Coverage for src/kwai/events/__main__.py: 0%

36 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-01-01 00:00 +0000

1"""Module for starting the event bus.""" 

2 

3import asyncio 

4import os 

5import sys 

6 

7from typing import Any 

8 

9from faststream import BaseMiddleware, FastStream 

10from faststream.redis import RedisBroker, RedisRouter 

11from faststream.security import SASLPlaintext 

12from loguru import logger 

13 

14from kwai.core.settings import LoggerSettings, get_settings 

15from kwai.events.v1 import router as v1_router 

16 

17 

18class LoggerMiddleware(BaseMiddleware): 

19 """Middleware that adds logging to the event bus.""" 

20 

21 async def on_consume(self, msg: Any) -> Any: 

22 """Set up a context for the logger.""" 

23 with logger.contextualize(): 

24 return await super().on_consume(msg) 

25 

26 

27def configure_logger(logger_settings: LoggerSettings): 

28 """Configure the logger.""" 

29 try: 

30 logger.remove(0) # Remove the default logger 

31 except ValueError: 

32 pass 

33 

34 def log_format(record): 

35 """Change the format when a request_id is set in extra.""" 

36 if "event_id" in record["extra"]: 

37 new_format = ( 

38 "{time} - {level} - ({extra[event_id]}) - {message}" + os.linesep 

39 ) 

40 else: 

41 new_format = "{time} - {level} - {message}" + os.linesep 

42 if record["exception"]: 

43 new_format += "{exception}" + os.linesep 

44 return new_format 

45 

46 logger.add( 

47 logger_settings.file or sys.stderr, 

48 format=log_format, 

49 level=logger_settings.level, 

50 colorize=True, 

51 retention=logger_settings.retention, 

52 rotation=logger_settings.rotation, 

53 backtrace=False, 

54 diagnose=False, 

55 ) 

56 

57 

58settings = get_settings() 

59 

60broker = RedisBroker( 

61 url=f"redis://{settings.redis.host}:{settings.redis.port}", 

62 middlewares=[LoggerMiddleware], 

63 security=SASLPlaintext( 

64 username="", 

65 password=settings.redis.password, 

66 ), 

67) 

68router = RedisRouter(prefix="kwai/") 

69router.include_router(v1_router) 

70broker.include_router(router) 

71 

72 

73async def main(): 

74 """Start the event bus.""" 

75 app = FastStream(broker) 

76 await app.run() 

77 

78 

79asyncio.run(main())