2024-10-13 13:04:18 +00:00
|
|
|
import csv
|
2024-10-13 13:20:30 +00:00
|
|
|
import dataclasses
|
2024-10-13 13:04:18 +00:00
|
|
|
import datetime
|
|
|
|
import decimal
|
|
|
|
import typing
|
|
|
|
import urllib.parse
|
2024-10-13 13:20:30 +00:00
|
|
|
from collections.abc import Callable
|
2024-10-13 13:04:18 +00:00
|
|
|
from decimal import Decimal
|
|
|
|
from pathlib import Path
|
2024-10-13 13:20:30 +00:00
|
|
|
from typing import Any
|
2024-10-13 13:04:18 +00:00
|
|
|
|
|
|
|
from frozendict import frozendict
|
|
|
|
|
|
|
|
CSV_DIALECT = 'one_true_dialect'
|
|
|
|
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
|
|
|
|
|
|
|
|
T = typing.TypeVar('T')
|
|
|
|
|
|
|
|
|
|
|
|
def try_value(fn: Callable[[str], T], s: str) -> T | None:
|
|
|
|
try:
|
|
|
|
return fn(s)
|
|
|
|
except (ValueError, decimal.InvalidOperation):
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def csv_str_to_value(
|
|
|
|
s: str,
|
|
|
|
) -> (
|
|
|
|
str
|
|
|
|
| Decimal
|
|
|
|
| datetime.date
|
|
|
|
| datetime.datetime
|
|
|
|
| urllib.parse.ParseResult
|
|
|
|
| bool
|
|
|
|
| None
|
|
|
|
):
|
2024-10-13 13:20:30 +00:00
|
|
|
assert not isinstance(s, list) # TODO?
|
2024-10-13 13:04:18 +00:00
|
|
|
|
|
|
|
if s is None:
|
|
|
|
return None
|
|
|
|
s = s.strip()
|
|
|
|
if len(s) == 0:
|
|
|
|
return None
|
|
|
|
if (v_decimal := try_value(Decimal, s)) is not None:
|
|
|
|
return v_decimal
|
|
|
|
if (v_date := try_value(datetime.date.fromisoformat, s)) is not None:
|
|
|
|
return v_date
|
|
|
|
if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None:
|
|
|
|
return v_datetime
|
|
|
|
if s.startswith(('http://', 'https://')):
|
|
|
|
return urllib.parse.urlparse(s)
|
|
|
|
if s.lower() == 'false':
|
|
|
|
return False
|
|
|
|
if s.lower() == 'true':
|
|
|
|
return True
|
|
|
|
if s.lower() == 'none':
|
|
|
|
return None
|
|
|
|
return s
|
|
|
|
|
|
|
|
|
|
|
|
def load_csv_file(csv_file: Path, sniff=False) -> list[frozendict[str, typing.Any]]:
|
|
|
|
dicts: list[frozendict] = []
|
|
|
|
with open(csv_file) as csvfile:
|
|
|
|
if sniff:
|
|
|
|
dialect = csv.Sniffer().sniff(csvfile.read(1024))
|
|
|
|
csvfile.seek(0)
|
|
|
|
else:
|
|
|
|
dialect = CSV_DIALECT
|
|
|
|
reader = csv.DictReader(csvfile, dialect=dialect)
|
|
|
|
for row in reader:
|
|
|
|
for k in list(row.keys()):
|
|
|
|
orig = row[k]
|
|
|
|
row[k] = csv_str_to_value(orig)
|
|
|
|
if row[k] is None:
|
|
|
|
del row[k]
|
|
|
|
del k, orig
|
|
|
|
dicts.append(frozendict(row))
|
|
|
|
del row
|
|
|
|
del csvfile
|
|
|
|
return dicts
|
|
|
|
|
|
|
|
|
|
|
|
@dataclasses.dataclass
|
|
|
|
class PossibleKeys:
|
|
|
|
time_start: list[str]
|
|
|
|
time_end: list[str]
|
|
|
|
duration: list[str]
|
|
|
|
name: list[str]
|
|
|
|
image: list[str]
|
|
|
|
misc: list[str]
|
|
|
|
|
2024-10-13 13:20:30 +00:00
|
|
|
|
2024-10-13 13:04:18 +00:00
|
|
|
def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
|
|
|
|
# Select data
|
2024-10-13 13:20:30 +00:00
|
|
|
time_keys = [k for k, v in event_data.items() if isinstance(v, datetime.date)]
|
2024-10-13 13:04:18 +00:00
|
|
|
duration_keys = [
|
|
|
|
k
|
|
|
|
for k, v in event_data.items()
|
|
|
|
if isinstance(v, Decimal) and 'duration_seconds' in k
|
|
|
|
]
|
|
|
|
name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
|
|
|
|
image_keys = [
|
|
|
|
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
|
|
|
|
]
|
|
|
|
|
|
|
|
misc_keys = list(event_data.keys())
|
|
|
|
for k in image_keys:
|
|
|
|
if k in misc_keys:
|
|
|
|
misc_keys.remove(k)
|
|
|
|
del k
|
|
|
|
for k in time_keys:
|
|
|
|
if k in misc_keys:
|
|
|
|
misc_keys.remove(k)
|
|
|
|
del k
|
|
|
|
|
2024-10-13 13:20:30 +00:00
|
|
|
time_start_keys = [k for k in time_keys if 'start' in k.lower()]
|
|
|
|
time_end_keys = [
|
|
|
|
k
|
|
|
|
for k in time_keys
|
|
|
|
if 'end' in k.lower() or 'stop' in k.lower() or 'last' in k.lower()
|
|
|
|
]
|
2024-10-13 13:04:18 +00:00
|
|
|
|
|
|
|
return PossibleKeys(
|
2024-10-13 13:20:30 +00:00
|
|
|
time_start=time_start_keys,
|
|
|
|
time_end=time_end_keys,
|
|
|
|
duration=duration_keys,
|
|
|
|
name=name_keys,
|
|
|
|
image=image_keys,
|
|
|
|
misc=misc_keys,
|
2024-10-13 13:04:18 +00:00
|
|
|
)
|
|
|
|
|
2024-10-13 13:20:30 +00:00
|
|
|
|
|
|
|
def start_end(
|
2024-10-14 17:43:34 +00:00
|
|
|
sample: dict[str, Any],
|
|
|
|
keys: PossibleKeys,
|
2024-10-13 13:20:30 +00:00
|
|
|
) -> tuple[datetime.datetime | None, datetime.datetime | None]:
|
2024-10-13 13:04:18 +00:00
|
|
|
if keys.time_start and keys.time_end:
|
|
|
|
return (sample[keys.time_start[0]], sample[keys.time_end[0]])
|
|
|
|
|
|
|
|
if keys.time_start and keys.duration:
|
|
|
|
start = sample[keys.time_start[0]]
|
|
|
|
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
|
|
|
|
return (start, start + duration)
|
|
|
|
|
2024-10-14 18:17:41 +00:00
|
|
|
if keys.time_end and keys.duration:
|
|
|
|
end = sample[keys.time_end[0]]
|
|
|
|
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
|
|
|
|
return (end - duration, end)
|
|
|
|
|
2024-10-13 13:04:18 +00:00
|
|
|
if keys.time_start:
|
|
|
|
start = sample[keys.time_start[0]]
|
|
|
|
return (start, None)
|
|
|
|
if keys.time_end:
|
|
|
|
return (None, sample[keys.time_end[0]])
|
|
|
|
return (None, None)
|