Coverage for src/kwai/events/v1/identity/user_invitation_tasks.py: 0%

28 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2024-01-01 00:00 +0000

1"""Module that defines entry points for tasks for user invitations.""" 

2 

3from typing import Any 

4 

5import inject 

6 

7from loguru import logger 

8 

9from kwai.core.db.database import Database 

10from kwai.core.db.uow import UnitOfWork 

11from kwai.core.domain.exceptions import UnprocessableException 

12from kwai.core.domain.value_objects.email_address import EmailAddress 

13from kwai.core.events.event_router import EventRouter 

14from kwai.core.mail.mailer import Mailer 

15from kwai.core.mail.recipient import Recipient, Recipients 

16from kwai.core.settings import Settings 

17from kwai.core.template.mail_template import MailTemplate 

18from kwai.core.template.template_engine import TemplateEngine 

19from kwai.modules.identity.mail_user_invitation import ( 

20 MailUserInvitation, 

21 MailUserInvitationCommand, 

22) 

23from kwai.modules.identity.user_invitations.user_invitation_db_repository import ( 

24 UserInvitationDbRepository, 

25) 

26from kwai.modules.identity.user_invitations.user_invitation_events import ( 

27 UserInvitationCreatedEvent, 

28) 

29from kwai.modules.identity.user_invitations.user_invitation_repository import ( 

30 UserInvitationNotFoundException, 

31) 

32 

33 

34@inject.autoparams() 

35async def email_user_invitation_task( 

36 event: dict[str, Any], 

37 settings: Settings, 

38 database: Database, 

39 mailer: Mailer, 

40 template_engine: TemplateEngine, 

41): 

42 """Task for sending the user invitation email.""" 

43 command = MailUserInvitationCommand(uuid=event["data"]["uuid"]) 

44 

45 try: 

46 async with UnitOfWork(database): 

47 await MailUserInvitation( 

48 UserInvitationDbRepository(database), 

49 mailer, 

50 Recipients( 

51 from_=Recipient( 

52 email=EmailAddress(settings.website.email), 

53 name=settings.website.name, 

54 ) 

55 ), 

56 MailTemplate( 

57 template_engine.create("mail/identity/invitation_html"), 

58 template_engine.create("mail/identity/invitation_txt"), 

59 ), 

60 ).execute(command) 

61 except UnprocessableException as ex: 

62 logger.error(f"Task not processed: {ex}") 

63 except UserInvitationNotFoundException: 

64 logger.error( 

65 f"Mail not send because user invitation does not exist " 

66 f"with uuid {command.uuid}" 

67 ) 

68 

69 

70router = ( 

71 EventRouter(event=UserInvitationCreatedEvent, callback=email_user_invitation_task), 

72)