Coverage for src/kwai/modules/portal/applications/application_db_repository.py: 95%
42 statements
« prev ^ index » next coverage.py v7.6.10, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2024-01-01 00:00 +0000
1"""Module that implements an application repository for a database."""
3from typing import AsyncIterator
5from kwai.core.db.database import Database
6from kwai.core.domain.entity import Entity
7from kwai.modules.portal.applications.application import (
8 ApplicationEntity,
9 ApplicationIdentifier,
10)
11from kwai.modules.portal.applications.application_db_query import ApplicationDbQuery
12from kwai.modules.portal.applications.application_query import ApplicationQuery
13from kwai.modules.portal.applications.application_repository import (
14 ApplicationNotFoundException,
15 ApplicationRepository,
16)
17from kwai.modules.portal.applications.application_tables import (
18 ApplicationRow,
19 ApplicationsTable,
20)
23def _create_entity(row):
24 return ApplicationsTable(row).create_entity()
27class ApplicationDbRepository(ApplicationRepository):
28 """An application database repository.
30 Attributes:
31 _database: the database for this repository.
32 """
34 def __init__(self, database: Database):
35 self._database = database
37 def create_query(self) -> ApplicationQuery:
38 return ApplicationDbQuery(self._database)
40 async def get_by_id(self, id_: ApplicationIdentifier) -> ApplicationEntity:
41 query = self.create_query()
42 query.filter_by_id(id_)
44 if row := await query.fetch_one():
45 return _create_entity(row)
47 raise ApplicationNotFoundException(f"Application with {id} does not exist.")
49 async def get_by_name(self, name: str) -> ApplicationEntity:
50 query = self.create_query()
51 query.filter_by_name(name)
53 if row := await query.fetch_one():
54 return _create_entity(row)
56 raise ApplicationNotFoundException(f"Application with {name} does not exist.")
58 async def get_all(
59 self,
60 query: ApplicationQuery | None = None,
61 limit: int | None = None,
62 offset: int | None = None,
63 ) -> AsyncIterator[ApplicationEntity]:
64 if query is None:
65 query = self.create_query()
66 async for row in query.fetch(limit, offset):
67 yield _create_entity(row)
69 async def create(self, application: ApplicationEntity) -> ApplicationEntity:
70 new_id = await self._database.insert(
71 ApplicationsTable.table_name, ApplicationRow.persist(application)
72 )
73 await self._database.commit()
74 return Entity.replace(application, id_=ApplicationIdentifier(new_id))
76 async def update(self, application: ApplicationEntity) -> None:
77 await self._database.update(
78 application.id.value,
79 ApplicationsTable.table_name,
80 ApplicationRow.persist(application),
81 )
82 await self._database.commit()
84 async def delete(self, application: ApplicationEntity) -> None:
85 await self._database.delete(application.id.value, ApplicationsTable.table_name)
86 await self._database.commit()