Coverage for kwai/cli/bus.py: 0%

79 statements  

« prev     ^ index     » next       coverage.py v7.3.0, created at 2023-09-05 17:55 +0000

1"""bus contains subcommands for the event bus.""" 

2import os 

3from asyncio import run 

4 

5import typer 

6from redis.asyncio import Redis 

7from rich import print 

8from rich.tree import Tree 

9from typer import Typer 

10 

11from kwai.core.dependencies import container 

12from kwai.core.events.stream import RedisStream 

13from kwai.core.settings import ENV_SETTINGS_FILE, Settings 

14 

15 

16def check(): 

17 """Check if the environment variable is set. If not, stop the cli.""" 

18 if ENV_SETTINGS_FILE not in os.environ: 

19 print( 

20 f"[bold red]Please set env variable {ENV_SETTINGS_FILE} to " 

21 f"the configuration file.[/bold red]" 

22 ) 

23 raise typer.Exit(code=1) 

24 env_file = os.environ[ENV_SETTINGS_FILE] 

25 print(f"Settings will be loaded from [bold green]{env_file}[/bold green].") 

26 

27 

28app = Typer(pretty_exceptions_short=True, callback=check) 

29 

30 

31@app.command(help="Show the event bus (redis) settings.") 

32def show(password: bool = typer.Option(False, help="Show the password")): 

33 """Command for showing the active database settings. 

34 

35 Args: 

36 password: show or hide the password (default is hide). 

37 """ 

38 try: 

39 settings = container[Settings] 

40 print(f"Host: [bold]{settings.redis.host}[/bold]") 

41 print(f"Port: [bold]{settings.redis.port}[/bold]") 

42 if password: 

43 print(f"Password: [bold]{settings.redis.password}[/bold]") 

44 except Exception as ex: 

45 print("[bold red]Failed![/bold red] Could not load the settings!") 

46 print(ex) 

47 raise typer.Exit(code=1) from None 

48 

49 

50@app.command(help="Test the redis connection.") 

51def test(): 

52 """Command for testing the redis connection.""" 

53 try: 

54 container[Redis] 

55 except Exception as ex: 

56 print("[bold red]Failed![/bold red] Could not connect to redis!") 

57 print(ex) 

58 raise typer.Exit(code=1) from None 

59 

60 print("[bold green]Success![/bold green] Connection to redis established!") 

61 

62 

63@app.command(help="Get information about a stream") 

64def stream( # noqa 

65 name: str = typer.Option(..., help="The name of the stream"), 

66 messages: bool = typer.Option(False, help="List all messages"), 

67): 

68 """Command for getting information about a stream. 

69 

70 Args: 

71 name: The name of the stream. 

72 messages: List all messages or not? Default is False. 

73 """ 

74 try: 

75 redis = container[Redis] 

76 except Exception as ex: 

77 print("[bold red]Failed![/bold red] Could not connect to redis!") 

78 print(ex) 

79 raise typer.Exit(code=1) from None 

80 

81 stream_name = f"kwai.{name}" 

82 

83 async def _main(): 

84 """Closure for handling the async code.""" 

85 stream_ = RedisStream(redis, stream_name) 

86 info = await stream_.info() 

87 print(f"Stream: [bold]{stream_.name}[/bold]") 

88 print(f"Number of messages: [bold]{info.length}[/bold]") 

89 print(f"First entry: [bold]{info.first_entry}[/bold]") 

90 print(f"Last entry: [bold]{info.last_entry}[/bold]") 

91 

92 if not messages: 

93 return 

94 

95 if info.length > 100: 

96 if not typer.confirm( 

97 f"You are about to browse {info.length} messages. Are you sure?" 

98 ): 

99 return 

100 

101 stream_ = RedisStream(redis, stream_name) 

102 

103 tree = Tree("Messages") 

104 last_id = "0-0" 

105 while True: 

106 message = await stream_.read(last_id) 

107 if message is None: 

108 break 

109 last_id = message.id 

110 

111 leaf = tree.add(f"[bold]{message.id}[/bold]") 

112 if "meta" in message.data: 

113 if "name" in message.data["meta"]: 

114 text = ( 

115 "[green]" 

116 f"[bold]{message.data['meta']['name']}[/bold]" 

117 "[/green]:" 

118 ) 

119 else: 

120 text = "" 

121 if "date" in message.data["meta"]: 

122 text += f" {message.data['meta']['date']}" 

123 if len(text) > 0: 

124 leaf = leaf.add(text) 

125 leaf.add(str(message.data["data"])) 

126 

127 print(tree) 

128 

129 run(_main())