Files
walkup/backend/tests/test_api.py
2026-04-22 07:48:12 -05:00

526 lines
17 KiB
Python

from __future__ import annotations
from io import BytesIO
from math import sin, tau
from wave import open as open_wave
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
from app.config import settings
from app.database import Base, SessionLocal, engine
from app.main import app
from app.models import AudioAsset, AudioClip, UserSession
from app.routes.teamsnap import rewrite_teamsnap_urls
@pytest.fixture(autouse=True)
def override_media_root(tmp_path: Path) -> None:
settings.media_root = tmp_path
@pytest.fixture(autouse=True)
def override_cookie_security() -> None:
settings.session_cookie_secure = False
@pytest.fixture(autouse=True)
def reset_database() -> None:
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
@pytest.fixture(autouse=True)
def reset_client_cookies() -> None:
client.cookies.clear()
client = TestClient(app)
def make_test_wav_bytes(*, duration_seconds: float = 0.25, sample_rate: int = 8000, frequency: float = 440.0) -> bytes:
frame_count = max(1, int(duration_seconds * sample_rate))
buffer = BytesIO()
with open_wave(buffer, "wb") as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(sample_rate)
frames = bytearray()
for index in range(frame_count):
sample = int(0.7 * 32767 * sin(tau * frequency * index / sample_rate))
frames.extend(sample.to_bytes(2, byteorder="little", signed=True))
wav_file.writeframes(bytes(frames))
return buffer.getvalue()
def test_healthcheck() -> None:
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
def test_admin_login_and_session() -> None:
response = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert response.status_code == 200
session_response = client.get("/auth/session")
assert session_response.status_code == 200
assert session_response.json()["authenticated"] is True
def test_rewrite_teamsnap_urls_uses_same_origin_proxy() -> None:
proxy_root = "https://kif.local.ascorrea.com/api/teamsnap"
payload = {
"collection": {
"href": "https://apiv3.teamsnap.com",
"links": [
{"rel": "self", "href": "https://apiv3.teamsnap.com/teams/1"},
{"rel": "avatar", "href": "https://example.com/avatar.png"},
],
}
}
rewritten = rewrite_teamsnap_urls(payload, "https://apiv3.teamsnap.com", proxy_root)
assert rewritten == {
"collection": {
"href": "https://kif.local.ascorrea.com/api/teamsnap",
"links": [
{"rel": "self", "href": "https://kif.local.ascorrea.com/api/teamsnap/teams/1"},
{"rel": "avatar", "href": "https://example.com/avatar.png"},
],
}
}
def test_teamsnap_token_returns_proxy_api_root() -> None:
db = SessionLocal()
session = UserSession(session_token="teamsnap-session", provider="teamsnap", access_token="token-value")
db.add(session)
db.commit()
db.close()
client.cookies.set(settings.session_cookie_name, "teamsnap-session")
response = client.post(
"/auth/teamsnap/token",
headers={"host": "kif.local.ascorrea.com", "x-forwarded-proto": "https"},
)
assert response.status_code == 200
assert response.json()["api_root"] == "https://kif.local.ascorrea.com/api/teamsnap"
def test_walkup_session_selection_is_persisted_in_session() -> None:
db = SessionLocal()
session = UserSession(
session_token="teamsnap-session",
provider="teamsnap",
external_user_id="user-42",
external_team_id="team-101",
external_player_id="player-1001",
)
db.add(session)
db.commit()
db.close()
client.cookies.set(settings.session_cookie_name, "teamsnap-session")
response = client.post(
"/auth/session/walkup",
json={"external_team_id": "team-101", "external_player_id": "player-1002"},
)
assert response.status_code == 200
assert response.json()["external_user_id"] == "user-42"
assert response.json()["external_team_id"] == "team-101"
assert response.json()["external_player_id"] == "player-1002"
def test_player_can_pin_a_clip_to_multiple_games_independently() -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
db = SessionLocal()
asset = AudioAsset(
external_team_id="team-1",
owner_external_player_id="player-1",
title="Song",
original_filename="song.mp3",
mime_type="audio/mpeg",
size_bytes=123,
storage_path="uploads/song.mp3",
)
db.add(asset)
db.flush()
first_clip = AudioClip(
asset_id=asset.id,
label="Intro",
start_ms=0,
end_ms=10000,
normalization_status="ready",
normalized_path="clips/intro.mp3",
)
db.add(first_clip)
db.commit()
db.refresh(first_clip)
db.close()
first_response = client.post(
"/games/game-1/assignments",
json={
"external_team_id": "team-1",
"external_player_id": "player-1",
"clip_id": first_clip.id,
"batting_slot": 1,
"status": "ready",
},
)
second_game_response = client.post(
"/games/game-2/assignments",
json={
"external_team_id": "team-1",
"external_player_id": "player-1",
"clip_id": first_clip.id,
"batting_slot": 1,
"status": "ready",
},
)
assert first_response.status_code == 200
assert second_game_response.status_code == 200
assert first_response.json()["start_ms"] == 0
assert first_response.json()["end_ms"] == 10000
game_one_assignments = client.get("/games/game-1/assignments")
game_two_assignments = client.get("/games/game-2/assignments")
assert game_one_assignments.status_code == 200
assert game_two_assignments.status_code == 200
assert [item["clip_id"] for item in game_one_assignments.json()] == [first_clip.id]
assert [item["clip_id"] for item in game_two_assignments.json()] == [first_clip.id]
db = SessionLocal()
session = db.query(UserSession).filter_by(session_token="admin-session").one_or_none()
if session is None:
session = UserSession(
session_token="admin-session",
provider="teamsnap",
external_team_id="team-1",
external_player_id="player-1",
)
db.add(session)
db.commit()
else:
session.external_team_id = "team-1"
session.external_player_id = "player-1"
db.commit()
db.close()
client.cookies.set(settings.session_cookie_name, "admin-session")
pins_before_delete = client.get("/games/pins", params={"external_player_id": "player-1"})
assert pins_before_delete.status_code == 200
assert [item["clip_id"] for item in pins_before_delete.json()] == [first_clip.id, first_clip.id]
delete_response = client.delete(
f"/games/game-1/assignments/{first_response.json()['id']}",
params={"external_player_id": "player-1"},
)
assert delete_response.status_code == 204
game_one_after_delete = client.get("/games/game-1/assignments")
game_two_after_delete = client.get("/games/game-2/assignments")
assert game_one_after_delete.status_code == 200
assert game_two_after_delete.status_code == 200
assert game_one_after_delete.json() == []
assert [item["clip_id"] for item in game_two_after_delete.json()] == [first_clip.id]
client.cookies.set(settings.session_cookie_name, "admin-session")
pins = client.get("/games/pins", params={"external_player_id": "player-1"})
assert pins.status_code == 200
assert [item["clip_id"] for item in pins.json()] == [first_clip.id]
def test_upload_creates_default_clip_and_clip_ranges_can_be_updated() -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
upload = client.post(
"/media/uploads",
data={
"external_team_id": "team-2",
"owner_external_player_id": "player-2",
"title": "Fresh track",
},
files={"file": ("fresh-track.wav", BytesIO(make_test_wav_bytes()), "audio/wav")},
)
assert upload.status_code == 200
asset_id = upload.json()["id"]
clips = client.get("/media/clips", params={"external_team_id": "team-2", "owner_external_player_id": "player-2"})
assert clips.status_code == 200
assert len(clips.json()) == 1
clip = clips.json()[0]
assert clip["asset_id"] == asset_id
assert clip["label"] == "Fresh track"
assert clip["start_ms"] == 0
assert clip["end_ms"] == 30000
assert clip["normalization_status"] == "ready"
assert clip["waveform_duration_ms"] is not None
assert len(clip["waveform_peaks"]) > 0
update = client.patch(
f"/media/clips/{clip['id']}",
json={"start_ms": 2500, "end_ms": 8750},
)
assert update.status_code == 200
updated_clip = update.json()
assert updated_clip["start_ms"] == 2500
assert updated_clip["end_ms"] == 8750
assert updated_clip["label"] == "Fresh track"
def test_player_can_reorder_clips_in_their_library() -> None:
db = SessionLocal()
session = UserSession(
session_token="player-session",
provider="teamsnap",
external_team_id="team-9",
external_player_id="player-9",
)
db.add(session)
db.commit()
db.close()
client.cookies.set(settings.session_cookie_name, "player-session")
asset = AudioAsset(
external_team_id="team-9",
owner_external_player_id="player-9",
title="Song",
original_filename="song.mp3",
mime_type="audio/mpeg",
size_bytes=123,
storage_path="uploads/song.mp3",
)
db = SessionLocal()
db.add(asset)
db.flush()
first_clip = AudioClip(
asset_id=asset.id,
label="Intro",
start_ms=0,
end_ms=10000,
sort_order=0,
normalization_status="ready",
normalized_path="clips/intro.mp3",
)
second_clip = AudioClip(
asset_id=asset.id,
label="Chorus",
start_ms=12000,
end_ms=22000,
sort_order=1,
normalization_status="ready",
normalized_path="clips/chorus.mp3",
)
db.add_all([first_clip, second_clip])
db.commit()
db.refresh(first_clip)
db.refresh(second_clip)
db.close()
reorder = client.post(
"/media/clips/reorder",
json={
"external_team_id": "team-9",
"owner_external_player_id": "player-9",
"clip_ids": [second_clip.id, first_clip.id],
},
)
assert reorder.status_code == 204
clips = client.get("/media/clips", params={"external_team_id": "team-9", "owner_external_player_id": "player-9"})
assert clips.status_code == 200
assert [item["id"] for item in clips.json()] == [second_clip.id, first_clip.id]
assert [item["sort_order"] for item in clips.json()] == [0, 1]
def test_clip_updates_can_use_player_scoped_authorization() -> None:
uploader_session = UserSession(session_token="uploader-session", provider="teamsnap")
editor_session = UserSession(session_token="editor-session", provider="teamsnap")
db = SessionLocal()
db.add_all([uploader_session, editor_session])
db.flush()
asset = AudioAsset(
external_team_id="team-3",
owner_external_player_id="player-3",
uploaded_by_session_id=uploader_session.id,
title="Player track",
original_filename="player-track.mp3",
mime_type="audio/mpeg",
size_bytes=123,
storage_path="uploads/player-track.mp3",
)
db.add(asset)
db.flush()
clip = AudioClip(
asset_id=asset.id,
label="Player clip",
start_ms=0,
end_ms=30000,
normalization_status="ready",
normalized_path="clips/player-clip.mp3",
)
db.add(clip)
db.commit()
db.refresh(clip)
db.close()
client.cookies.set(settings.session_cookie_name, "editor-session")
update = client.patch(
f"/media/clips/{clip.id}",
params={"owner_external_player_id": "player-3"},
json={"start_ms": 1500, "end_ms": 9000},
)
assert update.status_code == 200
updated_clip = update.json()
assert updated_clip["start_ms"] == 1500
assert updated_clip["end_ms"] == 9000
assert updated_clip["label"] == "Player clip"
def test_create_clip_uses_team_and_player_scope() -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
upload = client.post(
"/media/uploads",
data={
"external_team_id": "team-6",
"owner_external_player_id": "player-6",
"title": "Clip source",
},
files={"file": ("clip-source.wav", BytesIO(make_test_wav_bytes()), "audio/wav")},
)
assert upload.status_code == 200
asset_id = upload.json()["id"]
response = client.post(
"/media/clips",
json={
"asset_id": asset_id,
"external_team_id": "team-6",
"owner_external_player_id": "player-6",
"label": "New clip",
"start_ms": 1000,
"end_ms": 6000,
},
)
assert response.status_code == 200
clip = response.json()
assert clip["asset_id"] == asset_id
assert clip["label"] == "New clip"
assert clip["start_ms"] == 1000
assert clip["end_ms"] == 6000
def test_asset_title_can_be_edited() -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
upload = client.post(
"/media/uploads",
data={
"external_team_id": "team-5",
"owner_external_player_id": "player-5",
"title": "Old title",
},
files={"file": ("old-title.wav", BytesIO(make_test_wav_bytes()), "audio/wav")},
)
assert upload.status_code == 200
asset_id = upload.json()["id"]
update = client.patch(
f"/media/assets/{asset_id}",
json={"title": "New title"},
)
assert update.status_code == 200
assert update.json()["title"] == "New title"
clips = client.get("/media/clips", params={"external_team_id": "team-5", "owner_external_player_id": "player-5"})
assert clips.status_code == 200
assert clips.json()[0]["asset_title"] == "New title"
def test_import_url_creates_default_clip(monkeypatch: pytest.MonkeyPatch) -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
source_path = settings.media_root / "uploads" / "imported-source.wav"
source_path.parent.mkdir(parents=True, exist_ok=True)
source_path.write_bytes(make_test_wav_bytes())
from app.routes import media as media_routes
def fake_download_media_to_storage(url: str) -> tuple[str, int, str, str]:
return ("uploads/imported-source.wav", source_path.stat().st_size, "imported-source.wav", "Imported Source")
monkeypatch.setattr(media_routes, "download_media_to_storage", fake_download_media_to_storage)
response = client.post(
"/media/imports",
json={
"external_team_id": "team-3",
"owner_external_player_id": "player-3",
"url": "https://example.com/media",
},
)
assert response.status_code == 200
assert response.json()["title"] == "Imported Source"
clips = client.get("/media/clips", params={"external_team_id": "team-3", "owner_external_player_id": "player-3"})
assert clips.status_code == 200
assert len(clips.json()) == 1
clip = clips.json()[0]
assert clip["label"] == "Imported Source"
assert clip["start_ms"] == 0
assert clip["end_ms"] == 30000
assert clip["waveform_duration_ms"] is not None
assert len(clip["waveform_peaks"]) > 0
def test_import_url_surfaces_download_error(monkeypatch: pytest.MonkeyPatch) -> None:
login = client.post("/auth/admin/login", json={"username": "admin", "password": "admin"})
assert login.status_code == 200
from app.routes import media as media_routes
def fake_download_media_to_storage(url: str) -> tuple[str, int, str, str]:
raise media_routes.HTTPException(
status_code=422,
detail="Could not download media from that URL: HTTP Error 403: Forbidden",
)
monkeypatch.setattr(media_routes, "download_media_to_storage", fake_download_media_to_storage)
response = client.post(
"/media/imports",
json={
"external_team_id": "team-4",
"owner_external_player_id": "player-4",
"url": "https://example.com/private",
},
)
assert response.status_code == 422
assert "HTTP Error 403: Forbidden" in response.text