1
0

Compare commits

...

3 Commits

Author SHA1 Message Date
a64cbc6186
obsidian_import use imported functionality
All checks were successful
Test Python / Test (push) Successful in 34s
2024-10-14 20:38:35 +02:00
c226ac623c
Activity merging 2024-10-14 20:18:34 +02:00
c07c371939
Moved more logic to personal_data 2024-10-14 19:43:34 +02:00
9 changed files with 297 additions and 197 deletions

View File

@ -25,170 +25,4 @@ And the ([Hamster](https://github.com/projecthamster/hamster)) manual time track
![](docs/obligatory-hamster.png) ![](docs/obligatory-hamster.png)
""" """
import argparse __all__: list[str] = []
import datetime
import logging
import sys
from collections.abc import Iterator
from pathlib import Path
from personal_data.activity import (
ActivitySample,
RealizedActivitySample,
)
from .format import cli, icalendar
from .source import csv_file, git_repo
logger = logging.getLogger(__name__)
DEFAULT_ESTIMATED_DURATION = datetime.timedelta(hours=1)
ZERO_DURATION = datetime.timedelta(seconds=0)
HOUR = datetime.timedelta(hours=1)
MINUTE = datetime.timedelta(minutes=1)
def filter_samples(
samples: list[ActivitySample],
sample_filter: set[str],
) -> list[ActivitySample]:
assert len(sample_filter) > 0
return [s for s in samples if set(s.labels).intersection(sample_filter)]
def heuristically_realize_samples(
samples: list[ActivitySample],
) -> Iterator[RealizedActivitySample]:
"""Secret sauce.
Guarentees that:
* No samples overlap.
"""
previous_sample_end = None
for sample in samples:
end_at = sample.end_at
if previous_sample_end is None:
if end_at.tzinfo:
previous_sample_end = datetime.datetime.fromtimestamp(0, datetime.UTC)
else:
previous_sample_end = datetime.datetime.fromtimestamp(0)
assert previous_sample_end <= end_at, 'Iterating in incorrect order'
# TODO: Allow end_at is None
start_at = sample.start_at
if start_at is None:
estimated_duration: datetime.timedelta = DEFAULT_ESTIMATED_DURATION
start_at = max(previous_sample_end, end_at - estimated_duration)
del estimated_duration
yield RealizedActivitySample(
labels=sample.labels, end_at=end_at, start_at=start_at,
)
previous_sample_end = sample.end_at
del sample
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
'--git-repo',
action='extend',
nargs='+',
type=Path,
dest='repositories',
default=[],
)
parser.add_argument(
'--csv-file',
action='extend',
nargs='+',
type=Path,
dest='csv_files',
default=[],
)
parser.add_argument(
'--filter',
action='extend',
nargs='+',
type=str,
dest='sample_filter',
default=[],
)
parser.add_argument(
'--format',
action='store',
type=str,
dest='format_mode',
default='cli_report',
choices=['cli_report', 'icalendar'],
)
parser.add_argument(
'--out',
action='store',
type=Path,
dest='output_file',
default='output/samples.ics',
)
return parser.parse_args()
def load_samples(args) -> set[ActivitySample]:
shared_time_stamps_set: set[ActivitySample] = set()
# Git repositories
for repo_path in args.repositories:
logger.warning('Determine commits from %s', repo_path)
shared_time_stamps_set |= set(
git_repo.iterate_samples_from_git_repository(repo_path),
)
del repo_path
# CSV Files
for csv_path in args.csv_files:
logger.warning('Load samples from %s', csv_path)
shared_time_stamps_set |= set(
csv_file.iterate_samples_from_csv_file(csv_path),
)
del csv_path
return shared_time_stamps_set
def main():
logging.basicConfig()
args = parse_arguments()
# Determine samples
shared_time_stamps_set = load_samples(args)
# Sort samples
shared_time_stamps = sorted(shared_time_stamps_set, key=lambda s: s.end_at)
del shared_time_stamps_set
# Filter samples
sample_filter = args.sample_filter
if len(sample_filter) != 0:
logger.warning('Filtering %s samples', len(shared_time_stamps))
shared_time_stamps = filter_samples(shared_time_stamps, sample_filter)
logger.warning('Filtered down to %s samples', len(shared_time_stamps))
# Heuristic samples
logger.warning('Realizing %s samples', len(shared_time_stamps))
shared_time_stamps = list(heuristically_realize_samples(shared_time_stamps))
# Output format
if args.format_mode == 'cli_report':
for t in cli.generate_report(shared_time_stamps):
sys.stdout.write(t)
elif args.format_mode == 'icalendar':
icalendar.generate_icalendar_file(
shared_time_stamps,
file=args.output_file,
)

View File

@ -1,4 +1,140 @@
from git_time_tracker import main import argparse
import logging
import sys
from pathlib import Path
from personal_data.activity import (
ActivitySample,
heuristically_realize_samples,
merge_adjacent_samples,
)
from .format import cli, icalendar
from .source import csv_file, git_repo
logger = logging.getLogger(__name__)
def filter_samples(
samples: list[ActivitySample],
sample_filter: set[str],
) -> list[ActivitySample]:
assert len(sample_filter) > 0
return [s for s in samples if set(s.labels).intersection(sample_filter)]
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
'--git-repo',
action='extend',
nargs='+',
type=Path,
dest='repositories',
default=[],
)
parser.add_argument(
'--csv-file',
action='extend',
nargs='+',
type=Path,
dest='csv_files',
default=[],
)
parser.add_argument(
'--filter',
action='extend',
nargs='+',
type=str,
dest='sample_filter',
default=[],
)
parser.add_argument(
'--format',
action='store',
type=str,
dest='format_mode',
default='cli_report',
choices=['cli_report', 'icalendar'],
)
parser.add_argument(
'--merge',
action='store',
type=str,
dest='merge',
default=None,
)
parser.add_argument(
'--out',
action='store',
type=Path,
dest='output_file',
default='output/samples.ics',
)
return parser.parse_args()
def load_samples(args) -> set[ActivitySample]:
shared_time_stamps_set: set[ActivitySample] = set()
# Git repositories
for repo_path in args.repositories:
logger.warning('Determine commits from %s', repo_path)
shared_time_stamps_set |= set(
git_repo.iterate_samples_from_git_repository(repo_path),
)
del repo_path
# CSV Files
for csv_path in args.csv_files:
logger.warning('Load samples from %s', csv_path)
shared_time_stamps_set |= set(
csv_file.iterate_samples_from_csv_file(csv_path),
)
del csv_path
return shared_time_stamps_set
def main():
logging.basicConfig()
logging.getLogger('git_time_tracker').setLevel('INFO')
args = parse_arguments()
# Determine samples
shared_time_stamps_set = load_samples(args)
# Sort samples
shared_time_stamps = sorted(shared_time_stamps_set, key=lambda s: s.end_at)
del shared_time_stamps_set
# Filter samples
sample_filter = args.sample_filter
if len(sample_filter) != 0:
logger.warning('Filtering %s samples', len(shared_time_stamps))
shared_time_stamps = filter_samples(shared_time_stamps, sample_filter)
logger.warning('Filtered down to %s samples', len(shared_time_stamps))
# Heuristic samples
logger.warning('Realizing %s samples', len(shared_time_stamps))
shared_time_stamps = list(heuristically_realize_samples(shared_time_stamps))
# Merge adjacent
if args.merge:
shared_time_stamps = merge_adjacent_samples(shared_time_stamps, args.merge)
logger.warning('Merged to %s samples', len(shared_time_stamps))
# Output format
if args.format_mode == 'cli_report':
for t in cli.generate_report(shared_time_stamps):
sys.stdout.write(t)
elif args.format_mode == 'icalendar':
icalendar.generate_icalendar_file(
shared_time_stamps,
file=args.output_file,
)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -16,9 +16,9 @@ def create_title(sample: RealizedActivitySample) -> tuple[str, str]:
if label.category in {HIDDEN_LABEL_CATEGORY, 'author'}: if label.category in {HIDDEN_LABEL_CATEGORY, 'author'}:
continue continue
if len(ls) == 0: if len(ls) == 0:
ls.append(label.label) ls.append(str(label.label))
else: else:
desc.append(label.label) desc.append(str(label.label))
return ' '.join(ls), '\n'.join(desc) return ' '.join(ls), '\n'.join(desc)

View File

@ -1,10 +1,14 @@
from collections.abc import Iterator from collections.abc import Iterator
from logging import getLogger
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from personal_data.activity import ActivitySample, Label from personal_data.activity import ActivitySample, Label
from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end
print(__name__)
logger = getLogger(__name__)
def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivitySample]: def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivitySample]:
assert len(rows) > 0 assert len(rows) > 0
@ -13,6 +17,7 @@ def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivityS
if True: if True:
event_data = rows[len(rows) // 2] # Hopefully select a useful representative. event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
possible_keys = determine_possible_keys(event_data) possible_keys = determine_possible_keys(event_data)
logger.info('Found possible keys: %s', possible_keys)
del event_data del event_data
assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1 assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1

View File

@ -7,8 +7,10 @@ import datetime
from logging import getLogger from logging import getLogger
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from collections.abc import Iterator
from personal_data.util import load_csv_file from personal_data.csv_import import start_end, determine_possible_keys, load_csv_file
from personal_data.activity import ActivitySample, Label, RealizedActivitySample, heuristically_realize_samples
from .obsidian import Event, ObsidianVault from .obsidian import Event, ObsidianVault
@ -17,6 +19,32 @@ logger = getLogger(__name__)
Row = dict[str, Any] Row = dict[str, Any]
Rows = list[Row] Rows = list[Row]
def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]:
assert len(rows) > 0
if True:
event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
possible_keys = determine_possible_keys(event_data)
logger.info('Found possible keys: %s', possible_keys)
del event_data
assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1
assert len(possible_keys.image) >= 0
for event_data in rows:
(start_at, end_at) = start_end(event_data, possible_keys)
labels = [Label(k, event_data[k]) for k in possible_keys.misc]
# Create event
yield ActivitySample(
labels=tuple(labels),
start_at=start_at,
end_at=end_at,
)
del event_data
def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int: def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int:
num_updated = 0 num_updated = 0
@ -73,33 +101,35 @@ def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int: def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int:
# TODO: Update to using git_time_tracker event parsing system
verb = 'Watched' verb = 'Watched'
samples = heuristically_realize_samples(list(iterate_samples_from_rows(rows)))
samples_per_date: dict[datetime.date, list[RealizedActivitySample]] = {}
for sample in samples:
date: datetime.date = sample.start_at.date()
samples_per_date.setdefault(date, [])
samples_per_date[date].append(sample)
del date, sample
del rows
def map_to_event(sample: RealizedActivitySample) -> Event:
comment = '{} Episode {}: *{}*'.format(
sample.single_label_with_category('season.name'),
sample.single_label_with_category('episode.index'),
sample.single_label_with_category('episode.name'),
)
return Event(sample.start_at.time(),
sample.end_at.time(),
verb,
sample.single_label_with_category('series.name'),
comment,
)
num_updated = 0 num_updated = 0
rows_per_date = {} for date, samples in samples_per_date.items():
for row in rows: events = [map_to_event(sample) for sample in samples]
date = row['me.last_played_time'].date()
rows_per_date.setdefault(date, [])
rows_per_date[date].append(row)
del date, row
del rows
def map_to_event(row: Row) -> Event:
start = (
row['me.last_played_time'].time().replace(second=0, microsecond=0, fold=0)
)
end = start
comment = '{} Episode {}: *{}*'.format(
row['season.name'],
row['episode.index'],
row['episode.name'],
)
return Event(start, end, verb, row['series.name'], comment)
for date, rows in rows_per_date.items():
events = [map_to_event(row) for row in rows]
was_updated = vault.add_events(date, events) was_updated = vault.add_events(date, events)
if was_updated: if was_updated:

View File

@ -39,6 +39,7 @@ def parse_arguments():
def main(): def main():
logging.basicConfig() logging.basicConfig()
logging.getLogger('personal_data').setLevel('INFO') logging.getLogger('personal_data').setLevel('INFO')
args = parse_arguments() args = parse_arguments()
scraper_filter = frozenset(args.fetchers) scraper_filter = frozenset(args.fetchers)

View File

@ -1,8 +1,9 @@
import dataclasses import dataclasses
import datetime import datetime
from collections.abc import Sequence from collections.abc import Iterator, Sequence
HIDDEN_LABEL_CATEGORY = '__' HIDDEN_LABEL_CATEGORY = '__'
DEFAULT_ESTIMATED_DURATION = datetime.timedelta(hours=1)
@dataclasses.dataclass(frozen=True, order=True) @dataclasses.dataclass(frozen=True, order=True)
@ -22,8 +23,89 @@ class ActivitySample:
start_at: datetime.datetime | None start_at: datetime.datetime | None
end_at: datetime.datetime | None end_at: datetime.datetime | None
def single_label_with_category(self, category: str) -> str:
for label in self.labels:
if label.category == category:
return label.label
return None
@dataclasses.dataclass(frozen=True, order=True) @dataclasses.dataclass(frozen=True, order=True)
class RealizedActivitySample(ActivitySample): class RealizedActivitySample(ActivitySample):
start_at: datetime.datetime start_at: datetime.datetime
end_at: datetime.datetime end_at: datetime.datetime
def heuristically_realize_samples(
samples: list[ActivitySample],
) -> Iterator[RealizedActivitySample]:
"""Secret sauce.
Guarentees that:
* No samples overlap.
"""
previous_sample_end = None
for sample in samples:
end_at = sample.end_at
if previous_sample_end is None:
if end_at.tzinfo:
previous_sample_end = datetime.datetime.fromtimestamp(0, datetime.UTC)
else:
previous_sample_end = datetime.datetime.fromtimestamp(0)
assert previous_sample_end <= end_at, 'Iterating in incorrect order'
# TODO: Allow end_at is None
start_at = sample.start_at
if start_at is None:
estimated_duration: datetime.timedelta = DEFAULT_ESTIMATED_DURATION
start_at = max(previous_sample_end, end_at - estimated_duration)
del estimated_duration
yield RealizedActivitySample(
labels=sample.labels,
end_at=end_at,
start_at=start_at,
)
previous_sample_end = sample.end_at
del sample
def mergable_labels(a: Sequence[Label], b: Sequence[Label]) -> Sequence[Label]:
return list(set(a).intersection(set(b)))
def merge_adjacent_samples(
samples: list[RealizedActivitySample], group_category: str,
) -> list[RealizedActivitySample]:
max_interval_between_samples = datetime.timedelta(minutes=5)
def can_merge(
before: RealizedActivitySample, after: RealizedActivitySample,
) -> bool:
if before.single_label_with_category(
group_category,
) != after.single_label_with_category(group_category):
return False
return (after.start_at - before.end_at) < max_interval_between_samples
samples.sort(key=lambda s: s.start_at)
new: list[RealizedActivitySample] = []
for s in samples:
if len(new) > 0 and can_merge(new[-1], s):
# TODO: Merge/strip attributes?
new[-1] = RealizedActivitySample(
labels=mergable_labels(new[-1].labels, s.labels),
start_at=new[-1].start_at,
end_at=s.end_at,
)
else:
new.append(s)
return new

View File

@ -132,7 +132,8 @@ def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
def start_end( def start_end(
sample: dict[str, Any], keys: PossibleKeys, sample: dict[str, Any],
keys: PossibleKeys,
) -> tuple[datetime.datetime | None, datetime.datetime | None]: ) -> tuple[datetime.datetime | None, datetime.datetime | None]:
if keys.time_start and keys.time_end: if keys.time_start and keys.time_end:
return (sample[keys.time_start[0]], sample[keys.time_end[0]]) return (sample[keys.time_start[0]], sample[keys.time_end[0]])
@ -142,6 +143,11 @@ def start_end(
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]])) duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
return (start, start + duration) return (start, start + duration)
if keys.time_end and keys.duration:
end = sample[keys.time_end[0]]
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
return (end - duration, end)
if keys.time_start: if keys.time_start:
start = sample[keys.time_start[0]] start = sample[keys.time_start[0]]
return (start, None) return (start, None)

View File

@ -10,7 +10,13 @@ def test_determine_possible_keys():
{ {
'game.name': 'Halo', 'game.name': 'Halo',
'me.last_played_time': datetime.datetime( 'me.last_played_time': datetime.datetime(
2021, 6, 13, 19, 12, 21, tzinfo=datetime.timezone.utc, 2021,
6,
13,
19,
12,
21,
tzinfo=datetime.timezone.utc,
), ),
'trophy.name': 'Test', 'trophy.name': 'Test',
'trophy.desc': 'Description', 'trophy.desc': 'Description',