141 lines
4.6 KiB
Python
141 lines
4.6 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
from array import array
|
|
from pathlib import Path
|
|
|
|
from fastapi import UploadFile
|
|
|
|
from .config import settings
|
|
|
|
WAVEFORM_PEAK_COUNT = 1024
|
|
|
|
|
|
class MediaStorage:
|
|
def __init__(self) -> None:
|
|
self._ensure_directories()
|
|
|
|
@property
|
|
def root(self) -> Path:
|
|
return settings.media_root
|
|
|
|
@property
|
|
def uploads_dir(self) -> Path:
|
|
return self.root / "uploads"
|
|
|
|
@property
|
|
def normalized_dir(self) -> Path:
|
|
return self.root / "normalized"
|
|
|
|
def waveform_sidecar_path(self, relative_path: str) -> Path:
|
|
path = self.absolute_path(relative_path)
|
|
return path.with_name(f"{path.name}.waveform.json")
|
|
|
|
def _ensure_directories(self) -> None:
|
|
self.uploads_dir.mkdir(parents=True, exist_ok=True)
|
|
self.normalized_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def save_upload(self, upload: UploadFile, destination_name: str) -> tuple[str, int]:
|
|
self._ensure_directories()
|
|
destination = self.uploads_dir / destination_name
|
|
size = 0
|
|
with destination.open("wb") as output:
|
|
while chunk := upload.file.read(1024 * 1024):
|
|
size += len(chunk)
|
|
output.write(chunk)
|
|
return str(destination.relative_to(self.root)), size
|
|
|
|
def normalize_clip(self, source_relative_path: str, clip_name: str) -> str:
|
|
self._ensure_directories()
|
|
source = self.root / source_relative_path
|
|
destination = self.normalized_dir / clip_name
|
|
shutil.copyfile(source, destination)
|
|
return str(destination.relative_to(self.root))
|
|
|
|
def generate_waveform(self, source_relative_path: str, bins: int = WAVEFORM_PEAK_COUNT) -> dict[str, int | list[int]]:
|
|
self._ensure_directories()
|
|
source = self.root / source_relative_path
|
|
if not source.exists():
|
|
raise FileNotFoundError(source)
|
|
|
|
ffmpeg_path = shutil.which("ffmpeg")
|
|
if not ffmpeg_path:
|
|
raise RuntimeError("ffmpeg is not installed")
|
|
|
|
completed = subprocess.run(
|
|
[
|
|
ffmpeg_path,
|
|
"-v",
|
|
"error",
|
|
"-i",
|
|
str(source),
|
|
"-ac",
|
|
"1",
|
|
"-ar",
|
|
"8000",
|
|
"-f",
|
|
"s16le",
|
|
"pipe:1",
|
|
],
|
|
check=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
pcm = completed.stdout
|
|
samples = array("h")
|
|
samples.frombytes(pcm)
|
|
if sys.byteorder != "little":
|
|
samples.byteswap()
|
|
|
|
sample_count = len(samples)
|
|
if sample_count == 0:
|
|
peaks = [0 for _ in range(bins)]
|
|
duration_ms = 0
|
|
else:
|
|
peaks = []
|
|
for index in range(bins):
|
|
start = index * sample_count // bins
|
|
end = (index + 1) * sample_count // bins
|
|
if end <= start:
|
|
end = min(sample_count, start + 1)
|
|
segment = samples[start:end]
|
|
peak = max((abs(value) for value in segment), default=0)
|
|
peaks.append(round((peak / 32767) * 100))
|
|
duration_ms = round((sample_count / 8000) * 1000)
|
|
|
|
waveform = {"duration_ms": duration_ms, "peaks": peaks}
|
|
sidecar_path = self.waveform_sidecar_path(source_relative_path)
|
|
sidecar_path.parent.mkdir(parents=True, exist_ok=True)
|
|
sidecar_path.write_text(json.dumps(waveform), encoding="utf-8")
|
|
return waveform
|
|
|
|
def load_waveform(self, relative_path: str) -> dict[str, int | list[int]] | None:
|
|
sidecar_path = self.waveform_sidecar_path(relative_path)
|
|
if not sidecar_path.exists():
|
|
return None
|
|
return json.loads(sidecar_path.read_text(encoding="utf-8"))
|
|
|
|
def load_or_generate_waveform(self, relative_path: str) -> dict[str, int | list[int]] | None:
|
|
existing = self.load_waveform(relative_path)
|
|
if existing is not None and len(existing.get("peaks", [])) == WAVEFORM_PEAK_COUNT:
|
|
return existing
|
|
source = self.absolute_path(relative_path)
|
|
if not source.exists():
|
|
return None
|
|
return self.generate_waveform(relative_path)
|
|
|
|
def absolute_path(self, relative_path: str) -> Path:
|
|
return self.root / relative_path
|
|
|
|
def delete_relative_path(self, relative_path: str) -> None:
|
|
path = self.absolute_path(relative_path)
|
|
path.unlink(missing_ok=True)
|
|
self.waveform_sidecar_path(relative_path).unlink(missing_ok=True)
|
|
|
|
|
|
storage = MediaStorage()
|