Coverage for kwai/modules/portal/applications/application_db_repository.py: 90%
42 statements
« prev ^ index » next coverage.py v7.3.0, created at 2023-09-05 17:55 +0000
« prev ^ index » next coverage.py v7.3.0, created at 2023-09-05 17:55 +0000
1"""Module that implements an application repository for a database."""
2from typing import AsyncIterator
4from kwai.core.db.database import Database
5from kwai.core.domain.entity import Entity
6from kwai.modules.portal.applications.application import (
7 ApplicationEntity,
8 ApplicationIdentifier,
9)
10from kwai.modules.portal.applications.application_db_query import ApplicationDbQuery
11from kwai.modules.portal.applications.application_query import ApplicationQuery
12from kwai.modules.portal.applications.application_repository import (
13 ApplicationNotFoundException,
14 ApplicationRepository,
15)
16from kwai.modules.portal.applications.application_tables import (
17 ApplicationRow,
18 ApplicationsTable,
19)
22def _create_entity(row):
23 return ApplicationsTable(row).create_entity()
26class ApplicationDbRepository(ApplicationRepository):
27 """An application database repository.
29 Attributes:
30 _database: the database for this repository.
31 """
33 def __init__(self, database: Database):
34 self._database = database
36 def create_query(self) -> ApplicationQuery:
37 return ApplicationDbQuery(self._database)
39 async def get_by_id(self, id_: ApplicationIdentifier) -> ApplicationEntity:
40 query = self.create_query()
41 query.filter_by_id(id_)
43 if row := await query.fetch_one():
44 return _create_entity(row)
46 raise ApplicationNotFoundException(f"Application with {id} does not exist.")
48 async def get_by_name(self, name: str) -> ApplicationEntity:
49 query = self.create_query()
50 query.filter_by_name(name)
52 if row := await query.fetch_one():
53 return _create_entity(row)
55 raise ApplicationNotFoundException(f"Application with {name} does not exist.")
57 async def get_all(
58 self,
59 query: ApplicationQuery | None = None,
60 limit: int | None = None,
61 offset: int | None = None,
62 ) -> AsyncIterator[ApplicationEntity]:
63 if query is None:
64 query = self.create_query()
65 async for row in query.fetch(limit, offset):
66 yield _create_entity(row)
68 async def create(self, application: ApplicationEntity) -> ApplicationEntity:
69 new_id = await self._database.insert(
70 ApplicationsTable.table_name, ApplicationRow.persist(application)
71 )
72 await self._database.commit()
73 return Entity.replace(application, id_=ApplicationIdentifier(new_id))
75 async def update(self, application: ApplicationEntity) -> None:
76 await self._database.update(
77 application.id.value,
78 ApplicationsTable.table_name,
79 ApplicationRow.persist(application),
80 )
81 await self._database.commit()
83 async def delete(self, application: ApplicationEntity) -> None:
84 await self._database.delete(application.id.value, ApplicationsTable.table_name)
85 await self._database.commit()