diff --git a/src/apps/__init__.py b/.env.example similarity index 100% rename from src/apps/__init__.py rename to .env.example diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..e39590a --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,31 @@ +{ + "version": "2.0.0", + "tasks": [ + { + "label": "Truncate Database Tables", + "type": "shell", + "command": "${workspaceFolder}/.venv/bin/python", // Use this for macOS/Linux + "args": [ + "${workspaceFolder}/src/scripts/truncate_tables.py" + ], + "group": { + "kind": "build", + "isDefault": true + }, + "problemMatcher": [] + }, + { + "label": "Clear Database", + "type": "shell", + "command": "${workspaceFolder}/.venv/bin/python", // Use this for macOS/Linux + "args": [ + "${workspaceFolder}/src/scripts/clear_database.py" + ], + "group": { + "kind": "build", + "isDefault": true + }, + "problemMatcher": [] + } + ] +} \ No newline at end of file diff --git a/src/utils/csv.py b/README.md similarity index 100% rename from src/utils/csv.py rename to README.md diff --git a/alembic/__init__.py b/alembic/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alembic/alembic.ini b/alembic/alembic.ini new file mode 100644 index 0000000..e69de29 diff --git a/alembic/env.py b/alembic/env.py new file mode 100644 index 0000000..e69de29 diff --git a/normalization.toml b/normalization.toml index aec7b14..628a7f5 100644 --- a/normalization.toml +++ b/normalization.toml @@ -35,10 +35,16 @@ potential_keys = ["Field", "Location", "Venue"] original = ["Winnemac"] normalized = "Winnemac Park" [[field.values]] -original = ["Taft HS"] +original = ["Maywood", "MAYWOOD"] +normalized = "Maywood Park" +[[field.values]] +original = ["Taft HS", "Taft Hs"] normalized = "Taft High School" [[field.values]] -original = ["Southwest"] +original = ["Ridgewood Hs"] +normalized = "Ridgewood High School" +[[field.values]] +original = ["Southwest", "SW Park", "SOUTHWEST"] normalized = "Southwest Park" [[field.values]] original = ["Comed", "COMED", "ComEd"] @@ -61,13 +67,19 @@ potential_keys = ["Final Score", "Score", "Result", "Outcome"] original = ["Hounds", "Chicago Hounds", "Winnemac Hounds", "Hound"] normalized = "Hounds" [[team.values]] -original = ["Chicago Red Sox"] +original = ["Ramirez Bb", "Ramirez"] +normalized = "Ramirez Baseball" +[[team.values]] +original = ["Degeneratex"] +normalized = "DegenerateX" +[[team.values]] +original = ["Chicago Red Sox", "Redsox"] normalized = "Red Sox" [[team.values]] original = ["NorthSide White Sox"] normalized = "North Side White Sox" [[team.values]] -original = ["Chicago Rebels", "CHICAGO REBELS"] +original = ["Chicago Rebels"] normalized = "Rebels" [[team.values]] original = ["Lombard Expors", "LOMBARD EXPORS"] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e69de29 diff --git a/requirements.txt b/requirements.txt index d530bcc..7acfe1c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,20 @@ -typer[all]==0.9.0 -python-dateutil==2.8.2 -toml==0.10.2 +# CLI and display +typer[all] +rich + +# Database +sqlmodel +alembic +python-dotenv + +# Data processing +python-dateutil +pandas # Optional but useful for data analysis +openpyxl # For Excel file support + +# File parsing +toml +xlsx2csv + +# Image generation (for calendar features) pillow -xlsx2csv \ No newline at end of file diff --git a/sql_scripts/get_game_details.sql b/sql_scripts/get_game_details.sql new file mode 100644 index 0000000..f14b6e8 --- /dev/null +++ b/sql_scripts/get_game_details.sql @@ -0,0 +1,22 @@ +SELECT + game.id, + game.date, + team_home.name AS home_team, + team_visitor.name AS visitor_team, + venue.name AS venue_name, + gameresult.home_runs_for, + gameresult.visitor_runs_for, + gameresult.home_outcome, + gameresult.visitor_outcome +FROM + game +JOIN + team AS team_home ON game.home_team_id = team_home.id +JOIN + team AS team_visitor ON game.visitor_team_id = team_visitor.id +JOIN + venue ON game.venue_id = venue.id +LEFT JOIN + gameresult ON game.id = gameresult.game_id +ORDER BY + game.date; \ No newline at end of file diff --git a/sql_scripts/get_standings.sql b/sql_scripts/get_standings.sql new file mode 100644 index 0000000..b5ac8b3 --- /dev/null +++ b/sql_scripts/get_standings.sql @@ -0,0 +1,26 @@ +SELECT + strftime('%Y', game.date) AS year, -- Extracts year from date + team.id AS team_id, + team.name AS team_name, + SUM(CASE + WHEN gameresult.home_outcome = 'WIN' AND game.home_team_id = team.id + OR gameresult.visitor_outcome = 'WIN' AND game.visitor_team_id = team.id + THEN 1 ELSE 0 END) AS wins, + SUM(CASE + WHEN gameresult.home_outcome = 'LOSS' AND game.home_team_id = team.id + OR gameresult.visitor_outcome = 'LOSS' AND game.visitor_team_id = team.id + THEN 1 ELSE 0 END) AS losses, + SUM(CASE + WHEN gameresult.home_outcome = 'TIE' AND game.home_team_id = team.id + OR gameresult.visitor_outcome = 'TIE' AND game.visitor_team_id = team.id + THEN 1 ELSE 0 END) AS ties +FROM + team +LEFT JOIN + game ON (game.home_team_id = team.id OR game.visitor_team_id = team.id) +LEFT JOIN + gameresult ON game.id = gameresult.game_id +GROUP BY + year, team.id +ORDER BY + year, wins DESC, losses ASC, ties DESC; \ No newline at end of file diff --git a/src/__main__.py b/src/__main__.py deleted file mode 100644 index 30d4cc6..0000000 --- a/src/__main__.py +++ /dev/null @@ -1,15 +0,0 @@ -from .apps.convert import app as convert_app -from .apps.clean import app as clean_app -from .apps.read import app as read_app -from .apps.generate import app as generate_app -import typer - -app = typer.Typer() - -app.add_typer(convert_app, name="convert") -app.add_typer(clean_app, name="clean") -app.add_typer(read_app, name="read") -app.add_typer(generate_app, name="generate") - -if __name__ == "__main__": - app() \ No newline at end of file diff --git a/src/apps/clean/__init__.py b/src/apps/clean/__init__.py deleted file mode 100644 index 661a5dd..0000000 --- a/src/apps/clean/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .clean import app \ No newline at end of file diff --git a/src/apps/clean/clean.py b/src/apps/clean/clean.py deleted file mode 100644 index 39416c0..0000000 --- a/src/apps/clean/clean.py +++ /dev/null @@ -1,109 +0,0 @@ -import typer -from rich.table import Table, Column -from rich.console import Console -from rich.columns import Columns -from rich.panel import Panel -from pathlib import Path -import csv -from ...utils.common import list_key_values, read_and_normalize_csv_or_xlsx -from ...utils.normalize import normalize_header_key, replace_key_values, DEFAULT_NORMALIZATION_PATH -from typing import Annotated, List -import re - -app = typer.Typer() - -@app.command("replace") -def replace_values_for_key( - input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")], - output_file: Annotated[List[typer.FileText], typer.Option(..., "--output-file", "-o", help="Specify output file.")], - key: str = typer.Argument(..., help=""), - match: str = typer.Argument(..., help=""), - replace: str = typer.Argument(..., help=""), - in_place: bool = typer.Option(False, "--in-place", "-p", help="Modify file in place."), - match_is_regex: bool = typer.Option(False, "--regex", "-p", help="Match is a regex pattern.") - ): - - # normalized_key = normalize_header_key(key) - normalized_key = key - - console = Console() - - # Read CSV data - for f in input_file: - data = read_and_normalize_csv_or_xlsx(f) - - before_table = Table(Column(), show_header=False, title="Before") - for value in sorted(list_key_values(data, key)): - before_table.add_row(value) - - - after_table = Table( Column(), show_header=False, title="After") - - if normalized_key != "team" or "team" in data[0].keys(): - data = replace_key_values(data, normalized_key, match, replace, match_is_regex) - else: - data=replace_key_values(data, "home", match, replace, match_is_regex) - data=replace_key_values(data, "visitor", match, replace, match_is_regex) - - for value in sorted(list_key_values(data, key)): - after_table.add_row(value) - - panel = Panel( - Columns([before_table, after_table]), - title="Replace" - ) - - console.print(panel) - - if in_place and typer.confirm("Perform Replacement in-place?"): - fieldnames = data[0].keys() - writer = csv.DictWriter(f, fieldnames=fieldnames) - writer.writeheader() - writer.writerows(data) - - elif output_file: - if output_file.is_dir(): - output_file = output_file.joinpath(f.name) - if typer.confirm(f"Write to {output_file}?"): - with output_file.open('w') as f: - fieldnames = data[0].keys() - writer = csv.DictWriter(f, fieldnames=fieldnames) - writer.writeheader() - writer.writerows(data) - -@app.command("add-key") -def add_values_for_key( - file_path: Path = typer.Argument(..., help="Path to the CSV or XLSX file"), - key: str = typer.Argument(..., help=""), - value: str = typer.Argument("", help=""), - in_place: bool = typer.Option(False, "--in-place", "-p", help="Modify file in place."), - output_file: Path = typer.Option(None, "--output-file", "-o", help="Specify output file."), - ): - - if in_place and output_file: - typer.echo("Error: Only one of --in-place or --output-file should be provided, not both.") - raise typer.Abort() - - console = Console() - - # Read CSV data - data = read_and_normalize_csv_or_xlsx(file_path) - - # data = add_key_values(data, key, value) - - if in_place and typer.confirm("Perform Replacement in-place?"): - with file_path.open('w') as f: - fieldnames = data[0].keys() - writer = csv.DictWriter(f, fieldnames=fieldnames) - writer.writeheader() - writer.writerows(data) - - elif output_file: - if output_file.is_dir(): - output_file = output_file.joinpath(file_path.name) - if typer.confirm(f"Write to {output_file}?"): - with output_file.open('w') as f: - fieldnames = data[0].keys() - writer = csv.DictWriter(f, fieldnames=fieldnames) - writer.writeheader() - writer.writerows(data) \ No newline at end of file diff --git a/src/apps/convert/__init__.py b/src/apps/convert/__init__.py deleted file mode 100644 index e3e12fe..0000000 --- a/src/apps/convert/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .convert import app \ No newline at end of file diff --git a/src/apps/convert/convert.py b/src/apps/convert/convert.py deleted file mode 100644 index bbd4c08..0000000 --- a/src/apps/convert/convert.py +++ /dev/null @@ -1,44 +0,0 @@ -import typer -from typing import Annotated -from pathlib import Path -from ...utils.sportspress import validate_keys -from ...utils.normalize import normalize_header_key, load_config -from ...utils.common import read_and_normalize_csv_or_xlsx, is_visitor_home_order_reversed, import_gamebygame -from ...utils.sportspress import write_sportspress_csv -import csv - -app = typer.Typer() - -@app.command(name="sportspress") -def sportspress_csv( - input_file: Annotated[Path, typer.Argument(..., help="Path to the or XLSX file")], - file_output_path: Annotated[typer.FileTextWrite, typer.Argument(..., help="Path to the output CSV file")], - only_with_outcome: bool = typer.Option(default=False, is_flag=True, help="") - ): - - # Read CSV data - data = import_gamebygame(input_file) - - try: - write_sportspress_csv(data, file_output_path, only_with_outcome) - except KeyError as e: - typer.echo(f"Error: {e}") - - typer.echo(f"Output to {file_output_path.name}") - -@app.command(name="teamsnap") -def sportspress_csv( - input_file: Annotated[Path, typer.Argument(..., help="Path to the CSV or XLSX file")], - file_output_path: Annotated[typer.FileTextWrite, typer.Argument(..., help="Path to the output CSV file")], - only_with_outcome: bool = typer.Option(default=False, is_flag=True, help="") - ): - - # Read CSV data - data = import_gamebygame(input_file) - - try: - write_sportspress_csv(data, file_output_path, only_with_outcome) - except KeyError as e: - typer.echo(f"Error: {e}") - - typer.echo(f"Output to {file_output_path.name}") \ No newline at end of file diff --git a/src/apps/generate/__init__.py b/src/apps/generate/__init__.py deleted file mode 100644 index ce993e2..0000000 --- a/src/apps/generate/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .calendar import app \ No newline at end of file diff --git a/src/apps/generate/calendar.py b/src/apps/generate/calendar.py deleted file mode 100644 index a4fdd84..0000000 --- a/src/apps/generate/calendar.py +++ /dev/null @@ -1,55 +0,0 @@ -import typer -from rich.console import Console -from typing import Annotated, List, Optional -from pathlib import Path -from ...utils.sportspress import validate_keys -from ...utils.normalize import normalize_header_key, load_config -from ...utils.common import read_and_normalize_csv_or_xlsx, is_visitor_home_order_reversed, import_gamebygame, parse_datetime, personalize_data_for_team -from ...utils.sportspress import write_sportspress_csv -from .calendar_utils import generate_calendar -from collections import defaultdict -import toml - -app = typer.Typer() - -@app.command(name="calendar") -def generate_calendar_app( - input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV file")], - config_file: Annotated[Optional[typer.FileText], typer.Option(..., "--config", "-c", help="Path to a config file")]=None - ): - # Read CSV data - data = read_and_normalize_csv_or_xlsx(input_file) - data = personalize_data_for_team(data, "Hounds") - # data = parse_datetime(data) - - generate_calendar(data, config_file) - pass - -@app.command(name="calendar-config") -def generate_calendar_configs( - input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV file")], - output_file: Annotated[Path, typer.Argument(..., help="Path(s) to the output config file")] -): - data = read_and_normalize_csv_or_xlsx(input_file) - teams = {row.get('visitor') for row in data} - teams.update({row.get('home') for row in data}) - fields = {row.get('field') for row in data} - config = defaultdict(dict) - config['fields']['default'] = { - 'bg_color': (0, 0, 0, 256) - } - config['teams']['default'] = { - 'logo': '' - } - for field in fields: - config['fields'][field] = config['fields']['default'] - for team in teams: - config['teams'][team] = config['teams']['default'] - - if output_file.is_dir: - output_file = output_file.joinpath('calendar_config.toml') - - with output_file.open('w') as f: - toml.dump(config, f) - - pass \ No newline at end of file diff --git a/src/apps/generate/calendar_utils.py b/src/apps/generate/calendar_utils.py deleted file mode 100644 index 2c1b530..0000000 --- a/src/apps/generate/calendar_utils.py +++ /dev/null @@ -1,204 +0,0 @@ - -from calendar import Calendar -from PIL import Image, ImageDraw, ImageFont -from typing import Tuple -from pathlib import Path -import toml - -calendar_cell_size = (400, 500) -calendar_cell_width, calendar_cell_height = calendar_cell_size - -def textsize(text, font): - im = Image.new(mode="P", size=(0, 0)) - draw = ImageDraw.Draw(im) - _, _, width, height = draw.textbbox((0, 0), text=text, font=font) - return width, height - -def corner_image(): - return Image.new() - -def text_rectangle(text:str, font: str, font_size: int, foreground_color: Tuple[int, int, int, int]=(0,0,0,255), background_color: Tuple[int, int, int, int]=(0,0,0,0), height: int=400, width: int=500) -> Image: - font_obj = ImageFont.truetype(font,font_size) - img = Image.new('RGBA', (int(width),int(height)), background_color) - draw = ImageDraw.Draw(img) - text = str(text) - text_width, text_height = textsize(text, font=font_obj) - x = (width - text_width) / 2 - y = (height - text_height) / 2 - text_position = (x,y) - draw.text(text_position, text, font=font_obj, fill=foreground_color) - return img - -def calendar_cell( - height: int=calendar_cell_height, width: int=calendar_cell_width, - background_color: Tuple[int, int, int, int]=(0,0,0,0), - left_top_corner = None, - right_top_corner = None, - top_center = None, - right_bottom_corner = None, - bottom_center = None, - left_bottom_corner = None, - center = None -): - # Create a blank rectangle image - cell_img = Image.new('RGBA', (width, height), background_color) - - # Left top corner - if left_top_corner: - paste_position = (0, 0) - cell_img.paste(left_top_corner, paste_position, left_top_corner) - - # Right top corner - if right_top_corner: - paste_position = (width - right_top_corner.width, 0) - cell_img.paste(right_top_corner, paste_position, right_top_corner) - - if top_center: - raise NotImplementedError - - if right_bottom_corner: - paste_position = (width - right_bottom_corner.width, height - right_bottom_corner.height) - cell_img.paste(right_bottom_corner, paste_position, right_bottom_corner) - - if bottom_center: - paste_position = ((width - bottom_center.width)//2, (height - bottom_center.height)) - cell_img.paste(bottom_center, paste_position, bottom_center) - - if left_bottom_corner: - raise NotImplementedError - - if center: - paste_position = ((width - center.width)//2, (height - center.height)//2) - cell_img.paste(center, paste_position, center) - - return cell_img - -def generate_calendar(data, config_file=None): - result_calendar = Calendar() - result_calendar.setfirstweekday(6) - baseball_bat = Image.open(f"data/logos/baseball_bat_2.png") - if config_file: - config = toml.load(config_file) - - baseball_bat = baseball_bat.resize((90, 90)) - for year, month in {(row['datetime'].year, row['datetime'].month) for row in data}: - month_days=list(result_calendar.monthdayscalendar(year, month)) - month_image = Image.new('RGBA', (calendar_cell_width*7, calendar_cell_height*len(month_days)), (0, 0, 0, 0)) - first_thursday=(month, [w[4] for w in month_days if w[4] != 0][0]) - - colors = { - 'default': (128, 128, 128, 256), - } - team_logos={} - - if config: - for field, field_options in config['fields'].items(): - colors[field] = tuple(field_options.get('bg_color', colors.get('default'))) - for team, team_options in config['teams'].items(): - team_logos[team] = team_options.get('logo') - - for week_num, week in enumerate(month_days): - for day_num, date in enumerate(week): - date_text_image = text_rectangle(date, - "data/fonts/refrigerator-deluxe-bold.otf", - 100, - foreground_color='white', - height=calendar_cell_height*.25, - width=calendar_cell_width*.25) - if filtered_data := [row for row in data if row['datetime'].month == month and row['datetime'].day == date]: - # Gen square that has one game - if len (filtered_data) == 1: - game = filtered_data[0] - opponent_logo_path = team_logos.get(game['opponent']) - if opponent_logo_path and (opponent_logo_path := Path(opponent_logo_path)) and opponent_logo_path.exists(): - opponent_logo = Image.open(opponent_logo_path) - else: - opponent_logo = text_rectangle(text=game['opponent'][0].upper(),width=500, height=500, font_size=400, font="data/fonts/college.ttf") - is_home_game = game['homevisitor'] == "home" - if game.get('wood','').lower() == 'yes': - right_bottom_corner = baseball_bat - else: - right_bottom_corner = None - img = calendar_cell( - height=calendar_cell_height, - width=calendar_cell_width, - background_color=colors.get(game['field'], colors['default']), - left_top_corner = text_rectangle('H' if is_home_game else "A", - "data/fonts/refrigerator-deluxe-bold.otf", - 80, - foreground_color='black' if is_home_game else 'white', - background_color='white' if is_home_game else 'black', - height=calendar_cell_height*.2, - width=calendar_cell_width*.2), - right_top_corner = date_text_image, - center = opponent_logo.resize((int(opponent_logo.width*.5), int(opponent_logo.height*.5))), - bottom_center = text_rectangle(f"{game['time']:%-I:%M}" if game.get('time') else "", - "data/fonts/refrigerator-deluxe-bold.otf", - 120, - foreground_color='white', - height=calendar_cell_height*.25, - width=calendar_cell_width), - right_bottom_corner=right_bottom_corner - ) - # img.show() - elif len(filtered_data) == 2: - game1, game2 = filtered_data[:2] - opponent_logo_path = team_logos.get(game1['opponent']) - if opponent_logo_path and (opponent_logo_path := Path(opponent_logo_path)) and opponent_logo_path.exists(): - opponent_logo = Image.open(opponent_logo_path) - else: - opponent_logo = text_rectangle(text=game1['opponent'][0].upper(),width=500, height=500, font_size=400, font="data/fonts/college.ttf") - - img = calendar_cell( - height=calendar_cell_height, - width=calendar_cell_width, - background_color=colors.get(game1['field'], colors['default']), - left_top_corner = text_rectangle('DH', - "data/fonts/refrigerator-deluxe-bold.otf", - 80, - foreground_color='black', - background_color='white', - height=calendar_cell_height*.2, - width=calendar_cell_width*.2), - right_top_corner = date_text_image, - center = opponent_logo.resize((int(opponent_logo.width*.5), int(opponent_logo.height*.5))), - bottom_center = text_rectangle(f"{game1['time']:%-I:%M} & {game2['time']:%-I:%M}", - "data/fonts/refrigerator-deluxe-bold.otf", - 80, - foreground_color='white', - height=calendar_cell_height*.2, - width=calendar_cell_width), - ) - pass - else: - img=calendar_cell( - height=calendar_cell_height, - width=calendar_cell_width, - background_color=(204,204,204,int(256*.85)), - right_top_corner = text_rectangle(date, - "data/fonts/refrigerator-deluxe-bold.otf", - 100, - foreground_color='black', - height=calendar_cell_height*.25, - width=calendar_cell_width*.25) - ) - pass - - if date: month_image.paste(img, (img.size[0]*day_num, img.size[1]*week_num), img) - month_image.save(f'data/output/{year}-{month}.png') - - # if (month, date) in games_lookup.keys() and not (month, date) == first_thursday: - # background_image = game_square([g for g in games if g['dtstart'].month==month and g['dtstart'].day==date]) - # elif (month, date) in games_lookup.keys() and (month, date) == first_thursday: - # background_image = game_square( - # [g for g in games if g['dtstart'].month == month and g['dtstart'].day == date], special='open-mic') - # elif (month, date) == first_thursday: - # background_image = openmic_square(date) - # else: - # background_image = blank_square(date) - - # if date: month_image.paste(background_image, (background_image.size[0]*day_num, background_image.size[1]*week_num), background_image) - - # month_image.thumbnail((1000,1000)) - # month_image.save(f'output/{year}-{month}.png') - # month_image.show() diff --git a/src/apps/read/__init__.py b/src/apps/read/__init__.py deleted file mode 100644 index 056f4a0..0000000 --- a/src/apps/read/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .read import app \ No newline at end of file diff --git a/src/apps/read/read.py b/src/apps/read/read.py deleted file mode 100644 index 61aa4f6..0000000 --- a/src/apps/read/read.py +++ /dev/null @@ -1,158 +0,0 @@ -import typer -from rich.table import Table, Column -from rich.console import Console -from rich.columns import Columns -from pathlib import Path -import csv -from ...utils.common import list_key_values, read_and_normalize_csv_or_xlsx, import_gamebygame, aggregate_teams, aggregate_teams_by_season -from ...utils.normalize import normalize_header_key -from typing import Annotated, List - -app = typer.Typer() - -@app.command("list-values") -def print_values_for_key( - input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")], - key: str = typer.Argument(..., help="The key to retrieve to generate list.") - ): - # Read CSV data - data = [] - for f in input_file: - data.extend(read_and_normalize_csv_or_xlsx(f)) - values = list_key_values(data, key) - - console = Console() - table = Table(show_header=False, title=f'Values for "{key.title()}" ({len(values)})') - table.add_column("Values") - - # breakpoint() - for value in sorted(values): - table.add_row(value) - - console.print(table) - -@app.command("print") -def print_table( - input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")] - ): - # Read CSV data - data = [] - for f in input_file: - data.extend(read_and_normalize_csv_or_xlsx(f)) - - console = Console() - table = Table() - - keys = data[0].keys() - - for key in keys: - table.add_column(key) - - # breakpoint() - for row in data: - table.add_row(*[str(row[key]) for key in keys]) - - console.print(table) - -@app.command() -def check( - input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")] - ): - # Read CSV data - data = [] - for f in input_file: - data.extend(read_and_normalize_csv_or_xlsx(f)) - teams = set([row['visitor'] for row in data] + [row['home'] for row in data]) - fields = set([row['field'] for row in data]) - console = Console() - - table = Table("Team", "Number of Games") - for team in teams: - rows = [row for row in data if row['visitor']==team or row['home']==team] - table.add_row(team, str(len(rows))) - console.print(table) - - table = Table("Field", "Number of Games") - for field in fields: - rows = [row for row in data if row['field']==field] - table.add_row(field, str(len(rows))) - console.print(table) - - table = Table("Field", "Datetime", "Games") - field_times = [(row['field'], row['datetime']) for row in data] - for field, datetime in field_times: - rows = [row for row in data if row['field'] == field and row['datetime'] == datetime] - if len(rows) != 1: - table.add_row(str(field), str(datetime), str(len(rows))) - console.print(table) - - matchups = set([tuple([*sorted((row['home'], row['visitor']))]) for row in data]) - table =Table("Team 1", "Team 2", "Games") - for team1, team2 in matchups: - rows = [row for row in data if (row['visitor']==team1 or row['home']==team1) and (row['visitor']==team2 or row['home']==team2)] - table.add_row(str(team1), str(team2), str(len(rows))) - console.print(table) - - pass - - -@app.command() -def standings( - input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")], - ): - - # Read CSV data - data=[] - for f in input_file: - data.extend(import_gamebygame(f)) - - aggregate_team_data = aggregate_teams(data) - - # Display aggregated data as a table - console = Console() - table = Table(title="Aggregated Team Data") - table.add_column("Team", style="bold") - table.add_column("GP", style="bold") - table.add_column("Wins", style="bold") - table.add_column("Losses", style="bold") - table.add_column("Ties", style="bold") - table.add_column("Runs For", style="bold") - table.add_column("Runs Against", style="bold") - - for team_stats in aggregate_team_data: - table.add_row( - team_stats["team"], - str(team_stats["gp"]), - str(team_stats["win"]), - str(team_stats["loss"]), - str(team_stats["tie"]), - str(team_stats["runs_for"]), - str(team_stats["runs_against"]), - ) - - console.print(table) - -@app.command() -def seasons( - input_file: Annotated[List[Path], typer.Argument(..., help="Path(s) to the CSV or XLSX file")], - ): - - # Read CSV data - data=[] - for f in input_file: - data.extend(read_and_normalize_csv_or_xlsx(f)) - - aggregate_team_data = aggregate_teams_by_season(data) - - # Display aggregated data as a table - console = Console() - table = Table(title="Aggregated Team Data") - table.add_column("Team", style="bold") - - for team_stats in aggregate_team_data: - table.add_row( - team_stats["team"], - str(", ".join(sorted(team_stats["seasons"]))) - ) - - console.print(table) \ No newline at end of file diff --git a/src/cli.py b/src/cli.py new file mode 100644 index 0000000..779c1ac --- /dev/null +++ b/src/cli.py @@ -0,0 +1,207 @@ +import typer +from pathlib import Path +from typing import List, Optional +from .utils.common import import_gamebygame +from .utils.db import get_session, init_db +from .models import Game, Team, Venue, GameResult +# from alembic import command +# from alembic.config import Config +from rich.console import Console +from rich.table import Table + +app = typer.Typer() +console = Console() + +@app.command() +def init(): + """Initialize the database""" + init_db() + console.print("[green]Database initialized successfully[/green]") + +@app.command() +def ingest( + files: List[Path] = typer.Argument(..., help="CSV/XLSX files to ingest"), + season: Optional[int] = typer.Option(None, help="Season year"), + source: str = typer.Option("unknown", help="Data source (sportsengine, teamsnap, etc)"), + dry_run: bool = typer.Option(False, "--dry-run", help="Preview without saving") +): + """Ingest CSV/XLSX files into the database with automatic normalization""" + + with get_session() as session: + for file_path in files: + console.print(f"\n[bold]Processing {file_path}[/bold]") + + # Use your existing parsing logic + data = import_gamebygame(file_path) + + games_added = 0 + games_skipped = 0 + + for row in data: + if any(row.get(key) is None for key in ['home', 'visitor','field']): + continue #skip + # Get or create teams + home_team = get_or_create_team(session, row['home']) + visitor_team = get_or_create_team(session, row['visitor']) + + # Get or create venue + venue = None + if row.get('field'): + venue = get_or_create_venue(session, row['field']) + + # Create dedupe key + dedupe_key = f"{row['datetime']}_{row.get('field', 'unknown')}_{home_team.id}_{visitor_team.id}" + + # Check if game exists + existing = session.query(Game).filter_by(dedupe_key=dedupe_key).first() + if existing: + games_skipped += 1 + continue + + # Create game + game = Game( + date=row['datetime'], + home_team_id=home_team.id, + visitor_team_id=visitor_team.id, + venue_id=venue.id if venue else None, + field=row.get('field'), + season=season or row.get('season'), + source=source, + dedupe_key=dedupe_key + ) + + # Add result if available + if row.get('has_result'): + result = GameResult( + game_id=game.id, + home_runs_for=row.get('home_runs_for'), + visitor_runs_for=row.get('visitor_runs_for'), + home_outcome=row.get('home_outcome'), + visitor_outcome=row.get('visitor_outcome') + ) + game.result = result + + if not dry_run: + session.add(game) + games_added += 1 + + if not dry_run: + session.commit() + + # Summary + console.print(f"[green]Added: {games_added} games[/green]") + console.print(f"[yellow]Skipped (duplicates): {games_skipped} games[/yellow]") + +@app.command() +def query( + team: Optional[str] = typer.Option(None, help="Filter by team name"), + season: Optional[int] = typer.Option(None, help="Filter by season"), + venue: Optional[str] = typer.Option(None, help="Filter by venue"), + format: str = typer.Option("table", help="Output format: table, csv, json") +): + """Query games from the database""" + with get_session() as session: + query = session.query(Game) + + if team: + query = query.join(Team, or_( + Game.home_team_id == Team.id, + Game.visitor_team_id == Team.id + )).filter(Team.name == team) + + if season: + query = query.filter(Game.season == season) + + if venue: + query = query.join(Venue).filter(Venue.name == venue) + + games = query.all() + + if format == "table": + display_games_table(games) + elif format == "csv": + export_games_csv(games) + elif format == "json": + export_games_json(games) + +@app.command() +def standings( + season: Optional[int] = typer.Option(None, help="Filter by season") +): + """Show standings from the database""" + # Reuse your aggregate_teams logic but with database queries + with get_session() as session: + # Query and aggregate + standings_data = calculate_standings_from_db(session, season) + display_standings_table(standings_data) + +# Database management commands +from sqlalchemy import or_ +@app.command() +def upgrade(): + """Apply database migrations""" + # Example usage in a command function + # Ensure you import or_ from sqlalchemy + # from alembic import command + # from alembic.config import Config + # Uncomment and implement Alembic usage as desired + with get_session() as session: + # Your session-based operations here + pass + # alembic_cfg = Config("alembic.ini") + # command.upgrade(alembic_cfg, "head") + console.print("[green]Database upgraded[/green]") + +@app.command() +def downgrade(revision: str = typer.Argument("", help="Revision to downgrade to")): + """Downgrade database migrations""" + with get_session() as session: + # Your session-based operations here + pass + # alembic_cfg = Config("alembic.ini") + # command.downgrade(alembic_cfg, revision or "-1") + console.print("[yellow]Database downgraded[/yellow]") + +@app.command() +def clean_duplicates(): + """Remove duplicate games from database""" + # Implement deduplication logic + pass + +@app.command() +def export( + output: Path, + format: str = typer.Option("sportspress", help="Export format: sportspress, csv, json"), + season: Optional[int] = typer.Option(None, help="Filter by season") +): + """Export data from database""" + with get_session() as session: + games = session.query(Game) + if season: + games = games.filter(Game.season == season) + + if format == "sportspress": + # Use your existing write_sportspress_csv + from .utils.sportspress import write_sportspress_csv + data = games_to_dict_format(games.all()) + write_sportspress_csv(data, output) + +# Helper functions +def get_or_create_team(session, team_name: str) -> Team: + team = session.query(Team).filter_by(name=team_name).first() + if not team: + team = Team(name=team_name) + session.add(team) + session.flush() + return team + +def get_or_create_venue(session, venue_name: str) -> Venue: + venue = session.query(Venue).filter_by(name=venue_name).first() + if not venue: + venue = Venue(name=venue_name) + session.add(venue) + session.flush() + return venue + +if __name__ == "__main__": + app() \ No newline at end of file diff --git a/src/convert_to_sportspress.py b/src/convert_to_sportspress.py deleted file mode 100644 index 89d5c06..0000000 --- a/src/convert_to_sportspress.py +++ /dev/null @@ -1,81 +0,0 @@ -import csv -import re -from typing import List, Dict -from dateutil import parser -from pathlib import Path -from rich.console import Console -from rich.panel import Panel -from rich.table import Table, Column -from rich.columns import Columns -import typer -# from .utils.common import normalize_header_key, read_csv, is_visitor_home_order_reversed, process_data, aggregate_teams, write_sportspress_csv - -# validate_csv_header - -app = typer.Typer() - - -@app.command() -def standings(file_path: Path = typer.Argument(..., help="Path to the CSV file")): - # Validate CSV header - header = next(csv.reader(open(file_path, "r"))) - normalized_header = [normalize_header_key(key) for key in header] - if not validate_csv_header(header): - typer.echo("Error: Invalid CSV header. Make sure the CSV file contains the correct headers.") - return - - # Read CSV data - data = read_csv(file_path) - visitor_home_order_reversed = is_visitor_home_order_reversed(normalized_header) - processed_data = process_data(data, visitor_home_order_reversed) - aggregate_team_data = aggregate_teams(processed_data) - - # Display aggregated data as a table - console = Console() - table = Table(title="Aggregated Team Data") - table.add_column("Team", style="bold") - table.add_column("Wins", style="bold") - table.add_column("Losses", style="bold") - table.add_column("Ties", style="bold") - table.add_column("Runs For", style="bold") - table.add_column("Runs Against", style="bold") - - for team_stats in aggregate_team_data: - table.add_row( - team_stats["team"], - str(team_stats["win"]), - str(team_stats["loss"]), - str(team_stats["tie"]), - str(team_stats["runs_for"]), - str(team_stats["runs_against"]), - ) - - console.print(table) - - # Write processed CSV data back to a new file - # output_file_path = file_path.with_suffix(".processed.csv") - # write_csv(output_file_path, data) - # typer.echo(f"Processed data written to: {output_file_path}") - -# @app.command() -def replace_key_values(data: List[Dict], key, match:str, replace:str, is_regex:bool =False): - if not is_regex: - regex = re.compile(fr"^{match}$") - else: - regex = re.compile(fr"{match}") - - for row in data: - row[key] = regex.sub(replace, row[key]) - - return data - -def add_key_values(data: List[Dict], key, value:str): - for row in data: - row[key] = value - - return data - - - -if __name__ == "__main__": - app() diff --git a/src/models/__init__.py b/src/models/__init__.py new file mode 100644 index 0000000..cf4f59d --- /dev/null +++ b/src/models/__init__.py @@ -0,0 +1 @@ +from .models import * \ No newline at end of file diff --git a/src/models/models.py b/src/models/models.py new file mode 100644 index 0000000..c7b28ad --- /dev/null +++ b/src/models/models.py @@ -0,0 +1,94 @@ +from sqlmodel import SQLModel, Field, Relationship +from uuid import UUID, uuid4 +from datetime import datetime +from typing import Optional, List +from enum import Enum + +# Team model +class Team(SQLModel, table=True): + id: UUID = Field(default_factory=uuid4, primary_key=True) + name: str = Field(index=True, unique=True) + + # Specify foreign keys explicitly for relationships + # home_games: List["Game"] = Relationship( + # back_populates="home_team", + # sa_relationship_kwargs={"foreign_keys": "Game.home_team_id"} + # ) + # visitor_games: List["Game"] = Relationship( + # back_populates="visitor_team", + # sa_relationship_kwargs={"foreign_keys": "Game.visitor_team_id"} + # ) + +# Venue model +class Venue(SQLModel, table=True): + id: UUID = Field(default_factory=uuid4, primary_key=True) + name: str = Field(index=True, unique=True) + + # Relationships + games: List["Game"] = Relationship(back_populates="venue") + +# Game model +class Game(SQLModel, table=True): + id: UUID = Field(default_factory=uuid4, primary_key=True) + date: datetime = Field(index=True) + + # Foreign keys + home_team_id: UUID = Field(foreign_key="team.id") + visitor_team_id: UUID = Field(foreign_key="team.id") + venue_id: Optional[UUID] = Field(default=None, foreign_key="venue.id") + + # Explicitly specify relationships with foreign keys + home_team: "Team" = Relationship( + # back_populates="home_games", + sa_relationship_kwargs={"foreign_keys": "Game.home_team_id"} + ) + visitor_team: "Team" = Relationship( + # back_populates="visitor_games", + sa_relationship_kwargs={"foreign_keys": "Game.visitor_team_id"} + ) + venue: Optional["Venue"] = Relationship(back_populates="games") + result: Optional["GameResult"] = Relationship(back_populates="game") + + # Metadata + season: Optional[int] = Field(index=True) + source: str = Field(default="unknown") + dedupe_key: str = Field(unique=True, index=True) + +class GameOutcome(str, Enum): + WIN = "win" + LOSS = "loss" + TIE = "tie" + FORFEIT = "forfeit" + TECHNICAL_WIN = "technical_win" + TECHNICAL_LOSS = "technical_loss" + FORFEIT_WIN = "forfeit_win" + FORFEIT_LOSS = "forfeit_loss" + +# GameResult model +class GameResult(SQLModel, table=True): + id: UUID = Field(default_factory=uuid4, primary_key=True) + + # Foreign key to Game + game_id: UUID = Field(foreign_key="game.id") + game: "Game" = Relationship(back_populates="result") + + # Game results + home_runs_for: Optional["int"] + visitor_runs_for: Optional["int"] + + home_outcome: Optional[GameOutcome] # win/loss/tie/forfeit + visitor_outcome: Optional[GameOutcome] + +# External ID model if needed +class ExternalId(SQLModel, table=True): + id: UUID = Field(default_factory=uuid4, primary_key=True) + external_type: str # e.g., 'SportsEngine', 'TeamSnap' + external_id: str + team_id: Optional[UUID] = Field(default=None, foreign_key="team.id") + game_id: Optional[UUID] = Field(default=None, foreign_key="game.id") + venue_id: Optional[UUID] = Field(default=None, foreign_key="venue.id") + + # Relationships + team: Optional["Team"] = Relationship() + game: Optional["Game"] = Relationship() + venue: Optional["Venue"] = Relationship() \ No newline at end of file diff --git a/src/scripts/clear_database.py b/src/scripts/clear_database.py new file mode 100644 index 0000000..0e7e2ac --- /dev/null +++ b/src/scripts/clear_database.py @@ -0,0 +1,14 @@ +from sqlmodel import create_engine, SQLModel, + +# Replace with your actual database URL +DATABASE_URL = "sqlite:///sportsdb.db" + +engine = create_engine(DATABASE_URL) + +def clear_database(): + # Drop all tables + SQLModel.metadata.drop_all(engine) + print("All tables dropped successfully.") + +if __name__ == "__main__": + clear_database() \ No newline at end of file diff --git a/src/scripts/truncate_tables.py b/src/scripts/truncate_tables.py new file mode 100644 index 0000000..2ccaea7 --- /dev/null +++ b/src/scripts/truncate_tables.py @@ -0,0 +1,18 @@ +from sqlmodel import Session, create_engine, SQLModel, text + +DATABASE_URL = "sqlite:///sportsdb.db" # Adjust your database URL accordingly + +engine = create_engine(DATABASE_URL) + +def truncate_tables(): + with Session(engine) as session: + session.exec(text("DELETE FROM gameresult;")) + session.exec(text("DELETE FROM game;")) + session.exec(text("DELETE FROM venue;")) + session.exec(text("DELETE FROM team;")) + session.commit() + print("All tables truncated successfully.") + +if __name__ == "__main__": + SQLModel.metadata.create_all(engine) # Ensure the tables exist + truncate_tables() \ No newline at end of file diff --git a/src/utils/common.py b/src/utils/common.py index e0f493b..0a41ed1 100644 --- a/src/utils/common.py +++ b/src/utils/common.py @@ -169,7 +169,6 @@ def parse_datetime(data: List[Dict]): def import_gamebygame(data: Union[List[Dict], TextIO, Path]) -> List[Dict]: if isinstance(data, TextIOBase) or isinstance(data, Path) : data = read_and_normalize_csv_or_xlsx(data) - header = data[0].keys() visitor_home_order_reversed = is_visitor_home_order_reversed(list(header)) for row in data: @@ -178,7 +177,8 @@ def import_gamebygame(data: Union[List[Dict], TextIO, Path]) -> List[Dict]: try: row['datetime'] = parser.parse(f"{row['date']} {row['time']}") except parser.ParserError as e: - raise e + pass + # raise e return data diff --git a/src/utils/db.py b/src/utils/db.py new file mode 100644 index 0000000..a22a52d --- /dev/null +++ b/src/utils/db.py @@ -0,0 +1,34 @@ +from sqlalchemy.engine import create_engine +from sqlalchemy.orm import sessionmaker +from sqlmodel import SQLModel +import os +from dotenv import load_dotenv + +# Load environment variables from a .env file if present +load_dotenv() + +# Get the database URL from environment variables +DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./sportsdb.db") + +# Create an engine for the database +engine = create_engine(DATABASE_URL, echo=True) + +# Create a configured "Session" class +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +def init_db(): + """Initialize the database by creating all tables.""" + SQLModel.metadata.create_all(bind=engine) + +class SessionContext: + """Context manager for database sessions.""" + def __enter__(self): + self.db = SessionLocal() + return self.db + + def __exit__(self, exc_type, exc_value, traceback): + self.db.close() + +def get_session(): + """Return an instance of SessionContext for context management.""" + return SessionContext() diff --git a/src/utils/normalize.py b/src/utils/normalize.py index e4f951a..c069ade 100644 --- a/src/utils/normalize.py +++ b/src/utils/normalize.py @@ -1,8 +1,9 @@ import toml -from typing import List, Dict +from typing import List, Dict, Literal DEFAULT_NORMALIZATION_PATH="normalization.toml" import re from dateutil import parser +from dateutil.parser import ParserError import datetime def load_config(normalization_config_path=DEFAULT_NORMALIZATION_PATH): @@ -17,29 +18,33 @@ def normalize_header_key(original_key: str, normalization_config) -> str: normalized_key = key return normalized_key return original_key - return key_mapping.get(key.lower().strip(), key.lower().strip()) -def normalize_value(value, key, normalization_config): + +def normalize_value(value:str, key: str, normalization_config:dict): + if value.lower() == "xx": + return None value = value.strip() - for normalization_pair in normalization_config.get(key if not key == "home" or key == "away" else "team",{}).get('values',[]): + if key in ["home", "visitor", "away", "field"]: + value = value.title() + for normalization_pair in normalization_config.get(key if not (key == "home" or key == "visitor" or key=="away") else "team",{}).get('values',[]): if value in normalization_pair["original"]: value = normalization_pair["normalized"] match key.lower(): case "date": if value: value = parser.parse(value).date() - else: - pass - case "home": - value = value.title() - case "visitor": - value = value.title() case "time": if value: - value = parser.parse(value).time() - else: - pass + value = value.replace("550", "5:50") + pattern = r"(\b\d{1,2}:\d{2}\s*(?:AM|PM)\b).*" + # Use re.sub to replace the entire string matched by the pattern with just the time part + value = re.sub(pattern, r"\1", value) + value = value.replace("Time Change","") + try: + value = parser.parse(value).time() + except ParserError as e: + pass case _: # Handle other cases pass diff --git a/src/utils/sportspress.py b/src/utils/sportspress.py deleted file mode 100644 index 515a693..0000000 --- a/src/utils/sportspress.py +++ /dev/null @@ -1,105 +0,0 @@ -from typing import List, Dict -from pathlib import Path -import csv - -REQUIRED_KEYS=["date", "time", "field", "visitor", "home"] - -def validate_keys(header: List[str]) -> bool: - required_keys = REQUIRED_KEYS - return all(key in header for key in required_keys) - -def write_sportspress_csv(data: List[Dict], file: Path, only_with_outcome:bool = False): - """ - Writes sports event data to a CSV file in a specific format. - - Parameters: - - data (List[Dict]): List of dictionaries where each dictionary represents a sports event. - - file_path (Path): The Path object representing the file path where the CSV file will be created. - - only_with_outcome (bool, optional): If True, only events with outcomes will be included in the CSV. Default is False. - - Returns: - None - - Example: - >>> data = [...] # List of dictionaries representing sports events - >>> file_path = Path("output.csv") - >>> write_sportspress_csv(data, file_path) - """ - - header = data[0].keys() - - if not validate_keys(header): - raise KeyError(f"Missing Keys. Requires: {REQUIRED_KEYS}, provided: {list(header)}") - - writer = csv.writer(file) - - fieldnames = [ - "Format", #Competitive or Friendly - # "Competition", - "Season", - # "Date Format", - "Date", - "Time", - "Venue", - "Team", - "Results", - "Outcome", - # "Players", - # "Performance", - ] - - # Write the header - writer.writerow(fieldnames) - - # Write the data - for row in data: - if only_with_outcome and not row.get('has_result'): - continue - writer.writerow( - [ - row["datetime"].strftime("%Y/%m/%d"), - row["datetime"].strftime("%H:%M"), - row.get("field", ""), - row["home"], - "|".join([str(row.get(k,"")) for k in [ - "home_runs_for_inning_1", - "home_runs_for_inning_2", - "home_runs_for_inning_3", - "home_runs_for_inning_4", - "home_runs_for_inning_5", - "home_runs_for_inning_6", - "home_runs_for_inning_7", - "home_runs_for_inning_8", - "home_runs_for_inning_9", - "home_runs_for_inning_10", - "home_runs_for", - "home_errors", - "home_hits" - ]]), - row.get("home_outcome") - ] - ) - writer.writerow( - [ - "", - "", - "", - row["visitor"], - "|".join([str(row.get(k,"")) for k in [ - # "visitor_runs_for_inning_1", - # "visitor_runs_for_inning_2", - # "visitor_runs_for_inning_3", - # "visitor_runs_for_inning_4", - # "visitor_runs_for_inning_5", - # "visitor_runs_for_inning_6", - # "visitor_runs_for_inning_7", - # "visitor_runs_for_inning_8", - # "visitor_runs_for_inning_9", - # "visitor_runs_for_inning_10", - "visitor_runs_for", - "visitor_errors", - "visitor_hits" - ]]), - row.get("visitor_outcome") - ] - ) \ No newline at end of file