Coverage for src/kwai/modules/identity/user_invitations/user_invitation_db_repository.py: 100%

38 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-01-01 00:00 +0000

1"""Module that implements a user invitation repository for a database.""" 

2 

3from typing import AsyncIterator 

4 

5from kwai.core.db.database import Database 

6from kwai.core.domain.value_objects.unique_id import UniqueId 

7from kwai.modules.identity.user_invitations.user_invitation import ( 

8 UserInvitationEntity, 

9 UserInvitationIdentifier, 

10) 

11from kwai.modules.identity.user_invitations.user_invitation_db_query import ( 

12 UserInvitationDbQuery, 

13) 

14from kwai.modules.identity.user_invitations.user_invitation_query import ( 

15 UserInvitationQuery, 

16) 

17from kwai.modules.identity.user_invitations.user_invitation_repository import ( 

18 UserInvitationNotFoundException, 

19 UserInvitationRepository, 

20) 

21from kwai.modules.identity.user_invitations.user_invitation_tables import ( 

22 UserInvitationRow, 

23) 

24from kwai.modules.identity.users.user_tables import UserRow 

25 

26 

27def _create_entity(row) -> UserInvitationEntity: 

28 """Create a user invitation from a row.""" 

29 return UserInvitationRow.map(row).create_entity(UserRow.map(row).create_entity()) 

30 

31 

32class UserInvitationDbRepository(UserInvitationRepository): 

33 """A user invitation repository for a database. 

34 

35 Attributes: 

36 _database(Database): the database for this repository. 

37 """ 

38 

39 def __init__(self, database: Database): 

40 self._database = database 

41 

42 def create_query(self) -> UserInvitationQuery: 

43 return UserInvitationDbQuery(self._database) 

44 

45 async def get_all( 

46 self, 

47 query: UserInvitationQuery, 

48 limit: int | None = None, 

49 offset: int | None = None, 

50 ) -> AsyncIterator[UserInvitationEntity]: 

51 async for row in query.fetch(limit, offset): 

52 yield _create_entity(row) 

53 

54 async def get_invitation_by_id( 

55 self, id_: UserInvitationIdentifier 

56 ) -> UserInvitationEntity: 

57 query = self.create_query() 

58 query.filter_by_id(id_) 

59 

60 if row := await query.fetch_one(): 

61 return _create_entity(row) 

62 

63 raise UserInvitationNotFoundException( 

64 f"User invitation with {id} does not exist." 

65 ) 

66 

67 async def get_invitation_by_uuid(self, uuid: UniqueId) -> UserInvitationEntity: 

68 query = self.create_query() 

69 query.filter_by_uuid(uuid) 

70 

71 if row := await query.fetch_one(): 

72 return _create_entity(row) 

73 

74 raise UserInvitationNotFoundException( 

75 f"User invitation with uuid {uuid} does not exist." 

76 ) 

77 

78 async def create(self, invitation: UserInvitationEntity) -> UserInvitationEntity: 

79 new_id = await self._database.insert( 

80 UserInvitationRow.__table_name__, UserInvitationRow.persist(invitation) 

81 ) 

82 return invitation.set_id(UserInvitationIdentifier(new_id)) 

83 

84 async def update(self, invitation: UserInvitationEntity) -> None: 

85 await self._database.update( 

86 invitation.id.value, 

87 UserInvitationRow.__table_name__, 

88 UserInvitationRow.persist(invitation), 

89 ) 

90 

91 async def delete(self, invitation: UserInvitationEntity) -> None: 

92 await self._database.delete( 

93 invitation.id.value, UserInvitationRow.__table_name__ 

94 )