from __future__ import annotations import secrets import time from urllib.parse import urlencode from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response, status from fastapi.responses import JSONResponse, RedirectResponse from sqlalchemy import select from sqlalchemy.orm import Session from ..auth import ( build_teamsnap_authorize_url, clear_session_cookie, create_session_token, exchange_code_for_token, fetch_teamsnap_user_id, get_current_session, refresh_access_token, require_admin, require_session, set_session_cookie, update_session_tokens, ) from ..config import settings from ..database import get_db from ..http_cache import set_no_store from ..models import UserSession from .teamsnap import build_proxy_api_root from ..schemas import ( AdminLoginRequest, SessionResponse, TeamSnapTokenResponse, WalkupSessionSelectionUpdate, ) router = APIRouter(prefix="/auth", tags=["auth"]) def normalize_return_to(return_to: str | None) -> str: if not return_to or not return_to.startswith("/") or return_to.startswith("//"): return "/" return return_to def build_signin_error_redirect_url(message: str) -> str: return f"/signin?{urlencode({'error': message})}" @router.get("/teamsnap/start") def teamsnap_start(return_to: str | None = Query(default="/")) -> Response: if not settings.teamsnap_client_id: raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="TeamSnap is not configured") state = secrets.token_urlsafe(24) response = JSONResponse({"authorize_url": build_teamsnap_authorize_url(state), "state": state}) set_no_store(response) response.set_cookie( settings.auth_return_cookie_name, normalize_return_to(return_to), httponly=True, secure=settings.session_cookie_secure, samesite="lax", max_age=60 * 10, ) return response @router.get("/teamsnap/callback") async def teamsnap_callback( request: Request, code: str | None = Query(default=None), error: str | None = Query(default=None), db: Session = Depends(get_db), ) -> Response: if error: redirect = RedirectResponse( url=build_signin_error_redirect_url(f"TeamSnap sign-in failed: {error}"), status_code=status.HTTP_303_SEE_OTHER, ) set_no_store(redirect) redirect.delete_cookie(settings.auth_return_cookie_name) return redirect if not code: redirect = RedirectResponse( url=build_signin_error_redirect_url("TeamSnap sign-in did not return an authorization code."), status_code=status.HTTP_303_SEE_OTHER, ) set_no_store(redirect) redirect.delete_cookie(settings.auth_return_cookie_name) return redirect token_payload = await exchange_code_for_token(code) session = UserSession(session_token=create_session_token(), provider="teamsnap") update_session_tokens(session, token_payload) db.add(session) db.commit() redirect_target = normalize_return_to(request.cookies.get(settings.auth_return_cookie_name)) redirect = RedirectResponse(url=redirect_target, status_code=status.HTTP_303_SEE_OTHER) set_no_store(redirect) set_session_cookie(redirect, session.session_token) redirect.delete_cookie(settings.auth_return_cookie_name) return redirect @router.get("/session", response_model=SessionResponse) def session_status( response: Response, session: UserSession | None = Depends(get_current_session), ) -> SessionResponse: set_no_store(response) if session is None: return SessionResponse(authenticated=False) return SessionResponse( authenticated=True, provider=session.provider, is_admin=session.is_admin, external_user_id=session.external_user_id, external_team_id=session.external_team_id, external_player_id=session.external_player_id, token_expires_at=session.token_expires_at, ) @router.post("/teamsnap/token", response_model=TeamSnapTokenResponse) async def teamsnap_token( request: Request, response: Response, session: UserSession = Depends(require_session), db: Session = Depends(get_db), ) -> TeamSnapTokenResponse: if session.provider != "teamsnap": raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Session is not TeamSnap-backed") if not session.access_token: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing TeamSnap access token") session_updated = False if not session.external_user_id: session.external_user_id = await fetch_teamsnap_user_id(session.access_token) session_updated = session.external_user_id is not None expires_soon = session.token_expires_at is None or session.token_expires_at.timestamp() <= (time.time() + 60) if expires_soon and session.refresh_token: token_payload = await refresh_access_token(session.refresh_token) update_session_tokens(session, token_payload) if not session.external_user_id and session.access_token: session.external_user_id = await fetch_teamsnap_user_id(session.access_token) session_updated = True if session_updated or expires_soon: db.add(session) db.commit() db.refresh(session) set_no_store(response) return TeamSnapTokenResponse( access_token=session.access_token, expires_at=session.token_expires_at, api_root=build_proxy_api_root(request), auth_url=settings.teamsnap_auth_url.removesuffix("/oauth/authorize"), ) @router.post("/session/walkup", response_model=SessionResponse) def update_walkup_session_selection( payload: WalkupSessionSelectionUpdate, response: Response, session: UserSession = Depends(require_session), db: Session = Depends(get_db), ) -> SessionResponse: if session.provider != "teamsnap": raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Session is not TeamSnap-backed") session.external_team_id = payload.external_team_id session.external_player_id = payload.external_player_id db.add(session) db.commit() db.refresh(session) set_no_store(response) return SessionResponse( authenticated=True, provider=session.provider, is_admin=session.is_admin, external_user_id=session.external_user_id, external_team_id=session.external_team_id, external_player_id=session.external_player_id, token_expires_at=session.token_expires_at, ) @router.post("/admin/login", response_model=SessionResponse) def admin_login(payload: AdminLoginRequest, response: Response, db: Session = Depends(get_db)) -> SessionResponse: if payload.username != settings.local_admin_username or payload.password != settings.local_admin_password: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") session = UserSession(session_token=create_session_token(), provider="local", is_admin=True) db.add(session) db.commit() set_session_cookie(response, session.session_token) set_no_store(response) return SessionResponse(authenticated=True, provider="local", is_admin=True) @router.post("/logout") def logout( response: Response, session: UserSession | None = Depends(get_current_session), db: Session = Depends(get_db), ) -> dict[str, bool]: if session is not None: db.delete(session) db.commit() clear_session_cookie(response) set_no_store(response) return {"ok": True} @router.get("/admin/check", response_model=SessionResponse) def admin_check(response: Response, _: UserSession = Depends(require_admin)) -> SessionResponse: set_no_store(response) return SessionResponse(authenticated=True, provider="local", is_admin=True)