Files
boxofficefantasy/draft/state.py
Anthony Correa 71f0f01abc Improve draft UI state handling, layout, and order logic
- Added current/next pick info, updated server draft logic for order/snake
- Refactored WebSocketContext, removed dead code, improved CSS/layout
- Cleaned up template blocks, admin, and participant panel structure
2025-08-12 21:34:02 -05:00

228 lines
7.2 KiB
Python

from django.core.cache import cache
import json
from datetime import datetime, timedelta
from boxofficefantasy.models import Movie
from django.contrib.auth.models import User
from draft.constants import DraftPhase
from draft.models import DraftSession
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Literal, Optional, Sequence, Tuple
import random
class DraftCacheKeys:
def __init__(self, id):
self.prefix = f"draft:{id}"
@property
def admins(self):
return f"{self.prefix}:admins"
@property
def participants(self):
return f"{self.prefix}:participants"
@property
def users(self):
return f"{self.prefix}:users"
@property
def connected_users(self):
return f"{self.prefix}:connected_users"
@property
def phase(self):
return f"{self.prefix}:phase"
@property
def draft_order(self):
return f"{self.prefix}:draft_order"
@property
def draft_index(self):
return f"{self.prefix}:draft_index"
@property
def current_movie(self):
return f"{self.prefix}:current_movie"
# @property
# def state(self):
# return f"{self.prefix}:state"
# @property
# def current_movie(self):
# return f"{self.prefix}:current_movie"
@property
def bids(self):
return f"{self.prefix}:bids"
# @property
# def participants(self):
# return f"{self.prefix}:participants"
@property
def bid_timer_end(self):
return f"{self.prefix}:bid_timer_end"
@property
def bid_timer_start(self):
return f"{self.prefix}:bid_timer_start"
# def user_status(self, user_id):
# return f"{self.prefix}:user:{user_id}:status"
# def user_channel(self, user_id):
# return f"{self.prefix}:user:{user_id}:channel"
class DraftStateManager:
def __init__(self, session: DraftSession):
self.session_id = session.hashid
self.cache = cache
self.keys = DraftCacheKeys(self.session_id)
self._initial_phase = self.cache.get(self.keys.phase, DraftPhase.WAITING.value)
self.settings = session.settings
# === Phase Management ===
@property
def phase(self) -> str:
return str(self.cache.get(self.keys.phase, self._initial_phase))
@phase.setter
def phase(self, new_phase: DraftPhase):
self.cache.set(self.keys.phase, new_phase.value)
# === Connected Users ===
@property
def connected_participants(self) -> list[str]:
return json.loads(self.cache.get(self.keys.connected_users) or "[]")
def connect_participant(self, username: str):
users = set(self.connected_participants)
users.add(username)
self.cache.set(self.keys.connected_users, json.dumps(list(users)))
def disconnect_participant(self, username: str):
users = set(self.connected_participants)
users.discard(username)
self.cache.set(self.keys.connected_users, json.dumps(list(users)))
# === Draft Order ===
@property
def draft_order(self):
return json.loads(self.cache.get(self.keys.draft_order,"[]"))
@draft_order.setter
def draft_order(self, draft_order: list[str]):
if not isinstance(draft_order, list):
return
self.cache.set(self.keys.draft_order,json.dumps(draft_order))
def determine_draft_order(self, users: list[User]):
draft_order = random.sample(
users, len(users)
)
self.draft_order = [user.username for user in draft_order]
return self.draft_order
@property
def draft_index(self):
return self.cache.get(self.keys.draft_index,0)
@draft_index.setter
def draft_index(self, draft_index: int):
self.cache.set(self.keys.draft_index, int(draft_index))
def draft_index_advance(self, n: int = 1):
self.draft_index += n
return self.draft_index
def next_picks(
self,
*,
from_overall: int | None = None,
count: int | None = None,
include_current: bool = False,
) -> List[dict]:
"""
Convenience: return the next `count` picks starting after `from_overall`
(or after current draft_index if omitted). Each item:
{overall, round, pick_in_round, participant}
"""
if not self.draft_order:
return []
n = len(self.draft_order)
count = count if count else len(self.draft_order)
start = self.draft_index if from_overall is None else int(from_overall)
start = start if include_current else start + 1
out: List[dict] = []
for overall in range(start, start + count):
r, p = _round_and_pick(overall, n)
order_type = "snake"
order = _round_order(r, order_type, self.draft_order)
out.append({
"overall": overall,
"round": r,
"pick_in_round": p,
"participant": order[p - 1],
})
return out
# === Current Nomination / Bid ===
def start_nomination(self, movie_id: int):
self.cache.set(self.keys.current_movie, movie_id)
self.cache.delete(self.keys.bids)
def place_bid(self, user_id: int, amount: int):
bids = self.get_bids()
bids[user_id] = amount
self.cache.set(self.keys.bids, json.dumps(bids))
def get_bids(self) -> dict:
return json.loads(self.cache.get(self.keys.bids) or "{}")
def current_movie(self) -> Movie | None:
movie_id = self.cache.get(self.keys.current_movie)
return Movie.objects.filter(pk=movie_id).first() if movie_id else None
def start_timer(self):
seconds = self.settings.bidding_duration
start_time = time.time()
end_time = start_time + seconds
self.cache.set(self.keys.bid_timer_end, end_time)
self.cache.set(self.keys.bid_timer_start, start_time)
def get_timer_end(self) -> str | None:
return self.cache.get(self.keys.bid_timer_end)
def get_timer_start(self) -> str | None:
return self.cache.get(self.keys.bid_timer_start)
# === Sync Snapshot ===
def get_summary(self) -> dict:
picks = self.next_picks(include_current=True)
return {
"phase": self.phase,
"draft_order": self.draft_order,
"draft_index": self.draft_index,
"connected_participants": self.connected_participants,
"current_movie": self.cache.get(self.keys.current_movie),
# "bids": self.get_bids(),
"bidding_timer_end": self.get_timer_end(),
"bidding_timer_start": self.get_timer_start(),
"current_pick": picks[0] if picks else None,
"next_picks": picks[1:] if picks else []
}
OrderType = Literal["snake", "linear"]
def _round_and_pick(overall: int, n: int) -> Tuple[int, int]:
"""overall -> (round_1_based, pick_in_round_1_based)"""
r = overall // n + 1
p = overall % n + 1
return r, p
def _round_order(round_num: int, order_type: OrderType, r1: Sequence[Any]) -> Sequence[Any]:
if order_type == "linear" or (round_num % 2 == 1):
return r1
return list(reversed(r1)) # even rounds in snake