Coverage for src/kwai/api/v1/auth/endpoints/sso.py: 54%
41 statements
« prev ^ index » next coverage.py v7.6.10, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2024-01-01 00:00 +0000
1"""Module that defines endpoints for SSO logins."""
3from typing import Annotated
5from fastapi import APIRouter, HTTPException, Request, status
6from fastapi.params import Depends, Header
7from fastapi.responses import RedirectResponse
8from fastapi_sso import GoogleSSO
10from kwai.api.dependencies import create_database
11from kwai.api.v1.auth.cookies import create_cookies
12from kwai.core.db.database import Database
13from kwai.core.db.uow import UnitOfWork
14from kwai.core.settings import Settings, get_settings
15from kwai.modules.identity.authenticate_user import (
16 AuthenticateUser,
17 AuthenticateUserCommand,
18)
19from kwai.modules.identity.tokens.access_token_db_repository import (
20 AccessTokenDbRepository,
21)
22from kwai.modules.identity.tokens.log_user_login_db_service import LogUserLoginDbService
23from kwai.modules.identity.tokens.refresh_token_db_repository import (
24 RefreshTokenDbRepository,
25)
26from kwai.modules.identity.users.user_account_db_repository import (
27 UserAccountDbRepository,
28)
29from kwai.modules.identity.users.user_account_repository import (
30 UserAccountNotFoundException,
31)
34router = APIRouter()
37async def get_google_sso(
38 settings: Annotated[Settings, Depends(get_settings)],
39) -> GoogleSSO:
40 """Google SSO dependency."""
41 if settings.security.google is None:
42 raise HTTPException(
43 status_code=status.HTTP_501_NOT_IMPLEMENTED,
44 detail="Google SSO is not configured",
45 )
47 return GoogleSSO(
48 settings.security.google.client_id,
49 settings.security.google.client_secret,
50 redirect_uri=f"{settings.website.url}/api/v1/auth/sso/google/callback",
51 allow_insecure_http=settings.frontend.test,
52 )
55@router.get("/google/login")
56async def google_login(
57 google_sso: Annotated[GoogleSSO, Depends(get_google_sso)],
58 return_url: str | None = None,
59):
60 """Initiate the Google login process."""
61 async with google_sso:
62 return await google_sso.get_login_redirect(state=return_url)
65@router.get("/google/callback")
66async def google_callback(
67 request: Request,
68 google_sso: Annotated[GoogleSSO, Depends(get_google_sso)],
69 db: Annotated[Database, Depends(create_database)],
70 settings: Annotated[Settings, Depends(get_settings)],
71 state: str | None = None,
72 x_forwarded_for: Annotated[str | None, Header()] = None,
73 user_agent: Annotated[str | None, Header()] = "",
74):
75 """Implement the Google login callback."""
76 async with google_sso:
77 openid = await google_sso.verify_and_process(request)
78 if not openid:
79 raise HTTPException(
80 status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication failed"
81 )
83 async with UnitOfWork(db, always_commit=True):
84 try:
85 refresh_token = await AuthenticateUser(
86 UserAccountDbRepository(db),
87 AccessTokenDbRepository(db),
88 RefreshTokenDbRepository(db),
89 LogUserLoginDbService(
90 db,
91 email=str(openid.email),
92 user_agent=user_agent,
93 client_ip=request.client.host
94 if x_forwarded_for is None
95 else x_forwarded_for,
96 open_id_sub=openid.id if openid.id else "",
97 open_id_provider=openid.provider,
98 ),
99 ).execute(AuthenticateUserCommand(username=str(openid.email)))
100 except UserAccountNotFoundException as exc:
101 raise HTTPException(
102 status_code=status.HTTP_401_UNAUTHORIZED, detail="Unknown user account"
103 ) from exc
105 if state is not None:
106 response = RedirectResponse(state)
107 else:
108 response = RedirectResponse(settings.website.url)
110 create_cookies(response, refresh_token, settings)
112 return response