Files
walkup/backend/tests/test_api.py
2026-04-22 06:46:23 -05:00

428 lines
14 KiB
Python

from __future__ import annotations
from io import BytesIO
from math import sin, tau
from wave import open as open_wave
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
from app.config import settings
from app.database import Base, SessionLocal, engine
from app.main import app
from app.models import AudioAsset, AudioClip, UserSession
from app.routes.teamsnap import rewrite_teamsnap_urls
@pytest.fixture(autouse=True)
def override_media_root(tmp_path: Path) -> None:
settings.media_root = tmp_path
@pytest.fixture(autouse=True)
def override_cookie_security() -> None:
settings.session_cookie_secure = False
@pytest.fixture(autouse=True)
def reset_database() -> None:
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
@pytest.fixture(autouse=True)
def reset_client_cookies() -> None:
client.cookies.clear()
client = TestClient(app)
def make_test_wav_bytes(*, duration_seconds: float = 0.25, sample_rate: int = 8000, frequency: float = 440.0) -> bytes:
frame_count = max(1, int(duration_seconds * sample_rate))
buffer = BytesIO()
with open_wave(buffer, "wb") as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(sample_rate)
frames = bytearray()
for index in range(frame_count):
sample = int(0.7 * 32767 * sin(tau * frequency * index / sample_rate))
frames.extend(sample.to_bytes(2, byteorder="little", signed=True))
wav_file.writeframes(bytes(frames))
return buffer.getvalue()
def test_healthcheck() -> None:
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
def test_admin_login_and_session() -> None:
response = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert response.status_code == 200
session_response = client.get("/auth/session")
assert session_response.status_code == 200
assert session_response.json()["authenticated"] is True
def test_rewrite_teamsnap_urls_uses_same_origin_proxy() -> None:
proxy_root = "https://kif.local.ascorrea.com/api/teamsnap"
payload = {
"collection": {
"href": "https://apiv3.teamsnap.com",
"links": [
{"rel": "self", "href": "https://apiv3.teamsnap.com/teams/1"},
{"rel": "avatar", "href": "https://example.com/avatar.png"},
],
}
}
rewritten = rewrite_teamsnap_urls(payload, "https://apiv3.teamsnap.com", proxy_root)
assert rewritten == {
"collection": {
"href": "https://kif.local.ascorrea.com/api/teamsnap",
"links": [
{"rel": "self", "href": "https://kif.local.ascorrea.com/api/teamsnap/teams/1"},
{"rel": "avatar", "href": "https://example.com/avatar.png"},
],
}
}
def test_teamsnap_token_returns_proxy_api_root() -> None:
db = SessionLocal()
session = UserSession(session_token="teamsnap-session", provider="teamsnap", access_token="token-value")
db.add(session)
db.commit()
db.close()
client.cookies.set(settings.session_cookie_name, "teamsnap-session")
response = client.post(
"/auth/teamsnap/token",
headers={"host": "kif.local.ascorrea.com", "x-forwarded-proto": "https"},
)
assert response.status_code == 200
assert response.json()["api_root"] == "https://kif.local.ascorrea.com/api/teamsnap"
def test_walkup_session_selection_is_persisted_in_session() -> None:
db = SessionLocal()
session = UserSession(
session_token="teamsnap-session",
provider="teamsnap",
external_user_id="user-42",
external_team_id="team-101",
external_player_id="player-1001",
)
db.add(session)
db.commit()
db.close()
client.cookies.set(settings.session_cookie_name, "teamsnap-session")
response = client.post(
"/auth/session/walkup",
json={"external_team_id": "team-101", "external_player_id": "player-1002"},
)
assert response.status_code == 200
assert response.json()["external_user_id"] == "user-42"
assert response.json()["external_team_id"] == "team-101"
assert response.json()["external_player_id"] == "player-1002"
def test_player_can_attach_multiple_clips_to_same_game() -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
db = SessionLocal()
asset = AudioAsset(
external_team_id="team-1",
owner_external_player_id="player-1",
title="Song",
original_filename="song.mp3",
mime_type="audio/mpeg",
size_bytes=123,
storage_path="uploads/song.mp3",
)
db.add(asset)
db.flush()
first_clip = AudioClip(
asset_id=asset.id,
label="Intro",
start_ms=0,
end_ms=10000,
normalization_status="ready",
normalized_path="clips/intro.mp3",
)
second_clip = AudioClip(
asset_id=asset.id,
label="Chorus",
start_ms=12000,
end_ms=22000,
normalization_status="ready",
normalized_path="clips/chorus.mp3",
)
db.add_all([first_clip, second_clip])
db.commit()
db.refresh(first_clip)
db.refresh(second_clip)
db.close()
first_response = client.post(
"/games/game-1/assignments",
json={
"external_team_id": "team-1",
"external_player_id": "player-1",
"clip_id": first_clip.id,
"batting_slot": 1,
"status": "ready",
},
)
second_response = client.post(
"/games/game-1/assignments",
json={
"external_team_id": "team-1",
"external_player_id": "player-1",
"clip_id": second_clip.id,
"batting_slot": 1,
"status": "ready",
},
)
assert first_response.status_code == 200
assert second_response.status_code == 200
assert first_response.json()["start_ms"] == 0
assert first_response.json()["end_ms"] == 10000
assert second_response.json()["start_ms"] == 12000
assert second_response.json()["end_ms"] == 22000
assignments = client.get("/games/game-1/assignments")
assert assignments.status_code == 200
assignment_ids = [item["clip_id"] for item in assignments.json()]
assert assignment_ids == [second_clip.id, first_clip.id]
def test_upload_creates_default_clip_and_clip_ranges_can_be_updated() -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
upload = client.post(
"/media/uploads",
data={
"external_team_id": "team-2",
"owner_external_player_id": "player-2",
"title": "Fresh track",
},
files={"file": ("fresh-track.wav", BytesIO(make_test_wav_bytes()), "audio/wav")},
)
assert upload.status_code == 200
asset_id = upload.json()["id"]
clips = client.get("/media/clips", params={"external_team_id": "team-2", "owner_external_player_id": "player-2"})
assert clips.status_code == 200
assert len(clips.json()) == 1
clip = clips.json()[0]
assert clip["asset_id"] == asset_id
assert clip["label"] == "Fresh track"
assert clip["start_ms"] == 0
assert clip["end_ms"] == 30000
assert clip["normalization_status"] == "ready"
assert clip["waveform_duration_ms"] is not None
assert len(clip["waveform_peaks"]) > 0
update = client.patch(
f"/media/clips/{clip['id']}",
json={"start_ms": 2500, "end_ms": 8750},
)
assert update.status_code == 200
updated_clip = update.json()
assert updated_clip["start_ms"] == 2500
assert updated_clip["end_ms"] == 8750
assert updated_clip["label"] == "Fresh track"
def test_clip_updates_can_use_player_scoped_authorization() -> None:
uploader_session = UserSession(session_token="uploader-session", provider="teamsnap")
editor_session = UserSession(session_token="editor-session", provider="teamsnap")
db = SessionLocal()
db.add_all([uploader_session, editor_session])
db.flush()
asset = AudioAsset(
external_team_id="team-3",
owner_external_player_id="player-3",
uploaded_by_session_id=uploader_session.id,
title="Player track",
original_filename="player-track.mp3",
mime_type="audio/mpeg",
size_bytes=123,
storage_path="uploads/player-track.mp3",
)
db.add(asset)
db.flush()
clip = AudioClip(
asset_id=asset.id,
label="Player clip",
start_ms=0,
end_ms=30000,
normalization_status="ready",
normalized_path="clips/player-clip.mp3",
)
db.add(clip)
db.commit()
db.refresh(clip)
db.close()
client.cookies.set(settings.session_cookie_name, "editor-session")
update = client.patch(
f"/media/clips/{clip.id}",
params={"owner_external_player_id": "player-3"},
json={"start_ms": 1500, "end_ms": 9000},
)
assert update.status_code == 200
updated_clip = update.json()
assert updated_clip["start_ms"] == 1500
assert updated_clip["end_ms"] == 9000
assert updated_clip["label"] == "Player clip"
def test_create_clip_uses_team_and_player_scope() -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
upload = client.post(
"/media/uploads",
data={
"external_team_id": "team-6",
"owner_external_player_id": "player-6",
"title": "Clip source",
},
files={"file": ("clip-source.wav", BytesIO(make_test_wav_bytes()), "audio/wav")},
)
assert upload.status_code == 200
asset_id = upload.json()["id"]
response = client.post(
"/media/clips",
json={
"asset_id": asset_id,
"external_team_id": "team-6",
"owner_external_player_id": "player-6",
"label": "New clip",
"start_ms": 1000,
"end_ms": 6000,
},
)
assert response.status_code == 200
clip = response.json()
assert clip["asset_id"] == asset_id
assert clip["label"] == "New clip"
assert clip["start_ms"] == 1000
assert clip["end_ms"] == 6000
def test_asset_title_can_be_edited() -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
upload = client.post(
"/media/uploads",
data={
"external_team_id": "team-5",
"owner_external_player_id": "player-5",
"title": "Old title",
},
files={"file": ("old-title.wav", BytesIO(make_test_wav_bytes()), "audio/wav")},
)
assert upload.status_code == 200
asset_id = upload.json()["id"]
update = client.patch(
f"/media/assets/{asset_id}",
json={"title": "New title"},
)
assert update.status_code == 200
assert update.json()["title"] == "New title"
clips = client.get("/media/clips", params={"external_team_id": "team-5", "owner_external_player_id": "player-5"})
assert clips.status_code == 200
assert clips.json()[0]["asset_title"] == "New title"
def test_import_url_creates_default_clip(monkeypatch: pytest.MonkeyPatch) -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
source_path = settings.media_root / "uploads" / "imported-source.wav"
source_path.parent.mkdir(parents=True, exist_ok=True)
source_path.write_bytes(make_test_wav_bytes())
from app.routes import media as media_routes
def fake_download_media_to_storage(url: str) -> tuple[str, int, str, str]:
return ("uploads/imported-source.wav", source_path.stat().st_size, "imported-source.wav", "Imported Source")
monkeypatch.setattr(media_routes, "download_media_to_storage", fake_download_media_to_storage)
response = client.post(
"/media/imports",
json={
"external_team_id": "team-3",
"owner_external_player_id": "player-3",
"url": "https://example.com/media",
},
)
assert response.status_code == 200
assert response.json()["title"] == "Imported Source"
clips = client.get("/media/clips", params={"external_team_id": "team-3", "owner_external_player_id": "player-3"})
assert clips.status_code == 200
assert len(clips.json()) == 1
clip = clips.json()[0]
assert clip["label"] == "Imported Source"
assert clip["start_ms"] == 0
assert clip["end_ms"] == 30000
assert clip["waveform_duration_ms"] is not None
assert len(clip["waveform_peaks"]) > 0
def test_import_url_surfaces_download_error(monkeypatch: pytest.MonkeyPatch) -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
from app.routes import media as media_routes
def fake_download_media_to_storage(url: str) -> tuple[str, int, str, str]:
raise media_routes.HTTPException(
status_code=422,
detail="Could not download media from that URL: HTTP Error 403: Forbidden",
)
monkeypatch.setattr(media_routes, "download_media_to_storage", fake_download_media_to_storage)
response = client.post(
"/media/imports",
json={
"external_team_id": "team-4",
"owner_external_player_id": "player-4",
"url": "https://example.com/private",
},
)
assert response.status_code == 422
assert "HTTP Error 403: Forbidden" in response.text