1
0

Compare commits

..

No commits in common. "72be664d82a1eeac9d2e59aa2f8196e26524a803" and "4f851b21b56e75d85090a31af3ac0aae1686d393" have entirely different histories.

12 changed files with 220 additions and 293 deletions

View File

@ -32,11 +32,12 @@ import sys
from collections.abc import Iterator from collections.abc import Iterator
from pathlib import Path from pathlib import Path
from personal_data.activity import ( from .data import (
ActivitySample, HIDDEN_LABEL_PREFIX,
RealizedActivitySample, HIDDEN_LABEL_TOTAL,
RealizedWorkSample,
WorkSample,
) )
from .format import cli, icalendar from .format import cli, icalendar
from .source import csv_file, git_repo from .source import csv_file, git_repo
@ -50,16 +51,16 @@ MINUTE = datetime.timedelta(minutes=1)
def filter_samples( def filter_samples(
samples: list[ActivitySample], samples: list[WorkSample],
sample_filter: set[str], sample_filter: set[str],
) -> list[ActivitySample]: ) -> list[WorkSample]:
assert len(sample_filter) > 0 assert len(sample_filter) > 0
return [s for s in samples if set(s.labels).intersection(sample_filter)] return [s for s in samples if set(s.labels).intersection(sample_filter)]
def heuristically_realize_samples( def heuristically_realize_samples(
samples: list[ActivitySample], samples: list[WorkSample],
) -> Iterator[RealizedActivitySample]: ) -> Iterator[RealizedWorkSample]:
"""Secret sauce. """Secret sauce.
Guarentees that: Guarentees that:
@ -86,9 +87,7 @@ def heuristically_realize_samples(
start_at = max(previous_sample_end, end_at - estimated_duration) start_at = max(previous_sample_end, end_at - estimated_duration)
del estimated_duration del estimated_duration
yield RealizedActivitySample( yield RealizedWorkSample(labels=sample.labels, end_at=end_at, start_at=start_at)
labels=sample.labels, end_at=end_at, start_at=start_at,
)
previous_sample_end = sample.end_at previous_sample_end = sample.end_at
del sample del sample
@ -138,8 +137,8 @@ def parse_arguments():
return parser.parse_args() return parser.parse_args()
def load_samples(args) -> set[ActivitySample]: def load_samples(args) -> set[WorkSample]:
shared_time_stamps_set: set[ActivitySample] = set() shared_time_stamps_set: set[WorkSample] = set()
# Git repositories # Git repositories
for repo_path in args.repositories: for repo_path in args.repositories:

19
git_time_tracker/data.py Normal file
View File

@ -0,0 +1,19 @@
import dataclasses
import datetime
from collections.abc import Sequence
HIDDEN_LABEL_PREFIX = '__'
HIDDEN_LABEL_TOTAL = HIDDEN_LABEL_PREFIX + 'TOTAL'
@dataclasses.dataclass(frozen=True, order=True)
class WorkSample:
labels: Sequence[str]
start_at: datetime.datetime | None
end_at: datetime.datetime | None
@dataclasses.dataclass(frozen=True, order=True)
class RealizedWorkSample(WorkSample):
start_at: datetime.datetime
end_at: datetime.datetime

View File

@ -1,7 +1,7 @@
import datetime import datetime
from collections.abc import Iterator from collections.abc import Iterator
from personal_data.activity import HIDDEN_LABEL_CATEGORY, Label, RealizedActivitySample from ..data import HIDDEN_LABEL_PREFIX, HIDDEN_LABEL_TOTAL, RealizedWorkSample
ZERO_DURATION = datetime.timedelta(seconds=0) ZERO_DURATION = datetime.timedelta(seconds=0)
HOUR = datetime.timedelta(hours=1) HOUR = datetime.timedelta(hours=1)
@ -29,18 +29,18 @@ def fmt_year_ranges(years: list[int]) -> str:
return ''.join(list(fmt_year_ranges_internal(years))) return ''.join(list(fmt_year_ranges_internal(years)))
def fmt_line(label: Label, total_time: datetime.timedelta) -> str: def fmt_line(label_type: str, label: str, total_time: datetime.timedelta) -> str:
hours = int(total_time / HOUR) hours = int(total_time / HOUR)
minutes = int((total_time - hours * HOUR) / MINUTE) minutes = int((total_time - hours * HOUR) / MINUTE)
return f' {label.category:10} {label.label:40} {hours:-4d}h {minutes:-2d}m' return f' {label_type:10} {label:40} {hours:-4d}h {minutes:-2d}m'
def generate_report( def generate_report(
samples: list[RealizedActivitySample], samples: list[RealizedWorkSample],
) -> Iterator[str]: ) -> Iterator[str]:
# Time spent per label # Time spent per label
time_per_label: dict[Label, datetime.timedelta] = {} time_per_label: dict[str, datetime.timedelta] = {}
years_per_label: dict[Label, set[int]] = {} years_per_label: dict[str, set[int]] = {}
for sample in samples: for sample in samples:
duration = sample.end_at - sample.start_at duration = sample.end_at - sample.start_at
@ -57,13 +57,15 @@ def generate_report(
# #
yield '-' * 66 yield '-' * 66
yield '\n' yield '\n'
for total_time, label in time_and_label: for total_time, label_and_type in time_and_label:
if label.category == HIDDEN_LABEL_CATEGORY: if label_and_type.startswith(HIDDEN_LABEL_PREFIX):
continue continue
yield fmt_line(label, total_time) label_type, label = label_and_type.split(':', 1)
yield fmt_line(label_type, label, total_time)
yield ' (' yield ' ('
yield fmt_year_ranges(years_per_label.get(label, [])) yield fmt_year_ranges(years_per_label.get(label_and_type, []))
yield ')' yield ')'
yield '\n' yield '\n'
del label, total_time del label, total_time
@ -71,6 +73,5 @@ def generate_report(
yield '-' * 66 yield '-' * 66
yield '\n' yield '\n'
label_total = Label(HIDDEN_LABEL_CATEGORY, 'total') yield fmt_line('', 'TOTAL', time_per_label.get(HIDDEN_LABEL_TOTAL, ZERO_DURATION))
yield fmt_line(label_total, time_per_label.get(label_total, ZERO_DURATION))
yield '\n' yield '\n'

View File

@ -2,28 +2,30 @@ import datetime
import icalendar import icalendar
from personal_data.activity import HIDDEN_LABEL_CATEGORY, RealizedActivitySample from ..data import HIDDEN_LABEL_PREFIX, RealizedWorkSample
ZERO_DURATION = datetime.timedelta(seconds=0) ZERO_DURATION = datetime.timedelta(seconds=0)
HOUR = datetime.timedelta(hours=1) HOUR = datetime.timedelta(hours=1)
MINUTE = datetime.timedelta(minutes=1) MINUTE = datetime.timedelta(minutes=1)
def create_title(sample: RealizedActivitySample) -> tuple[str, str]: def create_title(sample: RealizedWorkSample) -> tuple[str, str]:
ls = [] ls = []
desc = [] desc = []
for label in sample.labels: for label_and_type in sample.labels:
if label.category in {HIDDEN_LABEL_CATEGORY, 'author'}: if label_and_type.startswith(HIDDEN_LABEL_PREFIX):
continue
if label_and_type.startswith('author:'):
continue continue
if len(ls) == 0: if len(ls) == 0:
ls.append(label.label) ls.append(label_and_type.split(':')[1])
else: else:
desc.append(label.label) desc.append(label_and_type)
return ' '.join(ls), '\n'.join(desc) return ' '.join(ls), '\n'.join(desc)
def generate_calendar( def generate_calendar(
samples: list[RealizedActivitySample], samples: list[RealizedWorkSample],
) -> icalendar.Calendar: ) -> icalendar.Calendar:
max_title_parts = 2 max_title_parts = 2
@ -42,11 +44,11 @@ def generate_calendar(
event.add('dtstart', sample.start_at) event.add('dtstart', sample.start_at)
event.add('dtend', sample.end_at) event.add('dtend', sample.end_at)
for label in sample.labels: for label_and_type in sample.labels:
if label.category == 'author': if label_and_type.startswith('author:'):
event.add( event.add(
'organizer', 'organizer',
'mailto:' + label.label, 'mailto:' + label_and_type.removeprefix('author:'),
) )
cal.add_component(event) cal.add_component(event)
@ -56,7 +58,7 @@ def generate_calendar(
def generate_icalendar_file( def generate_icalendar_file(
samples: list[RealizedActivitySample], samples: list[RealizedWorkSample],
file: str, file: str,
) -> None: ) -> None:
calendar = generate_calendar(samples) calendar = generate_calendar(samples)

View File

@ -1,17 +1,84 @@
from collections.abc import Iterator import datetime
from pathlib import Path import urllib.parse
from typing import Any from typing import Any
from collections.abc import Iterator
from decimal import Decimal
from pathlib import Path
import dataclasses
from personal_data.activity import ActivitySample, Label from personal_data.util import load_csv_file
from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end
from ..data import WorkSample
def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivitySample]: @dataclasses.dataclass
class PossibleKeys:
time_start: list[str]
time_end: list[str]
duration: list[str]
name: list[str]
image: list[str]
misc: list[str]
def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
# Select data
time_keys = [
k for k, v in event_data.items() if isinstance(v, datetime.date)
]
duration_keys = [
k
for k, v in event_data.items()
if isinstance(v, Decimal) and 'duration_seconds' in k
]
name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
image_keys = [
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
]
misc_keys = list(event_data.keys())
for k in image_keys:
if k in misc_keys:
misc_keys.remove(k)
del k
for k in time_keys:
if k in misc_keys:
misc_keys.remove(k)
del k
time_start_keys = [k for k in time_keys if 'start' in k.lower() ]
time_end_keys = [k for k in time_keys if 'end' in k.lower() or 'stop' in k.lower() ]
return PossibleKeys(
time_start = time_start_keys,
time_end = time_end_keys,
duration = duration_keys,
name = name_keys,
image = image_keys,
misc = misc_keys,
)
def start_end(sample: dict[str,Any], keys: PossibleKeys) -> tuple[datetime.datetime | None, datetime.datetime | None]:
if keys.time_start and keys.time_end:
return (sample[keys.time_start[0]], sample[keys.time_end[0]])
if keys.time_start and keys.duration:
start = sample[keys.time_start[0]]
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
return (start, start + duration)
if keys.time_start:
start = sample[keys.time_start[0]]
return (start, None)
if keys.time_end:
return (None, sample[keys.time_end[0]])
return (None, None)
def iterate_samples_from_dicts(rows: list[dict[str,Any]]) -> Iterator[WorkSample]:
assert len(rows) > 0 assert len(rows) > 0
max_title_parts = 2 max_title_parts = 2
if True: if True:
event_data = rows[len(rows) // 2] # Hopefully select a useful representative. event_data = rows[len(rows)//2] # Hopefully select a useful representative.
possible_keys = determine_possible_keys(event_data) possible_keys = determine_possible_keys(event_data)
del event_data del event_data
@ -19,19 +86,20 @@ def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivityS
assert len(possible_keys.image) >= 0 assert len(possible_keys.image) >= 0
for event_data in rows: for event_data in rows:
""" '''
title = ': '.join(event_data[k] for k in possible_name_keys[:max_title_parts]) title = ': '.join(event_data[k] for k in possible_name_keys[:max_title_parts])
description = '\n\n'.join( description = '\n\n'.join(
event_data[k] for k in possible_name_keys[max_title_parts:] event_data[k] for k in possible_name_keys[max_title_parts:]
) )
image = event_data[possible_keys.image[0]] if possible_keys.image else None image = event_data[possible_keys.image[0]] if possible_keys.image else None
""" '''
(start_at, end_at) = start_end(event_data, possible_keys) (start_at, end_at) = start_end(event_data, possible_keys)
labels = [Label(k, event_data[k]) for k in possible_keys.misc] labels = [f'{k}:{event_data[k]}' for k in possible_keys.misc]
# Create event # Create event
yield ActivitySample( yield WorkSample(
labels=tuple(labels), labels=tuple(labels),
start_at=start_at, start_at=start_at,
end_at=end_at, end_at=end_at,
@ -40,7 +108,7 @@ def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivityS
del event_data del event_data
def iterate_samples_from_csv_file(file_path: Path) -> Iterator[ActivitySample]: def iterate_samples_from_csv_file(file_path: Path) -> Iterator[WorkSample]:
dicts = load_csv_file(file_path) dicts = load_csv_file(file_path)
samples = list(iterate_samples_from_dicts(dicts)) samples = list(iterate_samples_from_dicts(dicts))
assert len(samples) > 0, 'Did not found any samples' assert len(samples) > 0, 'Did not found any samples'

View File

@ -5,7 +5,7 @@ from pathlib import Path
import git import git
from personal_data.activity import HIDDEN_LABEL_CATEGORY, ActivitySample, Label from ..data import HIDDEN_LABEL_TOTAL, WorkSample
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -25,7 +25,7 @@ def determine_project_name(repo: git.Repo) -> str:
return Path(repo.working_tree_dir).name return Path(repo.working_tree_dir).name
def get_samples_from_project(repo: git.Repo) -> Iterator[ActivitySample]: def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]:
project_name = determine_project_name(repo) project_name = determine_project_name(repo)
assert project_name is not None assert project_name is not None
@ -34,9 +34,9 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[ActivitySample]:
repo.commit() repo.commit()
for commit in repo.iter_commits(determine_default_branch(repo)): for commit in repo.iter_commits(determine_default_branch(repo)):
labels = [Label(HIDDEN_LABEL_CATEGORY, 'total')] labels = [HIDDEN_LABEL_TOTAL]
labels.append(Label('project', project_name)) labels.append('project:' + project_name)
labels.append(Label('author', commit.author.email)) labels.append('author:' + commit.author.email)
authored_date = datetime.datetime.fromtimestamp( authored_date = datetime.datetime.fromtimestamp(
commit.authored_date, commit.authored_date,
@ -47,13 +47,13 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[ActivitySample]:
tz=datetime.UTC, tz=datetime.UTC,
) )
yield ActivitySample( yield WorkSample(
labels=tuple(labels), labels=tuple(labels),
start_at=None, start_at=None,
end_at=authored_date, end_at=authored_date,
) )
if authored_date != committed_date: if authored_date != committed_date:
yield ActivitySample( yield WorkSample(
labels=tuple(labels), labels=tuple(labels),
start_at=None, start_at=None,
end_at=committed_date, end_at=committed_date,
@ -61,7 +61,7 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[ActivitySample]:
del labels del labels
def iterate_samples_from_git_repository(repo_path: Path) -> Iterator[ActivitySample]: def iterate_samples_from_git_repository(repo_path: Path) -> Iterator[WorkSample]:
try: try:
yield from get_samples_from_project(git.Repo(repo_path)) yield from get_samples_from_project(git.Repo(repo_path))
except git.exc.InvalidGitRepositoryError: except git.exc.InvalidGitRepositoryError:

View File

@ -1,29 +0,0 @@
import dataclasses
import datetime
from collections.abc import Sequence
HIDDEN_LABEL_CATEGORY = '__'
@dataclasses.dataclass(frozen=True, order=True)
class Label:
category: str
label: str
def __post_init__(self):
assert self.category is not None
assert ':' not in self.category
assert self.label is not None
@dataclasses.dataclass(frozen=True, order=True)
class ActivitySample:
labels: Sequence[Label]
start_at: datetime.datetime | None
end_at: datetime.datetime | None
@dataclasses.dataclass(frozen=True, order=True)
class RealizedActivitySample(ActivitySample):
start_at: datetime.datetime
end_at: datetime.datetime

View File

@ -1,150 +0,0 @@
import csv
import dataclasses
import datetime
import decimal
import typing
import urllib.parse
from collections.abc import Callable
from decimal import Decimal
from pathlib import Path
from typing import Any
from frozendict import frozendict
CSV_DIALECT = 'one_true_dialect'
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
T = typing.TypeVar('T')
def try_value(fn: Callable[[str], T], s: str) -> T | None:
try:
return fn(s)
except (ValueError, decimal.InvalidOperation):
return None
def csv_str_to_value(
s: str,
) -> (
str
| Decimal
| datetime.date
| datetime.datetime
| urllib.parse.ParseResult
| bool
| None
):
assert not isinstance(s, list) # TODO?
if s is None:
return None
s = s.strip()
if len(s) == 0:
return None
if (v_decimal := try_value(Decimal, s)) is not None:
return v_decimal
if (v_date := try_value(datetime.date.fromisoformat, s)) is not None:
return v_date
if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None:
return v_datetime
if s.startswith(('http://', 'https://')):
return urllib.parse.urlparse(s)
if s.lower() == 'false':
return False
if s.lower() == 'true':
return True
if s.lower() == 'none':
return None
return s
def load_csv_file(csv_file: Path, sniff=False) -> list[frozendict[str, typing.Any]]:
dicts: list[frozendict] = []
with open(csv_file) as csvfile:
if sniff:
dialect = csv.Sniffer().sniff(csvfile.read(1024))
csvfile.seek(0)
else:
dialect = CSV_DIALECT
reader = csv.DictReader(csvfile, dialect=dialect)
for row in reader:
for k in list(row.keys()):
orig = row[k]
row[k] = csv_str_to_value(orig)
if row[k] is None:
del row[k]
del k, orig
dicts.append(frozendict(row))
del row
del csvfile
return dicts
@dataclasses.dataclass
class PossibleKeys:
time_start: list[str]
time_end: list[str]
duration: list[str]
name: list[str]
image: list[str]
misc: list[str]
def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
# Select data
time_keys = [k for k, v in event_data.items() if isinstance(v, datetime.date)]
duration_keys = [
k
for k, v in event_data.items()
if isinstance(v, Decimal) and 'duration_seconds' in k
]
name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
image_keys = [
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
]
misc_keys = list(event_data.keys())
for k in image_keys:
if k in misc_keys:
misc_keys.remove(k)
del k
for k in time_keys:
if k in misc_keys:
misc_keys.remove(k)
del k
time_start_keys = [k for k in time_keys if 'start' in k.lower()]
time_end_keys = [
k
for k in time_keys
if 'end' in k.lower() or 'stop' in k.lower() or 'last' in k.lower()
]
return PossibleKeys(
time_start=time_start_keys,
time_end=time_end_keys,
duration=duration_keys,
name=name_keys,
image=image_keys,
misc=misc_keys,
)
def start_end(
sample: dict[str, Any], keys: PossibleKeys,
) -> tuple[datetime.datetime | None, datetime.datetime | None]:
if keys.time_start and keys.time_end:
return (sample[keys.time_start[0]], sample[keys.time_end[0]])
if keys.time_start and keys.duration:
start = sample[keys.time_start[0]]
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
return (start, start + duration)
if keys.time_start:
start = sample[keys.time_start[0]]
return (start, None)
if keys.time_end:
return (None, sample[keys.time_end[0]])
return (None, None)

View File

@ -57,12 +57,12 @@ def parse_time(text: str) -> datetime.datetime:
time = try_parse(text, '%d %b %Y %I:%M:%S %p') time = try_parse(text, '%d %b %Y %I:%M:%S %p')
time = time or try_parse(text, '%d %b, %Y @ %I:%M%p') time = time or try_parse(text, '%d %b, %Y @ %I:%M%p')
if time is None and (m := try_parse(text, '%d %b @ %I:%M%p')): if time is None and (m := try_parse(text, '%d %b @ %I:%M%p')):
time = m.replace(year=NOW.year) time = m.replace(year = NOW.year)
assert time is not None, 'Could not parse format' assert time is not None, 'Could not parse format'
if time.tzinfo is None: if time.tzinfo is None:
time = time.replace(tzinfo=LOCAL_TIMEZONE) time = time.replace(tzinfo=LOCAL_TIMEZONE )
assert time.tzinfo is not None, time assert time.tzinfo is not None, time
return time return time

View File

@ -1,22 +1,68 @@
import _csv import _csv
import csv import csv
import datetime import datetime
import decimal
import io import io
import logging import logging
import typing import typing
import urllib.parse import urllib.parse
from collections.abc import Iterable, Mapping, Sequence from collections.abc import Callable, Iterable, Mapping, Sequence
from decimal import Decimal
from pathlib import Path from pathlib import Path
from typing import Any
from frozendict import frozendict from frozendict import frozendict
from . import csv_import, data from . import data
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
CSV_DIALECT = 'one_true_dialect'
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
def csv_safe_value(v: Any) -> str: T = typing.TypeVar('T')
def try_value(fn: Callable[[str], T], s: str) -> T | None:
try:
return fn(s)
except (ValueError, decimal.InvalidOperation):
return None
def csv_str_to_value(
s: str,
) -> (
str
| Decimal
| datetime.date
| datetime.datetime
| urllib.parse.ParseResult
| bool
| None
):
if s is None:
return None
s = s.strip()
if len(s) == 0:
return None
if (v_decimal := try_value(Decimal, s)) is not None:
return v_decimal
if (v_date := try_value(datetime.date.fromisoformat, s)) is not None:
return v_date
if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None:
return v_datetime
if s.startswith(('http://', 'https://')):
return urllib.parse.urlparse(s)
if s.lower() == 'false':
return False
if s.lower() == 'true':
return True
if s.lower() == 'none':
return None
return s
def csv_safe_value(v: object) -> str:
if isinstance(v, urllib.parse.ParseResult): if isinstance(v, urllib.parse.ParseResult):
return v.geturl() return v.geturl()
if isinstance(v, datetime.datetime): if isinstance(v, datetime.datetime):
@ -99,13 +145,32 @@ def deduplicate_dicts(
def normalize_dict(d: dict[str, typing.Any]) -> frozendict[str, typing.Any]: def normalize_dict(d: dict[str, typing.Any]) -> frozendict[str, typing.Any]:
return frozendict( return frozendict(
{ {
k: csv_import.csv_str_to_value(str(v)) k: csv_str_to_value(str(v))
for k, v in d.items() for k, v in d.items()
if csv_import.csv_str_to_value(str(v)) is not None if csv_str_to_value(str(v)) is not None
}, },
) )
def load_csv_file(csv_file: Path) -> list[frozendict[str, typing.Any]]:
dicts: list[frozendict] = []
with open(csv_file) as csvfile:
dialect = csv.Sniffer().sniff(csvfile.read(1024))
csvfile.seek(0)
reader = csv.DictReader(csvfile, dialect=dialect)
for row in reader:
for k in list(row.keys()):
orig = row[k]
row[k] = csv_str_to_value(orig)
if row[k] is None:
del row[k]
del k, orig
dicts.append(frozendict(row))
del row
del csvfile
return dicts
def extend_csv_file( def extend_csv_file(
csv_file: Path, csv_file: Path,
new_dicts: list[dict[str, typing.Any]], new_dicts: list[dict[str, typing.Any]],
@ -115,7 +180,7 @@ def extend_csv_file(
assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns
try: try:
dicts = csv_import.load_csv_file(csv_file) dicts = load_csv_file(csv_file)
except (FileNotFoundError, _csv.Error) as e: except (FileNotFoundError, _csv.Error) as e:
logger.info('Creating file: %s', csv_file) logger.info('Creating file: %s', csv_file)
dicts = [] dicts = []
@ -134,7 +199,7 @@ def extend_csv_file(
writer = csv.DictWriter( writer = csv.DictWriter(
csvfile_in_memory, csvfile_in_memory,
fieldnames=fieldnames, fieldnames=fieldnames,
dialect=csv_import.CSV_DIALECT, dialect=CSV_DIALECT,
) )
writer.writeheader() writer.writeheader()
for d in dicts: for d in dicts:

View File

@ -1,24 +0,0 @@
import datetime
import frozendict
from personal_data.csv_import import determine_possible_keys
def test_determine_possible_keys():
data = frozendict.frozendict(
{
'game.name': 'Halo',
'me.last_played_time': datetime.datetime(
2021, 6, 13, 19, 12, 21, tzinfo=datetime.timezone.utc,
),
'trophy.name': 'Test',
'trophy.desc': 'Description',
},
)
keys = determine_possible_keys(data)
assert keys.time_start == []
assert keys.time_end == ['me.last_played_time']
assert keys.duration == []
assert len(keys.name) == 3

View File

@ -3,37 +3,13 @@ from decimal import Decimal
import pytest import pytest
from personal_data.csv_import import csv_str_to_value from personal_data.util import csv_str_to_value
PARSE_MAPPINGS = [ PARSE_MAPPINGS = [
( (
'2024-04-28 21:35:40+00:00', '2024-04-28 21:35:40+00:00',
datetime.datetime(2024, 4, 28, 21, 35, 40, tzinfo=datetime.UTC), datetime.datetime(2024, 4, 28, 21, 35, 40, tzinfo=datetime.UTC),
), ),
(
'2024-07-06 19:30:11+02:00',
datetime.datetime(
2024,
7,
6,
19,
30,
11,
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
),
),
(
'2023-10-21 11:43:27+02:00',
datetime.datetime(
2023,
10,
21,
11,
43,
27,
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
),
),
( (
'0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38', '0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38',
'0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38', '0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38',