Coverage for src/kwai/events/__main__.py: 0%
48 statements
« prev ^ index » next coverage.py v7.7.1, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2024-01-01 00:00 +0000
1"""Module for defining the event application."""
3import asyncio
4import os
5import signal
6import sys
8import inject
10from loguru import logger
11from redis.asyncio import Redis
13from kwai.core.events import dependencies
14from kwai.core.events.redis_bus import RedisBus
15from kwai.core.settings import LoggerSettings, Settings
16from kwai.events.v1 import router
19def configure_logger(logger_settings: LoggerSettings):
20 """Configure the logger."""
21 try:
22 logger.remove(0) # Remove the default logger
23 except ValueError:
24 pass
26 def log_format(record):
27 """Change the format when a request_id is set in extra."""
28 new_format = "{time} - {level}"
29 if "stream" in record["extra"]:
30 new_format += " - {extra[stream]}"
31 if "message_id" in record["extra"]:
32 new_format += " - ({extra[message_id]})"
33 new_format += " - {message}" + os.linesep
34 if record["exception"]:
35 new_format += "{exception}" + os.linesep
36 return new_format
38 logger.add(
39 logger_settings.file or sys.stderr,
40 format=log_format,
41 level=logger_settings.level,
42 colorize=True,
43 retention=logger_settings.retention,
44 rotation=logger_settings.rotation,
45 backtrace=False,
46 diagnose=False,
47 )
50def shutdown(sig, frame):
51 """A signal has been received to stop the application."""
52 print(f"Received exit signal {signal.Signals(sig).name}")
53 loop = asyncio.get_running_loop()
54 for task in asyncio.all_tasks(loop):
55 task.cancel()
58@inject.autoparams()
59async def main(settings: Settings):
60 """Main program."""
61 redis = Redis(
62 host=settings.redis.host,
63 port=settings.redis.port,
64 password=settings.redis.password,
65 )
67 if settings.redis.logger:
68 configure_logger(settings.redis.logger)
70 bus = RedisBus(redis)
71 for route_element in router:
72 bus.subscribe(route_element)
74 for sig in (signal.SIGINT, signal.SIGTERM):
75 asyncio.get_running_loop().add_signal_handler(sig, shutdown, sig, None)
77 logger.info("Starting the event bus.")
78 await bus.run()
81if __name__ == "__main__":
82 dependencies.configure()
83 asyncio.run(main())
84 logger.info("The bus has stopped!")