1
0

Compare commits

..

4 Commits

Author SHA1 Message Date
72be664d82
Ruff
All checks were successful
Test Python / Test (push) Successful in 31s
2024-10-13 15:20:30 +02:00
477fce869d
Moved ActivitySample into personal_data 2024-10-13 15:20:18 +02:00
eb3518ba88
Renamed WorkSample to ActivitySample 2024-10-13 15:05:55 +02:00
f47daa3256
Merging git_time_tracker's advanced CSV parsing into personal_data 2024-10-13 15:04:18 +02:00
12 changed files with 292 additions and 219 deletions

View File

@ -32,12 +32,11 @@ import sys
from collections.abc import Iterator from collections.abc import Iterator
from pathlib import Path from pathlib import Path
from .data import ( from personal_data.activity import (
HIDDEN_LABEL_PREFIX, ActivitySample,
HIDDEN_LABEL_TOTAL, RealizedActivitySample,
RealizedWorkSample,
WorkSample,
) )
from .format import cli, icalendar from .format import cli, icalendar
from .source import csv_file, git_repo from .source import csv_file, git_repo
@ -51,16 +50,16 @@ MINUTE = datetime.timedelta(minutes=1)
def filter_samples( def filter_samples(
samples: list[WorkSample], samples: list[ActivitySample],
sample_filter: set[str], sample_filter: set[str],
) -> list[WorkSample]: ) -> list[ActivitySample]:
assert len(sample_filter) > 0 assert len(sample_filter) > 0
return [s for s in samples if set(s.labels).intersection(sample_filter)] return [s for s in samples if set(s.labels).intersection(sample_filter)]
def heuristically_realize_samples( def heuristically_realize_samples(
samples: list[WorkSample], samples: list[ActivitySample],
) -> Iterator[RealizedWorkSample]: ) -> Iterator[RealizedActivitySample]:
"""Secret sauce. """Secret sauce.
Guarentees that: Guarentees that:
@ -87,7 +86,9 @@ def heuristically_realize_samples(
start_at = max(previous_sample_end, end_at - estimated_duration) start_at = max(previous_sample_end, end_at - estimated_duration)
del estimated_duration del estimated_duration
yield RealizedWorkSample(labels=sample.labels, end_at=end_at, start_at=start_at) yield RealizedActivitySample(
labels=sample.labels, end_at=end_at, start_at=start_at,
)
previous_sample_end = sample.end_at previous_sample_end = sample.end_at
del sample del sample
@ -137,8 +138,8 @@ def parse_arguments():
return parser.parse_args() return parser.parse_args()
def load_samples(args) -> set[WorkSample]: def load_samples(args) -> set[ActivitySample]:
shared_time_stamps_set: set[WorkSample] = set() shared_time_stamps_set: set[ActivitySample] = set()
# Git repositories # Git repositories
for repo_path in args.repositories: for repo_path in args.repositories:

View File

@ -1,19 +0,0 @@
import dataclasses
import datetime
from collections.abc import Sequence
HIDDEN_LABEL_PREFIX = '__'
HIDDEN_LABEL_TOTAL = HIDDEN_LABEL_PREFIX + 'TOTAL'
@dataclasses.dataclass(frozen=True, order=True)
class WorkSample:
labels: Sequence[str]
start_at: datetime.datetime | None
end_at: datetime.datetime | None
@dataclasses.dataclass(frozen=True, order=True)
class RealizedWorkSample(WorkSample):
start_at: datetime.datetime
end_at: datetime.datetime

View File

@ -1,7 +1,7 @@
import datetime import datetime
from collections.abc import Iterator from collections.abc import Iterator
from ..data import HIDDEN_LABEL_PREFIX, HIDDEN_LABEL_TOTAL, RealizedWorkSample from personal_data.activity import HIDDEN_LABEL_CATEGORY, Label, RealizedActivitySample
ZERO_DURATION = datetime.timedelta(seconds=0) ZERO_DURATION = datetime.timedelta(seconds=0)
HOUR = datetime.timedelta(hours=1) HOUR = datetime.timedelta(hours=1)
@ -29,18 +29,18 @@ def fmt_year_ranges(years: list[int]) -> str:
return ''.join(list(fmt_year_ranges_internal(years))) return ''.join(list(fmt_year_ranges_internal(years)))
def fmt_line(label_type: str, label: str, total_time: datetime.timedelta) -> str: def fmt_line(label: Label, total_time: datetime.timedelta) -> str:
hours = int(total_time / HOUR) hours = int(total_time / HOUR)
minutes = int((total_time - hours * HOUR) / MINUTE) minutes = int((total_time - hours * HOUR) / MINUTE)
return f' {label_type:10} {label:40} {hours:-4d}h {minutes:-2d}m' return f' {label.category:10} {label.label:40} {hours:-4d}h {minutes:-2d}m'
def generate_report( def generate_report(
samples: list[RealizedWorkSample], samples: list[RealizedActivitySample],
) -> Iterator[str]: ) -> Iterator[str]:
# Time spent per label # Time spent per label
time_per_label: dict[str, datetime.timedelta] = {} time_per_label: dict[Label, datetime.timedelta] = {}
years_per_label: dict[str, set[int]] = {} years_per_label: dict[Label, set[int]] = {}
for sample in samples: for sample in samples:
duration = sample.end_at - sample.start_at duration = sample.end_at - sample.start_at
@ -57,15 +57,13 @@ def generate_report(
# #
yield '-' * 66 yield '-' * 66
yield '\n' yield '\n'
for total_time, label_and_type in time_and_label: for total_time, label in time_and_label:
if label_and_type.startswith(HIDDEN_LABEL_PREFIX): if label.category == HIDDEN_LABEL_CATEGORY:
continue continue
label_type, label = label_and_type.split(':', 1) yield fmt_line(label, total_time)
yield fmt_line(label_type, label, total_time)
yield ' (' yield ' ('
yield fmt_year_ranges(years_per_label.get(label_and_type, [])) yield fmt_year_ranges(years_per_label.get(label, []))
yield ')' yield ')'
yield '\n' yield '\n'
del label, total_time del label, total_time
@ -73,5 +71,6 @@ def generate_report(
yield '-' * 66 yield '-' * 66
yield '\n' yield '\n'
yield fmt_line('', 'TOTAL', time_per_label.get(HIDDEN_LABEL_TOTAL, ZERO_DURATION)) label_total = Label(HIDDEN_LABEL_CATEGORY, 'total')
yield fmt_line(label_total, time_per_label.get(label_total, ZERO_DURATION))
yield '\n' yield '\n'

View File

@ -2,30 +2,28 @@ import datetime
import icalendar import icalendar
from ..data import HIDDEN_LABEL_PREFIX, RealizedWorkSample from personal_data.activity import HIDDEN_LABEL_CATEGORY, RealizedActivitySample
ZERO_DURATION = datetime.timedelta(seconds=0) ZERO_DURATION = datetime.timedelta(seconds=0)
HOUR = datetime.timedelta(hours=1) HOUR = datetime.timedelta(hours=1)
MINUTE = datetime.timedelta(minutes=1) MINUTE = datetime.timedelta(minutes=1)
def create_title(sample: RealizedWorkSample) -> tuple[str, str]: def create_title(sample: RealizedActivitySample) -> tuple[str, str]:
ls = [] ls = []
desc = [] desc = []
for label_and_type in sample.labels: for label in sample.labels:
if label_and_type.startswith(HIDDEN_LABEL_PREFIX): if label.category in {HIDDEN_LABEL_CATEGORY, 'author'}:
continue
if label_and_type.startswith('author:'):
continue continue
if len(ls) == 0: if len(ls) == 0:
ls.append(label_and_type.split(':')[1]) ls.append(label.label)
else: else:
desc.append(label_and_type) desc.append(label.label)
return ' '.join(ls), '\n'.join(desc) return ' '.join(ls), '\n'.join(desc)
def generate_calendar( def generate_calendar(
samples: list[RealizedWorkSample], samples: list[RealizedActivitySample],
) -> icalendar.Calendar: ) -> icalendar.Calendar:
max_title_parts = 2 max_title_parts = 2
@ -44,11 +42,11 @@ def generate_calendar(
event.add('dtstart', sample.start_at) event.add('dtstart', sample.start_at)
event.add('dtend', sample.end_at) event.add('dtend', sample.end_at)
for label_and_type in sample.labels: for label in sample.labels:
if label_and_type.startswith('author:'): if label.category == 'author':
event.add( event.add(
'organizer', 'organizer',
'mailto:' + label_and_type.removeprefix('author:'), 'mailto:' + label.label,
) )
cal.add_component(event) cal.add_component(event)
@ -58,7 +56,7 @@ def generate_calendar(
def generate_icalendar_file( def generate_icalendar_file(
samples: list[RealizedWorkSample], samples: list[RealizedActivitySample],
file: str, file: str,
) -> None: ) -> None:
calendar = generate_calendar(samples) calendar = generate_calendar(samples)

View File

@ -1,84 +1,17 @@
import datetime
import urllib.parse
from typing import Any
from collections.abc import Iterator from collections.abc import Iterator
from decimal import Decimal
from pathlib import Path from pathlib import Path
import dataclasses from typing import Any
from personal_data.util import load_csv_file from personal_data.activity import ActivitySample, Label
from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end
from ..data import WorkSample
@dataclasses.dataclass def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivitySample]:
class PossibleKeys:
time_start: list[str]
time_end: list[str]
duration: list[str]
name: list[str]
image: list[str]
misc: list[str]
def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
# Select data
time_keys = [
k for k, v in event_data.items() if isinstance(v, datetime.date)
]
duration_keys = [
k
for k, v in event_data.items()
if isinstance(v, Decimal) and 'duration_seconds' in k
]
name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
image_keys = [
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
]
misc_keys = list(event_data.keys())
for k in image_keys:
if k in misc_keys:
misc_keys.remove(k)
del k
for k in time_keys:
if k in misc_keys:
misc_keys.remove(k)
del k
time_start_keys = [k for k in time_keys if 'start' in k.lower() ]
time_end_keys = [k for k in time_keys if 'end' in k.lower() or 'stop' in k.lower() ]
return PossibleKeys(
time_start = time_start_keys,
time_end = time_end_keys,
duration = duration_keys,
name = name_keys,
image = image_keys,
misc = misc_keys,
)
def start_end(sample: dict[str,Any], keys: PossibleKeys) -> tuple[datetime.datetime | None, datetime.datetime | None]:
if keys.time_start and keys.time_end:
return (sample[keys.time_start[0]], sample[keys.time_end[0]])
if keys.time_start and keys.duration:
start = sample[keys.time_start[0]]
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
return (start, start + duration)
if keys.time_start:
start = sample[keys.time_start[0]]
return (start, None)
if keys.time_end:
return (None, sample[keys.time_end[0]])
return (None, None)
def iterate_samples_from_dicts(rows: list[dict[str,Any]]) -> Iterator[WorkSample]:
assert len(rows) > 0 assert len(rows) > 0
max_title_parts = 2 max_title_parts = 2
if True: if True:
event_data = rows[len(rows)//2] # Hopefully select a useful representative. event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
possible_keys = determine_possible_keys(event_data) possible_keys = determine_possible_keys(event_data)
del event_data del event_data
@ -86,20 +19,19 @@ def iterate_samples_from_dicts(rows: list[dict[str,Any]]) -> Iterator[WorkSample
assert len(possible_keys.image) >= 0 assert len(possible_keys.image) >= 0
for event_data in rows: for event_data in rows:
''' """
title = ': '.join(event_data[k] for k in possible_name_keys[:max_title_parts]) title = ': '.join(event_data[k] for k in possible_name_keys[:max_title_parts])
description = '\n\n'.join( description = '\n\n'.join(
event_data[k] for k in possible_name_keys[max_title_parts:] event_data[k] for k in possible_name_keys[max_title_parts:]
) )
image = event_data[possible_keys.image[0]] if possible_keys.image else None image = event_data[possible_keys.image[0]] if possible_keys.image else None
''' """
(start_at, end_at) = start_end(event_data, possible_keys) (start_at, end_at) = start_end(event_data, possible_keys)
labels = [f'{k}:{event_data[k]}' for k in possible_keys.misc] labels = [Label(k, event_data[k]) for k in possible_keys.misc]
# Create event # Create event
yield WorkSample( yield ActivitySample(
labels=tuple(labels), labels=tuple(labels),
start_at=start_at, start_at=start_at,
end_at=end_at, end_at=end_at,
@ -108,7 +40,7 @@ def iterate_samples_from_dicts(rows: list[dict[str,Any]]) -> Iterator[WorkSample
del event_data del event_data
def iterate_samples_from_csv_file(file_path: Path) -> Iterator[WorkSample]: def iterate_samples_from_csv_file(file_path: Path) -> Iterator[ActivitySample]:
dicts = load_csv_file(file_path) dicts = load_csv_file(file_path)
samples = list(iterate_samples_from_dicts(dicts)) samples = list(iterate_samples_from_dicts(dicts))
assert len(samples) > 0, 'Did not found any samples' assert len(samples) > 0, 'Did not found any samples'

View File

@ -5,7 +5,7 @@ from pathlib import Path
import git import git
from ..data import HIDDEN_LABEL_TOTAL, WorkSample from personal_data.activity import HIDDEN_LABEL_CATEGORY, ActivitySample, Label
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -25,7 +25,7 @@ def determine_project_name(repo: git.Repo) -> str:
return Path(repo.working_tree_dir).name return Path(repo.working_tree_dir).name
def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]: def get_samples_from_project(repo: git.Repo) -> Iterator[ActivitySample]:
project_name = determine_project_name(repo) project_name = determine_project_name(repo)
assert project_name is not None assert project_name is not None
@ -34,9 +34,9 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]:
repo.commit() repo.commit()
for commit in repo.iter_commits(determine_default_branch(repo)): for commit in repo.iter_commits(determine_default_branch(repo)):
labels = [HIDDEN_LABEL_TOTAL] labels = [Label(HIDDEN_LABEL_CATEGORY, 'total')]
labels.append('project:' + project_name) labels.append(Label('project', project_name))
labels.append('author:' + commit.author.email) labels.append(Label('author', commit.author.email))
authored_date = datetime.datetime.fromtimestamp( authored_date = datetime.datetime.fromtimestamp(
commit.authored_date, commit.authored_date,
@ -47,13 +47,13 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]:
tz=datetime.UTC, tz=datetime.UTC,
) )
yield WorkSample( yield ActivitySample(
labels=tuple(labels), labels=tuple(labels),
start_at=None, start_at=None,
end_at=authored_date, end_at=authored_date,
) )
if authored_date != committed_date: if authored_date != committed_date:
yield WorkSample( yield ActivitySample(
labels=tuple(labels), labels=tuple(labels),
start_at=None, start_at=None,
end_at=committed_date, end_at=committed_date,
@ -61,7 +61,7 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]:
del labels del labels
def iterate_samples_from_git_repository(repo_path: Path) -> Iterator[WorkSample]: def iterate_samples_from_git_repository(repo_path: Path) -> Iterator[ActivitySample]:
try: try:
yield from get_samples_from_project(git.Repo(repo_path)) yield from get_samples_from_project(git.Repo(repo_path))
except git.exc.InvalidGitRepositoryError: except git.exc.InvalidGitRepositoryError:

29
personal_data/activity.py Normal file
View File

@ -0,0 +1,29 @@
import dataclasses
import datetime
from collections.abc import Sequence
HIDDEN_LABEL_CATEGORY = '__'
@dataclasses.dataclass(frozen=True, order=True)
class Label:
category: str
label: str
def __post_init__(self):
assert self.category is not None
assert ':' not in self.category
assert self.label is not None
@dataclasses.dataclass(frozen=True, order=True)
class ActivitySample:
labels: Sequence[Label]
start_at: datetime.datetime | None
end_at: datetime.datetime | None
@dataclasses.dataclass(frozen=True, order=True)
class RealizedActivitySample(ActivitySample):
start_at: datetime.datetime
end_at: datetime.datetime

150
personal_data/csv_import.py Normal file
View File

@ -0,0 +1,150 @@
import csv
import dataclasses
import datetime
import decimal
import typing
import urllib.parse
from collections.abc import Callable
from decimal import Decimal
from pathlib import Path
from typing import Any
from frozendict import frozendict
CSV_DIALECT = 'one_true_dialect'
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
T = typing.TypeVar('T')
def try_value(fn: Callable[[str], T], s: str) -> T | None:
try:
return fn(s)
except (ValueError, decimal.InvalidOperation):
return None
def csv_str_to_value(
s: str,
) -> (
str
| Decimal
| datetime.date
| datetime.datetime
| urllib.parse.ParseResult
| bool
| None
):
assert not isinstance(s, list) # TODO?
if s is None:
return None
s = s.strip()
if len(s) == 0:
return None
if (v_decimal := try_value(Decimal, s)) is not None:
return v_decimal
if (v_date := try_value(datetime.date.fromisoformat, s)) is not None:
return v_date
if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None:
return v_datetime
if s.startswith(('http://', 'https://')):
return urllib.parse.urlparse(s)
if s.lower() == 'false':
return False
if s.lower() == 'true':
return True
if s.lower() == 'none':
return None
return s
def load_csv_file(csv_file: Path, sniff=False) -> list[frozendict[str, typing.Any]]:
dicts: list[frozendict] = []
with open(csv_file) as csvfile:
if sniff:
dialect = csv.Sniffer().sniff(csvfile.read(1024))
csvfile.seek(0)
else:
dialect = CSV_DIALECT
reader = csv.DictReader(csvfile, dialect=dialect)
for row in reader:
for k in list(row.keys()):
orig = row[k]
row[k] = csv_str_to_value(orig)
if row[k] is None:
del row[k]
del k, orig
dicts.append(frozendict(row))
del row
del csvfile
return dicts
@dataclasses.dataclass
class PossibleKeys:
time_start: list[str]
time_end: list[str]
duration: list[str]
name: list[str]
image: list[str]
misc: list[str]
def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
# Select data
time_keys = [k for k, v in event_data.items() if isinstance(v, datetime.date)]
duration_keys = [
k
for k, v in event_data.items()
if isinstance(v, Decimal) and 'duration_seconds' in k
]
name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
image_keys = [
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
]
misc_keys = list(event_data.keys())
for k in image_keys:
if k in misc_keys:
misc_keys.remove(k)
del k
for k in time_keys:
if k in misc_keys:
misc_keys.remove(k)
del k
time_start_keys = [k for k in time_keys if 'start' in k.lower()]
time_end_keys = [
k
for k in time_keys
if 'end' in k.lower() or 'stop' in k.lower() or 'last' in k.lower()
]
return PossibleKeys(
time_start=time_start_keys,
time_end=time_end_keys,
duration=duration_keys,
name=name_keys,
image=image_keys,
misc=misc_keys,
)
def start_end(
sample: dict[str, Any], keys: PossibleKeys,
) -> tuple[datetime.datetime | None, datetime.datetime | None]:
if keys.time_start and keys.time_end:
return (sample[keys.time_start[0]], sample[keys.time_end[0]])
if keys.time_start and keys.duration:
start = sample[keys.time_start[0]]
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
return (start, start + duration)
if keys.time_start:
start = sample[keys.time_start[0]]
return (start, None)
if keys.time_end:
return (None, sample[keys.time_end[0]])
return (None, None)

View File

@ -57,12 +57,12 @@ def parse_time(text: str) -> datetime.datetime:
time = try_parse(text, '%d %b %Y %I:%M:%S %p') time = try_parse(text, '%d %b %Y %I:%M:%S %p')
time = time or try_parse(text, '%d %b, %Y @ %I:%M%p') time = time or try_parse(text, '%d %b, %Y @ %I:%M%p')
if time is None and (m := try_parse(text, '%d %b @ %I:%M%p')): if time is None and (m := try_parse(text, '%d %b @ %I:%M%p')):
time = m.replace(year = NOW.year) time = m.replace(year=NOW.year)
assert time is not None, 'Could not parse format' assert time is not None, 'Could not parse format'
if time.tzinfo is None: if time.tzinfo is None:
time = time.replace(tzinfo=LOCAL_TIMEZONE ) time = time.replace(tzinfo=LOCAL_TIMEZONE)
assert time.tzinfo is not None, time assert time.tzinfo is not None, time
return time return time

View File

@ -1,68 +1,22 @@
import _csv import _csv
import csv import csv
import datetime import datetime
import decimal
import io import io
import logging import logging
import typing import typing
import urllib.parse import urllib.parse
from collections.abc import Callable, Iterable, Mapping, Sequence from collections.abc import Iterable, Mapping, Sequence
from decimal import Decimal
from pathlib import Path from pathlib import Path
from typing import Any
from frozendict import frozendict from frozendict import frozendict
from . import data from . import csv_import, data
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
CSV_DIALECT = 'one_true_dialect'
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
T = typing.TypeVar('T') def csv_safe_value(v: Any) -> str:
def try_value(fn: Callable[[str], T], s: str) -> T | None:
try:
return fn(s)
except (ValueError, decimal.InvalidOperation):
return None
def csv_str_to_value(
s: str,
) -> (
str
| Decimal
| datetime.date
| datetime.datetime
| urllib.parse.ParseResult
| bool
| None
):
if s is None:
return None
s = s.strip()
if len(s) == 0:
return None
if (v_decimal := try_value(Decimal, s)) is not None:
return v_decimal
if (v_date := try_value(datetime.date.fromisoformat, s)) is not None:
return v_date
if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None:
return v_datetime
if s.startswith(('http://', 'https://')):
return urllib.parse.urlparse(s)
if s.lower() == 'false':
return False
if s.lower() == 'true':
return True
if s.lower() == 'none':
return None
return s
def csv_safe_value(v: object) -> str:
if isinstance(v, urllib.parse.ParseResult): if isinstance(v, urllib.parse.ParseResult):
return v.geturl() return v.geturl()
if isinstance(v, datetime.datetime): if isinstance(v, datetime.datetime):
@ -145,32 +99,13 @@ def deduplicate_dicts(
def normalize_dict(d: dict[str, typing.Any]) -> frozendict[str, typing.Any]: def normalize_dict(d: dict[str, typing.Any]) -> frozendict[str, typing.Any]:
return frozendict( return frozendict(
{ {
k: csv_str_to_value(str(v)) k: csv_import.csv_str_to_value(str(v))
for k, v in d.items() for k, v in d.items()
if csv_str_to_value(str(v)) is not None if csv_import.csv_str_to_value(str(v)) is not None
}, },
) )
def load_csv_file(csv_file: Path) -> list[frozendict[str, typing.Any]]:
dicts: list[frozendict] = []
with open(csv_file) as csvfile:
dialect = csv.Sniffer().sniff(csvfile.read(1024))
csvfile.seek(0)
reader = csv.DictReader(csvfile, dialect=dialect)
for row in reader:
for k in list(row.keys()):
orig = row[k]
row[k] = csv_str_to_value(orig)
if row[k] is None:
del row[k]
del k, orig
dicts.append(frozendict(row))
del row
del csvfile
return dicts
def extend_csv_file( def extend_csv_file(
csv_file: Path, csv_file: Path,
new_dicts: list[dict[str, typing.Any]], new_dicts: list[dict[str, typing.Any]],
@ -180,7 +115,7 @@ def extend_csv_file(
assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns
try: try:
dicts = load_csv_file(csv_file) dicts = csv_import.load_csv_file(csv_file)
except (FileNotFoundError, _csv.Error) as e: except (FileNotFoundError, _csv.Error) as e:
logger.info('Creating file: %s', csv_file) logger.info('Creating file: %s', csv_file)
dicts = [] dicts = []
@ -199,7 +134,7 @@ def extend_csv_file(
writer = csv.DictWriter( writer = csv.DictWriter(
csvfile_in_memory, csvfile_in_memory,
fieldnames=fieldnames, fieldnames=fieldnames,
dialect=CSV_DIALECT, dialect=csv_import.CSV_DIALECT,
) )
writer.writeheader() writer.writeheader()
for d in dicts: for d in dicts:

24
test/test_csv_import.py Normal file
View File

@ -0,0 +1,24 @@
import datetime
import frozendict
from personal_data.csv_import import determine_possible_keys
def test_determine_possible_keys():
data = frozendict.frozendict(
{
'game.name': 'Halo',
'me.last_played_time': datetime.datetime(
2021, 6, 13, 19, 12, 21, tzinfo=datetime.timezone.utc,
),
'trophy.name': 'Test',
'trophy.desc': 'Description',
},
)
keys = determine_possible_keys(data)
assert keys.time_start == []
assert keys.time_end == ['me.last_played_time']
assert keys.duration == []
assert len(keys.name) == 3

View File

@ -3,13 +3,37 @@ from decimal import Decimal
import pytest import pytest
from personal_data.util import csv_str_to_value from personal_data.csv_import import csv_str_to_value
PARSE_MAPPINGS = [ PARSE_MAPPINGS = [
( (
'2024-04-28 21:35:40+00:00', '2024-04-28 21:35:40+00:00',
datetime.datetime(2024, 4, 28, 21, 35, 40, tzinfo=datetime.UTC), datetime.datetime(2024, 4, 28, 21, 35, 40, tzinfo=datetime.UTC),
), ),
(
'2024-07-06 19:30:11+02:00',
datetime.datetime(
2024,
7,
6,
19,
30,
11,
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
),
),
(
'2023-10-21 11:43:27+02:00',
datetime.datetime(
2023,
10,
21,
11,
43,
27,
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
),
),
( (
'0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38', '0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38',
'0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38', '0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38',