Coverage for src/kwai/modules/identity/log_user_login.py: 0%
19 statements
« prev ^ index » next coverage.py v7.6.10, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2024-01-01 00:00 +0000
1"""Module for a use case that logs a user login."""
3from dataclasses import dataclass
5from kwai.modules.identity.tokens.refresh_token import (
6 RefreshTokenEntity,
7)
8from kwai.modules.identity.tokens.user_log import UserLogEntity
9from kwai.modules.identity.tokens.user_log_repository import UserLogRepository
10from kwai.modules.identity.tokens.value_objects import IpAddress
13@dataclass(frozen=True, kw_only=True, slots=True)
14class LogUserLoginCommand:
15 """Input for the use case."""
17 success: bool
18 email: str = ""
19 client_ip: str
20 user_agent: str
21 openid_sub: str | None = None
22 openid_provider: str | None = None
25class LogUserLogin:
26 """Use case for logging the login of a user."""
28 def __init__(
29 self,
30 user_log_repository: UserLogRepository,
31 refresh_token: RefreshTokenEntity | None,
32 ):
33 """Initialize the use case."""
34 self._repo = user_log_repository
35 self._refresh_token = refresh_token
37 async def execute(self, command: LogUserLoginCommand):
38 """Execute the use case."""
39 await self._repo.create(
40 UserLogEntity(
41 success=command.success,
42 email=command.email,
43 refresh_token=self._refresh_token,
44 client_ip=IpAddress.create(command.client_ip),
45 user_agent=command.user_agent,
46 )
47 )