Coverage for src/kwai/modules/identity/authenticate_user.py: 90%

40 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-01-01 00:00 +0000

1"""Module that implements the use case: authenticate user.""" 

2 

3from dataclasses import dataclass 

4 

5from kwai.core.domain.value_objects.email_address import EmailAddress 

6from kwai.core.domain.value_objects.timestamp import Timestamp 

7from kwai.modules.identity.exceptions import AuthenticationException 

8from kwai.modules.identity.tokens.access_token import AccessTokenEntity 

9from kwai.modules.identity.tokens.access_token_repository import AccessTokenRepository 

10from kwai.modules.identity.tokens.log_user_login_service import LogUserLoginService 

11from kwai.modules.identity.tokens.refresh_token import RefreshTokenEntity 

12from kwai.modules.identity.tokens.refresh_token_repository import RefreshTokenRepository 

13from kwai.modules.identity.users.user_account_repository import UserAccountRepository 

14 

15 

16@dataclass(kw_only=True, frozen=True) 

17class AuthenticateUserCommand: 

18 """Input for the (AuthenticateUser) use case. 

19 

20 Attributes: 

21 username: The email address of the user. 

22 password: The password of the user. 

23 access_token_expiry_minutes: Minutes before expiring the access token. 

24 Default is 2 hours. 

25 refresh_token_expiry_minutes: Minutes before expiring the refresh token. 

26 Default is 2 months. 

27 """ 

28 

29 username: str 

30 password: str | None = None 

31 access_token_expiry_minutes: int = 60 * 2 # 2 hours 

32 refresh_token_expiry_minutes: int = 60 * 24 * 60 # 2 months 

33 

34 

35class AuthenticateUser: 

36 """Use case to authenticate a user.""" 

37 

38 def __init__( 

39 self, 

40 user_account_repo: UserAccountRepository, 

41 access_token_repo: AccessTokenRepository, 

42 refresh_token_repo: RefreshTokenRepository, 

43 log_user_login_service: LogUserLoginService, 

44 ): 

45 self._user_account_repo = user_account_repo 

46 self._access_token_repo = access_token_repo 

47 self._refresh_token_repo = refresh_token_repo 

48 self._log_user_login_service = log_user_login_service 

49 

50 async def execute(self, command: AuthenticateUserCommand) -> RefreshTokenEntity: 

51 """Execute the use case. 

52 

53 Args: 

54 command: The input for this use case. 

55 

56 Returns: 

57 RefreshTokenEntity: On success, a refresh token entity will be returned. 

58 

59 Raises: 

60 InvalidEmailException: Raised when the username 

61 contains an invalid email address. 

62 UserAccountNotFoundException: Raised when the user with the given email 

63 address doesn't exist. 

64 """ 

65 user_account = await self._user_account_repo.get_user_by_email( 

66 EmailAddress(command.username) 

67 ) 

68 if user_account.revoked: 

69 # save the last unsuccessful login 

70 await self._user_account_repo.update(user_account) 

71 

72 message = "User account is revoked" 

73 await self._log_user_login_service.notify_failure( 

74 message, user_account=user_account 

75 ) 

76 raise AuthenticationException(message) 

77 

78 user_account = user_account.login(command.password) 

79 if not user_account.logged_in: 

80 # save the last unsuccessful login 

81 await self._user_account_repo.update(user_account) 

82 

83 message = "Invalid password" 

84 await self._log_user_login_service.notify_failure( 

85 message, user_account=user_account 

86 ) 

87 raise AuthenticationException(message) 

88 

89 access_token = await self._access_token_repo.create( 

90 AccessTokenEntity( 

91 expiration=Timestamp.create_with_delta( 

92 minutes=command.access_token_expiry_minutes 

93 ), 

94 user_account=user_account, 

95 ) 

96 ) 

97 

98 refresh_token = await self._refresh_token_repo.create( 

99 RefreshTokenEntity( 

100 expiration=Timestamp.create_with_delta( 

101 minutes=command.refresh_token_expiry_minutes 

102 ), 

103 access_token=access_token, 

104 ) 

105 ) 

106 

107 # save the last successful login 

108 await self._user_account_repo.update(user_account) 

109 

110 await self._log_user_login_service.notify_success( 

111 user_account=user_account, refresh_token=refresh_token 

112 ) 

113 

114 return refresh_token