Coverage for kwai/modules/identity/refresh_access_token.py: 90%

30 statements  

« prev     ^ index     » next       coverage.py v7.3.0, created at 2023-09-05 17:55 +0000

1"""Module that defines the use case for refreshing an access token.""" 

2from dataclasses import dataclass 

3from datetime import datetime, timedelta 

4 

5from kwai.modules.identity.authenticate_user import AuthenticationException 

6from kwai.modules.identity.tokens.access_token import AccessTokenEntity 

7from kwai.modules.identity.tokens.access_token_repository import AccessTokenRepository 

8from kwai.modules.identity.tokens.refresh_token import RefreshTokenEntity 

9from kwai.modules.identity.tokens.refresh_token_repository import RefreshTokenRepository 

10from kwai.modules.identity.tokens.token_identifier import TokenIdentifier 

11 

12 

13@dataclass(kw_only=True, frozen=True, slots=True) 

14class RefreshAccessTokenCommand: 

15 """Input for the refresh access token use case. 

16 

17 Attributes: 

18 identifier: The identifier of the refresh token. 

19 access_token_expiry_minutes: Minutes before expiring the access token. 

20 Default is 2 hours. 

21 refresh_token_expiry_minutes: Minutes before expiring the refresh token. 

22 Default is 2 months. 

23 """ 

24 

25 identifier: str # The identifier of the refresh token. 

26 access_token_expiry_minutes: int = 60 * 2 # 2 hours 

27 refresh_token_expiry_minutes: int = 60 * 24 * 60 # 2 months 

28 

29 

30class RefreshAccessToken: 

31 """Use case for refreshing an access token. 

32 

33 Attributes: 

34 _refresh_token_repo (RefreshTokenRepository): The repo for getting and creating 

35 a new refresh token. 

36 _access_token_repo (AccessTokenRepository): The repo for updating and creating 

37 an access token. 

38 

39 Note: 

40 A new access token will also result in a new refresh token. 

41 """ 

42 

43 def __init__( 

44 self, 

45 refresh_token_repo: RefreshTokenRepository, 

46 access_token_repo: AccessTokenRepository, 

47 ): 

48 self._refresh_token_repo = refresh_token_repo 

49 self._access_token_repo = access_token_repo 

50 

51 async def execute(self, command: RefreshAccessTokenCommand) -> RefreshTokenEntity: 

52 """Execute the use case. 

53 

54 Args: 

55 command: The input for this use case. 

56 

57 Raises: 

58 RefreshTokenNotFoundException: Raised when the refresh token does not exist. 

59 AuthenticationException: Raised when the refresh token is expired, the 

60 refresh token is revoked or the user is revoked. 

61 """ 

62 refresh_token = await self._refresh_token_repo.get_by_token_identifier( 

63 TokenIdentifier(hex_string=command.identifier) 

64 ) 

65 

66 if refresh_token.expired: 

67 raise AuthenticationException("Refresh token is expired") 

68 

69 if refresh_token.revoked: 

70 raise AuthenticationException("Refresh token is revoked") 

71 

72 # Revoke the old refresh token and access token 

73 refresh_token.revoke() 

74 await self._refresh_token_repo.update(refresh_token) 

75 # The access token is also revoked, so update it 

76 await self._access_token_repo.update(refresh_token.access_token) 

77 

78 if refresh_token.access_token.user_account.revoked: 

79 raise AuthenticationException("User is revoked") 

80 

81 # Create a new access and refresh token 

82 access_token = await self._access_token_repo.create( 

83 AccessTokenEntity( 

84 identifier=TokenIdentifier.generate(), 

85 expiration=datetime.utcnow() 

86 + timedelta(minutes=command.access_token_expiry_minutes), 

87 user_account=refresh_token.access_token.user_account, 

88 ) 

89 ) 

90 

91 return await self._refresh_token_repo.create( 

92 RefreshTokenEntity( 

93 identifier=TokenIdentifier.generate(), 

94 expiration=datetime.utcnow() 

95 + timedelta(minutes=command.refresh_token_expiry_minutes), 

96 access_token=access_token, 

97 ) 

98 )