from django.core.cache import cache import json from datetime import datetime, timedelta from boxofficefantasy.models import Movie from django.contrib.auth.models import User from draft.constants import DraftPhase from draft.models import DraftSession import time from dataclasses import dataclass from typing import Any, Dict, List, Literal, Optional, Sequence, Tuple import random class DraftStateException(Exception): """Raised when an action is not allowed due to the current draft state or phase.""" pass class DraftCacheKeys: def __init__(self, id): self.prefix = f"draft:{id}" @property def admins(self): return f"{self.prefix}:admins" @property def participants(self): return f"{self.prefix}:participants" @property def users(self): return f"{self.prefix}:users" @property def connected_users(self): return f"{self.prefix}:connected_users" @property def phase(self): return f"{self.prefix}:phase" @property def draft_order(self): return f"{self.prefix}:draft_order" @property def draft_index(self): return f"{self.prefix}:draft_index" @property def current_movie(self): return f"{self.prefix}:current_movie" # @property # def state(self): # return f"{self.prefix}:state" # @property # def current_movie(self): # return f"{self.prefix}:current_movie" @property def bids(self): return f"{self.prefix}:bids" # @property # def participants(self): # return f"{self.prefix}:participants" @property def bid_timer_end(self): return f"{self.prefix}:bid_timer_end" @property def bid_timer_start(self): return f"{self.prefix}:bid_timer_start" # def user_status(self, user_id): # return f"{self.prefix}:user:{user_id}:status" # def user_channel(self, user_id): # return f"{self.prefix}:user:{user_id}:channel" class DraftStateManager: def __init__(self, session: DraftSession): self.session_id = session.hashid self.cache = cache self.cache_keys = DraftCacheKeys(self.session_id) self._initial_phase = self.cache.get(self.cache_keys.phase, DraftPhase.WAITING.value) self.settings = session.settings self.participants = list(session.participants.all()) # === Phase Management === @property def phase(self) -> str: return str(self.cache.get(self.cache_keys.phase, self._initial_phase)) @phase.setter def phase(self, new_phase: DraftPhase): self.cache.set(self.cache_keys.phase, new_phase.value) # === Connected Users === @property def connected_participants(self) -> list[str]: return json.loads(self.cache.get(self.cache_keys.connected_users) or "[]") def connect_participant(self, username: str): users = set(self.connected_participants) users.add(username) self.cache.set(self.cache_keys.connected_users, json.dumps(list(users))) def disconnect_participant(self, username: str): users = set(self.connected_participants) users.discard(username) self.cache.set(self.cache_keys.connected_users, json.dumps(list(users))) # === Draft Order === @property def draft_order(self): return json.loads(self.cache.get(self.cache_keys.draft_order,"[]")) @draft_order.setter def draft_order(self, draft_order: list[str]): if not isinstance(draft_order, list): return self.cache.set(self.cache_keys.draft_order,json.dumps(draft_order)) def determine_draft_order(self) -> List[User]: self.phase = DraftPhase.DETERMINE_ORDER self.draft_index = 0 draft_order = random.sample( self.participants, len(self.participants) ) self.draft_order = [user.username for user in draft_order] return self.draft_order @property def draft_index(self): return self.cache.get(self.cache_keys.draft_index,0) @draft_index.setter def draft_index(self, draft_index: int): self.cache.set(self.cache_keys.draft_index, int(draft_index)) def draft_index_advance(self, n: int = 1): self.draft_index += n return self.draft_index def next_picks( self, *, from_overall: int | None = None, count: int | None = None, include_current: bool = False, ) -> List[dict]: """ Convenience: return the next `count` picks starting after `from_overall` (or after current draft_index if omitted). Each item: {overall, round, pick_in_round, participant} """ if not self.draft_order: return [] n = len(self.draft_order) count = count if count else len(self.draft_order) start = self.draft_index if from_overall is None else int(from_overall) start = start if include_current else start + 1 out: List[dict] = [] for overall in range(start, start + count): r, p = _round_and_pick(overall, n) order_type = "snake" order = _round_order(r, order_type, self.draft_order) out.append({ "overall": overall, "round": r, "pick_in_round": p, "participant": order[p - 1], }) return out # === Current Nomination / Bid === def start_nomination(self, movie_id: int): self.cache.set(self.cache_keys.current_movie, movie_id) self.cache.delete(self.cache_keys.bids) def place_bid(self, user: User, amount: int|str): if isinstance(amount, str): amount = int(amount) bids = self.get_bids() bids.append({"user":user.username, "amount":amount}) self.cache.set(self.cache_keys.bids, json.dumps(bids)) def get_bids(self) -> dict: return json.loads(self.cache.get(self.cache_keys.bids) or "[]") def current_movie(self) -> Movie | None: movie_id = self.cache.get(self.cache_keys.current_movie) return Movie.objects.filter(pk=movie_id).first() if movie_id else None def start_bidding(self): seconds = self.settings.bidding_duration start_time = time.time() end_time = start_time + seconds self.cache.set(self.cache_keys.bid_timer_end, end_time) self.cache.set(self.cache_keys.bid_timer_start, start_time) def get_timer_end(self) -> str | None: return self.cache.get(self.cache_keys.bid_timer_end) def get_timer_start(self) -> str | None: return self.cache.get(self.cache_keys.bid_timer_start) # === Sync Snapshot === def to_dict(self) -> dict: picks = self.next_picks(include_current=True) return { "phase": self.phase, "draft_order": self.draft_order, "draft_index": self.draft_index, "connected_participants": self.connected_participants, "current_movie": self.cache.get(self.cache_keys.current_movie), "bids": self.get_bids(), "bidding_timer_end": self.get_timer_end(), "bidding_timer_start": self.get_timer_start(), "current_pick": picks[0] if picks else None, "next_picks": picks[1:] if picks else [] } # def __dict__(self): # return self.get_summary() def keys(self): # return an iterable of keys return self.to_dict().keys() def __getitem__(self, key): return self.to_dict()[key] def __iter__(self): # used for `dict(self.draft_state)` and iteration return iter(self.to_dict()) def __len__(self): return len(self.to_dict()) OrderType = Literal["snake", "linear"] def _round_and_pick(overall: int, n: int) -> Tuple[int, int]: """overall -> (round_1_based, pick_in_round_1_based)""" r = overall // n + 1 p = overall % n + 1 return r, p def _round_order(round_num: int, order_type: OrderType, r1: Sequence[Any]) -> Sequence[Any]: if order_type == "linear" or (round_num % 2 == 1): return r1 return list(reversed(r1)) # even rounds in snake