Coverage for kwai/kwai_bus.py: 0%
35 statements
« prev ^ index » next coverage.py v7.3.0, created at 2023-09-05 17:55 +0000
« prev ^ index » next coverage.py v7.3.0, created at 2023-09-05 17:55 +0000
1"""Entry point for starting the event bus."""
2import asyncio
3import os
4import sys
6from loguru import logger
7from redis.asyncio import Redis
9from kwai.core.dependencies import container
10from kwai.core.events.redis_bus import RedisBus
11from kwai.core.settings import Settings, SettingsException
14def create_bus():
15 """Create the event bus."""
16 try:
17 settings = container[Settings]
18 except SettingsException as ex:
19 logger.error(f"Could not load settings: {ex}")
20 sys.exit(0)
22 if settings.redis.logger:
23 try:
24 logger.remove(0) # Remove the default logger
25 except ValueError:
26 pass # ignore the non-existence of the default logger
28 def log_format(record):
29 """Create a logging format for an event."""
30 new_format = "{time} - {level}"
31 if "event_id" in record["extra"]:
32 new_format += " - ({extra[event_id]})"
33 if "message_id" in record["extra"] and "stream" in record["extra"]:
34 new_format += " - ({extra[stream]}: {extra[message_id]})"
35 new_format += " - {message}" + os.linesep
37 return new_format
39 logger.add(
40 settings.redis.logger.file or sys.stderr,
41 format=log_format,
42 level=settings.redis.logger.level,
43 colorize=True,
44 retention=settings.redis.logger.retention,
45 rotation=settings.redis.logger.rotation,
46 )
48 bus = RedisBus(container[Redis])
50 from kwai.modules.identity.tasks import tasks as identity_tasks
52 # Subscribe all identity tasks to their event
53 for event, task_fn in identity_tasks.items():
54 bus.subscribe(event, task_fn)
56 return bus
59if __name__ == "__main__":
60 asyncio.run(create_bus().run())