from __future__ import annotations from io import BytesIO from math import sin, tau from wave import open as open_wave from pathlib import Path import pytest from fastapi.testclient import TestClient from app.config import settings from app.database import Base, SessionLocal, engine from app.main import app from app.models import AudioAsset, AudioClip, UserSession from app.routes.teamsnap import rewrite_teamsnap_urls @pytest.fixture(autouse=True) def override_media_root(tmp_path: Path) -> None: settings.media_root = tmp_path @pytest.fixture(autouse=True) def override_cookie_security() -> None: settings.session_cookie_secure = False @pytest.fixture(autouse=True) def reset_database() -> None: Base.metadata.drop_all(bind=engine) Base.metadata.create_all(bind=engine) @pytest.fixture(autouse=True) def reset_client_cookies() -> None: client.cookies.clear() client = TestClient(app) def make_test_wav_bytes(*, duration_seconds: float = 0.25, sample_rate: int = 8000, frequency: float = 440.0) -> bytes: frame_count = max(1, int(duration_seconds * sample_rate)) buffer = BytesIO() with open_wave(buffer, "wb") as wav_file: wav_file.setnchannels(1) wav_file.setsampwidth(2) wav_file.setframerate(sample_rate) frames = bytearray() for index in range(frame_count): sample = int(0.7 * 32767 * sin(tau * frequency * index / sample_rate)) frames.extend(sample.to_bytes(2, byteorder="little", signed=True)) wav_file.writeframes(bytes(frames)) return buffer.getvalue() def test_healthcheck() -> None: response = client.get("/health") assert response.status_code == 200 assert response.json() == {"status": "ok"} def test_admin_login_and_session() -> None: response = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"}) assert response.status_code == 200 session_response = client.get("/auth/session") assert session_response.status_code == 200 assert session_response.json()["authenticated"] is True def test_rewrite_teamsnap_urls_uses_same_origin_proxy() -> None: proxy_root = "https://kif.local.ascorrea.com/api/teamsnap" payload = { "collection": { "href": "https://apiv3.teamsnap.com", "links": [ {"rel": "self", "href": "https://apiv3.teamsnap.com/teams/1"}, {"rel": "avatar", "href": "https://example.com/avatar.png"}, ], } } rewritten = rewrite_teamsnap_urls(payload, "https://apiv3.teamsnap.com", proxy_root) assert rewritten == { "collection": { "href": "https://kif.local.ascorrea.com/api/teamsnap", "links": [ {"rel": "self", "href": "https://kif.local.ascorrea.com/api/teamsnap/teams/1"}, {"rel": "avatar", "href": "https://example.com/avatar.png"}, ], } } def test_teamsnap_token_returns_proxy_api_root() -> None: db = SessionLocal() session = UserSession(session_token="teamsnap-session", provider="teamsnap", access_token="token-value") db.add(session) db.commit() db.close() client.cookies.set(settings.session_cookie_name, "teamsnap-session") response = client.post( "/auth/teamsnap/token", headers={"host": "kif.local.ascorrea.com", "x-forwarded-proto": "https"}, ) assert response.status_code == 200 assert response.json()["api_root"] == "https://kif.local.ascorrea.com/api/teamsnap" def test_walkup_session_selection_is_persisted_in_session() -> None: db = SessionLocal() session = UserSession( session_token="teamsnap-session", provider="teamsnap", external_user_id="user-42", external_team_id="team-101", external_player_id="player-1001", ) db.add(session) db.commit() db.close() client.cookies.set(settings.session_cookie_name, "teamsnap-session") response = client.post( "/auth/session/walkup", json={"external_team_id": "team-101", "external_player_id": "player-1002"}, ) assert response.status_code == 200 assert response.json()["external_user_id"] == "user-42" assert response.json()["external_team_id"] == "team-101" assert response.json()["external_player_id"] == "player-1002" def test_player_can_pin_a_clip_to_multiple_games_independently() -> None: login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"}) assert login.status_code == 200 db = SessionLocal() asset = AudioAsset( external_team_id="team-1", owner_external_player_id="player-1", title="Song", original_filename="song.mp3", mime_type="audio/mpeg", size_bytes=123, storage_path="uploads/song.mp3", ) db.add(asset) db.flush() first_clip = AudioClip( asset_id=asset.id, label="Intro", start_ms=0, end_ms=10000, normalization_status="ready", normalized_path="clips/intro.mp3", ) db.add(first_clip) db.commit() db.refresh(first_clip) db.close() first_response = client.post( "/games/game-1/assignments", json={ "external_team_id": "team-1", "external_player_id": "player-1", "clip_id": first_clip.id, "batting_slot": 1, "status": "ready", }, ) second_game_response = client.post( "/games/game-2/assignments", json={ "external_team_id": "team-1", "external_player_id": "player-1", "clip_id": first_clip.id, "batting_slot": 1, "status": "ready", }, ) assert first_response.status_code == 200 assert second_game_response.status_code == 200 assert first_response.json()["start_ms"] == 0 assert first_response.json()["end_ms"] == 10000 game_one_assignments = client.get("/games/game-1/assignments") game_two_assignments = client.get("/games/game-2/assignments") assert game_one_assignments.status_code == 200 assert game_two_assignments.status_code == 200 assert [item["clip_id"] for item in game_one_assignments.json()] == [first_clip.id] assert [item["clip_id"] for item in game_two_assignments.json()] == [first_clip.id] db = SessionLocal() session = db.query(UserSession).filter_by(session_token="admin-session").one_or_none() if session is None: session = UserSession( session_token="admin-session", provider="teamsnap", external_team_id="team-1", external_player_id="player-1", ) db.add(session) db.commit() else: session.external_team_id = "team-1" session.external_player_id = "player-1" db.commit() db.close() client.cookies.set(settings.session_cookie_name, "admin-session") pins_before_delete = client.get("/games/pins", params={"external_player_id": "player-1"}) assert pins_before_delete.status_code == 200 assert [item["clip_id"] for item in pins_before_delete.json()] == [first_clip.id, first_clip.id] delete_response = client.delete( f"/games/game-1/assignments/{first_response.json()['id']}", params={"external_player_id": "player-1"}, ) assert delete_response.status_code == 204 game_one_after_delete = client.get("/games/game-1/assignments") game_two_after_delete = client.get("/games/game-2/assignments") assert game_one_after_delete.status_code == 200 assert game_two_after_delete.status_code == 200 assert game_one_after_delete.json() == [] assert [item["clip_id"] for item in game_two_after_delete.json()] == [first_clip.id] client.cookies.set(settings.session_cookie_name, "admin-session") pins = client.get("/games/pins", params={"external_player_id": "player-1"}) assert pins.status_code == 200 assert [item["clip_id"] for item in pins.json()] == [first_clip.id] def test_upload_creates_default_clip_and_clip_ranges_can_be_updated() -> None: login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"}) assert login.status_code == 200 upload = client.post( "/media/uploads", data={ "external_team_id": "team-2", "owner_external_player_id": "player-2", "title": "Fresh track", }, files={"file": ("fresh-track.wav", BytesIO(make_test_wav_bytes()), "audio/wav")}, ) assert upload.status_code == 200 asset_id = upload.json()["id"] clips = client.get("/media/clips", params={"external_team_id": "team-2", "owner_external_player_id": "player-2"}) assert clips.status_code == 200 assert len(clips.json()) == 1 clip = clips.json()[0] assert clip["asset_id"] == asset_id assert clip["label"] == "Fresh track" assert clip["start_ms"] == 0 assert clip["end_ms"] == 30000 assert clip["normalization_status"] == "ready" assert clip["waveform_duration_ms"] is not None assert len(clip["waveform_peaks"]) > 0 update = client.patch( f"/media/clips/{clip['id']}", json={"start_ms": 2500, "end_ms": 8750}, ) assert update.status_code == 200 updated_clip = update.json() assert updated_clip["start_ms"] == 2500 assert updated_clip["end_ms"] == 8750 assert updated_clip["label"] == "Fresh track" def test_player_can_reorder_clips_in_their_library() -> None: db = SessionLocal() session = UserSession( session_token="player-session", provider="teamsnap", external_team_id="team-9", external_player_id="player-9", ) db.add(session) db.commit() db.close() client.cookies.set(settings.session_cookie_name, "player-session") asset = AudioAsset( external_team_id="team-9", owner_external_player_id="player-9", title="Song", original_filename="song.mp3", mime_type="audio/mpeg", size_bytes=123, storage_path="uploads/song.mp3", ) db = SessionLocal() db.add(asset) db.flush() first_clip = AudioClip( asset_id=asset.id, label="Intro", start_ms=0, end_ms=10000, sort_order=0, normalization_status="ready", normalized_path="clips/intro.mp3", ) second_clip = AudioClip( asset_id=asset.id, label="Chorus", start_ms=12000, end_ms=22000, sort_order=1, normalization_status="ready", normalized_path="clips/chorus.mp3", ) db.add_all([first_clip, second_clip]) db.commit() db.refresh(first_clip) db.refresh(second_clip) db.close() reorder = client.post( "/media/clips/reorder", json={ "external_team_id": "team-9", "owner_external_player_id": "player-9", "clip_ids": [second_clip.id, first_clip.id], }, ) assert reorder.status_code == 204 clips = client.get("/media/clips", params={"external_team_id": "team-9", "owner_external_player_id": "player-9"}) assert clips.status_code == 200 assert [item["id"] for item in clips.json()] == [second_clip.id, first_clip.id] assert [item["sort_order"] for item in clips.json()] == [0, 1] def test_hidden_clips_are_removed_from_gameday_views_but_remain_pinnable() -> None: login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"}) assert login.status_code == 200 db = SessionLocal() session = UserSession( session_token="player-hidden-session", provider="teamsnap", external_team_id="team-hidden", external_player_id="player-hidden", ) db.add(session) db.flush() asset = AudioAsset( external_team_id="team-hidden", owner_external_player_id="player-hidden", uploaded_by_session_id=session.id, title="Hidden song", original_filename="hidden.mp3", mime_type="audio/mpeg", size_bytes=123, storage_path="uploads/hidden.mp3", ) db.add(asset) db.flush() clip = AudioClip( asset_id=asset.id, label="Hidden clip", start_ms=0, end_ms=10000, normalization_status="ready", normalized_path="clips/hidden.mp3", ) db.add(clip) db.commit() db.refresh(clip) db.close() client.cookies.set(settings.session_cookie_name, "player-hidden-session") pin_response = client.post( "/games/game-hidden/assignments", json={ "external_team_id": "team-hidden", "external_player_id": "player-hidden", "clip_id": clip.id, "batting_slot": 1, "status": "ready", }, ) assert pin_response.status_code == 200 visible_before_hide = client.get( "/media/clips", params={"external_team_id": "team-hidden", "owner_external_player_id": "player-hidden"}, ) assert visible_before_hide.status_code == 200 assert [item["id"] for item in visible_before_hide.json()] == [clip.id] assert visible_before_hide.json()[0]["hidden"] is False hide_response = client.patch( f"/media/clips/{clip.id}", params={"owner_external_player_id": "player-hidden"}, json={"start_ms": 0, "end_ms": 10000, "hidden": True}, ) assert hide_response.status_code == 200 assert hide_response.json()["hidden"] is True visible_after_hide = client.get( "/media/clips", params={"external_team_id": "team-hidden", "owner_external_player_id": "player-hidden"}, ) assert visible_after_hide.status_code == 200 assert visible_after_hide.json() == [] hidden_in_library = client.get( "/media/clips", params={ "external_team_id": "team-hidden", "owner_external_player_id": "player-hidden", "include_hidden": True, }, ) assert hidden_in_library.status_code == 200 assert [item["id"] for item in hidden_in_library.json()] == [clip.id] assert hidden_in_library.json()[0]["hidden"] is True game_assignments = client.get("/games/game-hidden/assignments") prep = client.get("/games/game-hidden/prep") pins = client.get("/games/pins", params={"external_player_id": "player-hidden"}) assert game_assignments.status_code == 200 assert prep.status_code == 200 assert pins.status_code == 200 assert game_assignments.json() == [] assert prep.json()["assignments"] == [] assert [item["clip_id"] for item in pins.json()] == [clip.id] def test_clip_updates_can_use_player_scoped_authorization() -> None: uploader_session = UserSession(session_token="uploader-session", provider="teamsnap") editor_session = UserSession(session_token="editor-session", provider="teamsnap") db = SessionLocal() db.add_all([uploader_session, editor_session]) db.flush() asset = AudioAsset( external_team_id="team-3", owner_external_player_id="player-3", uploaded_by_session_id=uploader_session.id, title="Player track", original_filename="player-track.mp3", mime_type="audio/mpeg", size_bytes=123, storage_path="uploads/player-track.mp3", ) db.add(asset) db.flush() clip = AudioClip( asset_id=asset.id, label="Player clip", start_ms=0, end_ms=30000, normalization_status="ready", normalized_path="clips/player-clip.mp3", ) db.add(clip) db.commit() db.refresh(clip) db.close() client.cookies.set(settings.session_cookie_name, "editor-session") update = client.patch( f"/media/clips/{clip.id}", params={"owner_external_player_id": "player-3"}, json={"start_ms": 1500, "end_ms": 9000}, ) assert update.status_code == 200 updated_clip = update.json() assert updated_clip["start_ms"] == 1500 assert updated_clip["end_ms"] == 9000 assert updated_clip["label"] == "Player clip" def test_create_clip_uses_team_and_player_scope() -> None: login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"}) assert login.status_code == 200 upload = client.post( "/media/uploads", data={ "external_team_id": "team-6", "owner_external_player_id": "player-6", "title": "Clip source", }, files={"file": ("clip-source.wav", BytesIO(make_test_wav_bytes()), "audio/wav")}, ) assert upload.status_code == 200 asset_id = upload.json()["id"] response = client.post( "/media/clips", json={ "asset_id": asset_id, "external_team_id": "team-6", "owner_external_player_id": "player-6", "label": "New clip", "start_ms": 1000, "end_ms": 6000, }, ) assert response.status_code == 200 clip = response.json() assert clip["asset_id"] == asset_id assert clip["label"] == "New clip" assert clip["start_ms"] == 1000 assert clip["end_ms"] == 6000 def test_asset_title_can_be_edited() -> None: login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"}) assert login.status_code == 200 upload = client.post( "/media/uploads", data={ "external_team_id": "team-5", "owner_external_player_id": "player-5", "title": "Old title", }, files={"file": ("old-title.wav", BytesIO(make_test_wav_bytes()), "audio/wav")}, ) assert upload.status_code == 200 asset_id = upload.json()["id"] update = client.patch( f"/media/assets/{asset_id}", json={"title": "New title"}, ) assert update.status_code == 200 assert update.json()["title"] == "New title" clips = client.get("/media/clips", params={"external_team_id": "team-5", "owner_external_player_id": "player-5"}) assert clips.status_code == 200 assert clips.json()[0]["asset_title"] == "New title" def test_import_url_creates_default_clip(monkeypatch: pytest.MonkeyPatch) -> None: login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"}) assert login.status_code == 200 source_path = settings.media_root / "uploads" / "imported-source.wav" source_path.parent.mkdir(parents=True, exist_ok=True) source_path.write_bytes(make_test_wav_bytes()) from app.routes import media as media_routes def fake_download_media_to_storage(url: str) -> tuple[str, int, str, str]: return ("uploads/imported-source.wav", source_path.stat().st_size, "imported-source.wav", "Imported Source") monkeypatch.setattr(media_routes, "download_media_to_storage", fake_download_media_to_storage) response = client.post( "/media/imports", json={ "external_team_id": "team-3", "owner_external_player_id": "player-3", "url": "https://example.com/media", }, ) assert response.status_code == 200 assert response.json()["title"] == "Imported Source" clips = client.get("/media/clips", params={"external_team_id": "team-3", "owner_external_player_id": "player-3"}) assert clips.status_code == 200 assert len(clips.json()) == 1 clip = clips.json()[0] assert clip["label"] == "Imported Source" assert clip["start_ms"] == 0 assert clip["end_ms"] == 30000 assert clip["waveform_duration_ms"] is not None assert len(clip["waveform_peaks"]) > 0 def test_import_url_surfaces_download_error(monkeypatch: pytest.MonkeyPatch) -> None: login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"}) assert login.status_code == 200 from app.routes import media as media_routes def fake_download_media_to_storage(url: str) -> tuple[str, int, str, str]: raise media_routes.HTTPException( status_code=422, detail="Could not download media from that URL: HTTP Error 403: Forbidden", ) monkeypatch.setattr(media_routes, "download_media_to_storage", fake_download_media_to_storage) response = client.post( "/media/imports", json={ "external_team_id": "team-4", "owner_external_player_id": "player-4", "url": "https://example.com/private", }, ) assert response.status_code == 422 assert "HTTP Error 403: Forbidden" in response.text