Coverage for src/kwai/api/v1/auth/endpoints/sso.py: 54%

41 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-01-01 00:00 +0000

1"""Module that defines endpoints for SSO logins.""" 

2 

3from typing import Annotated 

4 

5from fastapi import APIRouter, HTTPException, Request, status 

6from fastapi.params import Depends, Header 

7from fastapi.responses import RedirectResponse 

8from fastapi_sso import GoogleSSO 

9 

10from kwai.api.dependencies import create_database 

11from kwai.api.v1.auth.cookies import create_cookies 

12from kwai.core.db.database import Database 

13from kwai.core.db.uow import UnitOfWork 

14from kwai.core.settings import Settings, get_settings 

15from kwai.modules.identity.authenticate_user import ( 

16 AuthenticateUser, 

17 AuthenticateUserCommand, 

18) 

19from kwai.modules.identity.tokens.access_token_db_repository import ( 

20 AccessTokenDbRepository, 

21) 

22from kwai.modules.identity.tokens.log_user_login_db_service import LogUserLoginDbService 

23from kwai.modules.identity.tokens.refresh_token_db_repository import ( 

24 RefreshTokenDbRepository, 

25) 

26from kwai.modules.identity.users.user_account_db_repository import ( 

27 UserAccountDbRepository, 

28) 

29from kwai.modules.identity.users.user_account_repository import ( 

30 UserAccountNotFoundException, 

31) 

32 

33 

34router = APIRouter() 

35 

36 

37async def get_google_sso( 

38 settings: Annotated[Settings, Depends(get_settings)], 

39) -> GoogleSSO: 

40 """Google SSO dependency.""" 

41 if settings.security.google is None: 

42 raise HTTPException( 

43 status_code=status.HTTP_501_NOT_IMPLEMENTED, 

44 detail="Google SSO is not configured", 

45 ) 

46 

47 return GoogleSSO( 

48 settings.security.google.client_id, 

49 settings.security.google.client_secret, 

50 redirect_uri=f"{settings.website.url}/api/v1/auth/sso/google/callback", 

51 allow_insecure_http=settings.frontend.test, 

52 ) 

53 

54 

55@router.get("/google/login") 

56async def google_login( 

57 google_sso: Annotated[GoogleSSO, Depends(get_google_sso)], 

58 return_url: str | None = None, 

59): 

60 """Initiate the Google login process.""" 

61 async with google_sso: 

62 return await google_sso.get_login_redirect(state=return_url) 

63 

64 

65@router.get("/google/callback") 

66async def google_callback( 

67 request: Request, 

68 google_sso: Annotated[GoogleSSO, Depends(get_google_sso)], 

69 db: Annotated[Database, Depends(create_database)], 

70 settings: Annotated[Settings, Depends(get_settings)], 

71 state: str | None = None, 

72 x_forwarded_for: Annotated[str | None, Header()] = None, 

73 user_agent: Annotated[str | None, Header()] = "", 

74): 

75 """Implement the Google login callback.""" 

76 async with google_sso: 

77 openid = await google_sso.verify_and_process(request) 

78 if not openid: 

79 raise HTTPException( 

80 status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication failed" 

81 ) 

82 

83 async with UnitOfWork(db, always_commit=True): 

84 try: 

85 refresh_token = await AuthenticateUser( 

86 UserAccountDbRepository(db), 

87 AccessTokenDbRepository(db), 

88 RefreshTokenDbRepository(db), 

89 LogUserLoginDbService( 

90 db, 

91 email=str(openid.email), 

92 user_agent=user_agent, 

93 client_ip=request.client.host 

94 if x_forwarded_for is None 

95 else x_forwarded_for, 

96 open_id_sub=openid.id if openid.id else "", 

97 open_id_provider=openid.provider, 

98 ), 

99 ).execute(AuthenticateUserCommand(username=str(openid.email))) 

100 except UserAccountNotFoundException as exc: 

101 raise HTTPException( 

102 status_code=status.HTTP_401_UNAUTHORIZED, detail="Unknown user account" 

103 ) from exc 

104 

105 if state is not None: 

106 response = RedirectResponse(state) 

107 else: 

108 response = RedirectResponse(settings.website.url) 

109 

110 create_cookies(response, refresh_token, settings) 

111 

112 return response