Coverage for src/kwai/modules/identity/revoke_user.py: 100%

20 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-01-01 00:00 +0000

1"""Module that defines the use case for revoking a user.""" 

2 

3from dataclasses import dataclass 

4 

5from kwai.core.domain.presenter import Presenter 

6from kwai.core.domain.value_objects.unique_id import UniqueId 

7from kwai.modules.identity.tokens.user_token_repository import UserTokenRepository 

8from kwai.modules.identity.users.user_account import UserAccountEntity 

9from kwai.modules.identity.users.user_account_repository import UserAccountRepository 

10 

11 

12@dataclass(frozen=True, kw_only=True, slots=True) 

13class RevokeUserCommand: 

14 """Input for the RevokeUser use case. 

15 

16 Attributes: 

17 uuid: The UUID of the user to revoke. 

18 """ 

19 

20 uuid: str 

21 

22 

23class RevokeUser: 

24 """Use case for revoking a user.""" 

25 

26 def __init__( 

27 self, 

28 repo: UserAccountRepository, 

29 user_token_repo: UserTokenRepository, 

30 presenter: Presenter[UserAccountEntity], 

31 ): 

32 """Initialize the use case. 

33 

34 Args: 

35 repo: The user account repository to use. 

36 user_token_repo: The user token repository to use. 

37 presenter: The presenter that will be used to return the user. 

38 """ 

39 self._repo = repo 

40 self._user_token_repo = user_token_repo 

41 self._presenter = presenter 

42 

43 async def execute(self, command: RevokeUserCommand) -> None: 

44 """Execute the use case. 

45 

46 Raises: 

47 UserAccountNotFoundException: If the user does not exist. 

48 """ 

49 user_account = await self._repo.get_user_by_uuid( 

50 UniqueId.create_from_string(command.uuid) 

51 ) 

52 user_account = user_account.revoke() 

53 await self._repo.update(user_account) 

54 await self._user_token_repo.revoke(user_account) 

55 self._presenter.present(user_account)