Coverage for kwai/cli/bus.py: 0%
79 statements
« prev ^ index » next coverage.py v7.3.0, created at 2023-09-05 17:55 +0000
« prev ^ index » next coverage.py v7.3.0, created at 2023-09-05 17:55 +0000
1"""bus contains subcommands for the event bus."""
2import os
3from asyncio import run
5import typer
6from redis.asyncio import Redis
7from rich import print
8from rich.tree import Tree
9from typer import Typer
11from kwai.core.dependencies import container
12from kwai.core.events.stream import RedisStream
13from kwai.core.settings import ENV_SETTINGS_FILE, Settings
16def check():
17 """Check if the environment variable is set. If not, stop the cli."""
18 if ENV_SETTINGS_FILE not in os.environ:
19 print(
20 f"[bold red]Please set env variable {ENV_SETTINGS_FILE} to "
21 f"the configuration file.[/bold red]"
22 )
23 raise typer.Exit(code=1)
24 env_file = os.environ[ENV_SETTINGS_FILE]
25 print(f"Settings will be loaded from [bold green]{env_file}[/bold green].")
28app = Typer(pretty_exceptions_short=True, callback=check)
31@app.command(help="Show the event bus (redis) settings.")
32def show(password: bool = typer.Option(False, help="Show the password")):
33 """Command for showing the active database settings.
35 Args:
36 password: show or hide the password (default is hide).
37 """
38 try:
39 settings = container[Settings]
40 print(f"Host: [bold]{settings.redis.host}[/bold]")
41 print(f"Port: [bold]{settings.redis.port}[/bold]")
42 if password:
43 print(f"Password: [bold]{settings.redis.password}[/bold]")
44 except Exception as ex:
45 print("[bold red]Failed![/bold red] Could not load the settings!")
46 print(ex)
47 raise typer.Exit(code=1) from None
50@app.command(help="Test the redis connection.")
51def test():
52 """Command for testing the redis connection."""
53 try:
54 container[Redis]
55 except Exception as ex:
56 print("[bold red]Failed![/bold red] Could not connect to redis!")
57 print(ex)
58 raise typer.Exit(code=1) from None
60 print("[bold green]Success![/bold green] Connection to redis established!")
63@app.command(help="Get information about a stream")
64def stream( # noqa
65 name: str = typer.Option(..., help="The name of the stream"),
66 messages: bool = typer.Option(False, help="List all messages"),
67):
68 """Command for getting information about a stream.
70 Args:
71 name: The name of the stream.
72 messages: List all messages or not? Default is False.
73 """
74 try:
75 redis = container[Redis]
76 except Exception as ex:
77 print("[bold red]Failed![/bold red] Could not connect to redis!")
78 print(ex)
79 raise typer.Exit(code=1) from None
81 stream_name = f"kwai.{name}"
83 async def _main():
84 """Closure for handling the async code."""
85 stream_ = RedisStream(redis, stream_name)
86 info = await stream_.info()
87 print(f"Stream: [bold]{stream_.name}[/bold]")
88 print(f"Number of messages: [bold]{info.length}[/bold]")
89 print(f"First entry: [bold]{info.first_entry}[/bold]")
90 print(f"Last entry: [bold]{info.last_entry}[/bold]")
92 if not messages:
93 return
95 if info.length > 100:
96 if not typer.confirm(
97 f"You are about to browse {info.length} messages. Are you sure?"
98 ):
99 return
101 stream_ = RedisStream(redis, stream_name)
103 tree = Tree("Messages")
104 last_id = "0-0"
105 while True:
106 message = await stream_.read(last_id)
107 if message is None:
108 break
109 last_id = message.id
111 leaf = tree.add(f"[bold]{message.id}[/bold]")
112 if "meta" in message.data:
113 if "name" in message.data["meta"]:
114 text = (
115 "[green]"
116 f"[bold]{message.data['meta']['name']}[/bold]"
117 "[/green]:"
118 )
119 else:
120 text = ""
121 if "date" in message.data["meta"]:
122 text += f" {message.data['meta']['date']}"
123 if len(text) > 0:
124 leaf = leaf.add(text)
125 leaf.add(str(message.data["data"]))
127 print(tree)
129 run(_main())