Coverage for src/kwai/events/__main__.py: 0%
36 statements
« prev ^ index » next coverage.py v7.6.10, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2024-01-01 00:00 +0000
1"""Module for starting the event bus."""
3import asyncio
4import os
5import sys
7from typing import Any
9from faststream import BaseMiddleware, FastStream
10from faststream.redis import RedisBroker, RedisRouter
11from faststream.security import SASLPlaintext
12from loguru import logger
14from kwai.core.settings import LoggerSettings, get_settings
15from kwai.events.v1 import router as v1_router
18class LoggerMiddleware(BaseMiddleware):
19 """Middleware that adds logging to the event bus."""
21 async def on_consume(self, msg: Any) -> Any:
22 """Set up a context for the logger."""
23 with logger.contextualize():
24 return await super().on_consume(msg)
27def configure_logger(logger_settings: LoggerSettings):
28 """Configure the logger."""
29 try:
30 logger.remove(0) # Remove the default logger
31 except ValueError:
32 pass
34 def log_format(record):
35 """Change the format when a request_id is set in extra."""
36 if "event_id" in record["extra"]:
37 new_format = (
38 "{time} - {level} - ({extra[event_id]}) - {message}" + os.linesep
39 )
40 else:
41 new_format = "{time} - {level} - {message}" + os.linesep
42 if record["exception"]:
43 new_format += "{exception}" + os.linesep
44 return new_format
46 logger.add(
47 logger_settings.file or sys.stderr,
48 format=log_format,
49 level=logger_settings.level,
50 colorize=True,
51 retention=logger_settings.retention,
52 rotation=logger_settings.rotation,
53 backtrace=False,
54 diagnose=False,
55 )
58settings = get_settings()
60broker = RedisBroker(
61 url=f"redis://{settings.redis.host}:{settings.redis.port}",
62 middlewares=[LoggerMiddleware],
63 security=SASLPlaintext(
64 username="",
65 password=settings.redis.password,
66 ),
67)
68router = RedisRouter(prefix="kwai/")
69router.include_router(v1_router)
70broker.include_router(router)
73async def main():
74 """Start the event bus."""
75 app = FastStream(broker)
76 await app.run()
79asyncio.run(main())