Compare commits
4 Commits
4f851b21b5
...
72be664d82
Author | SHA1 | Date | |
---|---|---|---|
72be664d82 | |||
477fce869d | |||
eb3518ba88 | |||
f47daa3256 |
|
@ -32,12 +32,11 @@ import sys
|
||||||
from collections.abc import Iterator
|
from collections.abc import Iterator
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from .data import (
|
from personal_data.activity import (
|
||||||
HIDDEN_LABEL_PREFIX,
|
ActivitySample,
|
||||||
HIDDEN_LABEL_TOTAL,
|
RealizedActivitySample,
|
||||||
RealizedWorkSample,
|
|
||||||
WorkSample,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
from .format import cli, icalendar
|
from .format import cli, icalendar
|
||||||
from .source import csv_file, git_repo
|
from .source import csv_file, git_repo
|
||||||
|
|
||||||
|
@ -51,16 +50,16 @@ MINUTE = datetime.timedelta(minutes=1)
|
||||||
|
|
||||||
|
|
||||||
def filter_samples(
|
def filter_samples(
|
||||||
samples: list[WorkSample],
|
samples: list[ActivitySample],
|
||||||
sample_filter: set[str],
|
sample_filter: set[str],
|
||||||
) -> list[WorkSample]:
|
) -> list[ActivitySample]:
|
||||||
assert len(sample_filter) > 0
|
assert len(sample_filter) > 0
|
||||||
return [s for s in samples if set(s.labels).intersection(sample_filter)]
|
return [s for s in samples if set(s.labels).intersection(sample_filter)]
|
||||||
|
|
||||||
|
|
||||||
def heuristically_realize_samples(
|
def heuristically_realize_samples(
|
||||||
samples: list[WorkSample],
|
samples: list[ActivitySample],
|
||||||
) -> Iterator[RealizedWorkSample]:
|
) -> Iterator[RealizedActivitySample]:
|
||||||
"""Secret sauce.
|
"""Secret sauce.
|
||||||
|
|
||||||
Guarentees that:
|
Guarentees that:
|
||||||
|
@ -87,7 +86,9 @@ def heuristically_realize_samples(
|
||||||
start_at = max(previous_sample_end, end_at - estimated_duration)
|
start_at = max(previous_sample_end, end_at - estimated_duration)
|
||||||
del estimated_duration
|
del estimated_duration
|
||||||
|
|
||||||
yield RealizedWorkSample(labels=sample.labels, end_at=end_at, start_at=start_at)
|
yield RealizedActivitySample(
|
||||||
|
labels=sample.labels, end_at=end_at, start_at=start_at,
|
||||||
|
)
|
||||||
|
|
||||||
previous_sample_end = sample.end_at
|
previous_sample_end = sample.end_at
|
||||||
del sample
|
del sample
|
||||||
|
@ -137,8 +138,8 @@ def parse_arguments():
|
||||||
return parser.parse_args()
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
def load_samples(args) -> set[WorkSample]:
|
def load_samples(args) -> set[ActivitySample]:
|
||||||
shared_time_stamps_set: set[WorkSample] = set()
|
shared_time_stamps_set: set[ActivitySample] = set()
|
||||||
|
|
||||||
# Git repositories
|
# Git repositories
|
||||||
for repo_path in args.repositories:
|
for repo_path in args.repositories:
|
||||||
|
|
|
@ -1,19 +0,0 @@
|
||||||
import dataclasses
|
|
||||||
import datetime
|
|
||||||
from collections.abc import Sequence
|
|
||||||
|
|
||||||
HIDDEN_LABEL_PREFIX = '__'
|
|
||||||
HIDDEN_LABEL_TOTAL = HIDDEN_LABEL_PREFIX + 'TOTAL'
|
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(frozen=True, order=True)
|
|
||||||
class WorkSample:
|
|
||||||
labels: Sequence[str]
|
|
||||||
start_at: datetime.datetime | None
|
|
||||||
end_at: datetime.datetime | None
|
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(frozen=True, order=True)
|
|
||||||
class RealizedWorkSample(WorkSample):
|
|
||||||
start_at: datetime.datetime
|
|
||||||
end_at: datetime.datetime
|
|
|
@ -1,7 +1,7 @@
|
||||||
import datetime
|
import datetime
|
||||||
from collections.abc import Iterator
|
from collections.abc import Iterator
|
||||||
|
|
||||||
from ..data import HIDDEN_LABEL_PREFIX, HIDDEN_LABEL_TOTAL, RealizedWorkSample
|
from personal_data.activity import HIDDEN_LABEL_CATEGORY, Label, RealizedActivitySample
|
||||||
|
|
||||||
ZERO_DURATION = datetime.timedelta(seconds=0)
|
ZERO_DURATION = datetime.timedelta(seconds=0)
|
||||||
HOUR = datetime.timedelta(hours=1)
|
HOUR = datetime.timedelta(hours=1)
|
||||||
|
@ -29,18 +29,18 @@ def fmt_year_ranges(years: list[int]) -> str:
|
||||||
return ''.join(list(fmt_year_ranges_internal(years)))
|
return ''.join(list(fmt_year_ranges_internal(years)))
|
||||||
|
|
||||||
|
|
||||||
def fmt_line(label_type: str, label: str, total_time: datetime.timedelta) -> str:
|
def fmt_line(label: Label, total_time: datetime.timedelta) -> str:
|
||||||
hours = int(total_time / HOUR)
|
hours = int(total_time / HOUR)
|
||||||
minutes = int((total_time - hours * HOUR) / MINUTE)
|
minutes = int((total_time - hours * HOUR) / MINUTE)
|
||||||
return f' {label_type:10} {label:40} {hours:-4d}h {minutes:-2d}m'
|
return f' {label.category:10} {label.label:40} {hours:-4d}h {minutes:-2d}m'
|
||||||
|
|
||||||
|
|
||||||
def generate_report(
|
def generate_report(
|
||||||
samples: list[RealizedWorkSample],
|
samples: list[RealizedActivitySample],
|
||||||
) -> Iterator[str]:
|
) -> Iterator[str]:
|
||||||
# Time spent per label
|
# Time spent per label
|
||||||
time_per_label: dict[str, datetime.timedelta] = {}
|
time_per_label: dict[Label, datetime.timedelta] = {}
|
||||||
years_per_label: dict[str, set[int]] = {}
|
years_per_label: dict[Label, set[int]] = {}
|
||||||
for sample in samples:
|
for sample in samples:
|
||||||
duration = sample.end_at - sample.start_at
|
duration = sample.end_at - sample.start_at
|
||||||
|
|
||||||
|
@ -57,15 +57,13 @@ def generate_report(
|
||||||
#
|
#
|
||||||
yield '-' * 66
|
yield '-' * 66
|
||||||
yield '\n'
|
yield '\n'
|
||||||
for total_time, label_and_type in time_and_label:
|
for total_time, label in time_and_label:
|
||||||
if label_and_type.startswith(HIDDEN_LABEL_PREFIX):
|
if label.category == HIDDEN_LABEL_CATEGORY:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
label_type, label = label_and_type.split(':', 1)
|
yield fmt_line(label, total_time)
|
||||||
|
|
||||||
yield fmt_line(label_type, label, total_time)
|
|
||||||
yield ' ('
|
yield ' ('
|
||||||
yield fmt_year_ranges(years_per_label.get(label_and_type, []))
|
yield fmt_year_ranges(years_per_label.get(label, []))
|
||||||
yield ')'
|
yield ')'
|
||||||
yield '\n'
|
yield '\n'
|
||||||
del label, total_time
|
del label, total_time
|
||||||
|
@ -73,5 +71,6 @@ def generate_report(
|
||||||
yield '-' * 66
|
yield '-' * 66
|
||||||
yield '\n'
|
yield '\n'
|
||||||
|
|
||||||
yield fmt_line('', 'TOTAL', time_per_label.get(HIDDEN_LABEL_TOTAL, ZERO_DURATION))
|
label_total = Label(HIDDEN_LABEL_CATEGORY, 'total')
|
||||||
|
yield fmt_line(label_total, time_per_label.get(label_total, ZERO_DURATION))
|
||||||
yield '\n'
|
yield '\n'
|
||||||
|
|
|
@ -2,30 +2,28 @@ import datetime
|
||||||
|
|
||||||
import icalendar
|
import icalendar
|
||||||
|
|
||||||
from ..data import HIDDEN_LABEL_PREFIX, RealizedWorkSample
|
from personal_data.activity import HIDDEN_LABEL_CATEGORY, RealizedActivitySample
|
||||||
|
|
||||||
ZERO_DURATION = datetime.timedelta(seconds=0)
|
ZERO_DURATION = datetime.timedelta(seconds=0)
|
||||||
HOUR = datetime.timedelta(hours=1)
|
HOUR = datetime.timedelta(hours=1)
|
||||||
MINUTE = datetime.timedelta(minutes=1)
|
MINUTE = datetime.timedelta(minutes=1)
|
||||||
|
|
||||||
|
|
||||||
def create_title(sample: RealizedWorkSample) -> tuple[str, str]:
|
def create_title(sample: RealizedActivitySample) -> tuple[str, str]:
|
||||||
ls = []
|
ls = []
|
||||||
desc = []
|
desc = []
|
||||||
for label_and_type in sample.labels:
|
for label in sample.labels:
|
||||||
if label_and_type.startswith(HIDDEN_LABEL_PREFIX):
|
if label.category in {HIDDEN_LABEL_CATEGORY, 'author'}:
|
||||||
continue
|
|
||||||
if label_and_type.startswith('author:'):
|
|
||||||
continue
|
continue
|
||||||
if len(ls) == 0:
|
if len(ls) == 0:
|
||||||
ls.append(label_and_type.split(':')[1])
|
ls.append(label.label)
|
||||||
else:
|
else:
|
||||||
desc.append(label_and_type)
|
desc.append(label.label)
|
||||||
return ' '.join(ls), '\n'.join(desc)
|
return ' '.join(ls), '\n'.join(desc)
|
||||||
|
|
||||||
|
|
||||||
def generate_calendar(
|
def generate_calendar(
|
||||||
samples: list[RealizedWorkSample],
|
samples: list[RealizedActivitySample],
|
||||||
) -> icalendar.Calendar:
|
) -> icalendar.Calendar:
|
||||||
max_title_parts = 2
|
max_title_parts = 2
|
||||||
|
|
||||||
|
@ -44,11 +42,11 @@ def generate_calendar(
|
||||||
event.add('dtstart', sample.start_at)
|
event.add('dtstart', sample.start_at)
|
||||||
event.add('dtend', sample.end_at)
|
event.add('dtend', sample.end_at)
|
||||||
|
|
||||||
for label_and_type in sample.labels:
|
for label in sample.labels:
|
||||||
if label_and_type.startswith('author:'):
|
if label.category == 'author':
|
||||||
event.add(
|
event.add(
|
||||||
'organizer',
|
'organizer',
|
||||||
'mailto:' + label_and_type.removeprefix('author:'),
|
'mailto:' + label.label,
|
||||||
)
|
)
|
||||||
|
|
||||||
cal.add_component(event)
|
cal.add_component(event)
|
||||||
|
@ -58,7 +56,7 @@ def generate_calendar(
|
||||||
|
|
||||||
|
|
||||||
def generate_icalendar_file(
|
def generate_icalendar_file(
|
||||||
samples: list[RealizedWorkSample],
|
samples: list[RealizedActivitySample],
|
||||||
file: str,
|
file: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
calendar = generate_calendar(samples)
|
calendar = generate_calendar(samples)
|
||||||
|
|
|
@ -1,82 +1,15 @@
|
||||||
import datetime
|
|
||||||
import urllib.parse
|
|
||||||
from typing import Any
|
|
||||||
from collections.abc import Iterator
|
from collections.abc import Iterator
|
||||||
from decimal import Decimal
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import dataclasses
|
from typing import Any
|
||||||
|
|
||||||
from personal_data.util import load_csv_file
|
from personal_data.activity import ActivitySample, Label
|
||||||
|
from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end
|
||||||
|
|
||||||
from ..data import WorkSample
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivitySample]:
|
||||||
class PossibleKeys:
|
|
||||||
time_start: list[str]
|
|
||||||
time_end: list[str]
|
|
||||||
duration: list[str]
|
|
||||||
name: list[str]
|
|
||||||
image: list[str]
|
|
||||||
misc: list[str]
|
|
||||||
|
|
||||||
def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
|
|
||||||
# Select data
|
|
||||||
time_keys = [
|
|
||||||
k for k, v in event_data.items() if isinstance(v, datetime.date)
|
|
||||||
]
|
|
||||||
duration_keys = [
|
|
||||||
k
|
|
||||||
for k, v in event_data.items()
|
|
||||||
if isinstance(v, Decimal) and 'duration_seconds' in k
|
|
||||||
]
|
|
||||||
name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
|
|
||||||
image_keys = [
|
|
||||||
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
|
|
||||||
]
|
|
||||||
|
|
||||||
misc_keys = list(event_data.keys())
|
|
||||||
for k in image_keys:
|
|
||||||
if k in misc_keys:
|
|
||||||
misc_keys.remove(k)
|
|
||||||
del k
|
|
||||||
for k in time_keys:
|
|
||||||
if k in misc_keys:
|
|
||||||
misc_keys.remove(k)
|
|
||||||
del k
|
|
||||||
|
|
||||||
time_start_keys = [k for k in time_keys if 'start' in k.lower() ]
|
|
||||||
time_end_keys = [k for k in time_keys if 'end' in k.lower() or 'stop' in k.lower() ]
|
|
||||||
|
|
||||||
return PossibleKeys(
|
|
||||||
time_start = time_start_keys,
|
|
||||||
time_end = time_end_keys,
|
|
||||||
duration = duration_keys,
|
|
||||||
name = name_keys,
|
|
||||||
image = image_keys,
|
|
||||||
misc = misc_keys,
|
|
||||||
)
|
|
||||||
|
|
||||||
def start_end(sample: dict[str,Any], keys: PossibleKeys) -> tuple[datetime.datetime | None, datetime.datetime | None]:
|
|
||||||
if keys.time_start and keys.time_end:
|
|
||||||
return (sample[keys.time_start[0]], sample[keys.time_end[0]])
|
|
||||||
|
|
||||||
if keys.time_start and keys.duration:
|
|
||||||
start = sample[keys.time_start[0]]
|
|
||||||
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
|
|
||||||
return (start, start + duration)
|
|
||||||
|
|
||||||
if keys.time_start:
|
|
||||||
start = sample[keys.time_start[0]]
|
|
||||||
return (start, None)
|
|
||||||
if keys.time_end:
|
|
||||||
return (None, sample[keys.time_end[0]])
|
|
||||||
return (None, None)
|
|
||||||
|
|
||||||
def iterate_samples_from_dicts(rows: list[dict[str,Any]]) -> Iterator[WorkSample]:
|
|
||||||
assert len(rows) > 0
|
assert len(rows) > 0
|
||||||
max_title_parts = 2
|
max_title_parts = 2
|
||||||
|
|
||||||
|
|
||||||
if True:
|
if True:
|
||||||
event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
|
event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
|
||||||
possible_keys = determine_possible_keys(event_data)
|
possible_keys = determine_possible_keys(event_data)
|
||||||
|
@ -86,20 +19,19 @@ def iterate_samples_from_dicts(rows: list[dict[str,Any]]) -> Iterator[WorkSample
|
||||||
assert len(possible_keys.image) >= 0
|
assert len(possible_keys.image) >= 0
|
||||||
|
|
||||||
for event_data in rows:
|
for event_data in rows:
|
||||||
'''
|
"""
|
||||||
title = ': '.join(event_data[k] for k in possible_name_keys[:max_title_parts])
|
title = ': '.join(event_data[k] for k in possible_name_keys[:max_title_parts])
|
||||||
description = '\n\n'.join(
|
description = '\n\n'.join(
|
||||||
event_data[k] for k in possible_name_keys[max_title_parts:]
|
event_data[k] for k in possible_name_keys[max_title_parts:]
|
||||||
)
|
)
|
||||||
image = event_data[possible_keys.image[0]] if possible_keys.image else None
|
image = event_data[possible_keys.image[0]] if possible_keys.image else None
|
||||||
'''
|
"""
|
||||||
|
|
||||||
|
|
||||||
(start_at, end_at) = start_end(event_data, possible_keys)
|
(start_at, end_at) = start_end(event_data, possible_keys)
|
||||||
labels = [f'{k}:{event_data[k]}' for k in possible_keys.misc]
|
labels = [Label(k, event_data[k]) for k in possible_keys.misc]
|
||||||
|
|
||||||
# Create event
|
# Create event
|
||||||
yield WorkSample(
|
yield ActivitySample(
|
||||||
labels=tuple(labels),
|
labels=tuple(labels),
|
||||||
start_at=start_at,
|
start_at=start_at,
|
||||||
end_at=end_at,
|
end_at=end_at,
|
||||||
|
@ -108,7 +40,7 @@ def iterate_samples_from_dicts(rows: list[dict[str,Any]]) -> Iterator[WorkSample
|
||||||
del event_data
|
del event_data
|
||||||
|
|
||||||
|
|
||||||
def iterate_samples_from_csv_file(file_path: Path) -> Iterator[WorkSample]:
|
def iterate_samples_from_csv_file(file_path: Path) -> Iterator[ActivitySample]:
|
||||||
dicts = load_csv_file(file_path)
|
dicts = load_csv_file(file_path)
|
||||||
samples = list(iterate_samples_from_dicts(dicts))
|
samples = list(iterate_samples_from_dicts(dicts))
|
||||||
assert len(samples) > 0, 'Did not found any samples'
|
assert len(samples) > 0, 'Did not found any samples'
|
||||||
|
|
|
@ -5,7 +5,7 @@ from pathlib import Path
|
||||||
|
|
||||||
import git
|
import git
|
||||||
|
|
||||||
from ..data import HIDDEN_LABEL_TOTAL, WorkSample
|
from personal_data.activity import HIDDEN_LABEL_CATEGORY, ActivitySample, Label
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -25,7 +25,7 @@ def determine_project_name(repo: git.Repo) -> str:
|
||||||
return Path(repo.working_tree_dir).name
|
return Path(repo.working_tree_dir).name
|
||||||
|
|
||||||
|
|
||||||
def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]:
|
def get_samples_from_project(repo: git.Repo) -> Iterator[ActivitySample]:
|
||||||
project_name = determine_project_name(repo)
|
project_name = determine_project_name(repo)
|
||||||
assert project_name is not None
|
assert project_name is not None
|
||||||
|
|
||||||
|
@ -34,9 +34,9 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]:
|
||||||
repo.commit()
|
repo.commit()
|
||||||
|
|
||||||
for commit in repo.iter_commits(determine_default_branch(repo)):
|
for commit in repo.iter_commits(determine_default_branch(repo)):
|
||||||
labels = [HIDDEN_LABEL_TOTAL]
|
labels = [Label(HIDDEN_LABEL_CATEGORY, 'total')]
|
||||||
labels.append('project:' + project_name)
|
labels.append(Label('project', project_name))
|
||||||
labels.append('author:' + commit.author.email)
|
labels.append(Label('author', commit.author.email))
|
||||||
|
|
||||||
authored_date = datetime.datetime.fromtimestamp(
|
authored_date = datetime.datetime.fromtimestamp(
|
||||||
commit.authored_date,
|
commit.authored_date,
|
||||||
|
@ -47,13 +47,13 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]:
|
||||||
tz=datetime.UTC,
|
tz=datetime.UTC,
|
||||||
)
|
)
|
||||||
|
|
||||||
yield WorkSample(
|
yield ActivitySample(
|
||||||
labels=tuple(labels),
|
labels=tuple(labels),
|
||||||
start_at=None,
|
start_at=None,
|
||||||
end_at=authored_date,
|
end_at=authored_date,
|
||||||
)
|
)
|
||||||
if authored_date != committed_date:
|
if authored_date != committed_date:
|
||||||
yield WorkSample(
|
yield ActivitySample(
|
||||||
labels=tuple(labels),
|
labels=tuple(labels),
|
||||||
start_at=None,
|
start_at=None,
|
||||||
end_at=committed_date,
|
end_at=committed_date,
|
||||||
|
@ -61,7 +61,7 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]:
|
||||||
del labels
|
del labels
|
||||||
|
|
||||||
|
|
||||||
def iterate_samples_from_git_repository(repo_path: Path) -> Iterator[WorkSample]:
|
def iterate_samples_from_git_repository(repo_path: Path) -> Iterator[ActivitySample]:
|
||||||
try:
|
try:
|
||||||
yield from get_samples_from_project(git.Repo(repo_path))
|
yield from get_samples_from_project(git.Repo(repo_path))
|
||||||
except git.exc.InvalidGitRepositoryError:
|
except git.exc.InvalidGitRepositoryError:
|
||||||
|
|
29
personal_data/activity.py
Normal file
29
personal_data/activity.py
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
import dataclasses
|
||||||
|
import datetime
|
||||||
|
from collections.abc import Sequence
|
||||||
|
|
||||||
|
HIDDEN_LABEL_CATEGORY = '__'
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(frozen=True, order=True)
|
||||||
|
class Label:
|
||||||
|
category: str
|
||||||
|
label: str
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
assert self.category is not None
|
||||||
|
assert ':' not in self.category
|
||||||
|
assert self.label is not None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(frozen=True, order=True)
|
||||||
|
class ActivitySample:
|
||||||
|
labels: Sequence[Label]
|
||||||
|
start_at: datetime.datetime | None
|
||||||
|
end_at: datetime.datetime | None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(frozen=True, order=True)
|
||||||
|
class RealizedActivitySample(ActivitySample):
|
||||||
|
start_at: datetime.datetime
|
||||||
|
end_at: datetime.datetime
|
150
personal_data/csv_import.py
Normal file
150
personal_data/csv_import.py
Normal file
|
@ -0,0 +1,150 @@
|
||||||
|
import csv
|
||||||
|
import dataclasses
|
||||||
|
import datetime
|
||||||
|
import decimal
|
||||||
|
import typing
|
||||||
|
import urllib.parse
|
||||||
|
from collections.abc import Callable
|
||||||
|
from decimal import Decimal
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from frozendict import frozendict
|
||||||
|
|
||||||
|
CSV_DIALECT = 'one_true_dialect'
|
||||||
|
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
|
||||||
|
|
||||||
|
T = typing.TypeVar('T')
|
||||||
|
|
||||||
|
|
||||||
|
def try_value(fn: Callable[[str], T], s: str) -> T | None:
|
||||||
|
try:
|
||||||
|
return fn(s)
|
||||||
|
except (ValueError, decimal.InvalidOperation):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def csv_str_to_value(
|
||||||
|
s: str,
|
||||||
|
) -> (
|
||||||
|
str
|
||||||
|
| Decimal
|
||||||
|
| datetime.date
|
||||||
|
| datetime.datetime
|
||||||
|
| urllib.parse.ParseResult
|
||||||
|
| bool
|
||||||
|
| None
|
||||||
|
):
|
||||||
|
assert not isinstance(s, list) # TODO?
|
||||||
|
|
||||||
|
if s is None:
|
||||||
|
return None
|
||||||
|
s = s.strip()
|
||||||
|
if len(s) == 0:
|
||||||
|
return None
|
||||||
|
if (v_decimal := try_value(Decimal, s)) is not None:
|
||||||
|
return v_decimal
|
||||||
|
if (v_date := try_value(datetime.date.fromisoformat, s)) is not None:
|
||||||
|
return v_date
|
||||||
|
if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None:
|
||||||
|
return v_datetime
|
||||||
|
if s.startswith(('http://', 'https://')):
|
||||||
|
return urllib.parse.urlparse(s)
|
||||||
|
if s.lower() == 'false':
|
||||||
|
return False
|
||||||
|
if s.lower() == 'true':
|
||||||
|
return True
|
||||||
|
if s.lower() == 'none':
|
||||||
|
return None
|
||||||
|
return s
|
||||||
|
|
||||||
|
|
||||||
|
def load_csv_file(csv_file: Path, sniff=False) -> list[frozendict[str, typing.Any]]:
|
||||||
|
dicts: list[frozendict] = []
|
||||||
|
with open(csv_file) as csvfile:
|
||||||
|
if sniff:
|
||||||
|
dialect = csv.Sniffer().sniff(csvfile.read(1024))
|
||||||
|
csvfile.seek(0)
|
||||||
|
else:
|
||||||
|
dialect = CSV_DIALECT
|
||||||
|
reader = csv.DictReader(csvfile, dialect=dialect)
|
||||||
|
for row in reader:
|
||||||
|
for k in list(row.keys()):
|
||||||
|
orig = row[k]
|
||||||
|
row[k] = csv_str_to_value(orig)
|
||||||
|
if row[k] is None:
|
||||||
|
del row[k]
|
||||||
|
del k, orig
|
||||||
|
dicts.append(frozendict(row))
|
||||||
|
del row
|
||||||
|
del csvfile
|
||||||
|
return dicts
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class PossibleKeys:
|
||||||
|
time_start: list[str]
|
||||||
|
time_end: list[str]
|
||||||
|
duration: list[str]
|
||||||
|
name: list[str]
|
||||||
|
image: list[str]
|
||||||
|
misc: list[str]
|
||||||
|
|
||||||
|
|
||||||
|
def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
|
||||||
|
# Select data
|
||||||
|
time_keys = [k for k, v in event_data.items() if isinstance(v, datetime.date)]
|
||||||
|
duration_keys = [
|
||||||
|
k
|
||||||
|
for k, v in event_data.items()
|
||||||
|
if isinstance(v, Decimal) and 'duration_seconds' in k
|
||||||
|
]
|
||||||
|
name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
|
||||||
|
image_keys = [
|
||||||
|
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
|
||||||
|
]
|
||||||
|
|
||||||
|
misc_keys = list(event_data.keys())
|
||||||
|
for k in image_keys:
|
||||||
|
if k in misc_keys:
|
||||||
|
misc_keys.remove(k)
|
||||||
|
del k
|
||||||
|
for k in time_keys:
|
||||||
|
if k in misc_keys:
|
||||||
|
misc_keys.remove(k)
|
||||||
|
del k
|
||||||
|
|
||||||
|
time_start_keys = [k for k in time_keys if 'start' in k.lower()]
|
||||||
|
time_end_keys = [
|
||||||
|
k
|
||||||
|
for k in time_keys
|
||||||
|
if 'end' in k.lower() or 'stop' in k.lower() or 'last' in k.lower()
|
||||||
|
]
|
||||||
|
|
||||||
|
return PossibleKeys(
|
||||||
|
time_start=time_start_keys,
|
||||||
|
time_end=time_end_keys,
|
||||||
|
duration=duration_keys,
|
||||||
|
name=name_keys,
|
||||||
|
image=image_keys,
|
||||||
|
misc=misc_keys,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def start_end(
|
||||||
|
sample: dict[str, Any], keys: PossibleKeys,
|
||||||
|
) -> tuple[datetime.datetime | None, datetime.datetime | None]:
|
||||||
|
if keys.time_start and keys.time_end:
|
||||||
|
return (sample[keys.time_start[0]], sample[keys.time_end[0]])
|
||||||
|
|
||||||
|
if keys.time_start and keys.duration:
|
||||||
|
start = sample[keys.time_start[0]]
|
||||||
|
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
|
||||||
|
return (start, start + duration)
|
||||||
|
|
||||||
|
if keys.time_start:
|
||||||
|
start = sample[keys.time_start[0]]
|
||||||
|
return (start, None)
|
||||||
|
if keys.time_end:
|
||||||
|
return (None, sample[keys.time_end[0]])
|
||||||
|
return (None, None)
|
|
@ -1,68 +1,22 @@
|
||||||
import _csv
|
import _csv
|
||||||
import csv
|
import csv
|
||||||
import datetime
|
import datetime
|
||||||
import decimal
|
|
||||||
import io
|
import io
|
||||||
import logging
|
import logging
|
||||||
import typing
|
import typing
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
from collections.abc import Callable, Iterable, Mapping, Sequence
|
from collections.abc import Iterable, Mapping, Sequence
|
||||||
from decimal import Decimal
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
from frozendict import frozendict
|
from frozendict import frozendict
|
||||||
|
|
||||||
from . import data
|
from . import csv_import, data
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
CSV_DIALECT = 'one_true_dialect'
|
|
||||||
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
|
|
||||||
|
|
||||||
T = typing.TypeVar('T')
|
def csv_safe_value(v: Any) -> str:
|
||||||
|
|
||||||
|
|
||||||
def try_value(fn: Callable[[str], T], s: str) -> T | None:
|
|
||||||
try:
|
|
||||||
return fn(s)
|
|
||||||
except (ValueError, decimal.InvalidOperation):
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def csv_str_to_value(
|
|
||||||
s: str,
|
|
||||||
) -> (
|
|
||||||
str
|
|
||||||
| Decimal
|
|
||||||
| datetime.date
|
|
||||||
| datetime.datetime
|
|
||||||
| urllib.parse.ParseResult
|
|
||||||
| bool
|
|
||||||
| None
|
|
||||||
):
|
|
||||||
if s is None:
|
|
||||||
return None
|
|
||||||
s = s.strip()
|
|
||||||
if len(s) == 0:
|
|
||||||
return None
|
|
||||||
if (v_decimal := try_value(Decimal, s)) is not None:
|
|
||||||
return v_decimal
|
|
||||||
if (v_date := try_value(datetime.date.fromisoformat, s)) is not None:
|
|
||||||
return v_date
|
|
||||||
if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None:
|
|
||||||
return v_datetime
|
|
||||||
if s.startswith(('http://', 'https://')):
|
|
||||||
return urllib.parse.urlparse(s)
|
|
||||||
if s.lower() == 'false':
|
|
||||||
return False
|
|
||||||
if s.lower() == 'true':
|
|
||||||
return True
|
|
||||||
if s.lower() == 'none':
|
|
||||||
return None
|
|
||||||
return s
|
|
||||||
|
|
||||||
|
|
||||||
def csv_safe_value(v: object) -> str:
|
|
||||||
if isinstance(v, urllib.parse.ParseResult):
|
if isinstance(v, urllib.parse.ParseResult):
|
||||||
return v.geturl()
|
return v.geturl()
|
||||||
if isinstance(v, datetime.datetime):
|
if isinstance(v, datetime.datetime):
|
||||||
|
@ -145,32 +99,13 @@ def deduplicate_dicts(
|
||||||
def normalize_dict(d: dict[str, typing.Any]) -> frozendict[str, typing.Any]:
|
def normalize_dict(d: dict[str, typing.Any]) -> frozendict[str, typing.Any]:
|
||||||
return frozendict(
|
return frozendict(
|
||||||
{
|
{
|
||||||
k: csv_str_to_value(str(v))
|
k: csv_import.csv_str_to_value(str(v))
|
||||||
for k, v in d.items()
|
for k, v in d.items()
|
||||||
if csv_str_to_value(str(v)) is not None
|
if csv_import.csv_str_to_value(str(v)) is not None
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def load_csv_file(csv_file: Path) -> list[frozendict[str, typing.Any]]:
|
|
||||||
dicts: list[frozendict] = []
|
|
||||||
with open(csv_file) as csvfile:
|
|
||||||
dialect = csv.Sniffer().sniff(csvfile.read(1024))
|
|
||||||
csvfile.seek(0)
|
|
||||||
reader = csv.DictReader(csvfile, dialect=dialect)
|
|
||||||
for row in reader:
|
|
||||||
for k in list(row.keys()):
|
|
||||||
orig = row[k]
|
|
||||||
row[k] = csv_str_to_value(orig)
|
|
||||||
if row[k] is None:
|
|
||||||
del row[k]
|
|
||||||
del k, orig
|
|
||||||
dicts.append(frozendict(row))
|
|
||||||
del row
|
|
||||||
del csvfile
|
|
||||||
return dicts
|
|
||||||
|
|
||||||
|
|
||||||
def extend_csv_file(
|
def extend_csv_file(
|
||||||
csv_file: Path,
|
csv_file: Path,
|
||||||
new_dicts: list[dict[str, typing.Any]],
|
new_dicts: list[dict[str, typing.Any]],
|
||||||
|
@ -180,7 +115,7 @@ def extend_csv_file(
|
||||||
assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns
|
assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns
|
||||||
|
|
||||||
try:
|
try:
|
||||||
dicts = load_csv_file(csv_file)
|
dicts = csv_import.load_csv_file(csv_file)
|
||||||
except (FileNotFoundError, _csv.Error) as e:
|
except (FileNotFoundError, _csv.Error) as e:
|
||||||
logger.info('Creating file: %s', csv_file)
|
logger.info('Creating file: %s', csv_file)
|
||||||
dicts = []
|
dicts = []
|
||||||
|
@ -199,7 +134,7 @@ def extend_csv_file(
|
||||||
writer = csv.DictWriter(
|
writer = csv.DictWriter(
|
||||||
csvfile_in_memory,
|
csvfile_in_memory,
|
||||||
fieldnames=fieldnames,
|
fieldnames=fieldnames,
|
||||||
dialect=CSV_DIALECT,
|
dialect=csv_import.CSV_DIALECT,
|
||||||
)
|
)
|
||||||
writer.writeheader()
|
writer.writeheader()
|
||||||
for d in dicts:
|
for d in dicts:
|
||||||
|
|
24
test/test_csv_import.py
Normal file
24
test/test_csv_import.py
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
import frozendict
|
||||||
|
|
||||||
|
from personal_data.csv_import import determine_possible_keys
|
||||||
|
|
||||||
|
|
||||||
|
def test_determine_possible_keys():
|
||||||
|
data = frozendict.frozendict(
|
||||||
|
{
|
||||||
|
'game.name': 'Halo',
|
||||||
|
'me.last_played_time': datetime.datetime(
|
||||||
|
2021, 6, 13, 19, 12, 21, tzinfo=datetime.timezone.utc,
|
||||||
|
),
|
||||||
|
'trophy.name': 'Test',
|
||||||
|
'trophy.desc': 'Description',
|
||||||
|
},
|
||||||
|
)
|
||||||
|
keys = determine_possible_keys(data)
|
||||||
|
|
||||||
|
assert keys.time_start == []
|
||||||
|
assert keys.time_end == ['me.last_played_time']
|
||||||
|
assert keys.duration == []
|
||||||
|
assert len(keys.name) == 3
|
|
@ -3,13 +3,37 @@ from decimal import Decimal
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from personal_data.util import csv_str_to_value
|
from personal_data.csv_import import csv_str_to_value
|
||||||
|
|
||||||
PARSE_MAPPINGS = [
|
PARSE_MAPPINGS = [
|
||||||
(
|
(
|
||||||
'2024-04-28 21:35:40+00:00',
|
'2024-04-28 21:35:40+00:00',
|
||||||
datetime.datetime(2024, 4, 28, 21, 35, 40, tzinfo=datetime.UTC),
|
datetime.datetime(2024, 4, 28, 21, 35, 40, tzinfo=datetime.UTC),
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
'2024-07-06 19:30:11+02:00',
|
||||||
|
datetime.datetime(
|
||||||
|
2024,
|
||||||
|
7,
|
||||||
|
6,
|
||||||
|
19,
|
||||||
|
30,
|
||||||
|
11,
|
||||||
|
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
'2023-10-21 11:43:27+02:00',
|
||||||
|
datetime.datetime(
|
||||||
|
2023,
|
||||||
|
10,
|
||||||
|
21,
|
||||||
|
11,
|
||||||
|
43,
|
||||||
|
27,
|
||||||
|
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
|
||||||
|
),
|
||||||
|
),
|
||||||
(
|
(
|
||||||
'0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38',
|
'0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38',
|
||||||
'0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38',
|
'0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38',
|
||||||
|
|
Loading…
Reference in New Issue
Block a user