Coverage for kwai/modules/portal/applications/application_db_repository.py: 90%

42 statements  

« prev     ^ index     » next       coverage.py v7.3.0, created at 2023-09-05 17:55 +0000

1"""Module that implements an application repository for a database.""" 

2from typing import AsyncIterator 

3 

4from kwai.core.db.database import Database 

5from kwai.core.domain.entity import Entity 

6from kwai.modules.portal.applications.application import ( 

7 ApplicationEntity, 

8 ApplicationIdentifier, 

9) 

10from kwai.modules.portal.applications.application_db_query import ApplicationDbQuery 

11from kwai.modules.portal.applications.application_query import ApplicationQuery 

12from kwai.modules.portal.applications.application_repository import ( 

13 ApplicationNotFoundException, 

14 ApplicationRepository, 

15) 

16from kwai.modules.portal.applications.application_tables import ( 

17 ApplicationRow, 

18 ApplicationsTable, 

19) 

20 

21 

22def _create_entity(row): 

23 return ApplicationsTable(row).create_entity() 

24 

25 

26class ApplicationDbRepository(ApplicationRepository): 

27 """An application database repository. 

28 

29 Attributes: 

30 _database: the database for this repository. 

31 """ 

32 

33 def __init__(self, database: Database): 

34 self._database = database 

35 

36 def create_query(self) -> ApplicationQuery: 

37 return ApplicationDbQuery(self._database) 

38 

39 async def get_by_id(self, id_: ApplicationIdentifier) -> ApplicationEntity: 

40 query = self.create_query() 

41 query.filter_by_id(id_) 

42 

43 if row := await query.fetch_one(): 

44 return _create_entity(row) 

45 

46 raise ApplicationNotFoundException(f"Application with {id} does not exist.") 

47 

48 async def get_by_name(self, name: str) -> ApplicationEntity: 

49 query = self.create_query() 

50 query.filter_by_name(name) 

51 

52 if row := await query.fetch_one(): 

53 return _create_entity(row) 

54 

55 raise ApplicationNotFoundException(f"Application with {name} does not exist.") 

56 

57 async def get_all( 

58 self, 

59 query: ApplicationQuery | None = None, 

60 limit: int | None = None, 

61 offset: int | None = None, 

62 ) -> AsyncIterator[ApplicationEntity]: 

63 if query is None: 

64 query = self.create_query() 

65 async for row in query.fetch(limit, offset): 

66 yield _create_entity(row) 

67 

68 async def create(self, application: ApplicationEntity) -> ApplicationEntity: 

69 new_id = await self._database.insert( 

70 ApplicationsTable.table_name, ApplicationRow.persist(application) 

71 ) 

72 await self._database.commit() 

73 return Entity.replace(application, id_=ApplicationIdentifier(new_id)) 

74 

75 async def update(self, application: ApplicationEntity) -> None: 

76 await self._database.update( 

77 application.id.value, 

78 ApplicationsTable.table_name, 

79 ApplicationRow.persist(application), 

80 ) 

81 await self._database.commit() 

82 

83 async def delete(self, application: ApplicationEntity) -> None: 

84 await self._database.delete(application.id.value, ApplicationsTable.table_name) 

85 await self._database.commit()