Coverage for src/kwai/events/__main__.py: 0%

48 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2024-01-01 00:00 +0000

1"""Module for defining the event application.""" 

2 

3import asyncio 

4import os 

5import signal 

6import sys 

7 

8import inject 

9 

10from loguru import logger 

11from redis.asyncio import Redis 

12 

13from kwai.core.events import dependencies 

14from kwai.core.events.redis_bus import RedisBus 

15from kwai.core.settings import LoggerSettings, Settings 

16from kwai.events.v1 import router 

17 

18 

19def configure_logger(logger_settings: LoggerSettings): 

20 """Configure the logger.""" 

21 try: 

22 logger.remove(0) # Remove the default logger 

23 except ValueError: 

24 pass 

25 

26 def log_format(record): 

27 """Change the format when a request_id is set in extra.""" 

28 new_format = "{time} - {level}" 

29 if "stream" in record["extra"]: 

30 new_format += " - {extra[stream]}" 

31 if "message_id" in record["extra"]: 

32 new_format += " - ({extra[message_id]})" 

33 new_format += " - {message}" + os.linesep 

34 if record["exception"]: 

35 new_format += "{exception}" + os.linesep 

36 return new_format 

37 

38 logger.add( 

39 logger_settings.file or sys.stderr, 

40 format=log_format, 

41 level=logger_settings.level, 

42 colorize=True, 

43 retention=logger_settings.retention, 

44 rotation=logger_settings.rotation, 

45 backtrace=False, 

46 diagnose=False, 

47 ) 

48 

49 

50def shutdown(sig, frame): 

51 """A signal has been received to stop the application.""" 

52 print(f"Received exit signal {signal.Signals(sig).name}") 

53 loop = asyncio.get_running_loop() 

54 for task in asyncio.all_tasks(loop): 

55 task.cancel() 

56 

57 

58@inject.autoparams() 

59async def main(settings: Settings): 

60 """Main program.""" 

61 redis = Redis( 

62 host=settings.redis.host, 

63 port=settings.redis.port, 

64 password=settings.redis.password, 

65 ) 

66 

67 if settings.redis.logger: 

68 configure_logger(settings.redis.logger) 

69 

70 bus = RedisBus(redis) 

71 for route_element in router: 

72 bus.subscribe(route_element) 

73 

74 for sig in (signal.SIGINT, signal.SIGTERM): 

75 asyncio.get_running_loop().add_signal_handler(sig, shutdown, sig, None) 

76 

77 logger.info("Starting the event bus.") 

78 await bus.run() 

79 

80 

81if __name__ == "__main__": 

82 dependencies.configure() 

83 asyncio.run(main()) 

84 logger.info("The bus has stopped!")