Compare commits
1 Commits
master
...
db-refacto
| Author | SHA1 | Date | |
|---|---|---|---|
|
7ea5fd15df
|
31
.vscode/tasks.json
vendored
Normal file
31
.vscode/tasks.json
vendored
Normal file
@@ -0,0 +1,31 @@
|
||||
{
|
||||
"version": "2.0.0",
|
||||
"tasks": [
|
||||
{
|
||||
"label": "Truncate Database Tables",
|
||||
"type": "shell",
|
||||
"command": "${workspaceFolder}/.venv/bin/python", // Use this for macOS/Linux
|
||||
"args": [
|
||||
"${workspaceFolder}/src/scripts/truncate_tables.py"
|
||||
],
|
||||
"group": {
|
||||
"kind": "build",
|
||||
"isDefault": true
|
||||
},
|
||||
"problemMatcher": []
|
||||
},
|
||||
{
|
||||
"label": "Clear Database",
|
||||
"type": "shell",
|
||||
"command": "${workspaceFolder}/.venv/bin/python", // Use this for macOS/Linux
|
||||
"args": [
|
||||
"${workspaceFolder}/src/scripts/clear_database.py"
|
||||
],
|
||||
"group": {
|
||||
"kind": "build",
|
||||
"isDefault": true
|
||||
},
|
||||
"problemMatcher": []
|
||||
}
|
||||
]
|
||||
}
|
||||
0
alembic/__init__.py
Normal file
0
alembic/__init__.py
Normal file
0
alembic/alembic.ini
Normal file
0
alembic/alembic.ini
Normal file
0
alembic/env.py
Normal file
0
alembic/env.py
Normal file
@@ -35,10 +35,16 @@ potential_keys = ["Field", "Location", "Venue"]
|
||||
original = ["Winnemac"]
|
||||
normalized = "Winnemac Park"
|
||||
[[field.values]]
|
||||
original = ["Taft HS"]
|
||||
original = ["Maywood", "MAYWOOD"]
|
||||
normalized = "Maywood Park"
|
||||
[[field.values]]
|
||||
original = ["Taft HS", "Taft Hs"]
|
||||
normalized = "Taft High School"
|
||||
[[field.values]]
|
||||
original = ["Southwest"]
|
||||
original = ["Ridgewood Hs"]
|
||||
normalized = "Ridgewood High School"
|
||||
[[field.values]]
|
||||
original = ["Southwest", "SW Park", "SOUTHWEST"]
|
||||
normalized = "Southwest Park"
|
||||
[[field.values]]
|
||||
original = ["Comed", "COMED", "ComEd"]
|
||||
@@ -61,13 +67,19 @@ potential_keys = ["Final Score", "Score", "Result", "Outcome"]
|
||||
original = ["Hounds", "Chicago Hounds", "Winnemac Hounds", "Hound"]
|
||||
normalized = "Hounds"
|
||||
[[team.values]]
|
||||
original = ["Chicago Red Sox"]
|
||||
original = ["Ramirez Bb", "Ramirez"]
|
||||
normalized = "Ramirez Baseball"
|
||||
[[team.values]]
|
||||
original = ["Degeneratex"]
|
||||
normalized = "DegenerateX"
|
||||
[[team.values]]
|
||||
original = ["Chicago Red Sox", "Redsox"]
|
||||
normalized = "Red Sox"
|
||||
[[team.values]]
|
||||
original = ["NorthSide White Sox"]
|
||||
normalized = "North Side White Sox"
|
||||
[[team.values]]
|
||||
original = ["Chicago Rebels", "CHICAGO REBELS"]
|
||||
original = ["Chicago Rebels"]
|
||||
normalized = "Rebels"
|
||||
[[team.values]]
|
||||
original = ["Lombard Expors", "LOMBARD EXPORS"]
|
||||
|
||||
0
pyproject.toml
Normal file
0
pyproject.toml
Normal file
@@ -1,5 +1,20 @@
|
||||
typer[all]==0.9.0
|
||||
python-dateutil==2.8.2
|
||||
toml==0.10.2
|
||||
# CLI and display
|
||||
typer[all]
|
||||
rich
|
||||
|
||||
# Database
|
||||
sqlmodel
|
||||
alembic
|
||||
python-dotenv
|
||||
|
||||
# Data processing
|
||||
python-dateutil
|
||||
pandas # Optional but useful for data analysis
|
||||
openpyxl # For Excel file support
|
||||
|
||||
# File parsing
|
||||
toml
|
||||
xlsx2csv
|
||||
|
||||
# Image generation (for calendar features)
|
||||
pillow
|
||||
xlsx2csv
|
||||
22
sql_scripts/get_game_details.sql
Normal file
22
sql_scripts/get_game_details.sql
Normal file
@@ -0,0 +1,22 @@
|
||||
SELECT
|
||||
game.id,
|
||||
game.date,
|
||||
team_home.name AS home_team,
|
||||
team_visitor.name AS visitor_team,
|
||||
venue.name AS venue_name,
|
||||
gameresult.home_runs_for,
|
||||
gameresult.visitor_runs_for,
|
||||
gameresult.home_outcome,
|
||||
gameresult.visitor_outcome
|
||||
FROM
|
||||
game
|
||||
JOIN
|
||||
team AS team_home ON game.home_team_id = team_home.id
|
||||
JOIN
|
||||
team AS team_visitor ON game.visitor_team_id = team_visitor.id
|
||||
JOIN
|
||||
venue ON game.venue_id = venue.id
|
||||
LEFT JOIN
|
||||
gameresult ON game.id = gameresult.game_id
|
||||
ORDER BY
|
||||
game.date;
|
||||
26
sql_scripts/get_standings.sql
Normal file
26
sql_scripts/get_standings.sql
Normal file
@@ -0,0 +1,26 @@
|
||||
SELECT
|
||||
strftime('%Y', game.date) AS year, -- Extracts year from date
|
||||
team.id AS team_id,
|
||||
team.name AS team_name,
|
||||
SUM(CASE
|
||||
WHEN gameresult.home_outcome = 'WIN' AND game.home_team_id = team.id
|
||||
OR gameresult.visitor_outcome = 'WIN' AND game.visitor_team_id = team.id
|
||||
THEN 1 ELSE 0 END) AS wins,
|
||||
SUM(CASE
|
||||
WHEN gameresult.home_outcome = 'LOSS' AND game.home_team_id = team.id
|
||||
OR gameresult.visitor_outcome = 'LOSS' AND game.visitor_team_id = team.id
|
||||
THEN 1 ELSE 0 END) AS losses,
|
||||
SUM(CASE
|
||||
WHEN gameresult.home_outcome = 'TIE' AND game.home_team_id = team.id
|
||||
OR gameresult.visitor_outcome = 'TIE' AND game.visitor_team_id = team.id
|
||||
THEN 1 ELSE 0 END) AS ties
|
||||
FROM
|
||||
team
|
||||
LEFT JOIN
|
||||
game ON (game.home_team_id = team.id OR game.visitor_team_id = team.id)
|
||||
LEFT JOIN
|
||||
gameresult ON game.id = gameresult.game_id
|
||||
GROUP BY
|
||||
year, team.id
|
||||
ORDER BY
|
||||
year, wins DESC, losses ASC, ties DESC;
|
||||
@@ -1,15 +0,0 @@
|
||||
from .apps.convert import app as convert_app
|
||||
from .apps.clean import app as clean_app
|
||||
from .apps.read import app as read_app
|
||||
from .apps.generate import app as generate_app
|
||||
import typer
|
||||
|
||||
app = typer.Typer()
|
||||
|
||||
app.add_typer(convert_app, name="convert")
|
||||
app.add_typer(clean_app, name="clean")
|
||||
app.add_typer(read_app, name="read")
|
||||
app.add_typer(generate_app, name="generate")
|
||||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
@@ -1 +0,0 @@
|
||||
from .clean import app
|
||||
@@ -1,109 +0,0 @@
|
||||
import typer
|
||||
from rich.table import Table, Column
|
||||
from rich.console import Console
|
||||
from rich.columns import Columns
|
||||
from rich.panel import Panel
|
||||
from pathlib import Path
|
||||
import csv
|
||||
from ...utils.common import list_key_values, read_and_normalize_csv_or_xlsx
|
||||
from ...utils.normalize import normalize_header_key, replace_key_values, DEFAULT_NORMALIZATION_PATH
|
||||
from typing import Annotated, List
|
||||
import re
|
||||
|
||||
app = typer.Typer()
|
||||
|
||||
@app.command("replace")
|
||||
def replace_values_for_key(
|
||||
input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")],
|
||||
output_file: Annotated[List[typer.FileText], typer.Option(..., "--output-file", "-o", help="Specify output file.")],
|
||||
key: str = typer.Argument(..., help=""),
|
||||
match: str = typer.Argument(..., help=""),
|
||||
replace: str = typer.Argument(..., help=""),
|
||||
in_place: bool = typer.Option(False, "--in-place", "-p", help="Modify file in place."),
|
||||
match_is_regex: bool = typer.Option(False, "--regex", "-p", help="Match is a regex pattern.")
|
||||
):
|
||||
|
||||
# normalized_key = normalize_header_key(key)
|
||||
normalized_key = key
|
||||
|
||||
console = Console()
|
||||
|
||||
# Read CSV data
|
||||
for f in input_file:
|
||||
data = read_and_normalize_csv_or_xlsx(f)
|
||||
|
||||
before_table = Table(Column(), show_header=False, title="Before")
|
||||
for value in sorted(list_key_values(data, key)):
|
||||
before_table.add_row(value)
|
||||
|
||||
|
||||
after_table = Table( Column(), show_header=False, title="After")
|
||||
|
||||
if normalized_key != "team" or "team" in data[0].keys():
|
||||
data = replace_key_values(data, normalized_key, match, replace, match_is_regex)
|
||||
else:
|
||||
data=replace_key_values(data, "home", match, replace, match_is_regex)
|
||||
data=replace_key_values(data, "visitor", match, replace, match_is_regex)
|
||||
|
||||
for value in sorted(list_key_values(data, key)):
|
||||
after_table.add_row(value)
|
||||
|
||||
panel = Panel(
|
||||
Columns([before_table, after_table]),
|
||||
title="Replace"
|
||||
)
|
||||
|
||||
console.print(panel)
|
||||
|
||||
if in_place and typer.confirm("Perform Replacement in-place?"):
|
||||
fieldnames = data[0].keys()
|
||||
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
writer.writerows(data)
|
||||
|
||||
elif output_file:
|
||||
if output_file.is_dir():
|
||||
output_file = output_file.joinpath(f.name)
|
||||
if typer.confirm(f"Write to {output_file}?"):
|
||||
with output_file.open('w') as f:
|
||||
fieldnames = data[0].keys()
|
||||
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
writer.writerows(data)
|
||||
|
||||
@app.command("add-key")
|
||||
def add_values_for_key(
|
||||
file_path: Path = typer.Argument(..., help="Path to the CSV or XLSX file"),
|
||||
key: str = typer.Argument(..., help=""),
|
||||
value: str = typer.Argument("", help=""),
|
||||
in_place: bool = typer.Option(False, "--in-place", "-p", help="Modify file in place."),
|
||||
output_file: Path = typer.Option(None, "--output-file", "-o", help="Specify output file."),
|
||||
):
|
||||
|
||||
if in_place and output_file:
|
||||
typer.echo("Error: Only one of --in-place or --output-file should be provided, not both.")
|
||||
raise typer.Abort()
|
||||
|
||||
console = Console()
|
||||
|
||||
# Read CSV data
|
||||
data = read_and_normalize_csv_or_xlsx(file_path)
|
||||
|
||||
# data = add_key_values(data, key, value)
|
||||
|
||||
if in_place and typer.confirm("Perform Replacement in-place?"):
|
||||
with file_path.open('w') as f:
|
||||
fieldnames = data[0].keys()
|
||||
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
writer.writerows(data)
|
||||
|
||||
elif output_file:
|
||||
if output_file.is_dir():
|
||||
output_file = output_file.joinpath(file_path.name)
|
||||
if typer.confirm(f"Write to {output_file}?"):
|
||||
with output_file.open('w') as f:
|
||||
fieldnames = data[0].keys()
|
||||
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
writer.writerows(data)
|
||||
@@ -1 +0,0 @@
|
||||
from .convert import app
|
||||
@@ -1,44 +0,0 @@
|
||||
import typer
|
||||
from typing import Annotated
|
||||
from pathlib import Path
|
||||
from ...utils.sportspress import validate_keys
|
||||
from ...utils.normalize import normalize_header_key, load_config
|
||||
from ...utils.common import read_and_normalize_csv_or_xlsx, is_visitor_home_order_reversed, import_gamebygame
|
||||
from ...utils.sportspress import write_sportspress_csv
|
||||
import csv
|
||||
|
||||
app = typer.Typer()
|
||||
|
||||
@app.command(name="sportspress")
|
||||
def sportspress_csv(
|
||||
input_file: Annotated[Path, typer.Argument(..., help="Path to the or XLSX file")],
|
||||
file_output_path: Annotated[typer.FileTextWrite, typer.Argument(..., help="Path to the output CSV file")],
|
||||
only_with_outcome: bool = typer.Option(default=False, is_flag=True, help="")
|
||||
):
|
||||
|
||||
# Read CSV data
|
||||
data = import_gamebygame(input_file)
|
||||
|
||||
try:
|
||||
write_sportspress_csv(data, file_output_path, only_with_outcome)
|
||||
except KeyError as e:
|
||||
typer.echo(f"Error: {e}")
|
||||
|
||||
typer.echo(f"Output to {file_output_path.name}")
|
||||
|
||||
@app.command(name="teamsnap")
|
||||
def sportspress_csv(
|
||||
input_file: Annotated[Path, typer.Argument(..., help="Path to the CSV or XLSX file")],
|
||||
file_output_path: Annotated[typer.FileTextWrite, typer.Argument(..., help="Path to the output CSV file")],
|
||||
only_with_outcome: bool = typer.Option(default=False, is_flag=True, help="")
|
||||
):
|
||||
|
||||
# Read CSV data
|
||||
data = import_gamebygame(input_file)
|
||||
|
||||
try:
|
||||
write_sportspress_csv(data, file_output_path, only_with_outcome)
|
||||
except KeyError as e:
|
||||
typer.echo(f"Error: {e}")
|
||||
|
||||
typer.echo(f"Output to {file_output_path.name}")
|
||||
@@ -1 +0,0 @@
|
||||
from .calendar import app
|
||||
@@ -1,55 +0,0 @@
|
||||
import typer
|
||||
from rich.console import Console
|
||||
from typing import Annotated, List, Optional
|
||||
from pathlib import Path
|
||||
from ...utils.sportspress import validate_keys
|
||||
from ...utils.normalize import normalize_header_key, load_config
|
||||
from ...utils.common import read_and_normalize_csv_or_xlsx, is_visitor_home_order_reversed, import_gamebygame, parse_datetime, personalize_data_for_team
|
||||
from ...utils.sportspress import write_sportspress_csv
|
||||
from .calendar_utils import generate_calendar
|
||||
from collections import defaultdict
|
||||
import toml
|
||||
|
||||
app = typer.Typer()
|
||||
|
||||
@app.command(name="calendar")
|
||||
def generate_calendar_app(
|
||||
input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV file")],
|
||||
config_file: Annotated[Optional[typer.FileText], typer.Option(..., "--config", "-c", help="Path to a config file")]=None
|
||||
):
|
||||
# Read CSV data
|
||||
data = read_and_normalize_csv_or_xlsx(input_file)
|
||||
data = personalize_data_for_team(data, "Hounds")
|
||||
# data = parse_datetime(data)
|
||||
|
||||
generate_calendar(data, config_file)
|
||||
pass
|
||||
|
||||
@app.command(name="calendar-config")
|
||||
def generate_calendar_configs(
|
||||
input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV file")],
|
||||
output_file: Annotated[Path, typer.Argument(..., help="Path(s) to the output config file")]
|
||||
):
|
||||
data = read_and_normalize_csv_or_xlsx(input_file)
|
||||
teams = {row.get('visitor') for row in data}
|
||||
teams.update({row.get('home') for row in data})
|
||||
fields = {row.get('field') for row in data}
|
||||
config = defaultdict(dict)
|
||||
config['fields']['default'] = {
|
||||
'bg_color': (0, 0, 0, 256)
|
||||
}
|
||||
config['teams']['default'] = {
|
||||
'logo': ''
|
||||
}
|
||||
for field in fields:
|
||||
config['fields'][field] = config['fields']['default']
|
||||
for team in teams:
|
||||
config['teams'][team] = config['teams']['default']
|
||||
|
||||
if output_file.is_dir:
|
||||
output_file = output_file.joinpath('calendar_config.toml')
|
||||
|
||||
with output_file.open('w') as f:
|
||||
toml.dump(config, f)
|
||||
|
||||
pass
|
||||
@@ -1,204 +0,0 @@
|
||||
|
||||
from calendar import Calendar
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
from typing import Tuple
|
||||
from pathlib import Path
|
||||
import toml
|
||||
|
||||
calendar_cell_size = (400, 500)
|
||||
calendar_cell_width, calendar_cell_height = calendar_cell_size
|
||||
|
||||
def textsize(text, font):
|
||||
im = Image.new(mode="P", size=(0, 0))
|
||||
draw = ImageDraw.Draw(im)
|
||||
_, _, width, height = draw.textbbox((0, 0), text=text, font=font)
|
||||
return width, height
|
||||
|
||||
def corner_image():
|
||||
return Image.new()
|
||||
|
||||
def text_rectangle(text:str, font: str, font_size: int, foreground_color: Tuple[int, int, int, int]=(0,0,0,255), background_color: Tuple[int, int, int, int]=(0,0,0,0), height: int=400, width: int=500) -> Image:
|
||||
font_obj = ImageFont.truetype(font,font_size)
|
||||
img = Image.new('RGBA', (int(width),int(height)), background_color)
|
||||
draw = ImageDraw.Draw(img)
|
||||
text = str(text)
|
||||
text_width, text_height = textsize(text, font=font_obj)
|
||||
x = (width - text_width) / 2
|
||||
y = (height - text_height) / 2
|
||||
text_position = (x,y)
|
||||
draw.text(text_position, text, font=font_obj, fill=foreground_color)
|
||||
return img
|
||||
|
||||
def calendar_cell(
|
||||
height: int=calendar_cell_height, width: int=calendar_cell_width,
|
||||
background_color: Tuple[int, int, int, int]=(0,0,0,0),
|
||||
left_top_corner = None,
|
||||
right_top_corner = None,
|
||||
top_center = None,
|
||||
right_bottom_corner = None,
|
||||
bottom_center = None,
|
||||
left_bottom_corner = None,
|
||||
center = None
|
||||
):
|
||||
# Create a blank rectangle image
|
||||
cell_img = Image.new('RGBA', (width, height), background_color)
|
||||
|
||||
# Left top corner
|
||||
if left_top_corner:
|
||||
paste_position = (0, 0)
|
||||
cell_img.paste(left_top_corner, paste_position, left_top_corner)
|
||||
|
||||
# Right top corner
|
||||
if right_top_corner:
|
||||
paste_position = (width - right_top_corner.width, 0)
|
||||
cell_img.paste(right_top_corner, paste_position, right_top_corner)
|
||||
|
||||
if top_center:
|
||||
raise NotImplementedError
|
||||
|
||||
if right_bottom_corner:
|
||||
paste_position = (width - right_bottom_corner.width, height - right_bottom_corner.height)
|
||||
cell_img.paste(right_bottom_corner, paste_position, right_bottom_corner)
|
||||
|
||||
if bottom_center:
|
||||
paste_position = ((width - bottom_center.width)//2, (height - bottom_center.height))
|
||||
cell_img.paste(bottom_center, paste_position, bottom_center)
|
||||
|
||||
if left_bottom_corner:
|
||||
raise NotImplementedError
|
||||
|
||||
if center:
|
||||
paste_position = ((width - center.width)//2, (height - center.height)//2)
|
||||
cell_img.paste(center, paste_position, center)
|
||||
|
||||
return cell_img
|
||||
|
||||
def generate_calendar(data, config_file=None):
|
||||
result_calendar = Calendar()
|
||||
result_calendar.setfirstweekday(6)
|
||||
baseball_bat = Image.open(f"data/logos/baseball_bat_2.png")
|
||||
if config_file:
|
||||
config = toml.load(config_file)
|
||||
|
||||
baseball_bat = baseball_bat.resize((90, 90))
|
||||
for year, month in {(row['datetime'].year, row['datetime'].month) for row in data}:
|
||||
month_days=list(result_calendar.monthdayscalendar(year, month))
|
||||
month_image = Image.new('RGBA', (calendar_cell_width*7, calendar_cell_height*len(month_days)), (0, 0, 0, 0))
|
||||
first_thursday=(month, [w[4] for w in month_days if w[4] != 0][0])
|
||||
|
||||
colors = {
|
||||
'default': (128, 128, 128, 256),
|
||||
}
|
||||
team_logos={}
|
||||
|
||||
if config:
|
||||
for field, field_options in config['fields'].items():
|
||||
colors[field] = tuple(field_options.get('bg_color', colors.get('default')))
|
||||
for team, team_options in config['teams'].items():
|
||||
team_logos[team] = team_options.get('logo')
|
||||
|
||||
for week_num, week in enumerate(month_days):
|
||||
for day_num, date in enumerate(week):
|
||||
date_text_image = text_rectangle(date,
|
||||
"data/fonts/refrigerator-deluxe-bold.otf",
|
||||
100,
|
||||
foreground_color='white',
|
||||
height=calendar_cell_height*.25,
|
||||
width=calendar_cell_width*.25)
|
||||
if filtered_data := [row for row in data if row['datetime'].month == month and row['datetime'].day == date]:
|
||||
# Gen square that has one game
|
||||
if len (filtered_data) == 1:
|
||||
game = filtered_data[0]
|
||||
opponent_logo_path = team_logos.get(game['opponent'])
|
||||
if opponent_logo_path and (opponent_logo_path := Path(opponent_logo_path)) and opponent_logo_path.exists():
|
||||
opponent_logo = Image.open(opponent_logo_path)
|
||||
else:
|
||||
opponent_logo = text_rectangle(text=game['opponent'][0].upper(),width=500, height=500, font_size=400, font="data/fonts/college.ttf")
|
||||
is_home_game = game['homevisitor'] == "home"
|
||||
if game.get('wood','').lower() == 'yes':
|
||||
right_bottom_corner = baseball_bat
|
||||
else:
|
||||
right_bottom_corner = None
|
||||
img = calendar_cell(
|
||||
height=calendar_cell_height,
|
||||
width=calendar_cell_width,
|
||||
background_color=colors.get(game['field'], colors['default']),
|
||||
left_top_corner = text_rectangle('H' if is_home_game else "A",
|
||||
"data/fonts/refrigerator-deluxe-bold.otf",
|
||||
80,
|
||||
foreground_color='black' if is_home_game else 'white',
|
||||
background_color='white' if is_home_game else 'black',
|
||||
height=calendar_cell_height*.2,
|
||||
width=calendar_cell_width*.2),
|
||||
right_top_corner = date_text_image,
|
||||
center = opponent_logo.resize((int(opponent_logo.width*.5), int(opponent_logo.height*.5))),
|
||||
bottom_center = text_rectangle(f"{game['time']:%-I:%M}" if game.get('time') else "",
|
||||
"data/fonts/refrigerator-deluxe-bold.otf",
|
||||
120,
|
||||
foreground_color='white',
|
||||
height=calendar_cell_height*.25,
|
||||
width=calendar_cell_width),
|
||||
right_bottom_corner=right_bottom_corner
|
||||
)
|
||||
# img.show()
|
||||
elif len(filtered_data) == 2:
|
||||
game1, game2 = filtered_data[:2]
|
||||
opponent_logo_path = team_logos.get(game1['opponent'])
|
||||
if opponent_logo_path and (opponent_logo_path := Path(opponent_logo_path)) and opponent_logo_path.exists():
|
||||
opponent_logo = Image.open(opponent_logo_path)
|
||||
else:
|
||||
opponent_logo = text_rectangle(text=game1['opponent'][0].upper(),width=500, height=500, font_size=400, font="data/fonts/college.ttf")
|
||||
|
||||
img = calendar_cell(
|
||||
height=calendar_cell_height,
|
||||
width=calendar_cell_width,
|
||||
background_color=colors.get(game1['field'], colors['default']),
|
||||
left_top_corner = text_rectangle('DH',
|
||||
"data/fonts/refrigerator-deluxe-bold.otf",
|
||||
80,
|
||||
foreground_color='black',
|
||||
background_color='white',
|
||||
height=calendar_cell_height*.2,
|
||||
width=calendar_cell_width*.2),
|
||||
right_top_corner = date_text_image,
|
||||
center = opponent_logo.resize((int(opponent_logo.width*.5), int(opponent_logo.height*.5))),
|
||||
bottom_center = text_rectangle(f"{game1['time']:%-I:%M} & {game2['time']:%-I:%M}",
|
||||
"data/fonts/refrigerator-deluxe-bold.otf",
|
||||
80,
|
||||
foreground_color='white',
|
||||
height=calendar_cell_height*.2,
|
||||
width=calendar_cell_width),
|
||||
)
|
||||
pass
|
||||
else:
|
||||
img=calendar_cell(
|
||||
height=calendar_cell_height,
|
||||
width=calendar_cell_width,
|
||||
background_color=(204,204,204,int(256*.85)),
|
||||
right_top_corner = text_rectangle(date,
|
||||
"data/fonts/refrigerator-deluxe-bold.otf",
|
||||
100,
|
||||
foreground_color='black',
|
||||
height=calendar_cell_height*.25,
|
||||
width=calendar_cell_width*.25)
|
||||
)
|
||||
pass
|
||||
|
||||
if date: month_image.paste(img, (img.size[0]*day_num, img.size[1]*week_num), img)
|
||||
month_image.save(f'data/output/{year}-{month}.png')
|
||||
|
||||
# if (month, date) in games_lookup.keys() and not (month, date) == first_thursday:
|
||||
# background_image = game_square([g for g in games if g['dtstart'].month==month and g['dtstart'].day==date])
|
||||
# elif (month, date) in games_lookup.keys() and (month, date) == first_thursday:
|
||||
# background_image = game_square(
|
||||
# [g for g in games if g['dtstart'].month == month and g['dtstart'].day == date], special='open-mic')
|
||||
# elif (month, date) == first_thursday:
|
||||
# background_image = openmic_square(date)
|
||||
# else:
|
||||
# background_image = blank_square(date)
|
||||
|
||||
# if date: month_image.paste(background_image, (background_image.size[0]*day_num, background_image.size[1]*week_num), background_image)
|
||||
|
||||
# month_image.thumbnail((1000,1000))
|
||||
# month_image.save(f'output/{year}-{month}.png')
|
||||
# month_image.show()
|
||||
@@ -1 +0,0 @@
|
||||
from .read import app
|
||||
@@ -1,158 +0,0 @@
|
||||
import typer
|
||||
from rich.table import Table, Column
|
||||
from rich.console import Console
|
||||
from rich.columns import Columns
|
||||
from pathlib import Path
|
||||
import csv
|
||||
from ...utils.common import list_key_values, read_and_normalize_csv_or_xlsx, import_gamebygame, aggregate_teams, aggregate_teams_by_season
|
||||
from ...utils.normalize import normalize_header_key
|
||||
from typing import Annotated, List
|
||||
|
||||
app = typer.Typer()
|
||||
|
||||
@app.command("list-values")
|
||||
def print_values_for_key(
|
||||
input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")],
|
||||
key: str = typer.Argument(..., help="The key to retrieve to generate list.")
|
||||
):
|
||||
# Read CSV data
|
||||
data = []
|
||||
for f in input_file:
|
||||
data.extend(read_and_normalize_csv_or_xlsx(f))
|
||||
values = list_key_values(data, key)
|
||||
|
||||
console = Console()
|
||||
table = Table(show_header=False, title=f'Values for "{key.title()}" ({len(values)})')
|
||||
table.add_column("Values")
|
||||
|
||||
# breakpoint()
|
||||
for value in sorted(values):
|
||||
table.add_row(value)
|
||||
|
||||
console.print(table)
|
||||
|
||||
@app.command("print")
|
||||
def print_table(
|
||||
input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")]
|
||||
):
|
||||
# Read CSV data
|
||||
data = []
|
||||
for f in input_file:
|
||||
data.extend(read_and_normalize_csv_or_xlsx(f))
|
||||
|
||||
console = Console()
|
||||
table = Table()
|
||||
|
||||
keys = data[0].keys()
|
||||
|
||||
for key in keys:
|
||||
table.add_column(key)
|
||||
|
||||
# breakpoint()
|
||||
for row in data:
|
||||
table.add_row(*[str(row[key]) for key in keys])
|
||||
|
||||
console.print(table)
|
||||
|
||||
@app.command()
|
||||
def check(
|
||||
input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")]
|
||||
):
|
||||
# Read CSV data
|
||||
data = []
|
||||
for f in input_file:
|
||||
data.extend(read_and_normalize_csv_or_xlsx(f))
|
||||
teams = set([row['visitor'] for row in data] + [row['home'] for row in data])
|
||||
fields = set([row['field'] for row in data])
|
||||
console = Console()
|
||||
|
||||
table = Table("Team", "Number of Games")
|
||||
for team in teams:
|
||||
rows = [row for row in data if row['visitor']==team or row['home']==team]
|
||||
table.add_row(team, str(len(rows)))
|
||||
console.print(table)
|
||||
|
||||
table = Table("Field", "Number of Games")
|
||||
for field in fields:
|
||||
rows = [row for row in data if row['field']==field]
|
||||
table.add_row(field, str(len(rows)))
|
||||
console.print(table)
|
||||
|
||||
table = Table("Field", "Datetime", "Games")
|
||||
field_times = [(row['field'], row['datetime']) for row in data]
|
||||
for field, datetime in field_times:
|
||||
rows = [row for row in data if row['field'] == field and row['datetime'] == datetime]
|
||||
if len(rows) != 1:
|
||||
table.add_row(str(field), str(datetime), str(len(rows)))
|
||||
console.print(table)
|
||||
|
||||
matchups = set([tuple([*sorted((row['home'], row['visitor']))]) for row in data])
|
||||
table =Table("Team 1", "Team 2", "Games")
|
||||
for team1, team2 in matchups:
|
||||
rows = [row for row in data if (row['visitor']==team1 or row['home']==team1) and (row['visitor']==team2 or row['home']==team2)]
|
||||
table.add_row(str(team1), str(team2), str(len(rows)))
|
||||
console.print(table)
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@app.command()
|
||||
def standings(
|
||||
input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")],
|
||||
):
|
||||
|
||||
# Read CSV data
|
||||
data=[]
|
||||
for f in input_file:
|
||||
data.extend(import_gamebygame(f))
|
||||
|
||||
aggregate_team_data = aggregate_teams(data)
|
||||
|
||||
# Display aggregated data as a table
|
||||
console = Console()
|
||||
table = Table(title="Aggregated Team Data")
|
||||
table.add_column("Team", style="bold")
|
||||
table.add_column("GP", style="bold")
|
||||
table.add_column("Wins", style="bold")
|
||||
table.add_column("Losses", style="bold")
|
||||
table.add_column("Ties", style="bold")
|
||||
table.add_column("Runs For", style="bold")
|
||||
table.add_column("Runs Against", style="bold")
|
||||
|
||||
for team_stats in aggregate_team_data:
|
||||
table.add_row(
|
||||
team_stats["team"],
|
||||
str(team_stats["gp"]),
|
||||
str(team_stats["win"]),
|
||||
str(team_stats["loss"]),
|
||||
str(team_stats["tie"]),
|
||||
str(team_stats["runs_for"]),
|
||||
str(team_stats["runs_against"]),
|
||||
)
|
||||
|
||||
console.print(table)
|
||||
|
||||
@app.command()
|
||||
def seasons(
|
||||
input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")],
|
||||
):
|
||||
|
||||
# Read CSV data
|
||||
data=[]
|
||||
for f in input_file:
|
||||
data.extend(read_and_normalize_csv_or_xlsx(f))
|
||||
|
||||
aggregate_team_data = aggregate_teams_by_season(data)
|
||||
|
||||
# Display aggregated data as a table
|
||||
console = Console()
|
||||
table = Table(title="Aggregated Team Data")
|
||||
table.add_column("Team", style="bold")
|
||||
|
||||
for team_stats in aggregate_team_data:
|
||||
table.add_row(
|
||||
team_stats["team"],
|
||||
str(", ".join(sorted(team_stats["seasons"])))
|
||||
)
|
||||
|
||||
console.print(table)
|
||||
207
src/cli.py
Normal file
207
src/cli.py
Normal file
@@ -0,0 +1,207 @@
|
||||
import typer
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
from .utils.common import import_gamebygame
|
||||
from .utils.db import get_session, init_db
|
||||
from .models import Game, Team, Venue, GameResult
|
||||
# from alembic import command
|
||||
# from alembic.config import Config
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
|
||||
app = typer.Typer()
|
||||
console = Console()
|
||||
|
||||
@app.command()
|
||||
def init():
|
||||
"""Initialize the database"""
|
||||
init_db()
|
||||
console.print("[green]Database initialized successfully[/green]")
|
||||
|
||||
@app.command()
|
||||
def ingest(
|
||||
files: List[Path] = typer.Argument(..., help="CSV/XLSX files to ingest"),
|
||||
season: Optional[int] = typer.Option(None, help="Season year"),
|
||||
source: str = typer.Option("unknown", help="Data source (sportsengine, teamsnap, etc)"),
|
||||
dry_run: bool = typer.Option(False, "--dry-run", help="Preview without saving")
|
||||
):
|
||||
"""Ingest CSV/XLSX files into the database with automatic normalization"""
|
||||
|
||||
with get_session() as session:
|
||||
for file_path in files:
|
||||
console.print(f"\n[bold]Processing {file_path}[/bold]")
|
||||
|
||||
# Use your existing parsing logic
|
||||
data = import_gamebygame(file_path)
|
||||
|
||||
games_added = 0
|
||||
games_skipped = 0
|
||||
|
||||
for row in data:
|
||||
if any(row.get(key) is None for key in ['home', 'visitor','field']):
|
||||
continue #skip
|
||||
# Get or create teams
|
||||
home_team = get_or_create_team(session, row['home'])
|
||||
visitor_team = get_or_create_team(session, row['visitor'])
|
||||
|
||||
# Get or create venue
|
||||
venue = None
|
||||
if row.get('field'):
|
||||
venue = get_or_create_venue(session, row['field'])
|
||||
|
||||
# Create dedupe key
|
||||
dedupe_key = f"{row['datetime']}_{row.get('field', 'unknown')}_{home_team.id}_{visitor_team.id}"
|
||||
|
||||
# Check if game exists
|
||||
existing = session.query(Game).filter_by(dedupe_key=dedupe_key).first()
|
||||
if existing:
|
||||
games_skipped += 1
|
||||
continue
|
||||
|
||||
# Create game
|
||||
game = Game(
|
||||
date=row['datetime'],
|
||||
home_team_id=home_team.id,
|
||||
visitor_team_id=visitor_team.id,
|
||||
venue_id=venue.id if venue else None,
|
||||
field=row.get('field'),
|
||||
season=season or row.get('season'),
|
||||
source=source,
|
||||
dedupe_key=dedupe_key
|
||||
)
|
||||
|
||||
# Add result if available
|
||||
if row.get('has_result'):
|
||||
result = GameResult(
|
||||
game_id=game.id,
|
||||
home_runs_for=row.get('home_runs_for'),
|
||||
visitor_runs_for=row.get('visitor_runs_for'),
|
||||
home_outcome=row.get('home_outcome'),
|
||||
visitor_outcome=row.get('visitor_outcome')
|
||||
)
|
||||
game.result = result
|
||||
|
||||
if not dry_run:
|
||||
session.add(game)
|
||||
games_added += 1
|
||||
|
||||
if not dry_run:
|
||||
session.commit()
|
||||
|
||||
# Summary
|
||||
console.print(f"[green]Added: {games_added} games[/green]")
|
||||
console.print(f"[yellow]Skipped (duplicates): {games_skipped} games[/yellow]")
|
||||
|
||||
@app.command()
|
||||
def query(
|
||||
team: Optional[str] = typer.Option(None, help="Filter by team name"),
|
||||
season: Optional[int] = typer.Option(None, help="Filter by season"),
|
||||
venue: Optional[str] = typer.Option(None, help="Filter by venue"),
|
||||
format: str = typer.Option("table", help="Output format: table, csv, json")
|
||||
):
|
||||
"""Query games from the database"""
|
||||
with get_session() as session:
|
||||
query = session.query(Game)
|
||||
|
||||
if team:
|
||||
query = query.join(Team, or_(
|
||||
Game.home_team_id == Team.id,
|
||||
Game.visitor_team_id == Team.id
|
||||
)).filter(Team.name == team)
|
||||
|
||||
if season:
|
||||
query = query.filter(Game.season == season)
|
||||
|
||||
if venue:
|
||||
query = query.join(Venue).filter(Venue.name == venue)
|
||||
|
||||
games = query.all()
|
||||
|
||||
if format == "table":
|
||||
display_games_table(games)
|
||||
elif format == "csv":
|
||||
export_games_csv(games)
|
||||
elif format == "json":
|
||||
export_games_json(games)
|
||||
|
||||
@app.command()
|
||||
def standings(
|
||||
season: Optional[int] = typer.Option(None, help="Filter by season")
|
||||
):
|
||||
"""Show standings from the database"""
|
||||
# Reuse your aggregate_teams logic but with database queries
|
||||
with get_session() as session:
|
||||
# Query and aggregate
|
||||
standings_data = calculate_standings_from_db(session, season)
|
||||
display_standings_table(standings_data)
|
||||
|
||||
# Database management commands
|
||||
from sqlalchemy import or_
|
||||
@app.command()
|
||||
def upgrade():
|
||||
"""Apply database migrations"""
|
||||
# Example usage in a command function
|
||||
# Ensure you import or_ from sqlalchemy
|
||||
# from alembic import command
|
||||
# from alembic.config import Config
|
||||
# Uncomment and implement Alembic usage as desired
|
||||
with get_session() as session:
|
||||
# Your session-based operations here
|
||||
pass
|
||||
# alembic_cfg = Config("alembic.ini")
|
||||
# command.upgrade(alembic_cfg, "head")
|
||||
console.print("[green]Database upgraded[/green]")
|
||||
|
||||
@app.command()
|
||||
def downgrade(revision: str = typer.Argument("", help="Revision to downgrade to")):
|
||||
"""Downgrade database migrations"""
|
||||
with get_session() as session:
|
||||
# Your session-based operations here
|
||||
pass
|
||||
# alembic_cfg = Config("alembic.ini")
|
||||
# command.downgrade(alembic_cfg, revision or "-1")
|
||||
console.print("[yellow]Database downgraded[/yellow]")
|
||||
|
||||
@app.command()
|
||||
def clean_duplicates():
|
||||
"""Remove duplicate games from database"""
|
||||
# Implement deduplication logic
|
||||
pass
|
||||
|
||||
@app.command()
|
||||
def export(
|
||||
output: Path,
|
||||
format: str = typer.Option("sportspress", help="Export format: sportspress, csv, json"),
|
||||
season: Optional[int] = typer.Option(None, help="Filter by season")
|
||||
):
|
||||
"""Export data from database"""
|
||||
with get_session() as session:
|
||||
games = session.query(Game)
|
||||
if season:
|
||||
games = games.filter(Game.season == season)
|
||||
|
||||
if format == "sportspress":
|
||||
# Use your existing write_sportspress_csv
|
||||
from .utils.sportspress import write_sportspress_csv
|
||||
data = games_to_dict_format(games.all())
|
||||
write_sportspress_csv(data, output)
|
||||
|
||||
# Helper functions
|
||||
def get_or_create_team(session, team_name: str) -> Team:
|
||||
team = session.query(Team).filter_by(name=team_name).first()
|
||||
if not team:
|
||||
team = Team(name=team_name)
|
||||
session.add(team)
|
||||
session.flush()
|
||||
return team
|
||||
|
||||
def get_or_create_venue(session, venue_name: str) -> Venue:
|
||||
venue = session.query(Venue).filter_by(name=venue_name).first()
|
||||
if not venue:
|
||||
venue = Venue(name=venue_name)
|
||||
session.add(venue)
|
||||
session.flush()
|
||||
return venue
|
||||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
@@ -1,81 +0,0 @@
|
||||
import csv
|
||||
import re
|
||||
from typing import List, Dict
|
||||
from dateutil import parser
|
||||
from pathlib import Path
|
||||
from rich.console import Console
|
||||
from rich.panel import Panel
|
||||
from rich.table import Table, Column
|
||||
from rich.columns import Columns
|
||||
import typer
|
||||
# from .utils.common import normalize_header_key, read_csv, is_visitor_home_order_reversed, process_data, aggregate_teams, write_sportspress_csv
|
||||
|
||||
# validate_csv_header
|
||||
|
||||
app = typer.Typer()
|
||||
|
||||
|
||||
@app.command()
|
||||
def standings(file_path: Path = typer.Argument(..., help="Path to the CSV file")):
|
||||
# Validate CSV header
|
||||
header = next(csv.reader(open(file_path, "r")))
|
||||
normalized_header = [normalize_header_key(key) for key in header]
|
||||
if not validate_csv_header(header):
|
||||
typer.echo("Error: Invalid CSV header. Make sure the CSV file contains the correct headers.")
|
||||
return
|
||||
|
||||
# Read CSV data
|
||||
data = read_csv(file_path)
|
||||
visitor_home_order_reversed = is_visitor_home_order_reversed(normalized_header)
|
||||
processed_data = process_data(data, visitor_home_order_reversed)
|
||||
aggregate_team_data = aggregate_teams(processed_data)
|
||||
|
||||
# Display aggregated data as a table
|
||||
console = Console()
|
||||
table = Table(title="Aggregated Team Data")
|
||||
table.add_column("Team", style="bold")
|
||||
table.add_column("Wins", style="bold")
|
||||
table.add_column("Losses", style="bold")
|
||||
table.add_column("Ties", style="bold")
|
||||
table.add_column("Runs For", style="bold")
|
||||
table.add_column("Runs Against", style="bold")
|
||||
|
||||
for team_stats in aggregate_team_data:
|
||||
table.add_row(
|
||||
team_stats["team"],
|
||||
str(team_stats["win"]),
|
||||
str(team_stats["loss"]),
|
||||
str(team_stats["tie"]),
|
||||
str(team_stats["runs_for"]),
|
||||
str(team_stats["runs_against"]),
|
||||
)
|
||||
|
||||
console.print(table)
|
||||
|
||||
# Write processed CSV data back to a new file
|
||||
# output_file_path = file_path.with_suffix(".processed.csv")
|
||||
# write_csv(output_file_path, data)
|
||||
# typer.echo(f"Processed data written to: {output_file_path}")
|
||||
|
||||
# @app.command()
|
||||
def replace_key_values(data: List[Dict], key, match:str, replace:str, is_regex:bool =False):
|
||||
if not is_regex:
|
||||
regex = re.compile(fr"^{match}$")
|
||||
else:
|
||||
regex = re.compile(fr"{match}")
|
||||
|
||||
for row in data:
|
||||
row[key] = regex.sub(replace, row[key])
|
||||
|
||||
return data
|
||||
|
||||
def add_key_values(data: List[Dict], key, value:str):
|
||||
for row in data:
|
||||
row[key] = value
|
||||
|
||||
return data
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
1
src/models/__init__.py
Normal file
1
src/models/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .models import *
|
||||
94
src/models/models.py
Normal file
94
src/models/models.py
Normal file
@@ -0,0 +1,94 @@
|
||||
from sqlmodel import SQLModel, Field, Relationship
|
||||
from uuid import UUID, uuid4
|
||||
from datetime import datetime
|
||||
from typing import Optional, List
|
||||
from enum import Enum
|
||||
|
||||
# Team model
|
||||
class Team(SQLModel, table=True):
|
||||
id: UUID = Field(default_factory=uuid4, primary_key=True)
|
||||
name: str = Field(index=True, unique=True)
|
||||
|
||||
# Specify foreign keys explicitly for relationships
|
||||
# home_games: List["Game"] = Relationship(
|
||||
# back_populates="home_team",
|
||||
# sa_relationship_kwargs={"foreign_keys": "Game.home_team_id"}
|
||||
# )
|
||||
# visitor_games: List["Game"] = Relationship(
|
||||
# back_populates="visitor_team",
|
||||
# sa_relationship_kwargs={"foreign_keys": "Game.visitor_team_id"}
|
||||
# )
|
||||
|
||||
# Venue model
|
||||
class Venue(SQLModel, table=True):
|
||||
id: UUID = Field(default_factory=uuid4, primary_key=True)
|
||||
name: str = Field(index=True, unique=True)
|
||||
|
||||
# Relationships
|
||||
games: List["Game"] = Relationship(back_populates="venue")
|
||||
|
||||
# Game model
|
||||
class Game(SQLModel, table=True):
|
||||
id: UUID = Field(default_factory=uuid4, primary_key=True)
|
||||
date: datetime = Field(index=True)
|
||||
|
||||
# Foreign keys
|
||||
home_team_id: UUID = Field(foreign_key="team.id")
|
||||
visitor_team_id: UUID = Field(foreign_key="team.id")
|
||||
venue_id: Optional[UUID] = Field(default=None, foreign_key="venue.id")
|
||||
|
||||
# Explicitly specify relationships with foreign keys
|
||||
home_team: "Team" = Relationship(
|
||||
# back_populates="home_games",
|
||||
sa_relationship_kwargs={"foreign_keys": "Game.home_team_id"}
|
||||
)
|
||||
visitor_team: "Team" = Relationship(
|
||||
# back_populates="visitor_games",
|
||||
sa_relationship_kwargs={"foreign_keys": "Game.visitor_team_id"}
|
||||
)
|
||||
venue: Optional["Venue"] = Relationship(back_populates="games")
|
||||
result: Optional["GameResult"] = Relationship(back_populates="game")
|
||||
|
||||
# Metadata
|
||||
season: Optional[int] = Field(index=True)
|
||||
source: str = Field(default="unknown")
|
||||
dedupe_key: str = Field(unique=True, index=True)
|
||||
|
||||
class GameOutcome(str, Enum):
|
||||
WIN = "win"
|
||||
LOSS = "loss"
|
||||
TIE = "tie"
|
||||
FORFEIT = "forfeit"
|
||||
TECHNICAL_WIN = "technical_win"
|
||||
TECHNICAL_LOSS = "technical_loss"
|
||||
FORFEIT_WIN = "forfeit_win"
|
||||
FORFEIT_LOSS = "forfeit_loss"
|
||||
|
||||
# GameResult model
|
||||
class GameResult(SQLModel, table=True):
|
||||
id: UUID = Field(default_factory=uuid4, primary_key=True)
|
||||
|
||||
# Foreign key to Game
|
||||
game_id: UUID = Field(foreign_key="game.id")
|
||||
game: "Game" = Relationship(back_populates="result")
|
||||
|
||||
# Game results
|
||||
home_runs_for: Optional["int"]
|
||||
visitor_runs_for: Optional["int"]
|
||||
|
||||
home_outcome: Optional[GameOutcome] # win/loss/tie/forfeit
|
||||
visitor_outcome: Optional[GameOutcome]
|
||||
|
||||
# External ID model if needed
|
||||
class ExternalId(SQLModel, table=True):
|
||||
id: UUID = Field(default_factory=uuid4, primary_key=True)
|
||||
external_type: str # e.g., 'SportsEngine', 'TeamSnap'
|
||||
external_id: str
|
||||
team_id: Optional[UUID] = Field(default=None, foreign_key="team.id")
|
||||
game_id: Optional[UUID] = Field(default=None, foreign_key="game.id")
|
||||
venue_id: Optional[UUID] = Field(default=None, foreign_key="venue.id")
|
||||
|
||||
# Relationships
|
||||
team: Optional["Team"] = Relationship()
|
||||
game: Optional["Game"] = Relationship()
|
||||
venue: Optional["Venue"] = Relationship()
|
||||
14
src/scripts/clear_database.py
Normal file
14
src/scripts/clear_database.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from sqlmodel import create_engine, SQLModel,
|
||||
|
||||
# Replace with your actual database URL
|
||||
DATABASE_URL = "sqlite:///sportsdb.db"
|
||||
|
||||
engine = create_engine(DATABASE_URL)
|
||||
|
||||
def clear_database():
|
||||
# Drop all tables
|
||||
SQLModel.metadata.drop_all(engine)
|
||||
print("All tables dropped successfully.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
clear_database()
|
||||
18
src/scripts/truncate_tables.py
Normal file
18
src/scripts/truncate_tables.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from sqlmodel import Session, create_engine, SQLModel, text
|
||||
|
||||
DATABASE_URL = "sqlite:///sportsdb.db" # Adjust your database URL accordingly
|
||||
|
||||
engine = create_engine(DATABASE_URL)
|
||||
|
||||
def truncate_tables():
|
||||
with Session(engine) as session:
|
||||
session.exec(text("DELETE FROM gameresult;"))
|
||||
session.exec(text("DELETE FROM game;"))
|
||||
session.exec(text("DELETE FROM venue;"))
|
||||
session.exec(text("DELETE FROM team;"))
|
||||
session.commit()
|
||||
print("All tables truncated successfully.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
SQLModel.metadata.create_all(engine) # Ensure the tables exist
|
||||
truncate_tables()
|
||||
@@ -169,7 +169,6 @@ def parse_datetime(data: List[Dict]):
|
||||
def import_gamebygame(data: Union[List[Dict], TextIO, Path]) -> List[Dict]:
|
||||
if isinstance(data, TextIOBase) or isinstance(data, Path) :
|
||||
data = read_and_normalize_csv_or_xlsx(data)
|
||||
|
||||
header = data[0].keys()
|
||||
visitor_home_order_reversed = is_visitor_home_order_reversed(list(header))
|
||||
for row in data:
|
||||
@@ -178,7 +177,8 @@ def import_gamebygame(data: Union[List[Dict], TextIO, Path]) -> List[Dict]:
|
||||
try:
|
||||
row['datetime'] = parser.parse(f"{row['date']} {row['time']}")
|
||||
except parser.ParserError as e:
|
||||
raise e
|
||||
pass
|
||||
# raise e
|
||||
|
||||
return data
|
||||
|
||||
|
||||
34
src/utils/db.py
Normal file
34
src/utils/db.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from sqlalchemy.engine import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlmodel import SQLModel
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Load environment variables from a .env file if present
|
||||
load_dotenv()
|
||||
|
||||
# Get the database URL from environment variables
|
||||
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./sportsdb.db")
|
||||
|
||||
# Create an engine for the database
|
||||
engine = create_engine(DATABASE_URL, echo=True)
|
||||
|
||||
# Create a configured "Session" class
|
||||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||
|
||||
def init_db():
|
||||
"""Initialize the database by creating all tables."""
|
||||
SQLModel.metadata.create_all(bind=engine)
|
||||
|
||||
class SessionContext:
|
||||
"""Context manager for database sessions."""
|
||||
def __enter__(self):
|
||||
self.db = SessionLocal()
|
||||
return self.db
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.db.close()
|
||||
|
||||
def get_session():
|
||||
"""Return an instance of SessionContext for context management."""
|
||||
return SessionContext()
|
||||
@@ -1,8 +1,9 @@
|
||||
import toml
|
||||
from typing import List, Dict
|
||||
from typing import List, Dict, Literal
|
||||
DEFAULT_NORMALIZATION_PATH="normalization.toml"
|
||||
import re
|
||||
from dateutil import parser
|
||||
from dateutil.parser import ParserError
|
||||
import datetime
|
||||
|
||||
def load_config(normalization_config_path=DEFAULT_NORMALIZATION_PATH):
|
||||
@@ -17,29 +18,33 @@ def normalize_header_key(original_key: str, normalization_config) -> str:
|
||||
normalized_key = key
|
||||
return normalized_key
|
||||
return original_key
|
||||
|
||||
return key_mapping.get(key.lower().strip(), key.lower().strip())
|
||||
|
||||
def normalize_value(value, key, normalization_config):
|
||||
|
||||
def normalize_value(value:str, key: str, normalization_config:dict):
|
||||
if value.lower() == "xx":
|
||||
return None
|
||||
value = value.strip()
|
||||
for normalization_pair in normalization_config.get(key if not key == "home" or key == "away" else "team",{}).get('values',[]):
|
||||
if key in ["home", "visitor", "away", "field"]:
|
||||
value = value.title()
|
||||
for normalization_pair in normalization_config.get(key if not (key == "home" or key == "visitor" or key=="away") else "team",{}).get('values',[]):
|
||||
if value in normalization_pair["original"]:
|
||||
value = normalization_pair["normalized"]
|
||||
match key.lower():
|
||||
case "date":
|
||||
if value:
|
||||
value = parser.parse(value).date()
|
||||
else:
|
||||
pass
|
||||
case "home":
|
||||
value = value.title()
|
||||
case "visitor":
|
||||
value = value.title()
|
||||
case "time":
|
||||
if value:
|
||||
value = parser.parse(value).time()
|
||||
else:
|
||||
pass
|
||||
value = value.replace("550", "5:50")
|
||||
pattern = r"(\b\d{1,2}:\d{2}\s*(?:AM|PM)\b).*"
|
||||
# Use re.sub to replace the entire string matched by the pattern with just the time part
|
||||
value = re.sub(pattern, r"\1", value)
|
||||
value = value.replace("Time Change","")
|
||||
try:
|
||||
value = parser.parse(value).time()
|
||||
except ParserError as e:
|
||||
pass
|
||||
case _:
|
||||
# Handle other cases
|
||||
pass
|
||||
|
||||
@@ -1,105 +0,0 @@
|
||||
from typing import List, Dict
|
||||
from pathlib import Path
|
||||
import csv
|
||||
|
||||
REQUIRED_KEYS=["date", "time", "field", "visitor", "home"]
|
||||
|
||||
def validate_keys(header: List[str]) -> bool:
|
||||
required_keys = REQUIRED_KEYS
|
||||
return all(key in header for key in required_keys)
|
||||
|
||||
def write_sportspress_csv(data: List[Dict], file: Path, only_with_outcome:bool = False):
|
||||
"""
|
||||
Writes sports event data to a CSV file in a specific format.
|
||||
|
||||
Parameters:
|
||||
- data (List[Dict]): List of dictionaries where each dictionary represents a sports event.
|
||||
- file_path (Path): The Path object representing the file path where the CSV file will be created.
|
||||
- only_with_outcome (bool, optional): If True, only events with outcomes will be included in the CSV. Default is False.
|
||||
|
||||
Returns:
|
||||
None
|
||||
|
||||
Example:
|
||||
>>> data = [...] # List of dictionaries representing sports events
|
||||
>>> file_path = Path("output.csv")
|
||||
>>> write_sportspress_csv(data, file_path)
|
||||
"""
|
||||
|
||||
header = data[0].keys()
|
||||
|
||||
if not validate_keys(header):
|
||||
raise KeyError(f"Missing Keys. Requires: {REQUIRED_KEYS}, provided: {list(header)}")
|
||||
|
||||
writer = csv.writer(file)
|
||||
|
||||
fieldnames = [
|
||||
"Format", #Competitive or Friendly
|
||||
# "Competition",
|
||||
"Season",
|
||||
# "Date Format",
|
||||
"Date",
|
||||
"Time",
|
||||
"Venue",
|
||||
"Team",
|
||||
"Results",
|
||||
"Outcome",
|
||||
# "Players",
|
||||
# "Performance",
|
||||
]
|
||||
|
||||
# Write the header
|
||||
writer.writerow(fieldnames)
|
||||
|
||||
# Write the data
|
||||
for row in data:
|
||||
if only_with_outcome and not row.get('has_result'):
|
||||
continue
|
||||
writer.writerow(
|
||||
[
|
||||
row["datetime"].strftime("%Y/%m/%d"),
|
||||
row["datetime"].strftime("%H:%M"),
|
||||
row.get("field", ""),
|
||||
row["home"],
|
||||
"|".join([str(row.get(k,"")) for k in [
|
||||
"home_runs_for_inning_1",
|
||||
"home_runs_for_inning_2",
|
||||
"home_runs_for_inning_3",
|
||||
"home_runs_for_inning_4",
|
||||
"home_runs_for_inning_5",
|
||||
"home_runs_for_inning_6",
|
||||
"home_runs_for_inning_7",
|
||||
"home_runs_for_inning_8",
|
||||
"home_runs_for_inning_9",
|
||||
"home_runs_for_inning_10",
|
||||
"home_runs_for",
|
||||
"home_errors",
|
||||
"home_hits"
|
||||
]]),
|
||||
row.get("home_outcome")
|
||||
]
|
||||
)
|
||||
writer.writerow(
|
||||
[
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
row["visitor"],
|
||||
"|".join([str(row.get(k,"")) for k in [
|
||||
# "visitor_runs_for_inning_1",
|
||||
# "visitor_runs_for_inning_2",
|
||||
# "visitor_runs_for_inning_3",
|
||||
# "visitor_runs_for_inning_4",
|
||||
# "visitor_runs_for_inning_5",
|
||||
# "visitor_runs_for_inning_6",
|
||||
# "visitor_runs_for_inning_7",
|
||||
# "visitor_runs_for_inning_8",
|
||||
# "visitor_runs_for_inning_9",
|
||||
# "visitor_runs_for_inning_10",
|
||||
"visitor_runs_for",
|
||||
"visitor_errors",
|
||||
"visitor_hits"
|
||||
]]),
|
||||
row.get("visitor_outcome")
|
||||
]
|
||||
)
|
||||
Reference in New Issue
Block a user