Coverage for src/kwai/events/v1/identity/user_invitation_tasks.py: 0%

26 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-01-01 00:00 +0000

1"""Module that defines entry points for tasks for user invitations.""" 

2 

3from typing import Any 

4 

5from fast_depends import Depends 

6from faststream.redis import RedisRouter 

7from loguru import logger 

8 

9from kwai.core.db.uow import UnitOfWork 

10from kwai.core.domain.exceptions import UnprocessableException 

11from kwai.core.domain.value_objects.email_address import EmailAddress 

12from kwai.core.events.dependencies import ( 

13 create_database, 

14 create_mailer, 

15 create_template_engine, 

16) 

17from kwai.core.mail.recipient import Recipient, Recipients 

18from kwai.core.settings import get_settings 

19from kwai.core.template.mail_template import MailTemplate 

20from kwai.modules.identity.mail_user_invitation import ( 

21 MailUserInvitation, 

22 MailUserInvitationCommand, 

23) 

24from kwai.modules.identity.user_invitations.user_invitation_db_repository import ( 

25 UserInvitationDbRepository, 

26) 

27from kwai.modules.identity.user_invitations.user_invitation_events import ( 

28 UserInvitationCreatedEvent, 

29) 

30from kwai.modules.identity.user_invitations.user_invitation_repository import ( 

31 UserInvitationNotFoundException, 

32) 

33 

34 

35router = RedisRouter() 

36 

37 

38@router.subscriber(stream=UserInvitationCreatedEvent.meta.name) 

39async def email_user_invitation_task( 

40 event: dict[str, Any], 

41 settings=Depends(get_settings), 

42 database=Depends(create_database), 

43 mailer=Depends(create_mailer), 

44 template_engine=Depends(create_template_engine), 

45): 

46 """Task for sending the user invitation email.""" 

47 command = MailUserInvitationCommand(uuid=event["data"]["uuid"]) 

48 

49 try: 

50 async with UnitOfWork(database): 

51 await MailUserInvitation( 

52 UserInvitationDbRepository(database), 

53 mailer, 

54 Recipients( 

55 from_=Recipient( 

56 email=EmailAddress(settings.website.email), 

57 name=settings.website.name, 

58 ) 

59 ), 

60 MailTemplate( 

61 template_engine.create("identity/invitation_html"), 

62 template_engine.create("identity/invitation_txt"), 

63 ), 

64 ).execute(command) 

65 except UnprocessableException as ex: 

66 logger.error(f"Task not processed: {ex}") 

67 except UserInvitationNotFoundException: 

68 logger.error( 

69 f"Mail not send because user invitation does not exist " 

70 f"with uuid {command.uuid}" 

71 )