Coverage for src/kwai/modules/portal/repositories/author_db_repository.py: 94%
32 statements
« prev ^ index » next coverage.py v7.7.1, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2024-01-01 00:00 +0000
1"""Module that implements an AuthorRepository for a database."""
3from sql_smith.functions import on
4from sql_smith.query import SelectQuery
6from kwai.core.db.database import Database
7from kwai.core.domain.value_objects.unique_id import UniqueId
8from kwai.modules.portal.domain.author import AuthorEntity, AuthorIdentifier
9from kwai.modules.portal.repositories._tables import AuthorRow, UserRow
10from kwai.modules.portal.repositories.author_repository import (
11 AuthorNotFoundException,
12 AuthorRepository,
13)
16class AuthorDbRepository(AuthorRepository):
17 """A author repository for a database."""
19 def __init__(self, database: Database):
20 self._database = database
21 super().__init__()
23 def _create_query(self) -> SelectQuery:
24 """Create a base query."""
25 return (
26 self._database.create_query_factory()
27 .select()
28 .from_(AuthorRow.__table_name__)
29 .columns(*(AuthorRow.get_aliases()) + UserRow.get_aliases())
30 .join(
31 UserRow.__table_name__,
32 on(UserRow.column("id"), AuthorRow.column("user_id")),
33 )
34 )
36 async def get(self, id: AuthorIdentifier) -> AuthorEntity:
37 query = self._create_query().where(AuthorRow.field("user_id").eq(id.value))
38 row = await self._database.fetch_one(query)
39 if row:
40 user_row = UserRow.map(row)
41 return AuthorRow.map(row).create_entity(
42 UniqueId.create_from_string(user_row.uuid)
43 )
45 raise AuthorNotFoundException()
47 async def get_by_uuid(self, uuid: UniqueId) -> AuthorEntity:
48 query = self._create_query().where(UserRow.field("uuid").eq(uuid))
49 row = await self._database.fetch_one(query)
50 if row:
51 user_row = UserRow.map(row)
52 return AuthorRow.map(row).create_entity(
53 UniqueId.create_from_string(user_row.uuid)
54 )
56 raise AuthorNotFoundException()
58 async def create(self, author: AuthorEntity) -> AuthorEntity:
59 await self._database.insert(AuthorRow.__table_name__, AuthorRow.persist(author))
60 return author
62 async def delete(self, author: AuthorEntity) -> None:
63 await self._database.delete(
64 author.id.value, AuthorRow.__table_name__, "user_id"
65 )