Coverage for src/kwai/core/events/event_router.py: 58%
19 statements
« prev ^ index » next coverage.py v7.6.10, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2024-01-01 00:00 +0000
1"""Module for defining an event router."""
3import inspect
5from dataclasses import dataclass
6from typing import Any, Callable, Type
8from loguru import logger
10from kwai.core.events.event import Event
13EventCallbackType = Callable[[dict[str, Any]], Any]
16@dataclass(frozen=True, slots=True, kw_only=True)
17class EventRouter:
18 """A router that defines which method must be called on an event."""
20 event: Type[Event]
21 callback: EventCallbackType
23 async def execute(self, event_data: dict[str, Any]) -> bool:
24 """Executes the callback."""
25 try:
26 if inspect.iscoroutinefunction(self.callback):
27 await self.callback(event_data)
28 else:
29 self.callback(event_data)
30 except Exception as ex:
31 logger.warning(f"The handler raised an exception: {ex!r}")
32 return False
33 return True