Coverage for kwai/kwai_bus.py: 0%

35 statements  

« prev     ^ index     » next       coverage.py v7.3.0, created at 2023-09-05 17:55 +0000

1"""Entry point for starting the event bus.""" 

2import asyncio 

3import os 

4import sys 

5 

6from loguru import logger 

7from redis.asyncio import Redis 

8 

9from kwai.core.dependencies import container 

10from kwai.core.events.redis_bus import RedisBus 

11from kwai.core.settings import Settings, SettingsException 

12 

13 

14def create_bus(): 

15 """Create the event bus.""" 

16 try: 

17 settings = container[Settings] 

18 except SettingsException as ex: 

19 logger.error(f"Could not load settings: {ex}") 

20 sys.exit(0) 

21 

22 if settings.redis.logger: 

23 try: 

24 logger.remove(0) # Remove the default logger 

25 except ValueError: 

26 pass # ignore the non-existence of the default logger 

27 

28 def log_format(record): 

29 """Create a logging format for an event.""" 

30 new_format = "{time} - {level}" 

31 if "event_id" in record["extra"]: 

32 new_format += " - ({extra[event_id]})" 

33 if "message_id" in record["extra"] and "stream" in record["extra"]: 

34 new_format += " - ({extra[stream]}: {extra[message_id]})" 

35 new_format += " - {message}" + os.linesep 

36 

37 return new_format 

38 

39 logger.add( 

40 settings.redis.logger.file or sys.stderr, 

41 format=log_format, 

42 level=settings.redis.logger.level, 

43 colorize=True, 

44 retention=settings.redis.logger.retention, 

45 rotation=settings.redis.logger.rotation, 

46 ) 

47 

48 bus = RedisBus(container[Redis]) 

49 

50 from kwai.modules.identity.tasks import tasks as identity_tasks 

51 

52 # Subscribe all identity tasks to their event 

53 for event, task_fn in identity_tasks.items(): 

54 bus.subscribe(event, task_fn) 

55 

56 return bus 

57 

58 

59if __name__ == "__main__": 

60 asyncio.run(create_bus().run())