Coverage for kwai/modules/identity/user_invitations/user_invitation_db_repository.py: 100%

42 statements  

« prev     ^ index     » next       coverage.py v7.3.0, created at 2023-09-05 17:55 +0000

1"""Module that implements a user invitation repository for a database.""" 

2from typing import AsyncIterator 

3 

4from kwai.core.db.database import Database 

5from kwai.core.domain.entity import Entity 

6from kwai.core.domain.value_objects.unique_id import UniqueId 

7from kwai.modules.identity.user_invitations.user_invitation import ( 

8 UserInvitationEntity, 

9 UserInvitationIdentifier, 

10) 

11from kwai.modules.identity.user_invitations.user_invitation_db_query import ( 

12 UserInvitationDbQuery, 

13) 

14from kwai.modules.identity.user_invitations.user_invitation_query import ( 

15 UserInvitationQuery, 

16) 

17from kwai.modules.identity.user_invitations.user_invitation_repository import ( 

18 UserInvitationNotFoundException, 

19 UserInvitationRepository, 

20) 

21from kwai.modules.identity.user_invitations.user_invitation_tables import ( 

22 UserInvitationRow, 

23 UserInvitationsTable, 

24) 

25from kwai.modules.identity.users.user_tables import UsersTable 

26 

27 

28def _create_entity(row) -> UserInvitationEntity: 

29 """Create a user invitation from a row.""" 

30 return UserInvitationsTable(row).create_entity(UsersTable(row).create_entity()) 

31 

32 

33class UserInvitationDbRepository(UserInvitationRepository): 

34 """A user invitation repository for a database. 

35 

36 Attributes: 

37 _database(Database): the database for this repository. 

38 """ 

39 

40 def __init__(self, database: Database): 

41 self._database = database 

42 

43 def create_query(self) -> UserInvitationQuery: 

44 return UserInvitationDbQuery(self._database) 

45 

46 async def get_all( 

47 self, 

48 query: UserInvitationQuery, 

49 limit: int | None = None, 

50 offset: int | None = None, 

51 ) -> AsyncIterator[UserInvitationEntity]: 

52 async for row in query.fetch(limit, offset): 

53 yield _create_entity(row) 

54 

55 async def get_invitation_by_id( 

56 self, id_: UserInvitationIdentifier 

57 ) -> UserInvitationEntity: 

58 query = self.create_query() 

59 query.filter_by_id(id_) 

60 

61 if row := await query.fetch_one(): 

62 return _create_entity(row) 

63 

64 raise UserInvitationNotFoundException( 

65 f"User invitation with {id} does not exist." 

66 ) 

67 

68 async def get_invitation_by_uuid(self, uuid: UniqueId) -> UserInvitationEntity: 

69 query = self.create_query() 

70 query.filter_by_uuid(uuid) 

71 

72 if row := await query.fetch_one(): 

73 return _create_entity(row) 

74 

75 raise UserInvitationNotFoundException( 

76 f"User invitation with uuid {uuid} does not exist." 

77 ) 

78 

79 async def create(self, invitation: UserInvitationEntity) -> UserInvitationEntity: 

80 new_id = await self._database.insert( 

81 UserInvitationsTable.table_name, UserInvitationRow.persist(invitation) 

82 ) 

83 await self._database.commit() 

84 return Entity.replace(invitation, id_=UserInvitationIdentifier(new_id)) 

85 

86 async def update(self, invitation: UserInvitationEntity) -> None: 

87 await self._database.update( 

88 invitation.id.value, 

89 UserInvitationsTable.table_name, 

90 UserInvitationRow.persist(invitation), 

91 ) 

92 await self._database.commit() 

93 

94 async def delete(self, invitation: UserInvitationEntity) -> None: 

95 await self._database.delete( 

96 invitation.id.value, UserInvitationsTable.table_name 

97 ) 

98 await self._database.commit()