Coverage for kwai/modules/identity/user_invitations/user_invitation_db_repository.py: 100%
42 statements
« prev ^ index » next coverage.py v7.3.0, created at 2023-09-05 17:55 +0000
« prev ^ index » next coverage.py v7.3.0, created at 2023-09-05 17:55 +0000
1"""Module that implements a user invitation repository for a database."""
2from typing import AsyncIterator
4from kwai.core.db.database import Database
5from kwai.core.domain.entity import Entity
6from kwai.core.domain.value_objects.unique_id import UniqueId
7from kwai.modules.identity.user_invitations.user_invitation import (
8 UserInvitationEntity,
9 UserInvitationIdentifier,
10)
11from kwai.modules.identity.user_invitations.user_invitation_db_query import (
12 UserInvitationDbQuery,
13)
14from kwai.modules.identity.user_invitations.user_invitation_query import (
15 UserInvitationQuery,
16)
17from kwai.modules.identity.user_invitations.user_invitation_repository import (
18 UserInvitationNotFoundException,
19 UserInvitationRepository,
20)
21from kwai.modules.identity.user_invitations.user_invitation_tables import (
22 UserInvitationRow,
23 UserInvitationsTable,
24)
25from kwai.modules.identity.users.user_tables import UsersTable
28def _create_entity(row) -> UserInvitationEntity:
29 """Create a user invitation from a row."""
30 return UserInvitationsTable(row).create_entity(UsersTable(row).create_entity())
33class UserInvitationDbRepository(UserInvitationRepository):
34 """A user invitation repository for a database.
36 Attributes:
37 _database(Database): the database for this repository.
38 """
40 def __init__(self, database: Database):
41 self._database = database
43 def create_query(self) -> UserInvitationQuery:
44 return UserInvitationDbQuery(self._database)
46 async def get_all(
47 self,
48 query: UserInvitationQuery,
49 limit: int | None = None,
50 offset: int | None = None,
51 ) -> AsyncIterator[UserInvitationEntity]:
52 async for row in query.fetch(limit, offset):
53 yield _create_entity(row)
55 async def get_invitation_by_id(
56 self, id_: UserInvitationIdentifier
57 ) -> UserInvitationEntity:
58 query = self.create_query()
59 query.filter_by_id(id_)
61 if row := await query.fetch_one():
62 return _create_entity(row)
64 raise UserInvitationNotFoundException(
65 f"User invitation with {id} does not exist."
66 )
68 async def get_invitation_by_uuid(self, uuid: UniqueId) -> UserInvitationEntity:
69 query = self.create_query()
70 query.filter_by_uuid(uuid)
72 if row := await query.fetch_one():
73 return _create_entity(row)
75 raise UserInvitationNotFoundException(
76 f"User invitation with uuid {uuid} does not exist."
77 )
79 async def create(self, invitation: UserInvitationEntity) -> UserInvitationEntity:
80 new_id = await self._database.insert(
81 UserInvitationsTable.table_name, UserInvitationRow.persist(invitation)
82 )
83 await self._database.commit()
84 return Entity.replace(invitation, id_=UserInvitationIdentifier(new_id))
86 async def update(self, invitation: UserInvitationEntity) -> None:
87 await self._database.update(
88 invitation.id.value,
89 UserInvitationsTable.table_name,
90 UserInvitationRow.persist(invitation),
91 )
92 await self._database.commit()
94 async def delete(self, invitation: UserInvitationEntity) -> None:
95 await self._database.delete(
96 invitation.id.value, UserInvitationsTable.table_name
97 )
98 await self._database.commit()