Files
walkup/backend/tests/test_api.py
2026-04-24 10:16:02 -05:00

1031 lines
34 KiB
Python

from __future__ import annotations
from io import BytesIO
from math import sin, tau
from wave import open as open_wave
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
from app.config import settings
from app.database import Base, SessionLocal, engine
from app.main import app
from app.models import AudioAsset, AudioClip, GameAssignment, UserSession
from app.routes.teamsnap import rewrite_teamsnap_urls
@pytest.fixture(autouse=True)
def override_media_root(tmp_path: Path) -> None:
settings.media_root = tmp_path
@pytest.fixture(autouse=True)
def override_cookie_security() -> None:
settings.session_cookie_secure = False
@pytest.fixture(autouse=True)
def reset_database() -> None:
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
@pytest.fixture(autouse=True)
def reset_client_cookies() -> None:
client.cookies.clear()
client = TestClient(app)
def make_test_wav_bytes(*, duration_seconds: float = 0.25, sample_rate: int = 8000, frequency: float = 440.0) -> bytes:
frame_count = max(1, int(duration_seconds * sample_rate))
buffer = BytesIO()
with open_wave(buffer, "wb") as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(sample_rate)
frames = bytearray()
for index in range(frame_count):
sample = int(0.7 * 32767 * sin(tau * frequency * index / sample_rate))
frames.extend(sample.to_bytes(2, byteorder="little", signed=True))
wav_file.writeframes(bytes(frames))
return buffer.getvalue()
def test_healthcheck() -> None:
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
def test_admin_login_and_session() -> None:
response = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert response.status_code == 200
session_response = client.get("/auth/session")
assert session_response.status_code == 200
assert session_response.json()["authenticated"] is True
def test_rewrite_teamsnap_urls_uses_same_origin_proxy() -> None:
proxy_root = "https://kif.local.ascorrea.com/api/teamsnap"
payload = {
"collection": {
"href": "https://apiv3.teamsnap.com",
"links": [
{"rel": "self", "href": "https://apiv3.teamsnap.com/teams/1"},
{"rel": "avatar", "href": "https://example.com/avatar.png"},
],
}
}
rewritten = rewrite_teamsnap_urls(payload, "https://apiv3.teamsnap.com", proxy_root)
assert rewritten == {
"collection": {
"href": "https://kif.local.ascorrea.com/api/teamsnap",
"links": [
{"rel": "self", "href": "https://kif.local.ascorrea.com/api/teamsnap/teams/1"},
{"rel": "avatar", "href": "https://example.com/avatar.png"},
],
}
}
def test_teamsnap_token_returns_proxy_api_root() -> None:
db = SessionLocal()
session = UserSession(session_token="teamsnap-session", provider="teamsnap", access_token="token-value")
db.add(session)
db.commit()
db.close()
client.cookies.set(settings.session_cookie_name, "teamsnap-session")
response = client.post(
"/auth/teamsnap/token",
headers={"host": "kif.local.ascorrea.com", "x-forwarded-proto": "https"},
)
assert response.status_code == 200
assert response.json()["api_root"] == "https://kif.local.ascorrea.com/api/teamsnap"
assert response.headers["cache-control"] == "no-store"
def test_teamsnap_callback_redirects_without_waiting_for_user_lookup(monkeypatch: pytest.MonkeyPatch) -> None:
from app.routes import auth as auth_routes
async def fake_exchange_code_for_token(code: str) -> dict:
assert code == "test-code"
return {
"access_token": "callback-access-token",
"refresh_token": "callback-refresh-token",
"expires_in": 3600,
}
async def fail_fetch_teamsnap_user_id(_: str) -> str | None:
raise AssertionError("callback should not fetch the TeamSnap user profile before redirecting")
monkeypatch.setattr(auth_routes, "exchange_code_for_token", fake_exchange_code_for_token)
monkeypatch.setattr(auth_routes, "fetch_teamsnap_user_id", fail_fetch_teamsnap_user_id)
client.cookies.set(settings.auth_return_cookie_name, "/library")
response = client.get("/auth/teamsnap/callback", params={"code": "test-code"}, follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/library"
assert response.headers["cache-control"] == "no-store"
assert settings.session_cookie_name in response.cookies
db = SessionLocal()
session = db.query(UserSession).filter_by(session_token=response.cookies[settings.session_cookie_name]).one()
assert session.provider == "teamsnap"
assert session.access_token == "callback-access-token"
assert session.refresh_token == "callback-refresh-token"
assert session.external_user_id is None
db.close()
def test_teamsnap_callback_redirects_to_signin_when_code_is_blank() -> None:
client.cookies.set(settings.auth_return_cookie_name, "/library")
response = client.get("/auth/teamsnap/callback", params={"code": ""}, follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/signin?error=TeamSnap+sign-in+did+not+return+an+authorization+code."
assert settings.auth_return_cookie_name not in response.cookies
def test_teamsnap_callback_redirects_to_signin_when_teamsnap_returns_error() -> None:
client.cookies.set(settings.auth_return_cookie_name, "/library")
response = client.get("/auth/teamsnap/callback", params={"error": "access_denied"}, follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == "/signin?error=TeamSnap+sign-in+failed%3A+access_denied"
assert settings.auth_return_cookie_name not in response.cookies
def test_teamsnap_token_backfills_external_user_id(monkeypatch: pytest.MonkeyPatch) -> None:
from app.routes import auth as auth_routes
async def fake_fetch_teamsnap_user_id(access_token: str) -> str | None:
assert access_token == "token-value"
return "user-42"
monkeypatch.setattr(auth_routes, "fetch_teamsnap_user_id", fake_fetch_teamsnap_user_id)
db = SessionLocal()
session = UserSession(session_token="teamsnap-session", provider="teamsnap", access_token="token-value")
db.add(session)
db.commit()
db.close()
client.cookies.set(settings.session_cookie_name, "teamsnap-session")
response = client.post(
"/auth/teamsnap/token",
headers={"host": "kif.local.ascorrea.com", "x-forwarded-proto": "https"},
)
assert response.status_code == 200
db = SessionLocal()
refreshed_session = db.query(UserSession).filter_by(session_token="teamsnap-session").one()
assert refreshed_session.external_user_id == "user-42"
db.close()
def test_session_and_clip_reads_use_cache_validators() -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
session_response = client.get("/auth/session")
assert session_response.status_code == 200
assert session_response.headers["cache-control"] == "no-store"
db = SessionLocal()
asset = AudioAsset(
external_team_id="team-cache",
owner_external_player_id="player-cache",
title="Cache Song",
original_filename="cache-song.mp3",
mime_type="audio/mpeg",
size_bytes=123,
storage_path="uploads/cache-song.mp3",
)
db.add(asset)
db.flush()
clip = AudioClip(
asset_id=asset.id,
label="Cache clip",
start_ms=0,
end_ms=10000,
normalization_status="ready",
normalized_path="normalized/cache-clip.mp3",
)
db.add(clip)
db.commit()
db.refresh(clip)
db.close()
clips = client.get("/media/clips", params={"external_team_id": "team-cache", "owner_external_player_id": "player-cache"})
assert clips.status_code == 200
assert clips.headers["etag"]
assert clips.headers["cache-control"] == "private, max-age=0, must-revalidate"
assert clips.headers["vary"] == "Cookie, Authorization"
revalidated = client.get(
"/media/clips",
params={"external_team_id": "team-cache", "owner_external_player_id": "player-cache"},
headers={"if-none-match": clips.headers["etag"]},
)
assert revalidated.status_code == 304
def test_game_prep_uses_stable_etag_for_cached_assignments() -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
db = SessionLocal()
asset = AudioAsset(
external_team_id="team-prep",
owner_external_player_id="player-prep",
title="Prep Song",
original_filename="prep-song.mp3",
mime_type="audio/mpeg",
size_bytes=123,
storage_path="uploads/prep-song.mp3",
)
db.add(asset)
db.flush()
clip = AudioClip(
asset_id=asset.id,
label="Prep clip",
start_ms=0,
end_ms=10000,
normalization_status="ready",
normalized_path="normalized/prep-clip.mp3",
)
db.add(clip)
db.flush()
assignment = GameAssignment(
external_team_id="team-prep",
external_game_id="game-prep",
external_player_id="player-prep",
clip_id=clip.id,
batting_slot=3,
status="ready",
)
db.add(assignment)
db.commit()
db.close()
prep = client.get("/games/game-prep/prep")
assert prep.status_code == 200
assert prep.headers["etag"]
revalidated = client.get("/games/game-prep/prep", headers={"if-none-match": prep.headers["etag"]})
assert revalidated.status_code == 304
def test_normalized_media_files_are_cacheable() -> None:
media_file = settings.media_root / "normalized" / "cacheable.mp3"
media_file.parent.mkdir(parents=True, exist_ok=True)
media_file.write_bytes(make_test_wav_bytes())
response = client.get("/media/files/normalized/cacheable.mp3")
assert response.status_code == 200
assert response.headers["cache-control"] == "public, max-age=31536000, immutable"
assert response.headers["etag"]
def test_walkup_session_selection_is_persisted_in_session() -> None:
db = SessionLocal()
session = UserSession(
session_token="teamsnap-session",
provider="teamsnap",
external_user_id="user-42",
external_team_id="team-101",
external_player_id="player-1001",
)
db.add(session)
db.commit()
db.close()
client.cookies.set(settings.session_cookie_name, "teamsnap-session")
response = client.post(
"/auth/session/walkup",
json={"external_team_id": "team-101", "external_player_id": "player-1002"},
)
assert response.status_code == 200
assert response.json()["external_user_id"] == "user-42"
assert response.json()["external_team_id"] == "team-101"
assert response.json()["external_player_id"] == "player-1002"
def test_player_can_pin_a_clip_to_multiple_games_independently() -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
db = SessionLocal()
asset = AudioAsset(
external_team_id="team-1",
owner_external_player_id="player-1",
title="Song",
original_filename="song.mp3",
mime_type="audio/mpeg",
size_bytes=123,
storage_path="uploads/song.mp3",
)
db.add(asset)
db.flush()
first_clip = AudioClip(
asset_id=asset.id,
label="Intro",
start_ms=0,
end_ms=10000,
normalization_status="ready",
normalized_path="clips/intro.mp3",
)
db.add(first_clip)
db.commit()
db.refresh(first_clip)
db.close()
first_response = client.post(
"/games/game-1/assignments",
json={
"external_team_id": "team-1",
"external_player_id": "player-1",
"clip_id": first_clip.id,
"batting_slot": 1,
"status": "ready",
},
)
second_game_response = client.post(
"/games/game-2/assignments",
json={
"external_team_id": "team-1",
"external_player_id": "player-1",
"clip_id": first_clip.id,
"batting_slot": 1,
"status": "ready",
},
)
assert first_response.status_code == 200
assert second_game_response.status_code == 200
assert first_response.json()["start_ms"] == 0
assert first_response.json()["end_ms"] == 10000
game_one_assignments = client.get("/games/game-1/assignments")
game_two_assignments = client.get("/games/game-2/assignments")
assert game_one_assignments.status_code == 200
assert game_two_assignments.status_code == 200
assert [item["clip_id"] for item in game_one_assignments.json()] == [first_clip.id]
assert [item["clip_id"] for item in game_two_assignments.json()] == [first_clip.id]
db = SessionLocal()
session = db.query(UserSession).filter_by(session_token="admin-session").one_or_none()
if session is None:
session = UserSession(
session_token="admin-session",
provider="teamsnap",
external_team_id="team-1",
external_player_id="player-1",
)
db.add(session)
db.commit()
else:
session.external_team_id = "team-1"
session.external_player_id = "player-1"
db.commit()
db.close()
client.cookies.set(settings.session_cookie_name, "admin-session")
pins_before_delete = client.get("/games/pins", params={"external_player_id": "player-1"})
assert pins_before_delete.status_code == 200
assert [item["clip_id"] for item in pins_before_delete.json()] == [first_clip.id, first_clip.id]
delete_response = client.delete(
f"/games/game-1/assignments/{first_response.json()['id']}",
params={"external_player_id": "player-1"},
)
assert delete_response.status_code == 204
game_one_after_delete = client.get("/games/game-1/assignments")
game_two_after_delete = client.get("/games/game-2/assignments")
assert game_one_after_delete.status_code == 200
assert game_two_after_delete.status_code == 200
assert game_one_after_delete.json() == []
assert [item["clip_id"] for item in game_two_after_delete.json()] == [first_clip.id]
client.cookies.set(settings.session_cookie_name, "admin-session")
pins = client.get("/games/pins", params={"external_player_id": "player-1"})
assert pins.status_code == 200
assert [item["clip_id"] for item in pins.json()] == [first_clip.id]
def test_player_cannot_pin_another_players_clip() -> None:
db = SessionLocal()
owner_session = UserSession(
session_token="owner-session",
provider="teamsnap",
external_team_id="team-1",
external_player_id="player-1",
)
attacker_session = UserSession(
session_token="attacker-session",
provider="teamsnap",
external_team_id="team-1",
external_player_id="player-2",
)
db.add_all([owner_session, attacker_session])
db.flush()
asset = AudioAsset(
external_team_id="team-1",
owner_external_player_id="player-1",
uploaded_by_session_id=owner_session.id,
title="Song",
original_filename="song.mp3",
mime_type="audio/mpeg",
size_bytes=123,
storage_path="uploads/song.mp3",
)
db.add(asset)
db.flush()
clip = AudioClip(
asset_id=asset.id,
label="Intro",
start_ms=0,
end_ms=10000,
normalization_status="ready",
normalized_path="clips/intro.mp3",
)
db.add(clip)
db.commit()
db.refresh(clip)
db.close()
client.cookies.set(settings.session_cookie_name, "attacker-session")
response = client.post(
"/games/game-1/assignments",
json={
"external_team_id": "team-1",
"external_player_id": "player-1",
"clip_id": clip.id,
"batting_slot": 1,
"status": "ready",
},
)
assert response.status_code == 403
def test_upload_creates_default_clip_and_clip_ranges_can_be_updated() -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
upload = client.post(
"/media/uploads",
data={
"external_team_id": "team-2",
"owner_external_player_id": "player-2",
"title": "Fresh track",
},
files={"file": ("fresh-track.wav", BytesIO(make_test_wav_bytes()), "audio/wav")},
)
assert upload.status_code == 200
asset_id = upload.json()["id"]
clips = client.get("/media/clips", params={"external_team_id": "team-2", "owner_external_player_id": "player-2"})
assert clips.status_code == 200
assert len(clips.json()) == 1
clip = clips.json()[0]
assert clip["asset_id"] == asset_id
assert clip["label"] == "Fresh track"
assert clip["start_ms"] == 0
assert clip["end_ms"] == 30000
assert clip["normalization_status"] == "ready"
assert clip["waveform_duration_ms"] is not None
assert len(clip["waveform_peaks"]) > 0
update = client.patch(
f"/media/clips/{clip['id']}",
json={"start_ms": 2500, "end_ms": 8750},
)
assert update.status_code == 200
updated_clip = update.json()
assert updated_clip["start_ms"] == 2500
assert updated_clip["end_ms"] == 8750
assert updated_clip["label"] == "Fresh track"
def test_player_can_reorder_clips_in_their_library() -> None:
db = SessionLocal()
session = UserSession(
session_token="player-session",
provider="teamsnap",
external_team_id="team-9",
external_player_id="player-9",
)
db.add(session)
db.commit()
db.close()
client.cookies.set(settings.session_cookie_name, "player-session")
asset = AudioAsset(
external_team_id="team-9",
owner_external_player_id="player-9",
title="Song",
original_filename="song.mp3",
mime_type="audio/mpeg",
size_bytes=123,
storage_path="uploads/song.mp3",
)
db = SessionLocal()
db.add(asset)
db.flush()
first_clip = AudioClip(
asset_id=asset.id,
label="Intro",
start_ms=0,
end_ms=10000,
sort_order=0,
normalization_status="ready",
normalized_path="clips/intro.mp3",
)
second_clip = AudioClip(
asset_id=asset.id,
label="Chorus",
start_ms=12000,
end_ms=22000,
sort_order=1,
normalization_status="ready",
normalized_path="clips/chorus.mp3",
)
db.add_all([first_clip, second_clip])
db.commit()
db.refresh(first_clip)
db.refresh(second_clip)
db.close()
reorder = client.post(
"/media/clips/reorder",
json={
"external_team_id": "team-9",
"owner_external_player_id": "player-9",
"clip_ids": [second_clip.id, first_clip.id],
},
)
assert reorder.status_code == 204
clips = client.get("/media/clips", params={"external_team_id": "team-9", "owner_external_player_id": "player-9"})
assert clips.status_code == 200
assert [item["id"] for item in clips.json()] == [second_clip.id, first_clip.id]
assert [item["sort_order"] for item in clips.json()] == [0, 1]
def test_hidden_clips_are_removed_from_gameday_views_but_remain_pinnable() -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
db = SessionLocal()
session = UserSession(
session_token="player-hidden-session",
provider="teamsnap",
external_team_id="team-hidden",
external_player_id="player-hidden",
)
db.add(session)
db.flush()
asset = AudioAsset(
external_team_id="team-hidden",
owner_external_player_id="player-hidden",
uploaded_by_session_id=session.id,
title="Hidden song",
original_filename="hidden.mp3",
mime_type="audio/mpeg",
size_bytes=123,
storage_path="uploads/hidden.mp3",
)
db.add(asset)
db.flush()
clip = AudioClip(
asset_id=asset.id,
label="Hidden clip",
start_ms=0,
end_ms=10000,
normalization_status="ready",
normalized_path="clips/hidden.mp3",
)
db.add(clip)
db.commit()
db.refresh(clip)
db.close()
client.cookies.set(settings.session_cookie_name, "player-hidden-session")
pin_response = client.post(
"/games/game-hidden/assignments",
json={
"external_team_id": "team-hidden",
"external_player_id": "player-hidden",
"clip_id": clip.id,
"batting_slot": 1,
"status": "ready",
},
)
assert pin_response.status_code == 200
visible_before_hide = client.get(
"/media/clips",
params={"external_team_id": "team-hidden", "owner_external_player_id": "player-hidden"},
)
assert visible_before_hide.status_code == 200
assert [item["id"] for item in visible_before_hide.json()] == [clip.id]
assert visible_before_hide.json()[0]["hidden"] is False
hide_response = client.patch(
f"/media/clips/{clip.id}",
params={"owner_external_player_id": "player-hidden"},
json={"start_ms": 0, "end_ms": 10000, "hidden": True},
)
assert hide_response.status_code == 200
assert hide_response.json()["hidden"] is True
visible_after_hide = client.get(
"/media/clips",
params={"external_team_id": "team-hidden", "owner_external_player_id": "player-hidden"},
)
assert visible_after_hide.status_code == 200
assert visible_after_hide.json() == []
hidden_in_library = client.get(
"/media/clips",
params={
"external_team_id": "team-hidden",
"owner_external_player_id": "player-hidden",
"include_hidden": True,
},
)
assert hidden_in_library.status_code == 200
assert [item["id"] for item in hidden_in_library.json()] == [clip.id]
assert hidden_in_library.json()[0]["hidden"] is True
game_assignments = client.get("/games/game-hidden/assignments")
prep = client.get("/games/game-hidden/prep")
pins = client.get("/games/pins", params={"external_player_id": "player-hidden"})
assert game_assignments.status_code == 200
assert prep.status_code == 200
assert pins.status_code == 200
assert game_assignments.json() == []
assert prep.json()["assignments"] == []
assert [item["clip_id"] for item in pins.json()] == [clip.id]
def test_same_team_player_can_read_another_players_clips() -> None:
db = SessionLocal()
owner_session = UserSession(
session_token="owner-library-session",
provider="teamsnap",
external_team_id="team-share",
external_player_id="player-owner",
)
viewer_session = UserSession(
session_token="viewer-library-session",
provider="teamsnap",
external_team_id="team-share",
external_player_id="player-viewer",
)
db.add_all([owner_session, viewer_session])
db.flush()
asset = AudioAsset(
external_team_id="team-share",
owner_external_player_id="player-owner",
uploaded_by_session_id=owner_session.id,
title="Shared song",
original_filename="shared-song.mp3",
mime_type="audio/mpeg",
size_bytes=123,
storage_path="uploads/shared-song.mp3",
)
db.add(asset)
db.flush()
clip = AudioClip(
asset_id=asset.id,
label="Shared clip",
start_ms=0,
end_ms=10000,
normalization_status="ready",
normalized_path="normalized/shared-clip.mp3",
)
db.add(clip)
db.commit()
db.close()
client.cookies.set(settings.session_cookie_name, "viewer-library-session")
response = client.get(
"/media/clips",
params={"external_team_id": "team-share", "owner_external_player_id": "player-owner"},
)
assert response.status_code == 200
assert [item["id"] for item in response.json()] == [clip.id]
assert response.json()[0]["owner_external_player_id"] == "player-owner"
def test_clip_updates_can_use_player_scoped_authorization() -> None:
uploader_session = UserSession(session_token="uploader-session", provider="teamsnap")
editor_session = UserSession(
session_token="editor-session",
provider="teamsnap",
external_team_id="team-3",
external_player_id="player-3",
)
db = SessionLocal()
db.add_all([uploader_session, editor_session])
db.flush()
asset = AudioAsset(
external_team_id="team-3",
owner_external_player_id="player-3",
uploaded_by_session_id=uploader_session.id,
title="Player track",
original_filename="player-track.mp3",
mime_type="audio/mpeg",
size_bytes=123,
storage_path="uploads/player-track.mp3",
)
db.add(asset)
db.flush()
clip = AudioClip(
asset_id=asset.id,
label="Player clip",
start_ms=0,
end_ms=30000,
normalization_status="ready",
normalized_path="clips/player-clip.mp3",
)
db.add(clip)
db.commit()
db.refresh(clip)
db.close()
client.cookies.set(settings.session_cookie_name, "editor-session")
update = client.patch(
f"/media/clips/{clip.id}",
params={"owner_external_player_id": "player-3"},
json={"start_ms": 1500, "end_ms": 9000},
)
assert update.status_code == 200
updated_clip = update.json()
assert updated_clip["start_ms"] == 1500
assert updated_clip["end_ms"] == 9000
assert updated_clip["label"] == "Player clip"
def test_clip_updates_reject_cross_player_scopes() -> None:
uploader_session = UserSession(
session_token="uploader-session",
provider="teamsnap",
external_team_id="team-3",
external_player_id="player-3",
)
editor_session = UserSession(
session_token="editor-session",
provider="teamsnap",
external_team_id="team-3",
external_player_id="player-4",
)
db = SessionLocal()
db.add_all([uploader_session, editor_session])
db.flush()
asset = AudioAsset(
external_team_id="team-3",
owner_external_player_id="player-3",
uploaded_by_session_id=uploader_session.id,
title="Player track",
original_filename="player-track.mp3",
mime_type="audio/mpeg",
size_bytes=123,
storage_path="uploads/player-track.mp3",
)
db.add(asset)
db.flush()
clip = AudioClip(
asset_id=asset.id,
label="Player clip",
start_ms=0,
end_ms=30000,
normalization_status="ready",
normalized_path="clips/player-clip.mp3",
)
db.add(clip)
db.commit()
db.refresh(clip)
db.close()
client.cookies.set(settings.session_cookie_name, "editor-session")
update = client.patch(
f"/media/clips/{clip.id}",
params={"owner_external_player_id": "player-3"},
json={"start_ms": 1500, "end_ms": 9000},
)
assert update.status_code == 403
def test_create_clip_uses_team_and_player_scope() -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
upload = client.post(
"/media/uploads",
data={
"external_team_id": "team-6",
"owner_external_player_id": "player-6",
"title": "Clip source",
},
files={"file": ("clip-source.wav", BytesIO(make_test_wav_bytes()), "audio/wav")},
)
assert upload.status_code == 200
asset_id = upload.json()["id"]
response = client.post(
"/media/clips",
json={
"asset_id": asset_id,
"external_team_id": "team-6",
"owner_external_player_id": "player-6",
"label": "New clip",
"start_ms": 1000,
"end_ms": 6000,
},
)
assert response.status_code == 200
clip = response.json()
assert clip["asset_id"] == asset_id
assert clip["label"] == "New clip"
assert clip["start_ms"] == 1000
assert clip["end_ms"] == 6000
def test_create_asset_with_default_clip_cleans_up_files_on_failure(monkeypatch: pytest.MonkeyPatch) -> None:
from app.routes import media as media_routes
db = SessionLocal()
session = UserSession(
session_token="cleanup-session",
provider="teamsnap",
external_team_id="team-clean",
external_player_id="player-clean",
)
db.add(session)
db.commit()
db.refresh(session)
source_relative_path = "uploads/cleanup-source.wav"
source_path = settings.media_root / source_relative_path
source_path.parent.mkdir(parents=True, exist_ok=True)
source_path.write_bytes(make_test_wav_bytes())
normalized_path = settings.media_root / "normalized" / "cleanup-copy.wav"
def fake_normalize_clip(source_relative_path_arg: str, clip_name: str) -> str:
normalized_path.parent.mkdir(parents=True, exist_ok=True)
normalized_path.write_bytes((settings.media_root / source_relative_path_arg).read_bytes())
return "normalized/cleanup-copy.wav"
def fake_generate_waveform(source_relative_path_arg: str, bins: int = media_routes.WAVEFORM_PEAK_COUNT) -> dict[str, int | list[int]]:
raise RuntimeError("waveform failed")
monkeypatch.setattr(media_routes.storage, "normalize_clip", fake_normalize_clip)
monkeypatch.setattr(media_routes.storage, "generate_waveform", fake_generate_waveform)
with pytest.raises(RuntimeError):
media_routes.create_asset_with_default_clip(
db=db,
session=session,
external_team_id="team-clean",
owner_external_player_id="player-clean",
title="Cleanup track",
original_filename="cleanup-source.wav",
mime_type="audio/wav",
size_bytes=source_path.stat().st_size,
storage_path=source_relative_path,
)
assert not source_path.exists()
assert not normalized_path.exists()
assert db.query(AudioAsset).count() == 0
assert db.query(AudioClip).count() == 0
db.close()
def test_asset_title_can_be_edited() -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
upload = client.post(
"/media/uploads",
data={
"external_team_id": "team-5",
"owner_external_player_id": "player-5",
"title": "Old title",
},
files={"file": ("old-title.wav", BytesIO(make_test_wav_bytes()), "audio/wav")},
)
assert upload.status_code == 200
asset_id = upload.json()["id"]
update = client.patch(
f"/media/assets/{asset_id}",
json={"title": "New title"},
)
assert update.status_code == 200
assert update.json()["title"] == "New title"
clips = client.get("/media/clips", params={"external_team_id": "team-5", "owner_external_player_id": "player-5"})
assert clips.status_code == 200
assert clips.json()[0]["asset_title"] == "New title"
def test_import_url_creates_default_clip(monkeypatch: pytest.MonkeyPatch) -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
source_path = settings.media_root / "uploads" / "imported-source.wav"
source_path.parent.mkdir(parents=True, exist_ok=True)
source_path.write_bytes(make_test_wav_bytes())
from app.routes import media as media_routes
def fake_download_media_to_storage(url: str) -> tuple[str, int, str, str]:
return ("uploads/imported-source.wav", source_path.stat().st_size, "imported-source.wav", "Imported Source")
monkeypatch.setattr(media_routes, "download_media_to_storage", fake_download_media_to_storage)
response = client.post(
"/media/imports",
json={
"external_team_id": "team-3",
"owner_external_player_id": "player-3",
"url": "https://example.com/media",
},
)
assert response.status_code == 200
assert response.json()["title"] == "Imported Source"
clips = client.get("/media/clips", params={"external_team_id": "team-3", "owner_external_player_id": "player-3"})
assert clips.status_code == 200
assert len(clips.json()) == 1
clip = clips.json()[0]
assert clip["label"] == "Imported Source"
assert clip["start_ms"] == 0
assert clip["end_ms"] == 30000
assert clip["waveform_duration_ms"] is not None
assert len(clip["waveform_peaks"]) > 0
def test_import_url_surfaces_download_error(monkeypatch: pytest.MonkeyPatch) -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
from app.routes import media as media_routes
def fake_download_media_to_storage(url: str) -> tuple[str, int, str, str]:
raise media_routes.HTTPException(
status_code=422,
detail="Could not download media from that URL: HTTP Error 403: Forbidden",
)
monkeypatch.setattr(media_routes, "download_media_to_storage", fake_download_media_to_storage)
response = client.post(
"/media/imports",
json={
"external_team_id": "team-4",
"owner_external_player_id": "player-4",
"url": "https://example.com/private",
},
)
assert response.status_code == 422
assert "HTTP Error 403: Forbidden" in response.text