Files
walkup/backend/app/routes/auth.py
2026-04-23 13:55:15 -05:00

192 lines
6.8 KiB
Python

from __future__ import annotations
import secrets
import time
from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response, status
from fastapi.responses import JSONResponse, RedirectResponse
from sqlalchemy import select
from sqlalchemy.orm import Session
from ..auth import (
build_teamsnap_authorize_url,
clear_session_cookie,
create_session_token,
exchange_code_for_token,
fetch_teamsnap_user_id,
get_current_session,
refresh_access_token,
require_admin,
require_session,
set_session_cookie,
update_session_tokens,
)
from ..config import settings
from ..database import get_db
from ..http_cache import set_no_store
from ..models import UserSession
from .teamsnap import build_proxy_api_root
from ..schemas import (
AdminLoginRequest,
SessionResponse,
TeamSnapTokenResponse,
WalkupSessionSelectionUpdate,
)
router = APIRouter(prefix="/auth", tags=["auth"])
def normalize_return_to(return_to: str | None) -> str:
if not return_to or not return_to.startswith("/") or return_to.startswith("//"):
return "/"
return return_to
@router.get("/teamsnap/start")
def teamsnap_start(return_to: str | None = Query(default="/")) -> Response:
if not settings.teamsnap_client_id:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="TeamSnap is not configured")
state = secrets.token_urlsafe(24)
response = JSONResponse({"authorize_url": build_teamsnap_authorize_url(state), "state": state})
set_no_store(response)
response.set_cookie(
settings.auth_return_cookie_name,
normalize_return_to(return_to),
httponly=True,
secure=settings.session_cookie_secure,
samesite="lax",
max_age=60 * 10,
)
return response
@router.get("/teamsnap/callback")
async def teamsnap_callback(
request: Request,
code: str = Query(...),
db: Session = Depends(get_db),
) -> Response:
token_payload = await exchange_code_for_token(code)
session = UserSession(session_token=create_session_token(), provider="teamsnap")
update_session_tokens(session, token_payload)
if session.access_token:
session.external_user_id = await fetch_teamsnap_user_id(session.access_token)
db.add(session)
db.commit()
redirect_target = normalize_return_to(request.cookies.get(settings.auth_return_cookie_name))
redirect = RedirectResponse(url=redirect_target, status_code=status.HTTP_303_SEE_OTHER)
set_no_store(redirect)
set_session_cookie(redirect, session.session_token)
redirect.delete_cookie(settings.auth_return_cookie_name)
return redirect
@router.get("/session", response_model=SessionResponse)
def session_status(
response: Response,
session: UserSession | None = Depends(get_current_session),
) -> SessionResponse:
set_no_store(response)
if session is None:
return SessionResponse(authenticated=False)
return SessionResponse(
authenticated=True,
provider=session.provider,
is_admin=session.is_admin,
external_user_id=session.external_user_id,
external_team_id=session.external_team_id,
external_player_id=session.external_player_id,
token_expires_at=session.token_expires_at,
)
@router.post("/teamsnap/token", response_model=TeamSnapTokenResponse)
async def teamsnap_token(
request: Request,
response: Response,
session: UserSession = Depends(require_session),
db: Session = Depends(get_db),
) -> TeamSnapTokenResponse:
if session.provider != "teamsnap":
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Session is not TeamSnap-backed")
if not session.access_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing TeamSnap access token")
expires_soon = session.token_expires_at is None or session.token_expires_at.timestamp() <= (time.time() + 60)
if expires_soon and session.refresh_token:
token_payload = await refresh_access_token(session.refresh_token)
update_session_tokens(session, token_payload)
if not session.external_user_id and session.access_token:
session.external_user_id = await fetch_teamsnap_user_id(session.access_token)
db.add(session)
db.commit()
db.refresh(session)
set_no_store(response)
return TeamSnapTokenResponse(
access_token=session.access_token,
expires_at=session.token_expires_at,
api_root=build_proxy_api_root(request),
auth_url=settings.teamsnap_auth_url.removesuffix("/oauth/authorize"),
)
@router.post("/session/walkup", response_model=SessionResponse)
def update_walkup_session_selection(
payload: WalkupSessionSelectionUpdate,
response: Response,
session: UserSession = Depends(require_session),
db: Session = Depends(get_db),
) -> SessionResponse:
if session.provider != "teamsnap":
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Session is not TeamSnap-backed")
session.external_team_id = payload.external_team_id
session.external_player_id = payload.external_player_id
db.add(session)
db.commit()
db.refresh(session)
set_no_store(response)
return SessionResponse(
authenticated=True,
provider=session.provider,
is_admin=session.is_admin,
external_user_id=session.external_user_id,
external_team_id=session.external_team_id,
external_player_id=session.external_player_id,
token_expires_at=session.token_expires_at,
)
@router.post("/admin/login", response_model=SessionResponse)
def admin_login(payload: AdminLoginRequest, response: Response, db: Session = Depends(get_db)) -> SessionResponse:
if payload.username != settings.local_admin_username or payload.password != settings.local_admin_password:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
session = UserSession(session_token=create_session_token(), provider="local", is_admin=True)
db.add(session)
db.commit()
set_session_cookie(response, session.session_token)
set_no_store(response)
return SessionResponse(authenticated=True, provider="local", is_admin=True)
@router.post("/logout")
def logout(
response: Response,
session: UserSession | None = Depends(get_current_session),
db: Session = Depends(get_db),
) -> dict[str, bool]:
if session is not None:
db.delete(session)
db.commit()
clear_session_cookie(response)
set_no_store(response)
return {"ok": True}
@router.get("/admin/check", response_model=SessionResponse)
def admin_check(response: Response, _: UserSession = Depends(require_admin)) -> SessionResponse:
set_no_store(response)
return SessionResponse(authenticated=True, provider="local", is_admin=True)