Coverage for src/kwai/cli/commands/bus.py: 0%
152 statements
« prev ^ index » next coverage.py v7.6.10, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2024-01-01 00:00 +0000
1"""bus contains subcommands for the event bus."""
3import os
5from asyncio import run
6from typing import Optional
8import inject
9import typer
11from redis import RedisError
12from redis.asyncio import Redis
13from rich import print
14from rich.tree import Tree
15from typer import Typer
17from kwai.core.events.stream import RedisStream
18from kwai.core.settings import ENV_SETTINGS_FILE, get_settings
21def check():
22 """Check if the environment variable is set. If not, stop the cli."""
23 if ENV_SETTINGS_FILE not in os.environ:
24 print(
25 f"[bold red]Please set env variable {ENV_SETTINGS_FILE} to "
26 f"the configuration file.[/bold red]"
27 )
28 raise typer.Exit(code=1)
29 env_file = os.environ[ENV_SETTINGS_FILE]
30 print(f"Settings will be loaded from [bold green]{env_file}[/bold green].")
33app = Typer(pretty_exceptions_short=True, callback=check)
36async def _create_groups_tree(stream_: RedisStream) -> Tree:
37 tree = Tree("Groups:")
38 groups_ = await stream_.get_groups()
39 for group_info in groups_.values():
40 leaf = tree.add(group_info.name)
41 leaf.add(f"[bold]Pending: [/bold]{group_info.pending}")
42 leaf.add(f"[bold]Consumers: [/bold]{group_info.consumers}")
43 leaf.add(f"[bold]Last delivered: [/bold]{group_info.last_delivered_id}")
44 return tree
47@app.command(name="show", help="Show the event bus (redis) settings.")
48def show_command(password: bool = typer.Option(False, help="Show the password")):
49 """Command for showing the active database settings.
51 Args:
52 password: show or hide the password (default is hide).
53 """
54 try:
55 settings = get_settings()
56 print(f"Host: [bold]{settings.redis.host}[/bold]")
57 print(f"Port: [bold]{settings.redis.port}[/bold]")
58 if password:
59 print(f"Password: [bold]{settings.redis.password}[/bold]")
60 except Exception as ex:
61 print("[bold red]Failed![/bold red] Could not load the settings!")
62 print(ex)
63 raise typer.Exit(code=1) from None
66@app.command(name="test", help="Test the redis connection.")
67def test_command():
68 """Command for testing the redis connection.
70 When a connection was successful, a PING command will be sent to the redis server.
71 """
73 @inject.autoparams()
74 async def execute(redis: Redis):
75 try:
76 ping_result = await redis.ping()
77 if ping_result:
78 print("Ping [bold green]success[/bold green]!")
79 else:
80 print("Ping [bold red]failed[/bold red]!")
81 raise typer.Exit(code=1) from None
82 except RedisError as ex:
83 print("[bold red]Failed![/bold red] Could not connect to redis!")
84 print(ex)
85 print("The show command can be used to check the settings.")
86 raise typer.Exit(code=1) from None
88 run(execute())
91@app.command(name="groups", help="")
92def groups_command(
93 stream: str = typer.Option(help="Name of the stream"),
94 group: Optional[str] = typer.Option(default=None, help="Name of the group"),
95 delete: Optional[bool] = typer.Option(default=False, help="Delete the group?"),
96):
97 """Command for showing groups of a stream."""
99 @inject.autoparams()
100 async def execute(redis: Redis):
101 stream_ = RedisStream(redis, stream)
103 if group is None: # Print all groups
104 try:
105 tree = await _create_groups_tree(stream_)
106 print(tree)
107 except RedisError as exc:
108 print("Could not retrieve group(s)!")
109 raise typer.Exit(code=1) from exc
110 return
112 try:
113 group_info = await stream_.get_group(group)
114 if group_info is None:
115 print(
116 f"Group [bold red]{group}[/bold red] does not exist "
117 f"in stream [bold red]{stream}[/bold red]"
118 )
119 raise typer.Exit(code=1) from None
120 except RedisError as ex:
121 print("Could not retrieve group(s)!")
122 raise typer.Exit(code=1) from ex
124 tree = Tree(group_info.name)
125 tree.add(f"[bold]Pending: [/bold]{group_info.pending}")
126 tree.add(f"[bold]Consumers: [/bold]{group_info.consumers}")
127 tree.add(f"[bold]Last delivered: [/bold]{group_info.last_delivered_id}")
128 print(tree)
130 if delete:
131 if group_info.pending > 0:
132 print(
133 f"[bold]There are still "
134 f"[bold red]{group_info.pending}[/bold red] "
135 f"messages pending![/bold]"
136 )
137 if not typer.confirm(
138 f"Are you sure to delete the group {group} from stream {stream}?"
139 ):
140 return
142 try:
143 await stream_.delete_group(group)
144 except RedisError as exc:
145 print(f"Could not delete group {group}!")
146 raise typer.Exit(code=1) from exc
148 run(execute())
151@app.command(name="streams", help="Get a list of available streams")
152def streams_command(groups: bool = typer.Option(False, help="List all groups")):
153 """Command for getting a list of streams from Redis.
155 Args:
156 groups: List all groups of a stream or not? Default is False.
157 """
159 @inject.autoparams()
160 async def execute(redis: Redis):
161 try:
162 async for stream in redis.scan_iter(type_="STREAM"):
163 print(stream.decode("utf-8"))
164 if groups:
165 tree = await _create_groups_tree(RedisStream(redis, stream))
166 print(tree)
167 except RedisError as ex:
168 print("Could not retrieve streams!")
169 raise typer.Exit(code=1) from ex
171 run(execute())
174@app.command(name="stream", help="Get information about a stream")
175def stream_command( # noqa
176 name: str = typer.Option(..., help="The name of the stream"),
177 messages: bool = typer.Option(False, help="List all messages"),
178):
179 """Command for getting information about a stream.
181 Args:
182 name: The name of the stream.
183 messages: List all messages or not? Default is False.
184 """
186 async def create_message_tree(stream: RedisStream) -> Tree:
187 tree = Tree("Messages:")
188 last_id = "0-0"
189 while True:
190 message = await stream.read(last_id)
191 if message is None:
192 break
193 last_id = message.id
195 leaf = tree.add(f"[bold]{message.id}[/bold]")
196 if "meta" in message.data:
197 if "name" in message.data["meta"]:
198 text = (
199 f"[green][bold]{message.data['meta']['name']}[/bold][/green]:"
200 )
201 else:
202 text = ""
203 if "date" in message.data["meta"]:
204 text += f" {message.data['meta']['date']}"
205 if len(text) > 0:
206 leaf = leaf.add(text)
207 leaf.add(str(message.data["data"]))
208 return tree
210 @inject.autoparams()
211 async def execute(redis: Redis):
212 """Closure for handling the async code."""
213 stream = RedisStream(redis, name)
214 info = await stream.info()
215 if info is None:
216 print(f"Stream [bold red]{stream.name}[/bold red] does not exist!")
217 raise typer.Exit(code=1) from None
219 print(f"Stream: [bold]{stream.name}[/bold]")
220 print(f"Number of messages: [bold]{info.length}[/bold]")
221 print(f"First entry: [bold]{info.first_entry}[/bold]")
222 print(f"Last entry: [bold]{info.last_entry}[/bold]")
224 print(await _create_groups_tree(stream))
226 if not messages:
227 return
229 if info.length > 100:
230 if not typer.confirm(
231 f"You are about to browse {info.length} messages. Are you sure?"
232 ):
233 return
235 try:
236 tree = await create_message_tree(RedisStream(redis, name))
237 print(tree)
238 except RedisError:
239 print("Could not retrieve messages!")
240 raise typer.Exit(code=1) from None
242 run(execute())