Coverage for src/kwai/core/events/fast_stream_publisher.py: 100%

10 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-01-01 00:00 +0000

1"""Module for defining a publisher using FastStream.""" 

2 

3from faststream.redis import RedisBroker 

4from loguru import logger 

5 

6from kwai.core.events.event import Event 

7from kwai.core.events.publisher import Publisher 

8 

9 

10class FastStreamPublisher(Publisher): 

11 """A publisher using FastStream.""" 

12 

13 def __init__(self, broker: RedisBroker): 

14 self._broker = broker 

15 

16 async def publish(self, event: Event): 

17 logger.info(f"Publishing event {event.meta.name} to {event.meta.module}") 

18 await self._broker.publish( 

19 event.data, 

20 stream=event.meta.full_name, 

21 )