Coverage for src/kwai/modules/portal/repositories/author_db_repository.py: 94%

32 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2024-01-01 00:00 +0000

1"""Module that implements an AuthorRepository for a database.""" 

2 

3from sql_smith.functions import on 

4from sql_smith.query import SelectQuery 

5 

6from kwai.core.db.database import Database 

7from kwai.core.domain.value_objects.unique_id import UniqueId 

8from kwai.modules.portal.domain.author import AuthorEntity, AuthorIdentifier 

9from kwai.modules.portal.repositories._tables import AuthorRow, UserRow 

10from kwai.modules.portal.repositories.author_repository import ( 

11 AuthorNotFoundException, 

12 AuthorRepository, 

13) 

14 

15 

16class AuthorDbRepository(AuthorRepository): 

17 """A author repository for a database.""" 

18 

19 def __init__(self, database: Database): 

20 self._database = database 

21 super().__init__() 

22 

23 def _create_query(self) -> SelectQuery: 

24 """Create a base query.""" 

25 return ( 

26 self._database.create_query_factory() 

27 .select() 

28 .from_(AuthorRow.__table_name__) 

29 .columns(*(AuthorRow.get_aliases()) + UserRow.get_aliases()) 

30 .join( 

31 UserRow.__table_name__, 

32 on(UserRow.column("id"), AuthorRow.column("user_id")), 

33 ) 

34 ) 

35 

36 async def get(self, id: AuthorIdentifier) -> AuthorEntity: 

37 query = self._create_query().where(AuthorRow.field("user_id").eq(id.value)) 

38 row = await self._database.fetch_one(query) 

39 if row: 

40 user_row = UserRow.map(row) 

41 return AuthorRow.map(row).create_entity( 

42 UniqueId.create_from_string(user_row.uuid) 

43 ) 

44 

45 raise AuthorNotFoundException() 

46 

47 async def get_by_uuid(self, uuid: UniqueId) -> AuthorEntity: 

48 query = self._create_query().where(UserRow.field("uuid").eq(uuid)) 

49 row = await self._database.fetch_one(query) 

50 if row: 

51 user_row = UserRow.map(row) 

52 return AuthorRow.map(row).create_entity( 

53 UniqueId.create_from_string(user_row.uuid) 

54 ) 

55 

56 raise AuthorNotFoundException() 

57 

58 async def create(self, author: AuthorEntity) -> AuthorEntity: 

59 await self._database.insert(AuthorRow.__table_name__, AuthorRow.persist(author)) 

60 return author 

61 

62 async def delete(self, author: AuthorEntity) -> None: 

63 await self._database.delete( 

64 author.id.value, AuthorRow.__table_name__, "user_id" 

65 )