Coverage for src/kwai/modules/identity/user_invitations/user_invitation_db_repository.py: 100%
38 statements
« prev ^ index » next coverage.py v7.6.10, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2024-01-01 00:00 +0000
1"""Module that implements a user invitation repository for a database."""
3from typing import AsyncIterator
5from kwai.core.db.database import Database
6from kwai.core.domain.value_objects.unique_id import UniqueId
7from kwai.modules.identity.user_invitations.user_invitation import (
8 UserInvitationEntity,
9 UserInvitationIdentifier,
10)
11from kwai.modules.identity.user_invitations.user_invitation_db_query import (
12 UserInvitationDbQuery,
13)
14from kwai.modules.identity.user_invitations.user_invitation_query import (
15 UserInvitationQuery,
16)
17from kwai.modules.identity.user_invitations.user_invitation_repository import (
18 UserInvitationNotFoundException,
19 UserInvitationRepository,
20)
21from kwai.modules.identity.user_invitations.user_invitation_tables import (
22 UserInvitationRow,
23)
24from kwai.modules.identity.users.user_tables import UserRow
27def _create_entity(row) -> UserInvitationEntity:
28 """Create a user invitation from a row."""
29 return UserInvitationRow.map(row).create_entity(UserRow.map(row).create_entity())
32class UserInvitationDbRepository(UserInvitationRepository):
33 """A user invitation repository for a database.
35 Attributes:
36 _database(Database): the database for this repository.
37 """
39 def __init__(self, database: Database):
40 self._database = database
42 def create_query(self) -> UserInvitationQuery:
43 return UserInvitationDbQuery(self._database)
45 async def get_all(
46 self,
47 query: UserInvitationQuery,
48 limit: int | None = None,
49 offset: int | None = None,
50 ) -> AsyncIterator[UserInvitationEntity]:
51 async for row in query.fetch(limit, offset):
52 yield _create_entity(row)
54 async def get_invitation_by_id(
55 self, id_: UserInvitationIdentifier
56 ) -> UserInvitationEntity:
57 query = self.create_query()
58 query.filter_by_id(id_)
60 if row := await query.fetch_one():
61 return _create_entity(row)
63 raise UserInvitationNotFoundException(
64 f"User invitation with {id} does not exist."
65 )
67 async def get_invitation_by_uuid(self, uuid: UniqueId) -> UserInvitationEntity:
68 query = self.create_query()
69 query.filter_by_uuid(uuid)
71 if row := await query.fetch_one():
72 return _create_entity(row)
74 raise UserInvitationNotFoundException(
75 f"User invitation with uuid {uuid} does not exist."
76 )
78 async def create(self, invitation: UserInvitationEntity) -> UserInvitationEntity:
79 new_id = await self._database.insert(
80 UserInvitationRow.__table_name__, UserInvitationRow.persist(invitation)
81 )
82 return invitation.set_id(UserInvitationIdentifier(new_id))
84 async def update(self, invitation: UserInvitationEntity) -> None:
85 await self._database.update(
86 invitation.id.value,
87 UserInvitationRow.__table_name__,
88 UserInvitationRow.persist(invitation),
89 )
91 async def delete(self, invitation: UserInvitationEntity) -> None:
92 await self._database.delete(
93 invitation.id.value, UserInvitationRow.__table_name__
94 )