Coverage for src/kwai/modules/portal/repositories/author_db_repository.py: 94%

34 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2024-01-01 00:00 +0000

1"""Module that implements an AuthorRepository for a database.""" 

2 

3from kwai.core.db.database import Database 

4from kwai.core.domain.value_objects.unique_id import UniqueId 

5from kwai.modules.portal.domain.author import AuthorEntity, AuthorIdentifier 

6from kwai.modules.portal.repositories._tables import AuthorRow 

7from kwai.modules.portal.repositories.author_db_query import ( 

8 AuthorDbQuery, 

9 AuthorQueryRow, 

10) 

11from kwai.modules.portal.repositories.author_query import AuthorQuery 

12from kwai.modules.portal.repositories.author_repository import ( 

13 AuthorNotFoundException, 

14 AuthorRepository, 

15) 

16 

17 

18class AuthorDbRepository(AuthorRepository): 

19 """A author repository for a database.""" 

20 

21 def __init__(self, database: Database): 

22 self._database = database 

23 super().__init__() 

24 

25 def create_query(self) -> AuthorQuery: 

26 """Create a base query.""" 

27 return AuthorDbQuery(self._database) 

28 

29 async def get(self, id_: AuthorIdentifier) -> AuthorEntity: 

30 query = self.create_query().filter_by_id(id_) 

31 row = await query.fetch_one() 

32 if row: 

33 return AuthorQueryRow.map(row).create_entity() 

34 

35 raise AuthorNotFoundException() 

36 

37 async def get_by_uuid(self, uuid: UniqueId) -> AuthorEntity: 

38 query = self.create_query().filter_by_uuid(uuid) 

39 row = await query.fetch_one() 

40 if row: 

41 return AuthorQueryRow.map(row).create_entity() 

42 

43 raise AuthorNotFoundException() 

44 

45 async def get_all(self, query=None, limit=0, offset=0): 

46 query = query or self.create_query() 

47 

48 async for row in query.fetch(limit, offset): 

49 yield AuthorQueryRow.map(row).create_entity() 

50 

51 async def create(self, author: AuthorEntity) -> AuthorEntity: 

52 await self._database.insert(AuthorRow.__table_name__, AuthorRow.persist(author)) 

53 return author 

54 

55 async def delete(self, author: AuthorEntity) -> None: 

56 await self._database.delete( 

57 author.id.value, AuthorRow.__table_name__, "user_id" 

58 )