Files
walkup/backend/app/routes/games.py
2026-04-22 16:10:40 -05:00

218 lines
8.9 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import select, update
from sqlalchemy.orm import Session
from ..auth import require_session
from ..database import get_db
from ..models import AudioClip, GameAssignment, PlaybackSession, UserSession
from ..schemas import (
GameAssignmentCreate,
GameAssignmentResponse,
GamePrepResponse,
PlaybackAction,
PlaybackSessionCreate,
PlaybackSessionResponse,
)
router = APIRouter(prefix="/games", tags=["games"])
def assignment_to_response(assignment: GameAssignment) -> GameAssignmentResponse:
normalized_url = f"/media/files/{assignment.clip.normalized_path}" if assignment.clip.normalized_path else None
return GameAssignmentResponse(
id=assignment.id,
external_team_id=assignment.external_team_id,
external_game_id=assignment.external_game_id,
external_player_id=assignment.external_player_id,
clip_id=assignment.clip_id,
clip_label=assignment.clip.label,
asset_title=assignment.clip.asset.title,
start_ms=assignment.clip.start_ms,
end_ms=assignment.clip.end_ms,
batting_slot=assignment.batting_slot,
status=assignment.status,
normalized_url=normalized_url,
updated_at=assignment.updated_at,
)
@router.get("/pins", response_model=list[GameAssignmentResponse])
def list_pins(
external_player_id: str | None = Query(default=None),
session: UserSession = Depends(require_session),
db: Session = Depends(get_db),
) -> list[GameAssignmentResponse]:
player_id = external_player_id or session.external_player_id
if not player_id or not session.external_team_id:
raise HTTPException(status_code=422, detail="Provide a player to list pins")
query = select(GameAssignment).join(GameAssignment.clip).where(
GameAssignment.external_team_id == session.external_team_id,
GameAssignment.external_player_id == player_id,
)
pins = db.scalars(query.order_by(GameAssignment.external_game_id.asc(), AudioClip.sort_order.asc())).all()
return [assignment_to_response(assignment) for assignment in pins]
@router.get("/{external_game_id}/assignments", response_model=list[GameAssignmentResponse])
def list_assignments(
external_game_id: str,
external_player_id: str | None = Query(default=None),
_: UserSession = Depends(require_session),
db: Session = Depends(get_db),
) -> list[GameAssignmentResponse]:
query = select(GameAssignment).join(GameAssignment.clip).where(
GameAssignment.external_game_id == external_game_id,
AudioClip.hidden.is_(False),
)
if external_player_id:
query = query.where(GameAssignment.external_player_id == external_player_id)
assignments = db.scalars(query.order_by(AudioClip.sort_order.asc(), GameAssignment.updated_at.desc())).all()
return [assignment_to_response(assignment) for assignment in assignments]
@router.post("/{external_game_id}/assignments", response_model=GameAssignmentResponse)
def create_assignment(
external_game_id: str,
payload: GameAssignmentCreate,
_: UserSession = Depends(require_session),
db: Session = Depends(get_db),
) -> GameAssignmentResponse:
clip = db.get(AudioClip, payload.clip_id)
if clip is None or clip.normalization_status != "ready":
raise HTTPException(status_code=422, detail="Clip is not ready")
if clip.hidden:
raise HTTPException(status_code=404, detail="Clip not found")
if clip.asset.external_team_id != payload.external_team_id:
raise HTTPException(status_code=422, detail="Clip does not belong to this team")
if clip.asset.owner_external_player_id != payload.external_player_id:
raise HTTPException(status_code=403, detail="You can only pin clips owned by that player")
assignment = db.scalar(
select(GameAssignment).where(
GameAssignment.external_game_id == external_game_id,
GameAssignment.clip_id == payload.clip_id,
)
)
if assignment is None:
assignment = GameAssignment(
external_team_id=payload.external_team_id,
external_game_id=external_game_id,
external_player_id=payload.external_player_id,
clip_id=payload.clip_id,
batting_slot=payload.batting_slot,
status=payload.status,
)
db.add(assignment)
else:
assignment.external_team_id = payload.external_team_id
assignment.external_player_id = payload.external_player_id
assignment.clip_id = payload.clip_id
assignment.batting_slot = payload.batting_slot
assignment.status = payload.status
db.commit()
db.refresh(assignment)
return assignment_to_response(assignment)
@router.delete("/{external_game_id}/assignments/{assignment_id}", status_code=204)
def delete_assignment(
external_game_id: str,
assignment_id: int,
external_player_id: str | None = Query(default=None),
_: UserSession = Depends(require_session),
db: Session = Depends(get_db),
) -> None:
assignment = db.get(GameAssignment, assignment_id)
if assignment is None or assignment.external_game_id != external_game_id:
raise HTTPException(status_code=404, detail="Pin not found")
if external_player_id is not None and assignment.external_player_id != external_player_id:
raise HTTPException(status_code=403, detail="Pin does not belong to that player")
db.execute(update(PlaybackSession).where(PlaybackSession.current_assignment_id == assignment.id).values(current_assignment_id=None))
db.delete(assignment)
db.commit()
@router.get("/{external_game_id}/prep", response_model=GamePrepResponse)
def prepare_game(
external_game_id: str,
_: UserSession = Depends(require_session),
db: Session = Depends(get_db),
) -> GamePrepResponse:
assignments = db.scalars(
select(GameAssignment)
.join(GameAssignment.clip)
.where(GameAssignment.external_game_id == external_game_id, AudioClip.hidden.is_(False))
.order_by(AudioClip.sort_order.asc(), GameAssignment.updated_at.desc())
).all()
external_team_id = assignments[0].external_team_id if assignments else ""
return GamePrepResponse(
external_game_id=external_game_id,
external_team_id=external_team_id,
prepared_at=datetime.now(timezone.utc),
assignments=[assignment_to_response(assignment) for assignment in assignments],
)
@router.post("/{external_game_id}/gameday/session", response_model=PlaybackSessionResponse)
def create_gameday_session(
external_game_id: str,
payload: PlaybackSessionCreate,
session: UserSession = Depends(require_session),
db: Session = Depends(get_db),
) -> PlaybackSessionResponse:
playback = PlaybackSession(
external_team_id=payload.external_team_id,
external_game_id=external_game_id,
gameday_session_id=session.id,
state="idle",
)
db.add(playback)
db.commit()
db.refresh(playback)
return PlaybackSessionResponse.model_validate(playback, from_attributes=True)
@router.post("/{external_game_id}/gameday/session/{playback_session_id}/trigger", response_model=PlaybackSessionResponse)
def trigger_gameday(
external_game_id: str,
playback_session_id: int,
payload: PlaybackAction,
_: UserSession = Depends(require_session),
db: Session = Depends(get_db),
) -> PlaybackSessionResponse:
playback = db.get(PlaybackSession, playback_session_id)
if playback is None or playback.external_game_id != external_game_id:
raise HTTPException(status_code=404, detail="Playback session not found")
if payload.assignment_id is None and payload.clip_id is None:
raise HTTPException(status_code=422, detail="Provide a pin or clip to trigger")
if payload.assignment_id is not None:
assignment = db.get(GameAssignment, payload.assignment_id)
if assignment is None or assignment.external_game_id != external_game_id:
raise HTTPException(status_code=404, detail="Pin not found")
if assignment.clip.hidden:
raise HTTPException(status_code=404, detail="Pin not found")
playback.current_assignment_id = assignment.id
else:
clip = db.get(AudioClip, payload.clip_id)
if clip is None or clip.asset.external_team_id != playback.external_team_id:
raise HTTPException(status_code=404, detail="Clip not found")
if clip.hidden:
raise HTTPException(status_code=404, detail="Clip not found")
if payload.external_player_id and clip.asset.owner_external_player_id != payload.external_player_id:
raise HTTPException(status_code=403, detail="Clip does not belong to that player")
playback.current_assignment_id = None
playback.state = payload.state
playback.last_triggered_at = datetime.now(timezone.utc)
db.commit()
db.refresh(playback)
return PlaybackSessionResponse.model_validate(playback, from_attributes=True)