Coverage for src/kwai/modules/portal/applications/application_db_repository.py: 95%

42 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-01-01 00:00 +0000

1"""Module that implements an application repository for a database.""" 

2 

3from typing import AsyncIterator 

4 

5from kwai.core.db.database import Database 

6from kwai.core.domain.entity import Entity 

7from kwai.modules.portal.applications.application import ( 

8 ApplicationEntity, 

9 ApplicationIdentifier, 

10) 

11from kwai.modules.portal.applications.application_db_query import ApplicationDbQuery 

12from kwai.modules.portal.applications.application_query import ApplicationQuery 

13from kwai.modules.portal.applications.application_repository import ( 

14 ApplicationNotFoundException, 

15 ApplicationRepository, 

16) 

17from kwai.modules.portal.applications.application_tables import ( 

18 ApplicationRow, 

19 ApplicationsTable, 

20) 

21 

22 

23def _create_entity(row): 

24 return ApplicationsTable(row).create_entity() 

25 

26 

27class ApplicationDbRepository(ApplicationRepository): 

28 """An application database repository. 

29 

30 Attributes: 

31 _database: the database for this repository. 

32 """ 

33 

34 def __init__(self, database: Database): 

35 self._database = database 

36 

37 def create_query(self) -> ApplicationQuery: 

38 return ApplicationDbQuery(self._database) 

39 

40 async def get_by_id(self, id_: ApplicationIdentifier) -> ApplicationEntity: 

41 query = self.create_query() 

42 query.filter_by_id(id_) 

43 

44 if row := await query.fetch_one(): 

45 return _create_entity(row) 

46 

47 raise ApplicationNotFoundException(f"Application with {id} does not exist.") 

48 

49 async def get_by_name(self, name: str) -> ApplicationEntity: 

50 query = self.create_query() 

51 query.filter_by_name(name) 

52 

53 if row := await query.fetch_one(): 

54 return _create_entity(row) 

55 

56 raise ApplicationNotFoundException(f"Application with {name} does not exist.") 

57 

58 async def get_all( 

59 self, 

60 query: ApplicationQuery | None = None, 

61 limit: int | None = None, 

62 offset: int | None = None, 

63 ) -> AsyncIterator[ApplicationEntity]: 

64 if query is None: 

65 query = self.create_query() 

66 async for row in query.fetch(limit, offset): 

67 yield _create_entity(row) 

68 

69 async def create(self, application: ApplicationEntity) -> ApplicationEntity: 

70 new_id = await self._database.insert( 

71 ApplicationsTable.table_name, ApplicationRow.persist(application) 

72 ) 

73 await self._database.commit() 

74 return Entity.replace(application, id_=ApplicationIdentifier(new_id)) 

75 

76 async def update(self, application: ApplicationEntity) -> None: 

77 await self._database.update( 

78 application.id.value, 

79 ApplicationsTable.table_name, 

80 ApplicationRow.persist(application), 

81 ) 

82 await self._database.commit() 

83 

84 async def delete(self, application: ApplicationEntity) -> None: 

85 await self._database.delete(application.id.value, ApplicationsTable.table_name) 

86 await self._database.commit()