Coverage for src/kwai/modules/identity/refresh_access_token.py: 90%

30 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-01-01 00:00 +0000

1"""Module that defines the use case for refreshing an access token.""" 

2 

3from dataclasses import dataclass 

4 

5from kwai.core.domain.value_objects.timestamp import Timestamp 

6from kwai.modules.identity.authenticate_user import AuthenticationException 

7from kwai.modules.identity.tokens.access_token import AccessTokenEntity 

8from kwai.modules.identity.tokens.access_token_repository import AccessTokenRepository 

9from kwai.modules.identity.tokens.refresh_token import RefreshTokenEntity 

10from kwai.modules.identity.tokens.refresh_token_repository import RefreshTokenRepository 

11from kwai.modules.identity.tokens.token_identifier import TokenIdentifier 

12 

13 

14@dataclass(kw_only=True, frozen=True, slots=True) 

15class RefreshAccessTokenCommand: 

16 """Input for the refresh access token use case. 

17 

18 Attributes: 

19 identifier: The identifier of the refresh token. 

20 access_token_expiry_minutes: Minutes before expiring the access token. 

21 Default is 2 hours. 

22 refresh_token_expiry_minutes: Minutes before expiring the refresh token. 

23 Default is 2 months. 

24 """ 

25 

26 identifier: str # The identifier of the refresh token. 

27 access_token_expiry_minutes: int = 60 * 2 # 2 hours 

28 refresh_token_expiry_minutes: int = 60 * 24 * 60 # 2 months 

29 

30 

31class RefreshAccessToken: 

32 """Use case for refreshing an access token. 

33 

34 Attributes: 

35 _refresh_token_repo (RefreshTokenRepository): The repo for getting and creating 

36 a new refresh token. 

37 _access_token_repo (AccessTokenRepository): The repo for updating and creating 

38 an access token. 

39 

40 Note: 

41 A new access token will also result in a new refresh token. 

42 """ 

43 

44 def __init__( 

45 self, 

46 refresh_token_repo: RefreshTokenRepository, 

47 access_token_repo: AccessTokenRepository, 

48 ): 

49 self._refresh_token_repo = refresh_token_repo 

50 self._access_token_repo = access_token_repo 

51 

52 async def execute(self, command: RefreshAccessTokenCommand) -> RefreshTokenEntity: 

53 """Execute the use case. 

54 

55 Args: 

56 command: The input for this use case. 

57 

58 Raises: 

59 RefreshTokenNotFoundException: Raised when the refresh token does not exist. 

60 AuthenticationException: Raised when the refresh token is expired, the 

61 refresh token is revoked or the user is revoked. 

62 """ 

63 refresh_token = await self._refresh_token_repo.get_by_token_identifier( 

64 TokenIdentifier(hex_string=command.identifier) 

65 ) 

66 

67 if refresh_token.expired: 

68 raise AuthenticationException("Refresh token is expired") 

69 

70 if refresh_token.revoked: 

71 raise AuthenticationException("Refresh token is revoked") 

72 

73 # Revoke the old refresh token and access token 

74 refresh_token.revoke() 

75 await self._refresh_token_repo.update(refresh_token) 

76 # The access token is also revoked, so update it 

77 await self._access_token_repo.update(refresh_token.access_token) 

78 

79 if refresh_token.access_token.user_account.revoked: 

80 raise AuthenticationException("User is revoked") 

81 

82 # Create a new access and refresh token 

83 access_token = await self._access_token_repo.create( 

84 AccessTokenEntity( 

85 identifier=TokenIdentifier.generate(), 

86 expiration=Timestamp.create_with_delta( 

87 minutes=command.access_token_expiry_minutes 

88 ), 

89 user_account=refresh_token.access_token.user_account, 

90 ) 

91 ) 

92 

93 return await self._refresh_token_repo.create( 

94 RefreshTokenEntity( 

95 identifier=TokenIdentifier.generate(), 

96 expiration=Timestamp.create_with_delta( 

97 minutes=command.refresh_token_expiry_minutes 

98 ), 

99 access_token=access_token, 

100 ) 

101 )