Compare commits

1 Commits

Author SHA1 Message Date
7ea5fd15df Refactor project structure and update configurations
- Renamed and deleted several Python modules
- Added new SQL and database scripts
- Updated `.vscode` and `requirements.txt` configurations
2025-08-27 08:33:51 -05:00
31 changed files with 502 additions and 798 deletions

31
.vscode/tasks.json vendored Normal file
View File

@@ -0,0 +1,31 @@
{
"version": "2.0.0",
"tasks": [
{
"label": "Truncate Database Tables",
"type": "shell",
"command": "${workspaceFolder}/.venv/bin/python", // Use this for macOS/Linux
"args": [
"${workspaceFolder}/src/scripts/truncate_tables.py"
],
"group": {
"kind": "build",
"isDefault": true
},
"problemMatcher": []
},
{
"label": "Clear Database",
"type": "shell",
"command": "${workspaceFolder}/.venv/bin/python", // Use this for macOS/Linux
"args": [
"${workspaceFolder}/src/scripts/clear_database.py"
],
"group": {
"kind": "build",
"isDefault": true
},
"problemMatcher": []
}
]
}

0
alembic/__init__.py Normal file
View File

0
alembic/alembic.ini Normal file
View File

0
alembic/env.py Normal file
View File

View File

@@ -35,10 +35,16 @@ potential_keys = ["Field", "Location", "Venue"]
original = ["Winnemac"] original = ["Winnemac"]
normalized = "Winnemac Park" normalized = "Winnemac Park"
[[field.values]] [[field.values]]
original = ["Taft HS"] original = ["Maywood", "MAYWOOD"]
normalized = "Maywood Park"
[[field.values]]
original = ["Taft HS", "Taft Hs"]
normalized = "Taft High School" normalized = "Taft High School"
[[field.values]] [[field.values]]
original = ["Southwest"] original = ["Ridgewood Hs"]
normalized = "Ridgewood High School"
[[field.values]]
original = ["Southwest", "SW Park", "SOUTHWEST"]
normalized = "Southwest Park" normalized = "Southwest Park"
[[field.values]] [[field.values]]
original = ["Comed", "COMED", "ComEd"] original = ["Comed", "COMED", "ComEd"]
@@ -61,13 +67,19 @@ potential_keys = ["Final Score", "Score", "Result", "Outcome"]
original = ["Hounds", "Chicago Hounds", "Winnemac Hounds", "Hound"] original = ["Hounds", "Chicago Hounds", "Winnemac Hounds", "Hound"]
normalized = "Hounds" normalized = "Hounds"
[[team.values]] [[team.values]]
original = ["Chicago Red Sox"] original = ["Ramirez Bb", "Ramirez"]
normalized = "Ramirez Baseball"
[[team.values]]
original = ["Degeneratex"]
normalized = "DegenerateX"
[[team.values]]
original = ["Chicago Red Sox", "Redsox"]
normalized = "Red Sox" normalized = "Red Sox"
[[team.values]] [[team.values]]
original = ["NorthSide White Sox"] original = ["NorthSide White Sox"]
normalized = "North Side White Sox" normalized = "North Side White Sox"
[[team.values]] [[team.values]]
original = ["Chicago Rebels", "CHICAGO REBELS"] original = ["Chicago Rebels"]
normalized = "Rebels" normalized = "Rebels"
[[team.values]] [[team.values]]
original = ["Lombard Expors", "LOMBARD EXPORS"] original = ["Lombard Expors", "LOMBARD EXPORS"]

0
pyproject.toml Normal file
View File

View File

@@ -1,5 +1,20 @@
typer[all]==0.9.0 # CLI and display
python-dateutil==2.8.2 typer[all]
toml==0.10.2 rich
# Database
sqlmodel
alembic
python-dotenv
# Data processing
python-dateutil
pandas # Optional but useful for data analysis
openpyxl # For Excel file support
# File parsing
toml
xlsx2csv
# Image generation (for calendar features)
pillow pillow
xlsx2csv

View File

@@ -0,0 +1,22 @@
SELECT
game.id,
game.date,
team_home.name AS home_team,
team_visitor.name AS visitor_team,
venue.name AS venue_name,
gameresult.home_runs_for,
gameresult.visitor_runs_for,
gameresult.home_outcome,
gameresult.visitor_outcome
FROM
game
JOIN
team AS team_home ON game.home_team_id = team_home.id
JOIN
team AS team_visitor ON game.visitor_team_id = team_visitor.id
JOIN
venue ON game.venue_id = venue.id
LEFT JOIN
gameresult ON game.id = gameresult.game_id
ORDER BY
game.date;

View File

@@ -0,0 +1,26 @@
SELECT
strftime('%Y', game.date) AS year, -- Extracts year from date
team.id AS team_id,
team.name AS team_name,
SUM(CASE
WHEN gameresult.home_outcome = 'WIN' AND game.home_team_id = team.id
OR gameresult.visitor_outcome = 'WIN' AND game.visitor_team_id = team.id
THEN 1 ELSE 0 END) AS wins,
SUM(CASE
WHEN gameresult.home_outcome = 'LOSS' AND game.home_team_id = team.id
OR gameresult.visitor_outcome = 'LOSS' AND game.visitor_team_id = team.id
THEN 1 ELSE 0 END) AS losses,
SUM(CASE
WHEN gameresult.home_outcome = 'TIE' AND game.home_team_id = team.id
OR gameresult.visitor_outcome = 'TIE' AND game.visitor_team_id = team.id
THEN 1 ELSE 0 END) AS ties
FROM
team
LEFT JOIN
game ON (game.home_team_id = team.id OR game.visitor_team_id = team.id)
LEFT JOIN
gameresult ON game.id = gameresult.game_id
GROUP BY
year, team.id
ORDER BY
year, wins DESC, losses ASC, ties DESC;

View File

@@ -1,15 +0,0 @@
from .apps.convert import app as convert_app
from .apps.clean import app as clean_app
from .apps.read import app as read_app
from .apps.generate import app as generate_app
import typer
app = typer.Typer()
app.add_typer(convert_app, name="convert")
app.add_typer(clean_app, name="clean")
app.add_typer(read_app, name="read")
app.add_typer(generate_app, name="generate")
if __name__ == "__main__":
app()

View File

@@ -1 +0,0 @@
from .clean import app

View File

@@ -1,109 +0,0 @@
import typer
from rich.table import Table, Column
from rich.console import Console
from rich.columns import Columns
from rich.panel import Panel
from pathlib import Path
import csv
from ...utils.common import list_key_values, read_and_normalize_csv_or_xlsx
from ...utils.normalize import normalize_header_key, replace_key_values, DEFAULT_NORMALIZATION_PATH
from typing import Annotated, List
import re
app = typer.Typer()
@app.command("replace")
def replace_values_for_key(
input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")],
output_file: Annotated[List[typer.FileText], typer.Option(..., "--output-file", "-o", help="Specify output file.")],
key: str = typer.Argument(..., help=""),
match: str = typer.Argument(..., help=""),
replace: str = typer.Argument(..., help=""),
in_place: bool = typer.Option(False, "--in-place", "-p", help="Modify file in place."),
match_is_regex: bool = typer.Option(False, "--regex", "-p", help="Match is a regex pattern.")
):
# normalized_key = normalize_header_key(key)
normalized_key = key
console = Console()
# Read CSV data
for f in input_file:
data = read_and_normalize_csv_or_xlsx(f)
before_table = Table(Column(), show_header=False, title="Before")
for value in sorted(list_key_values(data, key)):
before_table.add_row(value)
after_table = Table( Column(), show_header=False, title="After")
if normalized_key != "team" or "team" in data[0].keys():
data = replace_key_values(data, normalized_key, match, replace, match_is_regex)
else:
data=replace_key_values(data, "home", match, replace, match_is_regex)
data=replace_key_values(data, "visitor", match, replace, match_is_regex)
for value in sorted(list_key_values(data, key)):
after_table.add_row(value)
panel = Panel(
Columns([before_table, after_table]),
title="Replace"
)
console.print(panel)
if in_place and typer.confirm("Perform Replacement in-place?"):
fieldnames = data[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
elif output_file:
if output_file.is_dir():
output_file = output_file.joinpath(f.name)
if typer.confirm(f"Write to {output_file}?"):
with output_file.open('w') as f:
fieldnames = data[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
@app.command("add-key")
def add_values_for_key(
file_path: Path = typer.Argument(..., help="Path to the CSV or XLSX file"),
key: str = typer.Argument(..., help=""),
value: str = typer.Argument("", help=""),
in_place: bool = typer.Option(False, "--in-place", "-p", help="Modify file in place."),
output_file: Path = typer.Option(None, "--output-file", "-o", help="Specify output file."),
):
if in_place and output_file:
typer.echo("Error: Only one of --in-place or --output-file should be provided, not both.")
raise typer.Abort()
console = Console()
# Read CSV data
data = read_and_normalize_csv_or_xlsx(file_path)
# data = add_key_values(data, key, value)
if in_place and typer.confirm("Perform Replacement in-place?"):
with file_path.open('w') as f:
fieldnames = data[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
elif output_file:
if output_file.is_dir():
output_file = output_file.joinpath(file_path.name)
if typer.confirm(f"Write to {output_file}?"):
with output_file.open('w') as f:
fieldnames = data[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)

View File

@@ -1 +0,0 @@
from .convert import app

View File

@@ -1,44 +0,0 @@
import typer
from typing import Annotated
from pathlib import Path
from ...utils.sportspress import validate_keys
from ...utils.normalize import normalize_header_key, load_config
from ...utils.common import read_and_normalize_csv_or_xlsx, is_visitor_home_order_reversed, import_gamebygame
from ...utils.sportspress import write_sportspress_csv
import csv
app = typer.Typer()
@app.command(name="sportspress")
def sportspress_csv(
input_file: Annotated[Path, typer.Argument(..., help="Path to the or XLSX file")],
file_output_path: Annotated[typer.FileTextWrite, typer.Argument(..., help="Path to the output CSV file")],
only_with_outcome: bool = typer.Option(default=False, is_flag=True, help="")
):
# Read CSV data
data = import_gamebygame(input_file)
try:
write_sportspress_csv(data, file_output_path, only_with_outcome)
except KeyError as e:
typer.echo(f"Error: {e}")
typer.echo(f"Output to {file_output_path.name}")
@app.command(name="teamsnap")
def sportspress_csv(
input_file: Annotated[Path, typer.Argument(..., help="Path to the CSV or XLSX file")],
file_output_path: Annotated[typer.FileTextWrite, typer.Argument(..., help="Path to the output CSV file")],
only_with_outcome: bool = typer.Option(default=False, is_flag=True, help="")
):
# Read CSV data
data = import_gamebygame(input_file)
try:
write_sportspress_csv(data, file_output_path, only_with_outcome)
except KeyError as e:
typer.echo(f"Error: {e}")
typer.echo(f"Output to {file_output_path.name}")

View File

@@ -1 +0,0 @@
from .calendar import app

View File

@@ -1,55 +0,0 @@
import typer
from rich.console import Console
from typing import Annotated, List, Optional
from pathlib import Path
from ...utils.sportspress import validate_keys
from ...utils.normalize import normalize_header_key, load_config
from ...utils.common import read_and_normalize_csv_or_xlsx, is_visitor_home_order_reversed, import_gamebygame, parse_datetime, personalize_data_for_team
from ...utils.sportspress import write_sportspress_csv
from .calendar_utils import generate_calendar
from collections import defaultdict
import toml
app = typer.Typer()
@app.command(name="calendar")
def generate_calendar_app(
input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV file")],
config_file: Annotated[Optional[typer.FileText], typer.Option(..., "--config", "-c", help="Path to a config file")]=None
):
# Read CSV data
data = read_and_normalize_csv_or_xlsx(input_file)
data = personalize_data_for_team(data, "Hounds")
# data = parse_datetime(data)
generate_calendar(data, config_file)
pass
@app.command(name="calendar-config")
def generate_calendar_configs(
input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV file")],
output_file: Annotated[Path, typer.Argument(..., help="Path(s) to the output config file")]
):
data = read_and_normalize_csv_or_xlsx(input_file)
teams = {row.get('visitor') for row in data}
teams.update({row.get('home') for row in data})
fields = {row.get('field') for row in data}
config = defaultdict(dict)
config['fields']['default'] = {
'bg_color': (0, 0, 0, 256)
}
config['teams']['default'] = {
'logo': ''
}
for field in fields:
config['fields'][field] = config['fields']['default']
for team in teams:
config['teams'][team] = config['teams']['default']
if output_file.is_dir:
output_file = output_file.joinpath('calendar_config.toml')
with output_file.open('w') as f:
toml.dump(config, f)
pass

View File

@@ -1,204 +0,0 @@
from calendar import Calendar
from PIL import Image, ImageDraw, ImageFont
from typing import Tuple
from pathlib import Path
import toml
calendar_cell_size = (400, 500)
calendar_cell_width, calendar_cell_height = calendar_cell_size
def textsize(text, font):
im = Image.new(mode="P", size=(0, 0))
draw = ImageDraw.Draw(im)
_, _, width, height = draw.textbbox((0, 0), text=text, font=font)
return width, height
def corner_image():
return Image.new()
def text_rectangle(text:str, font: str, font_size: int, foreground_color: Tuple[int, int, int, int]=(0,0,0,255), background_color: Tuple[int, int, int, int]=(0,0,0,0), height: int=400, width: int=500) -> Image:
font_obj = ImageFont.truetype(font,font_size)
img = Image.new('RGBA', (int(width),int(height)), background_color)
draw = ImageDraw.Draw(img)
text = str(text)
text_width, text_height = textsize(text, font=font_obj)
x = (width - text_width) / 2
y = (height - text_height) / 2
text_position = (x,y)
draw.text(text_position, text, font=font_obj, fill=foreground_color)
return img
def calendar_cell(
height: int=calendar_cell_height, width: int=calendar_cell_width,
background_color: Tuple[int, int, int, int]=(0,0,0,0),
left_top_corner = None,
right_top_corner = None,
top_center = None,
right_bottom_corner = None,
bottom_center = None,
left_bottom_corner = None,
center = None
):
# Create a blank rectangle image
cell_img = Image.new('RGBA', (width, height), background_color)
# Left top corner
if left_top_corner:
paste_position = (0, 0)
cell_img.paste(left_top_corner, paste_position, left_top_corner)
# Right top corner
if right_top_corner:
paste_position = (width - right_top_corner.width, 0)
cell_img.paste(right_top_corner, paste_position, right_top_corner)
if top_center:
raise NotImplementedError
if right_bottom_corner:
paste_position = (width - right_bottom_corner.width, height - right_bottom_corner.height)
cell_img.paste(right_bottom_corner, paste_position, right_bottom_corner)
if bottom_center:
paste_position = ((width - bottom_center.width)//2, (height - bottom_center.height))
cell_img.paste(bottom_center, paste_position, bottom_center)
if left_bottom_corner:
raise NotImplementedError
if center:
paste_position = ((width - center.width)//2, (height - center.height)//2)
cell_img.paste(center, paste_position, center)
return cell_img
def generate_calendar(data, config_file=None):
result_calendar = Calendar()
result_calendar.setfirstweekday(6)
baseball_bat = Image.open(f"data/logos/baseball_bat_2.png")
if config_file:
config = toml.load(config_file)
baseball_bat = baseball_bat.resize((90, 90))
for year, month in {(row['datetime'].year, row['datetime'].month) for row in data}:
month_days=list(result_calendar.monthdayscalendar(year, month))
month_image = Image.new('RGBA', (calendar_cell_width*7, calendar_cell_height*len(month_days)), (0, 0, 0, 0))
first_thursday=(month, [w[4] for w in month_days if w[4] != 0][0])
colors = {
'default': (128, 128, 128, 256),
}
team_logos={}
if config:
for field, field_options in config['fields'].items():
colors[field] = tuple(field_options.get('bg_color', colors.get('default')))
for team, team_options in config['teams'].items():
team_logos[team] = team_options.get('logo')
for week_num, week in enumerate(month_days):
for day_num, date in enumerate(week):
date_text_image = text_rectangle(date,
"data/fonts/refrigerator-deluxe-bold.otf",
100,
foreground_color='white',
height=calendar_cell_height*.25,
width=calendar_cell_width*.25)
if filtered_data := [row for row in data if row['datetime'].month == month and row['datetime'].day == date]:
# Gen square that has one game
if len (filtered_data) == 1:
game = filtered_data[0]
opponent_logo_path = team_logos.get(game['opponent'])
if opponent_logo_path and (opponent_logo_path := Path(opponent_logo_path)) and opponent_logo_path.exists():
opponent_logo = Image.open(opponent_logo_path)
else:
opponent_logo = text_rectangle(text=game['opponent'][0].upper(),width=500, height=500, font_size=400, font="data/fonts/college.ttf")
is_home_game = game['homevisitor'] == "home"
if game.get('wood','').lower() == 'yes':
right_bottom_corner = baseball_bat
else:
right_bottom_corner = None
img = calendar_cell(
height=calendar_cell_height,
width=calendar_cell_width,
background_color=colors.get(game['field'], colors['default']),
left_top_corner = text_rectangle('H' if is_home_game else "A",
"data/fonts/refrigerator-deluxe-bold.otf",
80,
foreground_color='black' if is_home_game else 'white',
background_color='white' if is_home_game else 'black',
height=calendar_cell_height*.2,
width=calendar_cell_width*.2),
right_top_corner = date_text_image,
center = opponent_logo.resize((int(opponent_logo.width*.5), int(opponent_logo.height*.5))),
bottom_center = text_rectangle(f"{game['time']:%-I:%M}" if game.get('time') else "",
"data/fonts/refrigerator-deluxe-bold.otf",
120,
foreground_color='white',
height=calendar_cell_height*.25,
width=calendar_cell_width),
right_bottom_corner=right_bottom_corner
)
# img.show()
elif len(filtered_data) == 2:
game1, game2 = filtered_data[:2]
opponent_logo_path = team_logos.get(game1['opponent'])
if opponent_logo_path and (opponent_logo_path := Path(opponent_logo_path)) and opponent_logo_path.exists():
opponent_logo = Image.open(opponent_logo_path)
else:
opponent_logo = text_rectangle(text=game1['opponent'][0].upper(),width=500, height=500, font_size=400, font="data/fonts/college.ttf")
img = calendar_cell(
height=calendar_cell_height,
width=calendar_cell_width,
background_color=colors.get(game1['field'], colors['default']),
left_top_corner = text_rectangle('DH',
"data/fonts/refrigerator-deluxe-bold.otf",
80,
foreground_color='black',
background_color='white',
height=calendar_cell_height*.2,
width=calendar_cell_width*.2),
right_top_corner = date_text_image,
center = opponent_logo.resize((int(opponent_logo.width*.5), int(opponent_logo.height*.5))),
bottom_center = text_rectangle(f"{game1['time']:%-I:%M} & {game2['time']:%-I:%M}",
"data/fonts/refrigerator-deluxe-bold.otf",
80,
foreground_color='white',
height=calendar_cell_height*.2,
width=calendar_cell_width),
)
pass
else:
img=calendar_cell(
height=calendar_cell_height,
width=calendar_cell_width,
background_color=(204,204,204,int(256*.85)),
right_top_corner = text_rectangle(date,
"data/fonts/refrigerator-deluxe-bold.otf",
100,
foreground_color='black',
height=calendar_cell_height*.25,
width=calendar_cell_width*.25)
)
pass
if date: month_image.paste(img, (img.size[0]*day_num, img.size[1]*week_num), img)
month_image.save(f'data/output/{year}-{month}.png')
# if (month, date) in games_lookup.keys() and not (month, date) == first_thursday:
# background_image = game_square([g for g in games if g['dtstart'].month==month and g['dtstart'].day==date])
# elif (month, date) in games_lookup.keys() and (month, date) == first_thursday:
# background_image = game_square(
# [g for g in games if g['dtstart'].month == month and g['dtstart'].day == date], special='open-mic')
# elif (month, date) == first_thursday:
# background_image = openmic_square(date)
# else:
# background_image = blank_square(date)
# if date: month_image.paste(background_image, (background_image.size[0]*day_num, background_image.size[1]*week_num), background_image)
# month_image.thumbnail((1000,1000))
# month_image.save(f'output/{year}-{month}.png')
# month_image.show()

View File

@@ -1 +0,0 @@
from .read import app

View File

@@ -1,158 +0,0 @@
import typer
from rich.table import Table, Column
from rich.console import Console
from rich.columns import Columns
from pathlib import Path
import csv
from ...utils.common import list_key_values, read_and_normalize_csv_or_xlsx, import_gamebygame, aggregate_teams, aggregate_teams_by_season
from ...utils.normalize import normalize_header_key
from typing import Annotated, List
app = typer.Typer()
@app.command("list-values")
def print_values_for_key(
input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")],
key: str = typer.Argument(..., help="The key to retrieve to generate list.")
):
# Read CSV data
data = []
for f in input_file:
data.extend(read_and_normalize_csv_or_xlsx(f))
values = list_key_values(data, key)
console = Console()
table = Table(show_header=False, title=f'Values for "{key.title()}" ({len(values)})')
table.add_column("Values")
# breakpoint()
for value in sorted(values):
table.add_row(value)
console.print(table)
@app.command("print")
def print_table(
input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")]
):
# Read CSV data
data = []
for f in input_file:
data.extend(read_and_normalize_csv_or_xlsx(f))
console = Console()
table = Table()
keys = data[0].keys()
for key in keys:
table.add_column(key)
# breakpoint()
for row in data:
table.add_row(*[str(row[key]) for key in keys])
console.print(table)
@app.command()
def check(
input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")]
):
# Read CSV data
data = []
for f in input_file:
data.extend(read_and_normalize_csv_or_xlsx(f))
teams = set([row['visitor'] for row in data] + [row['home'] for row in data])
fields = set([row['field'] for row in data])
console = Console()
table = Table("Team", "Number of Games")
for team in teams:
rows = [row for row in data if row['visitor']==team or row['home']==team]
table.add_row(team, str(len(rows)))
console.print(table)
table = Table("Field", "Number of Games")
for field in fields:
rows = [row for row in data if row['field']==field]
table.add_row(field, str(len(rows)))
console.print(table)
table = Table("Field", "Datetime", "Games")
field_times = [(row['field'], row['datetime']) for row in data]
for field, datetime in field_times:
rows = [row for row in data if row['field'] == field and row['datetime'] == datetime]
if len(rows) != 1:
table.add_row(str(field), str(datetime), str(len(rows)))
console.print(table)
matchups = set([tuple([*sorted((row['home'], row['visitor']))]) for row in data])
table =Table("Team 1", "Team 2", "Games")
for team1, team2 in matchups:
rows = [row for row in data if (row['visitor']==team1 or row['home']==team1) and (row['visitor']==team2 or row['home']==team2)]
table.add_row(str(team1), str(team2), str(len(rows)))
console.print(table)
pass
@app.command()
def standings(
input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")],
):
# Read CSV data
data=[]
for f in input_file:
data.extend(import_gamebygame(f))
aggregate_team_data = aggregate_teams(data)
# Display aggregated data as a table
console = Console()
table = Table(title="Aggregated Team Data")
table.add_column("Team", style="bold")
table.add_column("GP", style="bold")
table.add_column("Wins", style="bold")
table.add_column("Losses", style="bold")
table.add_column("Ties", style="bold")
table.add_column("Runs For", style="bold")
table.add_column("Runs Against", style="bold")
for team_stats in aggregate_team_data:
table.add_row(
team_stats["team"],
str(team_stats["gp"]),
str(team_stats["win"]),
str(team_stats["loss"]),
str(team_stats["tie"]),
str(team_stats["runs_for"]),
str(team_stats["runs_against"]),
)
console.print(table)
@app.command()
def seasons(
input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")],
):
# Read CSV data
data=[]
for f in input_file:
data.extend(read_and_normalize_csv_or_xlsx(f))
aggregate_team_data = aggregate_teams_by_season(data)
# Display aggregated data as a table
console = Console()
table = Table(title="Aggregated Team Data")
table.add_column("Team", style="bold")
for team_stats in aggregate_team_data:
table.add_row(
team_stats["team"],
str(", ".join(sorted(team_stats["seasons"])))
)
console.print(table)

207
src/cli.py Normal file
View File

@@ -0,0 +1,207 @@
import typer
from pathlib import Path
from typing import List, Optional
from .utils.common import import_gamebygame
from .utils.db import get_session, init_db
from .models import Game, Team, Venue, GameResult
# from alembic import command
# from alembic.config import Config
from rich.console import Console
from rich.table import Table
app = typer.Typer()
console = Console()
@app.command()
def init():
"""Initialize the database"""
init_db()
console.print("[green]Database initialized successfully[/green]")
@app.command()
def ingest(
files: List[Path] = typer.Argument(..., help="CSV/XLSX files to ingest"),
season: Optional[int] = typer.Option(None, help="Season year"),
source: str = typer.Option("unknown", help="Data source (sportsengine, teamsnap, etc)"),
dry_run: bool = typer.Option(False, "--dry-run", help="Preview without saving")
):
"""Ingest CSV/XLSX files into the database with automatic normalization"""
with get_session() as session:
for file_path in files:
console.print(f"\n[bold]Processing {file_path}[/bold]")
# Use your existing parsing logic
data = import_gamebygame(file_path)
games_added = 0
games_skipped = 0
for row in data:
if any(row.get(key) is None for key in ['home', 'visitor','field']):
continue #skip
# Get or create teams
home_team = get_or_create_team(session, row['home'])
visitor_team = get_or_create_team(session, row['visitor'])
# Get or create venue
venue = None
if row.get('field'):
venue = get_or_create_venue(session, row['field'])
# Create dedupe key
dedupe_key = f"{row['datetime']}_{row.get('field', 'unknown')}_{home_team.id}_{visitor_team.id}"
# Check if game exists
existing = session.query(Game).filter_by(dedupe_key=dedupe_key).first()
if existing:
games_skipped += 1
continue
# Create game
game = Game(
date=row['datetime'],
home_team_id=home_team.id,
visitor_team_id=visitor_team.id,
venue_id=venue.id if venue else None,
field=row.get('field'),
season=season or row.get('season'),
source=source,
dedupe_key=dedupe_key
)
# Add result if available
if row.get('has_result'):
result = GameResult(
game_id=game.id,
home_runs_for=row.get('home_runs_for'),
visitor_runs_for=row.get('visitor_runs_for'),
home_outcome=row.get('home_outcome'),
visitor_outcome=row.get('visitor_outcome')
)
game.result = result
if not dry_run:
session.add(game)
games_added += 1
if not dry_run:
session.commit()
# Summary
console.print(f"[green]Added: {games_added} games[/green]")
console.print(f"[yellow]Skipped (duplicates): {games_skipped} games[/yellow]")
@app.command()
def query(
team: Optional[str] = typer.Option(None, help="Filter by team name"),
season: Optional[int] = typer.Option(None, help="Filter by season"),
venue: Optional[str] = typer.Option(None, help="Filter by venue"),
format: str = typer.Option("table", help="Output format: table, csv, json")
):
"""Query games from the database"""
with get_session() as session:
query = session.query(Game)
if team:
query = query.join(Team, or_(
Game.home_team_id == Team.id,
Game.visitor_team_id == Team.id
)).filter(Team.name == team)
if season:
query = query.filter(Game.season == season)
if venue:
query = query.join(Venue).filter(Venue.name == venue)
games = query.all()
if format == "table":
display_games_table(games)
elif format == "csv":
export_games_csv(games)
elif format == "json":
export_games_json(games)
@app.command()
def standings(
season: Optional[int] = typer.Option(None, help="Filter by season")
):
"""Show standings from the database"""
# Reuse your aggregate_teams logic but with database queries
with get_session() as session:
# Query and aggregate
standings_data = calculate_standings_from_db(session, season)
display_standings_table(standings_data)
# Database management commands
from sqlalchemy import or_
@app.command()
def upgrade():
"""Apply database migrations"""
# Example usage in a command function
# Ensure you import or_ from sqlalchemy
# from alembic import command
# from alembic.config import Config
# Uncomment and implement Alembic usage as desired
with get_session() as session:
# Your session-based operations here
pass
# alembic_cfg = Config("alembic.ini")
# command.upgrade(alembic_cfg, "head")
console.print("[green]Database upgraded[/green]")
@app.command()
def downgrade(revision: str = typer.Argument("", help="Revision to downgrade to")):
"""Downgrade database migrations"""
with get_session() as session:
# Your session-based operations here
pass
# alembic_cfg = Config("alembic.ini")
# command.downgrade(alembic_cfg, revision or "-1")
console.print("[yellow]Database downgraded[/yellow]")
@app.command()
def clean_duplicates():
"""Remove duplicate games from database"""
# Implement deduplication logic
pass
@app.command()
def export(
output: Path,
format: str = typer.Option("sportspress", help="Export format: sportspress, csv, json"),
season: Optional[int] = typer.Option(None, help="Filter by season")
):
"""Export data from database"""
with get_session() as session:
games = session.query(Game)
if season:
games = games.filter(Game.season == season)
if format == "sportspress":
# Use your existing write_sportspress_csv
from .utils.sportspress import write_sportspress_csv
data = games_to_dict_format(games.all())
write_sportspress_csv(data, output)
# Helper functions
def get_or_create_team(session, team_name: str) -> Team:
team = session.query(Team).filter_by(name=team_name).first()
if not team:
team = Team(name=team_name)
session.add(team)
session.flush()
return team
def get_or_create_venue(session, venue_name: str) -> Venue:
venue = session.query(Venue).filter_by(name=venue_name).first()
if not venue:
venue = Venue(name=venue_name)
session.add(venue)
session.flush()
return venue
if __name__ == "__main__":
app()

View File

@@ -1,81 +0,0 @@
import csv
import re
from typing import List, Dict
from dateutil import parser
from pathlib import Path
from rich.console import Console
from rich.panel import Panel
from rich.table import Table, Column
from rich.columns import Columns
import typer
# from .utils.common import normalize_header_key, read_csv, is_visitor_home_order_reversed, process_data, aggregate_teams, write_sportspress_csv
# validate_csv_header
app = typer.Typer()
@app.command()
def standings(file_path: Path = typer.Argument(..., help="Path to the CSV file")):
# Validate CSV header
header = next(csv.reader(open(file_path, "r")))
normalized_header = [normalize_header_key(key) for key in header]
if not validate_csv_header(header):
typer.echo("Error: Invalid CSV header. Make sure the CSV file contains the correct headers.")
return
# Read CSV data
data = read_csv(file_path)
visitor_home_order_reversed = is_visitor_home_order_reversed(normalized_header)
processed_data = process_data(data, visitor_home_order_reversed)
aggregate_team_data = aggregate_teams(processed_data)
# Display aggregated data as a table
console = Console()
table = Table(title="Aggregated Team Data")
table.add_column("Team", style="bold")
table.add_column("Wins", style="bold")
table.add_column("Losses", style="bold")
table.add_column("Ties", style="bold")
table.add_column("Runs For", style="bold")
table.add_column("Runs Against", style="bold")
for team_stats in aggregate_team_data:
table.add_row(
team_stats["team"],
str(team_stats["win"]),
str(team_stats["loss"]),
str(team_stats["tie"]),
str(team_stats["runs_for"]),
str(team_stats["runs_against"]),
)
console.print(table)
# Write processed CSV data back to a new file
# output_file_path = file_path.with_suffix(".processed.csv")
# write_csv(output_file_path, data)
# typer.echo(f"Processed data written to: {output_file_path}")
# @app.command()
def replace_key_values(data: List[Dict], key, match:str, replace:str, is_regex:bool =False):
if not is_regex:
regex = re.compile(fr"^{match}$")
else:
regex = re.compile(fr"{match}")
for row in data:
row[key] = regex.sub(replace, row[key])
return data
def add_key_values(data: List[Dict], key, value:str):
for row in data:
row[key] = value
return data
if __name__ == "__main__":
app()

1
src/models/__init__.py Normal file
View File

@@ -0,0 +1 @@
from .models import *

94
src/models/models.py Normal file
View File

@@ -0,0 +1,94 @@
from sqlmodel import SQLModel, Field, Relationship
from uuid import UUID, uuid4
from datetime import datetime
from typing import Optional, List
from enum import Enum
# Team model
class Team(SQLModel, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True)
name: str = Field(index=True, unique=True)
# Specify foreign keys explicitly for relationships
# home_games: List["Game"] = Relationship(
# back_populates="home_team",
# sa_relationship_kwargs={"foreign_keys": "Game.home_team_id"}
# )
# visitor_games: List["Game"] = Relationship(
# back_populates="visitor_team",
# sa_relationship_kwargs={"foreign_keys": "Game.visitor_team_id"}
# )
# Venue model
class Venue(SQLModel, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True)
name: str = Field(index=True, unique=True)
# Relationships
games: List["Game"] = Relationship(back_populates="venue")
# Game model
class Game(SQLModel, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True)
date: datetime = Field(index=True)
# Foreign keys
home_team_id: UUID = Field(foreign_key="team.id")
visitor_team_id: UUID = Field(foreign_key="team.id")
venue_id: Optional[UUID] = Field(default=None, foreign_key="venue.id")
# Explicitly specify relationships with foreign keys
home_team: "Team" = Relationship(
# back_populates="home_games",
sa_relationship_kwargs={"foreign_keys": "Game.home_team_id"}
)
visitor_team: "Team" = Relationship(
# back_populates="visitor_games",
sa_relationship_kwargs={"foreign_keys": "Game.visitor_team_id"}
)
venue: Optional["Venue"] = Relationship(back_populates="games")
result: Optional["GameResult"] = Relationship(back_populates="game")
# Metadata
season: Optional[int] = Field(index=True)
source: str = Field(default="unknown")
dedupe_key: str = Field(unique=True, index=True)
class GameOutcome(str, Enum):
WIN = "win"
LOSS = "loss"
TIE = "tie"
FORFEIT = "forfeit"
TECHNICAL_WIN = "technical_win"
TECHNICAL_LOSS = "technical_loss"
FORFEIT_WIN = "forfeit_win"
FORFEIT_LOSS = "forfeit_loss"
# GameResult model
class GameResult(SQLModel, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True)
# Foreign key to Game
game_id: UUID = Field(foreign_key="game.id")
game: "Game" = Relationship(back_populates="result")
# Game results
home_runs_for: Optional["int"]
visitor_runs_for: Optional["int"]
home_outcome: Optional[GameOutcome] # win/loss/tie/forfeit
visitor_outcome: Optional[GameOutcome]
# External ID model if needed
class ExternalId(SQLModel, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True)
external_type: str # e.g., 'SportsEngine', 'TeamSnap'
external_id: str
team_id: Optional[UUID] = Field(default=None, foreign_key="team.id")
game_id: Optional[UUID] = Field(default=None, foreign_key="game.id")
venue_id: Optional[UUID] = Field(default=None, foreign_key="venue.id")
# Relationships
team: Optional["Team"] = Relationship()
game: Optional["Game"] = Relationship()
venue: Optional["Venue"] = Relationship()

View File

@@ -0,0 +1,14 @@
from sqlmodel import create_engine, SQLModel,
# Replace with your actual database URL
DATABASE_URL = "sqlite:///sportsdb.db"
engine = create_engine(DATABASE_URL)
def clear_database():
# Drop all tables
SQLModel.metadata.drop_all(engine)
print("All tables dropped successfully.")
if __name__ == "__main__":
clear_database()

View File

@@ -0,0 +1,18 @@
from sqlmodel import Session, create_engine, SQLModel, text
DATABASE_URL = "sqlite:///sportsdb.db" # Adjust your database URL accordingly
engine = create_engine(DATABASE_URL)
def truncate_tables():
with Session(engine) as session:
session.exec(text("DELETE FROM gameresult;"))
session.exec(text("DELETE FROM game;"))
session.exec(text("DELETE FROM venue;"))
session.exec(text("DELETE FROM team;"))
session.commit()
print("All tables truncated successfully.")
if __name__ == "__main__":
SQLModel.metadata.create_all(engine) # Ensure the tables exist
truncate_tables()

View File

@@ -169,7 +169,6 @@ def parse_datetime(data: List[Dict]):
def import_gamebygame(data: Union[List[Dict], TextIO, Path]) -> List[Dict]: def import_gamebygame(data: Union[List[Dict], TextIO, Path]) -> List[Dict]:
if isinstance(data, TextIOBase) or isinstance(data, Path) : if isinstance(data, TextIOBase) or isinstance(data, Path) :
data = read_and_normalize_csv_or_xlsx(data) data = read_and_normalize_csv_or_xlsx(data)
header = data[0].keys() header = data[0].keys()
visitor_home_order_reversed = is_visitor_home_order_reversed(list(header)) visitor_home_order_reversed = is_visitor_home_order_reversed(list(header))
for row in data: for row in data:
@@ -178,7 +177,8 @@ def import_gamebygame(data: Union[List[Dict], TextIO, Path]) -> List[Dict]:
try: try:
row['datetime'] = parser.parse(f"{row['date']} {row['time']}") row['datetime'] = parser.parse(f"{row['date']} {row['time']}")
except parser.ParserError as e: except parser.ParserError as e:
raise e pass
# raise e
return data return data

34
src/utils/db.py Normal file
View File

@@ -0,0 +1,34 @@
from sqlalchemy.engine import create_engine
from sqlalchemy.orm import sessionmaker
from sqlmodel import SQLModel
import os
from dotenv import load_dotenv
# Load environment variables from a .env file if present
load_dotenv()
# Get the database URL from environment variables
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./sportsdb.db")
# Create an engine for the database
engine = create_engine(DATABASE_URL, echo=True)
# Create a configured "Session" class
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def init_db():
"""Initialize the database by creating all tables."""
SQLModel.metadata.create_all(bind=engine)
class SessionContext:
"""Context manager for database sessions."""
def __enter__(self):
self.db = SessionLocal()
return self.db
def __exit__(self, exc_type, exc_value, traceback):
self.db.close()
def get_session():
"""Return an instance of SessionContext for context management."""
return SessionContext()

View File

@@ -1,8 +1,9 @@
import toml import toml
from typing import List, Dict from typing import List, Dict, Literal
DEFAULT_NORMALIZATION_PATH="normalization.toml" DEFAULT_NORMALIZATION_PATH="normalization.toml"
import re import re
from dateutil import parser from dateutil import parser
from dateutil.parser import ParserError
import datetime import datetime
def load_config(normalization_config_path=DEFAULT_NORMALIZATION_PATH): def load_config(normalization_config_path=DEFAULT_NORMALIZATION_PATH):
@@ -17,29 +18,33 @@ def normalize_header_key(original_key: str, normalization_config) -> str:
normalized_key = key normalized_key = key
return normalized_key return normalized_key
return original_key return original_key
return key_mapping.get(key.lower().strip(), key.lower().strip()) return key_mapping.get(key.lower().strip(), key.lower().strip())
def normalize_value(value, key, normalization_config):
def normalize_value(value:str, key: str, normalization_config:dict):
if value.lower() == "xx":
return None
value = value.strip() value = value.strip()
for normalization_pair in normalization_config.get(key if not key == "home" or key == "away" else "team",{}).get('values',[]): if key in ["home", "visitor", "away", "field"]:
value = value.title()
for normalization_pair in normalization_config.get(key if not (key == "home" or key == "visitor" or key=="away") else "team",{}).get('values',[]):
if value in normalization_pair["original"]: if value in normalization_pair["original"]:
value = normalization_pair["normalized"] value = normalization_pair["normalized"]
match key.lower(): match key.lower():
case "date": case "date":
if value: if value:
value = parser.parse(value).date() value = parser.parse(value).date()
else:
pass
case "home":
value = value.title()
case "visitor":
value = value.title()
case "time": case "time":
if value: if value:
value = parser.parse(value).time() value = value.replace("550", "5:50")
else: pattern = r"(\b\d{1,2}:\d{2}\s*(?:AM|PM)\b).*"
pass # Use re.sub to replace the entire string matched by the pattern with just the time part
value = re.sub(pattern, r"\1", value)
value = value.replace("Time Change","")
try:
value = parser.parse(value).time()
except ParserError as e:
pass
case _: case _:
# Handle other cases # Handle other cases
pass pass

View File

@@ -1,105 +0,0 @@
from typing import List, Dict
from pathlib import Path
import csv
REQUIRED_KEYS=["date", "time", "field", "visitor", "home"]
def validate_keys(header: List[str]) -> bool:
required_keys = REQUIRED_KEYS
return all(key in header for key in required_keys)
def write_sportspress_csv(data: List[Dict], file: Path, only_with_outcome:bool = False):
"""
Writes sports event data to a CSV file in a specific format.
Parameters:
- data (List[Dict]): List of dictionaries where each dictionary represents a sports event.
- file_path (Path): The Path object representing the file path where the CSV file will be created.
- only_with_outcome (bool, optional): If True, only events with outcomes will be included in the CSV. Default is False.
Returns:
None
Example:
>>> data = [...] # List of dictionaries representing sports events
>>> file_path = Path("output.csv")
>>> write_sportspress_csv(data, file_path)
"""
header = data[0].keys()
if not validate_keys(header):
raise KeyError(f"Missing Keys. Requires: {REQUIRED_KEYS}, provided: {list(header)}")
writer = csv.writer(file)
fieldnames = [
"Format", #Competitive or Friendly
# "Competition",
"Season",
# "Date Format",
"Date",
"Time",
"Venue",
"Team",
"Results",
"Outcome",
# "Players",
# "Performance",
]
# Write the header
writer.writerow(fieldnames)
# Write the data
for row in data:
if only_with_outcome and not row.get('has_result'):
continue
writer.writerow(
[
row["datetime"].strftime("%Y/%m/%d"),
row["datetime"].strftime("%H:%M"),
row.get("field", ""),
row["home"],
"|".join([str(row.get(k,"")) for k in [
"home_runs_for_inning_1",
"home_runs_for_inning_2",
"home_runs_for_inning_3",
"home_runs_for_inning_4",
"home_runs_for_inning_5",
"home_runs_for_inning_6",
"home_runs_for_inning_7",
"home_runs_for_inning_8",
"home_runs_for_inning_9",
"home_runs_for_inning_10",
"home_runs_for",
"home_errors",
"home_hits"
]]),
row.get("home_outcome")
]
)
writer.writerow(
[
"",
"",
"",
row["visitor"],
"|".join([str(row.get(k,"")) for k in [
# "visitor_runs_for_inning_1",
# "visitor_runs_for_inning_2",
# "visitor_runs_for_inning_3",
# "visitor_runs_for_inning_4",
# "visitor_runs_for_inning_5",
# "visitor_runs_for_inning_6",
# "visitor_runs_for_inning_7",
# "visitor_runs_for_inning_8",
# "visitor_runs_for_inning_9",
# "visitor_runs_for_inning_10",
"visitor_runs_for",
"visitor_errors",
"visitor_hits"
]]),
row.get("visitor_outcome")
]
)