1
0

Compare commits

..

3 Commits

Author SHA1 Message Date
a64cbc6186
obsidian_import use imported functionality
All checks were successful
Test Python / Test (push) Successful in 34s
2024-10-14 20:38:35 +02:00
c226ac623c
Activity merging 2024-10-14 20:18:34 +02:00
c07c371939
Moved more logic to personal_data 2024-10-14 19:43:34 +02:00
9 changed files with 297 additions and 197 deletions

View File

@ -25,170 +25,4 @@ And the ([Hamster](https://github.com/projecthamster/hamster)) manual time track
![](docs/obligatory-hamster.png)
"""
import argparse
import datetime
import logging
import sys
from collections.abc import Iterator
from pathlib import Path
from personal_data.activity import (
ActivitySample,
RealizedActivitySample,
)
from .format import cli, icalendar
from .source import csv_file, git_repo
logger = logging.getLogger(__name__)
DEFAULT_ESTIMATED_DURATION = datetime.timedelta(hours=1)
ZERO_DURATION = datetime.timedelta(seconds=0)
HOUR = datetime.timedelta(hours=1)
MINUTE = datetime.timedelta(minutes=1)
def filter_samples(
samples: list[ActivitySample],
sample_filter: set[str],
) -> list[ActivitySample]:
assert len(sample_filter) > 0
return [s for s in samples if set(s.labels).intersection(sample_filter)]
def heuristically_realize_samples(
samples: list[ActivitySample],
) -> Iterator[RealizedActivitySample]:
"""Secret sauce.
Guarentees that:
* No samples overlap.
"""
previous_sample_end = None
for sample in samples:
end_at = sample.end_at
if previous_sample_end is None:
if end_at.tzinfo:
previous_sample_end = datetime.datetime.fromtimestamp(0, datetime.UTC)
else:
previous_sample_end = datetime.datetime.fromtimestamp(0)
assert previous_sample_end <= end_at, 'Iterating in incorrect order'
# TODO: Allow end_at is None
start_at = sample.start_at
if start_at is None:
estimated_duration: datetime.timedelta = DEFAULT_ESTIMATED_DURATION
start_at = max(previous_sample_end, end_at - estimated_duration)
del estimated_duration
yield RealizedActivitySample(
labels=sample.labels, end_at=end_at, start_at=start_at,
)
previous_sample_end = sample.end_at
del sample
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
'--git-repo',
action='extend',
nargs='+',
type=Path,
dest='repositories',
default=[],
)
parser.add_argument(
'--csv-file',
action='extend',
nargs='+',
type=Path,
dest='csv_files',
default=[],
)
parser.add_argument(
'--filter',
action='extend',
nargs='+',
type=str,
dest='sample_filter',
default=[],
)
parser.add_argument(
'--format',
action='store',
type=str,
dest='format_mode',
default='cli_report',
choices=['cli_report', 'icalendar'],
)
parser.add_argument(
'--out',
action='store',
type=Path,
dest='output_file',
default='output/samples.ics',
)
return parser.parse_args()
def load_samples(args) -> set[ActivitySample]:
shared_time_stamps_set: set[ActivitySample] = set()
# Git repositories
for repo_path in args.repositories:
logger.warning('Determine commits from %s', repo_path)
shared_time_stamps_set |= set(
git_repo.iterate_samples_from_git_repository(repo_path),
)
del repo_path
# CSV Files
for csv_path in args.csv_files:
logger.warning('Load samples from %s', csv_path)
shared_time_stamps_set |= set(
csv_file.iterate_samples_from_csv_file(csv_path),
)
del csv_path
return shared_time_stamps_set
def main():
logging.basicConfig()
args = parse_arguments()
# Determine samples
shared_time_stamps_set = load_samples(args)
# Sort samples
shared_time_stamps = sorted(shared_time_stamps_set, key=lambda s: s.end_at)
del shared_time_stamps_set
# Filter samples
sample_filter = args.sample_filter
if len(sample_filter) != 0:
logger.warning('Filtering %s samples', len(shared_time_stamps))
shared_time_stamps = filter_samples(shared_time_stamps, sample_filter)
logger.warning('Filtered down to %s samples', len(shared_time_stamps))
# Heuristic samples
logger.warning('Realizing %s samples', len(shared_time_stamps))
shared_time_stamps = list(heuristically_realize_samples(shared_time_stamps))
# Output format
if args.format_mode == 'cli_report':
for t in cli.generate_report(shared_time_stamps):
sys.stdout.write(t)
elif args.format_mode == 'icalendar':
icalendar.generate_icalendar_file(
shared_time_stamps,
file=args.output_file,
)
__all__: list[str] = []

View File

@ -1,4 +1,140 @@
from git_time_tracker import main
import argparse
import logging
import sys
from pathlib import Path
from personal_data.activity import (
ActivitySample,
heuristically_realize_samples,
merge_adjacent_samples,
)
from .format import cli, icalendar
from .source import csv_file, git_repo
logger = logging.getLogger(__name__)
def filter_samples(
samples: list[ActivitySample],
sample_filter: set[str],
) -> list[ActivitySample]:
assert len(sample_filter) > 0
return [s for s in samples if set(s.labels).intersection(sample_filter)]
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
'--git-repo',
action='extend',
nargs='+',
type=Path,
dest='repositories',
default=[],
)
parser.add_argument(
'--csv-file',
action='extend',
nargs='+',
type=Path,
dest='csv_files',
default=[],
)
parser.add_argument(
'--filter',
action='extend',
nargs='+',
type=str,
dest='sample_filter',
default=[],
)
parser.add_argument(
'--format',
action='store',
type=str,
dest='format_mode',
default='cli_report',
choices=['cli_report', 'icalendar'],
)
parser.add_argument(
'--merge',
action='store',
type=str,
dest='merge',
default=None,
)
parser.add_argument(
'--out',
action='store',
type=Path,
dest='output_file',
default='output/samples.ics',
)
return parser.parse_args()
def load_samples(args) -> set[ActivitySample]:
shared_time_stamps_set: set[ActivitySample] = set()
# Git repositories
for repo_path in args.repositories:
logger.warning('Determine commits from %s', repo_path)
shared_time_stamps_set |= set(
git_repo.iterate_samples_from_git_repository(repo_path),
)
del repo_path
# CSV Files
for csv_path in args.csv_files:
logger.warning('Load samples from %s', csv_path)
shared_time_stamps_set |= set(
csv_file.iterate_samples_from_csv_file(csv_path),
)
del csv_path
return shared_time_stamps_set
def main():
logging.basicConfig()
logging.getLogger('git_time_tracker').setLevel('INFO')
args = parse_arguments()
# Determine samples
shared_time_stamps_set = load_samples(args)
# Sort samples
shared_time_stamps = sorted(shared_time_stamps_set, key=lambda s: s.end_at)
del shared_time_stamps_set
# Filter samples
sample_filter = args.sample_filter
if len(sample_filter) != 0:
logger.warning('Filtering %s samples', len(shared_time_stamps))
shared_time_stamps = filter_samples(shared_time_stamps, sample_filter)
logger.warning('Filtered down to %s samples', len(shared_time_stamps))
# Heuristic samples
logger.warning('Realizing %s samples', len(shared_time_stamps))
shared_time_stamps = list(heuristically_realize_samples(shared_time_stamps))
# Merge adjacent
if args.merge:
shared_time_stamps = merge_adjacent_samples(shared_time_stamps, args.merge)
logger.warning('Merged to %s samples', len(shared_time_stamps))
# Output format
if args.format_mode == 'cli_report':
for t in cli.generate_report(shared_time_stamps):
sys.stdout.write(t)
elif args.format_mode == 'icalendar':
icalendar.generate_icalendar_file(
shared_time_stamps,
file=args.output_file,
)
if __name__ == '__main__':
main()

View File

@ -16,9 +16,9 @@ def create_title(sample: RealizedActivitySample) -> tuple[str, str]:
if label.category in {HIDDEN_LABEL_CATEGORY, 'author'}:
continue
if len(ls) == 0:
ls.append(label.label)
ls.append(str(label.label))
else:
desc.append(label.label)
desc.append(str(label.label))
return ' '.join(ls), '\n'.join(desc)

View File

@ -1,10 +1,14 @@
from collections.abc import Iterator
from logging import getLogger
from pathlib import Path
from typing import Any
from personal_data.activity import ActivitySample, Label
from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end
print(__name__)
logger = getLogger(__name__)
def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivitySample]:
assert len(rows) > 0
@ -13,6 +17,7 @@ def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivityS
if True:
event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
possible_keys = determine_possible_keys(event_data)
logger.info('Found possible keys: %s', possible_keys)
del event_data
assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1

View File

@ -7,8 +7,10 @@ import datetime
from logging import getLogger
from pathlib import Path
from typing import Any
from collections.abc import Iterator
from personal_data.util import load_csv_file
from personal_data.csv_import import start_end, determine_possible_keys, load_csv_file
from personal_data.activity import ActivitySample, Label, RealizedActivitySample, heuristically_realize_samples
from .obsidian import Event, ObsidianVault
@ -17,6 +19,32 @@ logger = getLogger(__name__)
Row = dict[str, Any]
Rows = list[Row]
def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]:
assert len(rows) > 0
if True:
event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
possible_keys = determine_possible_keys(event_data)
logger.info('Found possible keys: %s', possible_keys)
del event_data
assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1
assert len(possible_keys.image) >= 0
for event_data in rows:
(start_at, end_at) = start_end(event_data, possible_keys)
labels = [Label(k, event_data[k]) for k in possible_keys.misc]
# Create event
yield ActivitySample(
labels=tuple(labels),
start_at=start_at,
end_at=end_at,
)
del event_data
def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int:
num_updated = 0
@ -73,33 +101,35 @@ def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int:
# TODO: Update to using git_time_tracker event parsing system
verb = 'Watched'
samples = heuristically_realize_samples(list(iterate_samples_from_rows(rows)))
samples_per_date: dict[datetime.date, list[RealizedActivitySample]] = {}
for sample in samples:
date: datetime.date = sample.start_at.date()
samples_per_date.setdefault(date, [])
samples_per_date[date].append(sample)
del date, sample
del rows
def map_to_event(sample: RealizedActivitySample) -> Event:
comment = '{} Episode {}: *{}*'.format(
sample.single_label_with_category('season.name'),
sample.single_label_with_category('episode.index'),
sample.single_label_with_category('episode.name'),
)
return Event(sample.start_at.time(),
sample.end_at.time(),
verb,
sample.single_label_with_category('series.name'),
comment,
)
num_updated = 0
rows_per_date = {}
for row in rows:
date = row['me.last_played_time'].date()
rows_per_date.setdefault(date, [])
rows_per_date[date].append(row)
del date, row
del rows
def map_to_event(row: Row) -> Event:
start = (
row['me.last_played_time'].time().replace(second=0, microsecond=0, fold=0)
)
end = start
comment = '{} Episode {}: *{}*'.format(
row['season.name'],
row['episode.index'],
row['episode.name'],
)
return Event(start, end, verb, row['series.name'], comment)
for date, rows in rows_per_date.items():
events = [map_to_event(row) for row in rows]
for date, samples in samples_per_date.items():
events = [map_to_event(sample) for sample in samples]
was_updated = vault.add_events(date, events)
if was_updated:

View File

@ -39,6 +39,7 @@ def parse_arguments():
def main():
logging.basicConfig()
logging.getLogger('personal_data').setLevel('INFO')
args = parse_arguments()
scraper_filter = frozenset(args.fetchers)

View File

@ -1,8 +1,9 @@
import dataclasses
import datetime
from collections.abc import Sequence
from collections.abc import Iterator, Sequence
HIDDEN_LABEL_CATEGORY = '__'
DEFAULT_ESTIMATED_DURATION = datetime.timedelta(hours=1)
@dataclasses.dataclass(frozen=True, order=True)
@ -22,8 +23,89 @@ class ActivitySample:
start_at: datetime.datetime | None
end_at: datetime.datetime | None
def single_label_with_category(self, category: str) -> str:
for label in self.labels:
if label.category == category:
return label.label
return None
@dataclasses.dataclass(frozen=True, order=True)
class RealizedActivitySample(ActivitySample):
start_at: datetime.datetime
end_at: datetime.datetime
def heuristically_realize_samples(
samples: list[ActivitySample],
) -> Iterator[RealizedActivitySample]:
"""Secret sauce.
Guarentees that:
* No samples overlap.
"""
previous_sample_end = None
for sample in samples:
end_at = sample.end_at
if previous_sample_end is None:
if end_at.tzinfo:
previous_sample_end = datetime.datetime.fromtimestamp(0, datetime.UTC)
else:
previous_sample_end = datetime.datetime.fromtimestamp(0)
assert previous_sample_end <= end_at, 'Iterating in incorrect order'
# TODO: Allow end_at is None
start_at = sample.start_at
if start_at is None:
estimated_duration: datetime.timedelta = DEFAULT_ESTIMATED_DURATION
start_at = max(previous_sample_end, end_at - estimated_duration)
del estimated_duration
yield RealizedActivitySample(
labels=sample.labels,
end_at=end_at,
start_at=start_at,
)
previous_sample_end = sample.end_at
del sample
def mergable_labels(a: Sequence[Label], b: Sequence[Label]) -> Sequence[Label]:
return list(set(a).intersection(set(b)))
def merge_adjacent_samples(
samples: list[RealizedActivitySample], group_category: str,
) -> list[RealizedActivitySample]:
max_interval_between_samples = datetime.timedelta(minutes=5)
def can_merge(
before: RealizedActivitySample, after: RealizedActivitySample,
) -> bool:
if before.single_label_with_category(
group_category,
) != after.single_label_with_category(group_category):
return False
return (after.start_at - before.end_at) < max_interval_between_samples
samples.sort(key=lambda s: s.start_at)
new: list[RealizedActivitySample] = []
for s in samples:
if len(new) > 0 and can_merge(new[-1], s):
# TODO: Merge/strip attributes?
new[-1] = RealizedActivitySample(
labels=mergable_labels(new[-1].labels, s.labels),
start_at=new[-1].start_at,
end_at=s.end_at,
)
else:
new.append(s)
return new

View File

@ -132,7 +132,8 @@ def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
def start_end(
sample: dict[str, Any], keys: PossibleKeys,
sample: dict[str, Any],
keys: PossibleKeys,
) -> tuple[datetime.datetime | None, datetime.datetime | None]:
if keys.time_start and keys.time_end:
return (sample[keys.time_start[0]], sample[keys.time_end[0]])
@ -142,6 +143,11 @@ def start_end(
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
return (start, start + duration)
if keys.time_end and keys.duration:
end = sample[keys.time_end[0]]
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
return (end - duration, end)
if keys.time_start:
start = sample[keys.time_start[0]]
return (start, None)

View File

@ -10,7 +10,13 @@ def test_determine_possible_keys():
{
'game.name': 'Halo',
'me.last_played_time': datetime.datetime(
2021, 6, 13, 19, 12, 21, tzinfo=datetime.timezone.utc,
2021,
6,
13,
19,
12,
21,
tzinfo=datetime.timezone.utc,
),
'trophy.name': 'Test',
'trophy.desc': 'Description',