from django.core.cache import cache, BaseCache import json from datetime import datetime, timedelta from boxofficefantasy.models import Movie from django.contrib.auth.models import User from draft.constants import DraftPhase from draft.models import DraftSession, DraftSessionSettings import time from dataclasses import dataclass from typing import Any, Dict, List, Literal, Optional, Sequence, Tuple import random class DraftStateException(Exception): """Raised when an action is not allowed due to the current draft state or phase.""" pass class DraftCache: phase: str draft_order: str draft_index: str current_movie: str bids: str bid_timer_start: str bid_timer_end: str _cached_properties = { "participants", "phase", "draft_order", "draft_index", "current_movie", "bids", "bid_timer_start", "bid_timer_end", } def __init__(self, draft_id: str, cache: BaseCache = cache): super().__setattr__("_cache", self._load_cache(cache)) super().__setattr__("_prefix", f"draft:{draft_id}:") def _load_cache(self, cache) -> BaseCache: return cache def _save_cache(self) -> None: # Django cache saves itself return def __getattr__(self, name: str) -> Any: if name == "_prefix": return super().__getattribute__('_prefix') if name in self._cached_properties: return self._cache.get(self._prefix+name, None) raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") def __setattr__(self, name: str, value: Any): if name in self._cached_properties: self._cache.set(self._prefix+name, value) self._save_cache() else: super().__setattr__(name, value) def __delattr__(self, name): if name in self._cached_properties: self._cache.delete(name) raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") class DraftStateManager: _initial_phase: DraftPhase = DraftPhase.WAITING.value def __init__(self, session: DraftSession): self.session_id: str = session.hashid self.cache: DraftCache = DraftCache(self.session_id, cache) self.settings: DraftSessionSettings = session.settings self.participants: set[User] = set(session.participants.all()) self.connected_participants: set[User] = set() # === Phase Management === @property def phase(self) -> str: return self.cache.phase or self._initial_phase @phase.setter def phase(self, new_phase: DraftPhase) -> None: self.cache.phase = new_phase # === Connected Users === def connect_participant(self, username: str): self.connected_participants.add(username) return self.connected_participants def disconnect_participant(self, username: str): self.connected_participants.discard(username) # === Draft Order === @property def draft_order(self): return json.loads(self.cache.draft_order or "[]") @draft_order.setter def draft_order(self, draft_order: list[str]): if not isinstance(draft_order, list): return self.cache.draft_order = json.dumps(draft_order) def determine_draft_order(self) -> List[User]: self.phase = DraftPhase.DETERMINE_ORDER self.draft_index = 0 draft_order = random.sample( list(self.participants), len(self.participants) ) self.draft_order = [user.username for user in draft_order] return self.draft_order @property def draft_index(self): draft_index = self.cache.draft_index if not draft_index: draft_index = 0 self.cache.draft_index = draft_index return self.cache.draft_index @draft_index.setter def draft_index(self, draft_index: int): self.cache.draft_index = draft_index def draft_index_advance(self, n: int = 1): self.draft_index += n return self.draft_index def next_picks( self, *, from_overall: int | None = None, count: int | None = None, include_current: bool = False, ) -> List[dict]: """ Convenience: return the next `count` picks starting after `from_overall` (or after current draft_index if omitted). Each item: {overall, round, pick_in_round, participant} """ if not self.draft_order: return [] n = len(self.draft_order) count = count if count else len(self.draft_order) start = self.draft_index if from_overall is None else int(from_overall) start = start if include_current else start + 1 out: List[dict] = [] for overall in range(start, start + count): r, p = _round_and_pick(overall, n) order_type = "snake" order = _round_order(r, order_type, self.draft_order) out.append({ "overall": overall, "round": r, "pick_in_round": p, "participant": order[p - 1], }) return out # === Current Nomination / Bid === def start_nomination(self, movie_id: int): self.cache.current_movie = movie_id self.cache.bids = [] def place_bid(self, user: User, amount: int|str): if isinstance(amount, str): amount = int(amount) bids = self.get_bids() bids.append({"user":user.username, "amount":amount}) self.cache.bids = json.dumps(bids) def get_bids(self) -> dict: return json.loads(self.cache.bids or "[]") def current_movie(self) -> Movie | None: movie_id = self.current_movie return Movie.objects.filter(pk=movie_id).first() if movie_id else None def start_bidding(self): seconds = self.settings.bidding_duration start_time = time.time() end_time = start_time + seconds self.cache.bid_timer_end = end_time self.cache.bid_timer_start = start_time def get_timer_end(self) -> str | None: return self.cache.bid_timer_end def get_timer_start(self) -> str | None: return self.cache.bid_timer_start # === Sync Snapshot === def to_dict(self) -> dict: picks = self.next_picks(include_current=True) return { "phase": self.phase, "draft_order": self.draft_order, "draft_index": self.draft_index, "connected_participants": list(self.connected_participants), "current_movie": self.cache.current_movie, "bids": self.get_bids(), "bidding_timer_end": self.get_timer_end(), "bidding_timer_start": self.get_timer_start(), "current_pick": picks[0] if picks else None, "next_picks": picks[1:] if picks else [] } # def __dict__(self): # return self.get_summary() def keys(self): # return an iterable of keys return self.to_dict().keys() def __getitem__(self, key): return self.to_dict()[key] def __iter__(self): # used for `dict(self.draft_state)` and iteration return iter(self.to_dict()) def __len__(self): return len(self.to_dict()) OrderType = Literal["snake", "linear"] def _round_and_pick(overall: int, n: int) -> Tuple[int, int]: """overall -> (round_1_based, pick_in_round_1_based)""" r = overall // n + 1 p = overall % n + 1 return r, p def _round_order(round_num: int, order_type: OrderType, r1: Sequence[Any]) -> Sequence[Any]: if order_type == "linear" or (round_num % 2 == 1): return r1 return list(reversed(r1)) # even rounds in snake