Coverage for src/kwai/modules/identity/refresh_access_token.py: 90%
30 statements
« prev ^ index » next coverage.py v7.6.10, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2024-01-01 00:00 +0000
1"""Module that defines the use case for refreshing an access token."""
3from dataclasses import dataclass
5from kwai.core.domain.value_objects.timestamp import Timestamp
6from kwai.modules.identity.authenticate_user import AuthenticationException
7from kwai.modules.identity.tokens.access_token import AccessTokenEntity
8from kwai.modules.identity.tokens.access_token_repository import AccessTokenRepository
9from kwai.modules.identity.tokens.refresh_token import RefreshTokenEntity
10from kwai.modules.identity.tokens.refresh_token_repository import RefreshTokenRepository
11from kwai.modules.identity.tokens.token_identifier import TokenIdentifier
14@dataclass(kw_only=True, frozen=True, slots=True)
15class RefreshAccessTokenCommand:
16 """Input for the refresh access token use case.
18 Attributes:
19 identifier: The identifier of the refresh token.
20 access_token_expiry_minutes: Minutes before expiring the access token.
21 Default is 2 hours.
22 refresh_token_expiry_minutes: Minutes before expiring the refresh token.
23 Default is 2 months.
24 """
26 identifier: str # The identifier of the refresh token.
27 access_token_expiry_minutes: int = 60 * 2 # 2 hours
28 refresh_token_expiry_minutes: int = 60 * 24 * 60 # 2 months
31class RefreshAccessToken:
32 """Use case for refreshing an access token.
34 Attributes:
35 _refresh_token_repo (RefreshTokenRepository): The repo for getting and creating
36 a new refresh token.
37 _access_token_repo (AccessTokenRepository): The repo for updating and creating
38 an access token.
40 Note:
41 A new access token will also result in a new refresh token.
42 """
44 def __init__(
45 self,
46 refresh_token_repo: RefreshTokenRepository,
47 access_token_repo: AccessTokenRepository,
48 ):
49 self._refresh_token_repo = refresh_token_repo
50 self._access_token_repo = access_token_repo
52 async def execute(self, command: RefreshAccessTokenCommand) -> RefreshTokenEntity:
53 """Execute the use case.
55 Args:
56 command: The input for this use case.
58 Raises:
59 RefreshTokenNotFoundException: Raised when the refresh token does not exist.
60 AuthenticationException: Raised when the refresh token is expired, the
61 refresh token is revoked or the user is revoked.
62 """
63 refresh_token = await self._refresh_token_repo.get_by_token_identifier(
64 TokenIdentifier(hex_string=command.identifier)
65 )
67 if refresh_token.expired:
68 raise AuthenticationException("Refresh token is expired")
70 if refresh_token.revoked:
71 raise AuthenticationException("Refresh token is revoked")
73 # Revoke the old refresh token and access token
74 refresh_token.revoke()
75 await self._refresh_token_repo.update(refresh_token)
76 # The access token is also revoked, so update it
77 await self._access_token_repo.update(refresh_token.access_token)
79 if refresh_token.access_token.user_account.revoked:
80 raise AuthenticationException("User is revoked")
82 # Create a new access and refresh token
83 access_token = await self._access_token_repo.create(
84 AccessTokenEntity(
85 identifier=TokenIdentifier.generate(),
86 expiration=Timestamp.create_with_delta(
87 minutes=command.access_token_expiry_minutes
88 ),
89 user_account=refresh_token.access_token.user_account,
90 )
91 )
93 return await self._refresh_token_repo.create(
94 RefreshTokenEntity(
95 identifier=TokenIdentifier.generate(),
96 expiration=Timestamp.create_with_delta(
97 minutes=command.refresh_token_expiry_minutes
98 ),
99 access_token=access_token,
100 )
101 )