Coverage for src/kwai/modules/identity/users/user_account_db_repository.py: 98%

46 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-01-01 00:00 +0000

1"""Module that implements a user account repository for a database.""" 

2 

3from collections.abc import AsyncGenerator 

4from dataclasses import replace 

5 

6from kwai.core.db.database import Database 

7from kwai.core.domain.value_objects.email_address import EmailAddress 

8from kwai.core.domain.value_objects.unique_id import UniqueId 

9from kwai.modules.identity.users.user import UserIdentifier 

10from kwai.modules.identity.users.user_account import ( 

11 UserAccountEntity, 

12 UserAccountIdentifier, 

13) 

14from kwai.modules.identity.users.user_account_db_query import UserAccountDbQuery 

15from kwai.modules.identity.users.user_account_query import UserAccountQuery 

16from kwai.modules.identity.users.user_account_repository import ( 

17 UserAccountNotFoundException, 

18 UserAccountRepository, 

19) 

20from kwai.modules.identity.users.user_tables import ( 

21 UserAccountRow, 

22) 

23 

24 

25class UserAccountDbRepository(UserAccountRepository): 

26 """User account repository for a database.""" 

27 

28 def __init__(self, database: Database): 

29 self._database = database 

30 

31 def create_query(self) -> UserAccountQuery: 

32 return UserAccountDbQuery(self._database) 

33 

34 async def get_all( 

35 self, 

36 query: UserAccountQuery | None = None, 

37 limit: int | None = None, 

38 offset: int | None = None, 

39 ) -> AsyncGenerator[UserAccountEntity, None]: 

40 query = query or self.create_query() 

41 async for row in query.fetch(limit, offset): 

42 yield UserAccountRow.map(row).create_entity() 

43 

44 async def get_user_by_email(self, email: EmailAddress) -> UserAccountEntity: 

45 query = self.create_query().filter_by_email(email) 

46 if row := await query.fetch_one(): 

47 return UserAccountRow.map(row).create_entity() 

48 

49 raise UserAccountNotFoundException() 

50 

51 async def exists_with_email(self, email: EmailAddress) -> bool: 

52 try: 

53 await self.get_user_by_email(email) 

54 except UserAccountNotFoundException: 

55 return False 

56 

57 return True 

58 

59 async def get_user_by_uuid(self, uuid: UniqueId) -> UserAccountEntity: 

60 query = self.create_query() 

61 query.filter_by_uuid(uuid) 

62 

63 row = await query.fetch_one() 

64 if row: 

65 return UserAccountRow.map(row).create_entity() 

66 

67 raise UserAccountNotFoundException() 

68 

69 async def create(self, user_account: UserAccountEntity) -> UserAccountEntity: 

70 new_id = await self._database.insert( 

71 UserAccountRow.__table_name__, UserAccountRow.persist(user_account) 

72 ) 

73 user = user_account.user.set_id(UserIdentifier(new_id)) 

74 return replace(user_account, user=user).set_id(UserAccountIdentifier(new_id)) 

75 

76 async def update(self, user_account: UserAccountEntity): 

77 await self._database.update( 

78 user_account.id.value, 

79 UserAccountRow.__table_name__, 

80 UserAccountRow.persist(user_account), 

81 ) 

82 

83 async def delete(self, user_account): 

84 await self._database.delete( 

85 user_account.id.value, UserAccountRow.__table_name__ 

86 )