Files
baseball-db/src/utils/common.py

344 lines
13 KiB
Python

import csv
import re
from typing import List, Dict, Union, TextIO
from io import TextIOBase, StringIO
from xlsx2csv import Xlsx2csv
from dateutil import parser
from pathlib import Path
from rich.console import Console
from rich.table import Table
from .normalize import normalize_header_key, load_config, normalize_value, normalize_keyvalue, normalize_row
import datetime
def list_key_values(data: List[Dict], key):
if key.lower() == "team":
key = "team"
else:
key = key
if key != "team" or "team" in data[0].keys():
output = {row.get(key) for row in data}
else:
output = {row.get('team') for row in data}
output = output | {row.get('home') for row in data}
output = output | {row.get('visitor') for row in data}
output.discard(None)
return output
def read_and_normalize_csv_or_xlsx(input_file: Union[List[TextIO], List[Path], TextIO, Path]) -> List[dict]:
"""
Reads CSV file(s) from the provided input file path(s) or file object(s),
and returns a list of dictionaries with normalized keys and values
where each dictionary represents a row in the CSV.
Parameters:
input_file (Union[List[TextIO], List[Path], TextIO, Path]):
Either a single file path (as a string or Path object) or a list of file paths,
or a single file object (opened in text mode) or a list of file objects.
If a list is provided, each element should be either a file path or a file object.
Returns:
List[dict]:
A list of dictionaries where each dictionary represents a row in the CSV.
Keys in the dictionaries correspond to column names, and values correspond to cell values.
"""
normalization_config = load_config()
result_data = []
if isinstance(input_file, list):
file_list = input_file
else:
file_list = [input_file]
for f in file_list:
if isinstance(f, Path):
if f.suffix.lower() == ".csv":
with f.open("r", encoding="utf-8") as f:
reader = csv.DictReader(f)
elif f.suffix.lower() == ".xlsx":
output = StringIO()
Xlsx2csv(f, outputencoding="utf-8").convert(output)
output.seek(0)
reader = csv.DictReader(output)
else:
raise ValueError("File must be a .csv or .xlsx")
for row in reader:
normalized_row = normalize_row(row, normalization_config)
result_data.append(normalized_row)
return result_data
def personalize_data_for_team(data:List[dict], target_team:str):
for row in data:
if row.get('home') == target_team:
row['homevisitor'] = 'home'
row['opponent'] = row.get('visitor')
elif row.get('visitor') == target_team:
row['homevisitor'] = 'visitor'
row['opponent'] = row.get('home')
return data
def write_csv(file_path: Path, data: List[dict]) -> None:
with open(file_path, "w", newline="") as csvfile:
fieldnames = data[0].keys()
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
def parse_score(score_str: str, reverse_order: bool = False) -> Dict[str, int]:
"""
Parse a score string and extract home and visitor scores.
Args:
score_str (str): The score string contain somewhere "visitor-home".
reverse_order (bool, optional): If True, the order of the scores is reversed (home first).
Defaults to False.
Returns:
Dict[str, int]: A dictionary containing home and visitor scores.
"""
regex = re.compile(r"^(?P<pre>.*?)?(?:(?P<runs_first>\d+)-(?P<runs_second>\d+))?(?P<post>.*?)?$")
match = regex.match(score_str)
if match:
score = {}
if match.group("pre"): score["pre"] = match.group("pre")
if match.group("post"): score["post"] = match.group("post")
if match.group("runs_first") and match.group("runs_second"):
score['has_result'] = True
runs_first, runs_second = int(match.group("runs_first")), int(match.group("runs_second"))
if not reverse_order:
score.update({
"home_runs_for": runs_second, "visitor_runs_for": runs_first,
"home_runs_against":runs_first, "visitor_runs_against": runs_second
})
elif reverse_order:
score.update({
"home_runs_for": runs_first, "visitor_runs_for": runs_second,
"home_runs_against": runs_second, "visitor_runs_against": runs_first
})
if score["home_runs_for"] > score["visitor_runs_for"]:
score["home_outcome"] = "win"
score["visitor_outcome"] = "loss"
if "forfeit" in score.get("post",""):
score["visitor_outcome"] = "forfeit"
elif score["home_runs_for"] < score["visitor_runs_for"]:
score["home_outcome"] = "loss"
score["visitor_outcome"] = "win"
if "forfeit" in score.get("post",""):
score["home_outcome"] = "forfeit"
else:
score["home_outcome"] = "tie"
score["visitor_outcome"] = "tie"
else:
score['has_result'] = False
return score
raise ValueError("Invalid score format")
def is_visitor_home_order_reversed(header: List[str]) -> bool:
"""
Determine if the order of 'visitor' and 'home' in the header suggests reversed order.
convention is that home is second.
Args:
header (List[str]): The list of header keys.
Returns:
bool: True if the 'home' key comes before the 'visitor' key, indicating reversed order.
Returns False if nothing needs to be done to the data.
"""
if 'visitor' in header and 'home' in header:
return header.index('visitor') > header.index('home')
else:
return KeyError
def parse_datetime(data: List[Dict]):
for row in data:
if isinstance(row.get('date'), datetime.datetime) and isinstance(roq.get('time'), datetime.time):
row['datetime']
try:
row['datetime'] = parser.parse(f"{row.get('date')} {row.get('time')}")
except parser.ParserError as e:
raise e
return data
def import_gamebygame(data: Union[List[Dict], TextIO, Path]) -> List[Dict]:
if isinstance(data, TextIOBase) or isinstance(data, Path) :
data = read_and_normalize_csv_or_xlsx(data)
header = data[0].keys()
visitor_home_order_reversed = is_visitor_home_order_reversed(list(header))
for row in data:
parsed_score = parse_score(row.get("results",''), visitor_home_order_reversed)
row.update(parsed_score)
try:
row['datetime'] = parser.parse(f"{row['date']} {row['time']}")
except parser.ParserError as e:
raise e
return data
def aggregate_teams(data: List[Dict[str, str]]) -> List[Dict[str, int]]:
"""
Aggregate data by team, summing up wins, losses, and ties.
Args:
data (List[Dict[str, str]]): A list of dictionaries representing the CSV data.
Returns:
List[Dict[str, int]]: A list of dictionaries containing aggregated data for each team.
"""
team_stats = {}
for row in data:
if not row["has_result"]:
continue
home_team = row["home"]
visitor_team = row["visitor"]
team_stats.setdefault(home_team, {"win": 0, "loss": 0, "tie": 0, "gp": 0, "runs_for": 0, "runs_against":0})
team_stats.setdefault(visitor_team, {"win": 0, "loss": 0, "tie": 0, "gp": 0, "runs_for": 0, "runs_against":0})
team_stats[home_team]['gp'] += 1
team_stats[visitor_team]['gp'] += 1
for outcome in ["win", "loss", "tie"]:
if row["home_outcome"] == outcome:
team_stats[home_team][outcome] += 1
# team_stats[home_team]["games"].append(f"{row['datetime']}: {visitor_team}: {outcome[0].upper()} {row['home_runs_for']}-{row['home_runs_against']}")
if row["visitor_outcome"] == outcome:
team_stats[visitor_team][outcome] += 1
# team_stats[visitor_team]["games"].append(f"{row['datetime']}: {home_team}: {outcome[0].upper()} {row['visitor_runs_for']}-{row['visitor_runs_against']}")
team_stats[home_team]["runs_for"] += row["home_runs_for"]
team_stats[home_team]["runs_against"] += row["home_runs_against"]
team_stats[visitor_team]["runs_for"] += row["visitor_runs_for"]
team_stats[visitor_team]["runs_against"] += row["visitor_runs_against"]
# Convert team_stats dictionary to a list of dictionaries
aggregated_data = [{"team": team, **stats} for team, stats in team_stats.items()]
# Sort the list by team name
sorted_aggregated_data = sorted(aggregated_data, key=lambda x: x["win"], reverse=True)
return sorted_aggregated_data
def aggregate_teams_by_season(data: List[Dict[str, str]]) -> List[Dict[str, int]]:
team_stats = {}
for row in data:
for team_key in ["home", "visitor", "team"]:
# team = row.get(team_key)
if (team := row.get(team_key)) and (season := row.get('season')):
team_stats.setdefault(team, {"seasons": set()})
# breakpoint()
team_stats[team]['seasons'].update({season})
# Convert team_stats dictionary to a list of dictionaries
aggregated_data = [{"team": team, **stats} for team, stats in team_stats.items()]
# Sort the list by team name
sorted_aggregated_data = sorted(aggregated_data, key=lambda x: x["team"])
return sorted_aggregated_data
def write_sportspress_csv(data: List[Dict], file_path: Path, only_with_outcome:bool = False):
"""
Writes sports event data to a CSV file in a specific format.
Parameters:
- data (List[Dict]): List of dictionaries where each dictionary represents a sports event.
- file_path (Path): The Path object representing the file path where the CSV file will be created.
- only_with_outcome (bool, optional): If True, only events with outcomes will be included in the CSV. Default is False.
Returns:
None
Example:
>>> data = [...] # List of dictionaries representing sports events
>>> file_path = Path("output.csv")
>>> write_sportspress_csv(data, file_path)
"""
with file_path.open('w') as output_csv_file:
writer = csv.writer(output_csv_file)
fieldnames = [
"Format", #Competitive or Friendly
# "Competition",
"Season",
# "Date Format",
"Date",
"Time",
"Venue",
"Team",
"Results",
"Outcome",
# "Players",
# "Performance",
]
# Write the header
writer.writerow(fieldnames)
# Write the data
for row in data:
if only_with_outcome and not row['has_result']:
continue
writer.writerow(
[
row["datetime"].strftime("%Y/%m/%d"),
row["datetime"].strftime("%H:%M"),
row.get("field", ""),
row["home"],
"|".join([str(row.get(k,"")) for k in [
"home_runs_for_inning_1",
"home_runs_for_inning_2",
"home_runs_for_inning_3",
"home_runs_for_inning_4",
"home_runs_for_inning_5",
"home_runs_for_inning_6",
"home_runs_for_inning_7",
"home_runs_for_inning_8",
"home_runs_for_inning_9",
"home_runs_for_inning_10",
"home_runs_for",
"home_errors",
"home_hits"
]]),
row.get("home_outcome")
]
)
writer.writerow(
[
"",
"",
"",
row["visitor"],
"|".join([str(row.get(k,"")) for k in [
"visitor_runs_for_inning_1",
"visitor_runs_for_inning_2",
"visitor_runs_for_inning_3",
"visitor_runs_for_inning_4",
"visitor_runs_for_inning_5",
"visitor_runs_for_inning_6",
"visitor_runs_for_inning_7",
"visitor_runs_for_inning_8",
"visitor_runs_for_inning_9",
"visitor_runs_for_inning_10",
"visitor_runs_for",
"visitor_errors",
"visitor_hits"
]]),
row.get("visitor_outcome")
]
)