Files
walkup/backend/app/storage.py
2026-04-22 06:46:23 -05:00

141 lines
4.6 KiB
Python

from __future__ import annotations
import json
import shutil
import subprocess
import sys
from array import array
from pathlib import Path
from fastapi import UploadFile
from .config import settings
WAVEFORM_PEAK_COUNT = 1024
class MediaStorage:
def __init__(self) -> None:
self._ensure_directories()
@property
def root(self) -> Path:
return settings.media_root
@property
def uploads_dir(self) -> Path:
return self.root / "uploads"
@property
def normalized_dir(self) -> Path:
return self.root / "normalized"
def waveform_sidecar_path(self, relative_path: str) -> Path:
path = self.absolute_path(relative_path)
return path.with_name(f"{path.name}.waveform.json")
def _ensure_directories(self) -> None:
self.uploads_dir.mkdir(parents=True, exist_ok=True)
self.normalized_dir.mkdir(parents=True, exist_ok=True)
def save_upload(self, upload: UploadFile, destination_name: str) -> tuple[str, int]:
self._ensure_directories()
destination = self.uploads_dir / destination_name
size = 0
with destination.open("wb") as output:
while chunk := upload.file.read(1024 * 1024):
size += len(chunk)
output.write(chunk)
return str(destination.relative_to(self.root)), size
def normalize_clip(self, source_relative_path: str, clip_name: str) -> str:
self._ensure_directories()
source = self.root / source_relative_path
destination = self.normalized_dir / clip_name
shutil.copyfile(source, destination)
return str(destination.relative_to(self.root))
def generate_waveform(self, source_relative_path: str, bins: int = WAVEFORM_PEAK_COUNT) -> dict[str, int | list[int]]:
self._ensure_directories()
source = self.root / source_relative_path
if not source.exists():
raise FileNotFoundError(source)
ffmpeg_path = shutil.which("ffmpeg")
if not ffmpeg_path:
raise RuntimeError("ffmpeg is not installed")
completed = subprocess.run(
[
ffmpeg_path,
"-v",
"error",
"-i",
str(source),
"-ac",
"1",
"-ar",
"8000",
"-f",
"s16le",
"pipe:1",
],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
pcm = completed.stdout
samples = array("h")
samples.frombytes(pcm)
if sys.byteorder != "little":
samples.byteswap()
sample_count = len(samples)
if sample_count == 0:
peaks = [0 for _ in range(bins)]
duration_ms = 0
else:
peaks = []
for index in range(bins):
start = index * sample_count // bins
end = (index + 1) * sample_count // bins
if end <= start:
end = min(sample_count, start + 1)
segment = samples[start:end]
peak = max((abs(value) for value in segment), default=0)
peaks.append(round((peak / 32767) * 100))
duration_ms = round((sample_count / 8000) * 1000)
waveform = {"duration_ms": duration_ms, "peaks": peaks}
sidecar_path = self.waveform_sidecar_path(source_relative_path)
sidecar_path.parent.mkdir(parents=True, exist_ok=True)
sidecar_path.write_text(json.dumps(waveform), encoding="utf-8")
return waveform
def load_waveform(self, relative_path: str) -> dict[str, int | list[int]] | None:
sidecar_path = self.waveform_sidecar_path(relative_path)
if not sidecar_path.exists():
return None
return json.loads(sidecar_path.read_text(encoding="utf-8"))
def load_or_generate_waveform(self, relative_path: str) -> dict[str, int | list[int]] | None:
existing = self.load_waveform(relative_path)
if existing is not None and len(existing.get("peaks", [])) == WAVEFORM_PEAK_COUNT:
return existing
source = self.absolute_path(relative_path)
if not source.exists():
return None
return self.generate_waveform(relative_path)
def absolute_path(self, relative_path: str) -> Path:
return self.root / relative_path
def delete_relative_path(self, relative_path: str) -> None:
path = self.absolute_path(relative_path)
path.unlink(missing_ok=True)
self.waveform_sidecar_path(relative_path).unlink(missing_ok=True)
storage = MediaStorage()