Coverage for src/kwai/cli/commands/bus.py: 0%

152 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-01-01 00:00 +0000

1"""bus contains subcommands for the event bus.""" 

2 

3import os 

4 

5from asyncio import run 

6from typing import Optional 

7 

8import inject 

9import typer 

10 

11from redis import RedisError 

12from redis.asyncio import Redis 

13from rich import print 

14from rich.tree import Tree 

15from typer import Typer 

16 

17from kwai.core.events.stream import RedisStream 

18from kwai.core.settings import ENV_SETTINGS_FILE, get_settings 

19 

20 

21def check(): 

22 """Check if the environment variable is set. If not, stop the cli.""" 

23 if ENV_SETTINGS_FILE not in os.environ: 

24 print( 

25 f"[bold red]Please set env variable {ENV_SETTINGS_FILE} to " 

26 f"the configuration file.[/bold red]" 

27 ) 

28 raise typer.Exit(code=1) 

29 env_file = os.environ[ENV_SETTINGS_FILE] 

30 print(f"Settings will be loaded from [bold green]{env_file}[/bold green].") 

31 

32 

33app = Typer(pretty_exceptions_short=True, callback=check) 

34 

35 

36async def _create_groups_tree(stream_: RedisStream) -> Tree: 

37 tree = Tree("Groups:") 

38 groups_ = await stream_.get_groups() 

39 for group_info in groups_.values(): 

40 leaf = tree.add(group_info.name) 

41 leaf.add(f"[bold]Pending: [/bold]{group_info.pending}") 

42 leaf.add(f"[bold]Consumers: [/bold]{group_info.consumers}") 

43 leaf.add(f"[bold]Last delivered: [/bold]{group_info.last_delivered_id}") 

44 return tree 

45 

46 

47@app.command(name="show", help="Show the event bus (redis) settings.") 

48def show_command(password: bool = typer.Option(False, help="Show the password")): 

49 """Command for showing the active database settings. 

50 

51 Args: 

52 password: show or hide the password (default is hide). 

53 """ 

54 try: 

55 settings = get_settings() 

56 print(f"Host: [bold]{settings.redis.host}[/bold]") 

57 print(f"Port: [bold]{settings.redis.port}[/bold]") 

58 if password: 

59 print(f"Password: [bold]{settings.redis.password}[/bold]") 

60 except Exception as ex: 

61 print("[bold red]Failed![/bold red] Could not load the settings!") 

62 print(ex) 

63 raise typer.Exit(code=1) from None 

64 

65 

66@app.command(name="test", help="Test the redis connection.") 

67def test_command(): 

68 """Command for testing the redis connection. 

69 

70 When a connection was successful, a PING command will be sent to the redis server. 

71 """ 

72 

73 @inject.autoparams() 

74 async def execute(redis: Redis): 

75 try: 

76 ping_result = await redis.ping() 

77 if ping_result: 

78 print("Ping [bold green]success[/bold green]!") 

79 else: 

80 print("Ping [bold red]failed[/bold red]!") 

81 raise typer.Exit(code=1) from None 

82 except RedisError as ex: 

83 print("[bold red]Failed![/bold red] Could not connect to redis!") 

84 print(ex) 

85 print("The show command can be used to check the settings.") 

86 raise typer.Exit(code=1) from None 

87 

88 run(execute()) 

89 

90 

91@app.command(name="groups", help="") 

92def groups_command( 

93 stream: str = typer.Option(help="Name of the stream"), 

94 group: Optional[str] = typer.Option(default=None, help="Name of the group"), 

95 delete: Optional[bool] = typer.Option(default=False, help="Delete the group?"), 

96): 

97 """Command for showing groups of a stream.""" 

98 

99 @inject.autoparams() 

100 async def execute(redis: Redis): 

101 stream_ = RedisStream(redis, stream) 

102 

103 if group is None: # Print all groups 

104 try: 

105 tree = await _create_groups_tree(stream_) 

106 print(tree) 

107 except RedisError as exc: 

108 print("Could not retrieve group(s)!") 

109 raise typer.Exit(code=1) from exc 

110 return 

111 

112 try: 

113 group_info = await stream_.get_group(group) 

114 if group_info is None: 

115 print( 

116 f"Group [bold red]{group}[/bold red] does not exist " 

117 f"in stream [bold red]{stream}[/bold red]" 

118 ) 

119 raise typer.Exit(code=1) from None 

120 except RedisError as ex: 

121 print("Could not retrieve group(s)!") 

122 raise typer.Exit(code=1) from ex 

123 

124 tree = Tree(group_info.name) 

125 tree.add(f"[bold]Pending: [/bold]{group_info.pending}") 

126 tree.add(f"[bold]Consumers: [/bold]{group_info.consumers}") 

127 tree.add(f"[bold]Last delivered: [/bold]{group_info.last_delivered_id}") 

128 print(tree) 

129 

130 if delete: 

131 if group_info.pending > 0: 

132 print( 

133 f"[bold]There are still " 

134 f"[bold red]{group_info.pending}[/bold red] " 

135 f"messages pending![/bold]" 

136 ) 

137 if not typer.confirm( 

138 f"Are you sure to delete the group {group} from stream {stream}?" 

139 ): 

140 return 

141 

142 try: 

143 await stream_.delete_group(group) 

144 except RedisError as exc: 

145 print(f"Could not delete group {group}!") 

146 raise typer.Exit(code=1) from exc 

147 

148 run(execute()) 

149 

150 

151@app.command(name="streams", help="Get a list of available streams") 

152def streams_command(groups: bool = typer.Option(False, help="List all groups")): 

153 """Command for getting a list of streams from Redis. 

154 

155 Args: 

156 groups: List all groups of a stream or not? Default is False. 

157 """ 

158 

159 @inject.autoparams() 

160 async def execute(redis: Redis): 

161 try: 

162 async for stream in redis.scan_iter(type_="STREAM"): 

163 print(stream.decode("utf-8")) 

164 if groups: 

165 tree = await _create_groups_tree(RedisStream(redis, stream)) 

166 print(tree) 

167 except RedisError as ex: 

168 print("Could not retrieve streams!") 

169 raise typer.Exit(code=1) from ex 

170 

171 run(execute()) 

172 

173 

174@app.command(name="stream", help="Get information about a stream") 

175def stream_command( # noqa 

176 name: str = typer.Option(..., help="The name of the stream"), 

177 messages: bool = typer.Option(False, help="List all messages"), 

178): 

179 """Command for getting information about a stream. 

180 

181 Args: 

182 name: The name of the stream. 

183 messages: List all messages or not? Default is False. 

184 """ 

185 

186 async def create_message_tree(stream: RedisStream) -> Tree: 

187 tree = Tree("Messages:") 

188 last_id = "0-0" 

189 while True: 

190 message = await stream.read(last_id) 

191 if message is None: 

192 break 

193 last_id = message.id 

194 

195 leaf = tree.add(f"[bold]{message.id}[/bold]") 

196 if "meta" in message.data: 

197 if "name" in message.data["meta"]: 

198 text = ( 

199 f"[green][bold]{message.data['meta']['name']}[/bold][/green]:" 

200 ) 

201 else: 

202 text = "" 

203 if "date" in message.data["meta"]: 

204 text += f" {message.data['meta']['date']}" 

205 if len(text) > 0: 

206 leaf = leaf.add(text) 

207 leaf.add(str(message.data["data"])) 

208 return tree 

209 

210 @inject.autoparams() 

211 async def execute(redis: Redis): 

212 """Closure for handling the async code.""" 

213 stream = RedisStream(redis, name) 

214 info = await stream.info() 

215 if info is None: 

216 print(f"Stream [bold red]{stream.name}[/bold red] does not exist!") 

217 raise typer.Exit(code=1) from None 

218 

219 print(f"Stream: [bold]{stream.name}[/bold]") 

220 print(f"Number of messages: [bold]{info.length}[/bold]") 

221 print(f"First entry: [bold]{info.first_entry}[/bold]") 

222 print(f"Last entry: [bold]{info.last_entry}[/bold]") 

223 

224 print(await _create_groups_tree(stream)) 

225 

226 if not messages: 

227 return 

228 

229 if info.length > 100: 

230 if not typer.confirm( 

231 f"You are about to browse {info.length} messages. Are you sure?" 

232 ): 

233 return 

234 

235 try: 

236 tree = await create_message_tree(RedisStream(redis, name)) 

237 print(tree) 

238 except RedisError: 

239 print("Could not retrieve messages!") 

240 raise typer.Exit(code=1) from None 

241 

242 run(execute())