Support non-timezone datetimes
This commit is contained in:
parent
d8f6c6bf65
commit
d775b4ad0e
|
@ -67,10 +67,16 @@ def heuristically_realize_samples(
|
|||
* No samples overlap.
|
||||
"""
|
||||
|
||||
previous_sample_end = datetime.datetime.fromtimestamp(0, datetime.UTC)
|
||||
previous_sample_end = None
|
||||
for sample in samples:
|
||||
end_at = sample.end_at
|
||||
|
||||
if previous_sample_end is None:
|
||||
if end_at.tzinfo:
|
||||
previous_sample_end = datetime.datetime.fromtimestamp(0, datetime.UTC)
|
||||
else:
|
||||
previous_sample_end = datetime.datetime.fromtimestamp(0)
|
||||
|
||||
assert previous_sample_end <= end_at, 'Iterating in incorrect order'
|
||||
|
||||
# TODO: Allow end_at is None
|
||||
|
@ -121,10 +127,17 @@ def parse_arguments():
|
|||
default='cli_report',
|
||||
choices=['cli_report', 'icalendar'],
|
||||
)
|
||||
parser.add_argument(
|
||||
'--out',
|
||||
action='store',
|
||||
type=Path,
|
||||
dest='output_file',
|
||||
default='output/samples.ics',
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def load_samples(args):
|
||||
def load_samples(args) -> set[WorkSample]:
|
||||
shared_time_stamps_set: set[WorkSample] = set()
|
||||
|
||||
# Git repositories
|
||||
|
@ -176,5 +189,5 @@ def main():
|
|||
elif args.format_mode == 'icalendar':
|
||||
icalendar.generate_icalendar_file(
|
||||
shared_time_stamps,
|
||||
file='./output/samples.ics',
|
||||
file=args.output_file,
|
||||
)
|
||||
|
|
|
@ -1,63 +1,102 @@
|
|||
import datetime
|
||||
import urllib.parse
|
||||
from typing import Any
|
||||
from collections.abc import Iterator
|
||||
from decimal import Decimal
|
||||
from pathlib import Path
|
||||
import dataclasses
|
||||
|
||||
from personal_data.util import load_csv_file
|
||||
|
||||
from ..data import WorkSample
|
||||
|
||||
@dataclasses.dataclass
|
||||
class PossibleKeys:
|
||||
time_start: list[str]
|
||||
time_end: list[str]
|
||||
duration: list[str]
|
||||
name: list[str]
|
||||
image: list[str]
|
||||
misc: list[str]
|
||||
|
||||
def iterate_samples_from_dicts(rows: list[dict]) -> Iterator[WorkSample]:
|
||||
def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
|
||||
# Select data
|
||||
time_keys = [
|
||||
k for k, v in event_data.items() if isinstance(v, datetime.date)
|
||||
]
|
||||
duration_keys = [
|
||||
k
|
||||
for k, v in event_data.items()
|
||||
if isinstance(v, Decimal) and 'duration_seconds' in k
|
||||
]
|
||||
name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
|
||||
image_keys = [
|
||||
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
|
||||
]
|
||||
|
||||
misc_keys = list(event_data.keys())
|
||||
for k in image_keys:
|
||||
if k in misc_keys:
|
||||
misc_keys.remove(k)
|
||||
del k
|
||||
for k in time_keys:
|
||||
if k in misc_keys:
|
||||
misc_keys.remove(k)
|
||||
del k
|
||||
|
||||
time_start_keys = [k for k in time_keys if 'start' in k.lower() ]
|
||||
time_end_keys = [k for k in time_keys if 'end' in k.lower() or 'stop' in k.lower() ]
|
||||
|
||||
return PossibleKeys(
|
||||
time_start = time_start_keys,
|
||||
time_end = time_end_keys,
|
||||
duration = duration_keys,
|
||||
name = name_keys,
|
||||
image = image_keys,
|
||||
misc = misc_keys,
|
||||
)
|
||||
|
||||
def start_end(sample: dict[str,Any], keys: PossibleKeys) -> tuple[datetime.datetime | None, datetime.datetime | None]:
|
||||
if keys.time_start and keys.time_end:
|
||||
return (sample[keys.time_start[0]], sample[keys.time_end[0]])
|
||||
|
||||
if keys.time_start and keys.duration:
|
||||
start = sample[keys.time_start[0]]
|
||||
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
|
||||
return (start, start + duration)
|
||||
|
||||
if keys.time_start:
|
||||
start = sample[keys.time_start[0]]
|
||||
return (start, None)
|
||||
if keys.time_end:
|
||||
return (None, sample[keys.time_end[0]])
|
||||
return (None, None)
|
||||
|
||||
def iterate_samples_from_dicts(rows: list[dict[str,Any]]) -> Iterator[WorkSample]:
|
||||
assert len(rows) > 0
|
||||
max_title_parts = 2
|
||||
|
||||
|
||||
if True:
|
||||
event_data = rows[len(rows)//2] # Hopefully select a useful representative.
|
||||
possible_keys = determine_possible_keys(event_data)
|
||||
del event_data
|
||||
|
||||
assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1
|
||||
assert len(possible_keys.image) >= 0
|
||||
|
||||
for event_data in rows:
|
||||
# Select data
|
||||
possible_time_keys = [
|
||||
k for k, v in event_data.items() if isinstance(v, datetime.date)
|
||||
]
|
||||
possible_duration_keys = [
|
||||
k
|
||||
for k, v in event_data.items()
|
||||
if isinstance(v, Decimal) and 'duration_seconds' in k
|
||||
]
|
||||
possible_name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
|
||||
possible_image_keys = [
|
||||
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
|
||||
]
|
||||
|
||||
possible_misc_keys = list(event_data.keys())
|
||||
for k in possible_image_keys:
|
||||
if k in possible_misc_keys:
|
||||
possible_misc_keys.remove(k)
|
||||
del k
|
||||
for k in possible_time_keys:
|
||||
if k in possible_misc_keys:
|
||||
possible_misc_keys.remove(k)
|
||||
del k
|
||||
|
||||
date = event_data[possible_time_keys[0]] if possible_time_keys else None
|
||||
image = event_data[possible_image_keys[0]] if possible_image_keys else None
|
||||
|
||||
if date is None:
|
||||
continue
|
||||
|
||||
if len(possible_duration_keys) > 0:
|
||||
start_at = date
|
||||
seconds = event_data[possible_duration_keys[0]]
|
||||
end_at = date + datetime.timedelta(seconds=float(seconds))
|
||||
del seconds
|
||||
else:
|
||||
start_at = None
|
||||
end_at = date
|
||||
|
||||
'''
|
||||
title = ': '.join(event_data[k] for k in possible_name_keys[:max_title_parts])
|
||||
description = '\n\n'.join(
|
||||
event_data[k] for k in possible_name_keys[max_title_parts:]
|
||||
)
|
||||
image = event_data[possible_keys.image[0]] if possible_keys.image else None
|
||||
'''
|
||||
|
||||
labels = [f'{k}:{event_data[k]}' for k in possible_misc_keys]
|
||||
|
||||
(start_at, end_at) = start_end(event_data, possible_keys)
|
||||
labels = [f'{k}:{event_data[k]}' for k in possible_keys.misc]
|
||||
|
||||
# Create event
|
||||
yield WorkSample(
|
||||
|
@ -71,4 +110,6 @@ def iterate_samples_from_dicts(rows: list[dict]) -> Iterator[WorkSample]:
|
|||
|
||||
def iterate_samples_from_csv_file(file_path: Path) -> Iterator[WorkSample]:
|
||||
dicts = load_csv_file(file_path)
|
||||
yield from iterate_samples_from_dicts(dicts)
|
||||
samples = list(iterate_samples_from_dicts(dicts))
|
||||
assert len(samples) > 0, 'Did not found any samples'
|
||||
yield from samples
|
||||
|
|
Reference in New Issue
Block a user