Compare commits
No commits in common. "a64cbc61869130ac40e79a82c44f770b5708a181" and "72be664d82a1eeac9d2e59aa2f8196e26524a803" have entirely different histories.
a64cbc6186
...
72be664d82
|
@ -25,4 +25,170 @@ And the ([Hamster](https://github.com/projecthamster/hamster)) manual time track
|
|||
![](docs/obligatory-hamster.png)
|
||||
"""
|
||||
|
||||
__all__: list[str] = []
|
||||
import argparse
|
||||
import datetime
|
||||
import logging
|
||||
import sys
|
||||
from collections.abc import Iterator
|
||||
from pathlib import Path
|
||||
|
||||
from personal_data.activity import (
|
||||
ActivitySample,
|
||||
RealizedActivitySample,
|
||||
)
|
||||
|
||||
from .format import cli, icalendar
|
||||
from .source import csv_file, git_repo
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
DEFAULT_ESTIMATED_DURATION = datetime.timedelta(hours=1)
|
||||
ZERO_DURATION = datetime.timedelta(seconds=0)
|
||||
HOUR = datetime.timedelta(hours=1)
|
||||
MINUTE = datetime.timedelta(minutes=1)
|
||||
|
||||
|
||||
def filter_samples(
|
||||
samples: list[ActivitySample],
|
||||
sample_filter: set[str],
|
||||
) -> list[ActivitySample]:
|
||||
assert len(sample_filter) > 0
|
||||
return [s for s in samples if set(s.labels).intersection(sample_filter)]
|
||||
|
||||
|
||||
def heuristically_realize_samples(
|
||||
samples: list[ActivitySample],
|
||||
) -> Iterator[RealizedActivitySample]:
|
||||
"""Secret sauce.
|
||||
|
||||
Guarentees that:
|
||||
* No samples overlap.
|
||||
"""
|
||||
|
||||
previous_sample_end = None
|
||||
for sample in samples:
|
||||
end_at = sample.end_at
|
||||
|
||||
if previous_sample_end is None:
|
||||
if end_at.tzinfo:
|
||||
previous_sample_end = datetime.datetime.fromtimestamp(0, datetime.UTC)
|
||||
else:
|
||||
previous_sample_end = datetime.datetime.fromtimestamp(0)
|
||||
|
||||
assert previous_sample_end <= end_at, 'Iterating in incorrect order'
|
||||
|
||||
# TODO: Allow end_at is None
|
||||
|
||||
start_at = sample.start_at
|
||||
if start_at is None:
|
||||
estimated_duration: datetime.timedelta = DEFAULT_ESTIMATED_DURATION
|
||||
start_at = max(previous_sample_end, end_at - estimated_duration)
|
||||
del estimated_duration
|
||||
|
||||
yield RealizedActivitySample(
|
||||
labels=sample.labels, end_at=end_at, start_at=start_at,
|
||||
)
|
||||
|
||||
previous_sample_end = sample.end_at
|
||||
del sample
|
||||
|
||||
|
||||
def parse_arguments():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
'--git-repo',
|
||||
action='extend',
|
||||
nargs='+',
|
||||
type=Path,
|
||||
dest='repositories',
|
||||
default=[],
|
||||
)
|
||||
parser.add_argument(
|
||||
'--csv-file',
|
||||
action='extend',
|
||||
nargs='+',
|
||||
type=Path,
|
||||
dest='csv_files',
|
||||
default=[],
|
||||
)
|
||||
parser.add_argument(
|
||||
'--filter',
|
||||
action='extend',
|
||||
nargs='+',
|
||||
type=str,
|
||||
dest='sample_filter',
|
||||
default=[],
|
||||
)
|
||||
parser.add_argument(
|
||||
'--format',
|
||||
action='store',
|
||||
type=str,
|
||||
dest='format_mode',
|
||||
default='cli_report',
|
||||
choices=['cli_report', 'icalendar'],
|
||||
)
|
||||
parser.add_argument(
|
||||
'--out',
|
||||
action='store',
|
||||
type=Path,
|
||||
dest='output_file',
|
||||
default='output/samples.ics',
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def load_samples(args) -> set[ActivitySample]:
|
||||
shared_time_stamps_set: set[ActivitySample] = set()
|
||||
|
||||
# Git repositories
|
||||
for repo_path in args.repositories:
|
||||
logger.warning('Determine commits from %s', repo_path)
|
||||
shared_time_stamps_set |= set(
|
||||
git_repo.iterate_samples_from_git_repository(repo_path),
|
||||
)
|
||||
del repo_path
|
||||
|
||||
# CSV Files
|
||||
for csv_path in args.csv_files:
|
||||
logger.warning('Load samples from %s', csv_path)
|
||||
shared_time_stamps_set |= set(
|
||||
csv_file.iterate_samples_from_csv_file(csv_path),
|
||||
)
|
||||
del csv_path
|
||||
|
||||
return shared_time_stamps_set
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig()
|
||||
|
||||
args = parse_arguments()
|
||||
|
||||
# Determine samples
|
||||
shared_time_stamps_set = load_samples(args)
|
||||
|
||||
# Sort samples
|
||||
shared_time_stamps = sorted(shared_time_stamps_set, key=lambda s: s.end_at)
|
||||
del shared_time_stamps_set
|
||||
|
||||
# Filter samples
|
||||
sample_filter = args.sample_filter
|
||||
if len(sample_filter) != 0:
|
||||
logger.warning('Filtering %s samples', len(shared_time_stamps))
|
||||
shared_time_stamps = filter_samples(shared_time_stamps, sample_filter)
|
||||
logger.warning('Filtered down to %s samples', len(shared_time_stamps))
|
||||
|
||||
# Heuristic samples
|
||||
logger.warning('Realizing %s samples', len(shared_time_stamps))
|
||||
shared_time_stamps = list(heuristically_realize_samples(shared_time_stamps))
|
||||
|
||||
# Output format
|
||||
if args.format_mode == 'cli_report':
|
||||
for t in cli.generate_report(shared_time_stamps):
|
||||
sys.stdout.write(t)
|
||||
elif args.format_mode == 'icalendar':
|
||||
icalendar.generate_icalendar_file(
|
||||
shared_time_stamps,
|
||||
file=args.output_file,
|
||||
)
|
||||
|
|
|
@ -1,140 +1,4 @@
|
|||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from personal_data.activity import (
|
||||
ActivitySample,
|
||||
heuristically_realize_samples,
|
||||
merge_adjacent_samples,
|
||||
)
|
||||
|
||||
from .format import cli, icalendar
|
||||
from .source import csv_file, git_repo
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def filter_samples(
|
||||
samples: list[ActivitySample],
|
||||
sample_filter: set[str],
|
||||
) -> list[ActivitySample]:
|
||||
assert len(sample_filter) > 0
|
||||
return [s for s in samples if set(s.labels).intersection(sample_filter)]
|
||||
|
||||
|
||||
def parse_arguments():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
'--git-repo',
|
||||
action='extend',
|
||||
nargs='+',
|
||||
type=Path,
|
||||
dest='repositories',
|
||||
default=[],
|
||||
)
|
||||
parser.add_argument(
|
||||
'--csv-file',
|
||||
action='extend',
|
||||
nargs='+',
|
||||
type=Path,
|
||||
dest='csv_files',
|
||||
default=[],
|
||||
)
|
||||
parser.add_argument(
|
||||
'--filter',
|
||||
action='extend',
|
||||
nargs='+',
|
||||
type=str,
|
||||
dest='sample_filter',
|
||||
default=[],
|
||||
)
|
||||
parser.add_argument(
|
||||
'--format',
|
||||
action='store',
|
||||
type=str,
|
||||
dest='format_mode',
|
||||
default='cli_report',
|
||||
choices=['cli_report', 'icalendar'],
|
||||
)
|
||||
parser.add_argument(
|
||||
'--merge',
|
||||
action='store',
|
||||
type=str,
|
||||
dest='merge',
|
||||
default=None,
|
||||
)
|
||||
parser.add_argument(
|
||||
'--out',
|
||||
action='store',
|
||||
type=Path,
|
||||
dest='output_file',
|
||||
default='output/samples.ics',
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def load_samples(args) -> set[ActivitySample]:
|
||||
shared_time_stamps_set: set[ActivitySample] = set()
|
||||
|
||||
# Git repositories
|
||||
for repo_path in args.repositories:
|
||||
logger.warning('Determine commits from %s', repo_path)
|
||||
shared_time_stamps_set |= set(
|
||||
git_repo.iterate_samples_from_git_repository(repo_path),
|
||||
)
|
||||
del repo_path
|
||||
|
||||
# CSV Files
|
||||
for csv_path in args.csv_files:
|
||||
logger.warning('Load samples from %s', csv_path)
|
||||
shared_time_stamps_set |= set(
|
||||
csv_file.iterate_samples_from_csv_file(csv_path),
|
||||
)
|
||||
del csv_path
|
||||
|
||||
return shared_time_stamps_set
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig()
|
||||
logging.getLogger('git_time_tracker').setLevel('INFO')
|
||||
|
||||
args = parse_arguments()
|
||||
|
||||
# Determine samples
|
||||
shared_time_stamps_set = load_samples(args)
|
||||
|
||||
# Sort samples
|
||||
shared_time_stamps = sorted(shared_time_stamps_set, key=lambda s: s.end_at)
|
||||
del shared_time_stamps_set
|
||||
|
||||
# Filter samples
|
||||
sample_filter = args.sample_filter
|
||||
if len(sample_filter) != 0:
|
||||
logger.warning('Filtering %s samples', len(shared_time_stamps))
|
||||
shared_time_stamps = filter_samples(shared_time_stamps, sample_filter)
|
||||
logger.warning('Filtered down to %s samples', len(shared_time_stamps))
|
||||
|
||||
# Heuristic samples
|
||||
logger.warning('Realizing %s samples', len(shared_time_stamps))
|
||||
shared_time_stamps = list(heuristically_realize_samples(shared_time_stamps))
|
||||
|
||||
# Merge adjacent
|
||||
if args.merge:
|
||||
shared_time_stamps = merge_adjacent_samples(shared_time_stamps, args.merge)
|
||||
logger.warning('Merged to %s samples', len(shared_time_stamps))
|
||||
|
||||
# Output format
|
||||
if args.format_mode == 'cli_report':
|
||||
for t in cli.generate_report(shared_time_stamps):
|
||||
sys.stdout.write(t)
|
||||
elif args.format_mode == 'icalendar':
|
||||
icalendar.generate_icalendar_file(
|
||||
shared_time_stamps,
|
||||
file=args.output_file,
|
||||
)
|
||||
|
||||
from git_time_tracker import main
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
|
@ -16,9 +16,9 @@ def create_title(sample: RealizedActivitySample) -> tuple[str, str]:
|
|||
if label.category in {HIDDEN_LABEL_CATEGORY, 'author'}:
|
||||
continue
|
||||
if len(ls) == 0:
|
||||
ls.append(str(label.label))
|
||||
ls.append(label.label)
|
||||
else:
|
||||
desc.append(str(label.label))
|
||||
desc.append(label.label)
|
||||
return ' '.join(ls), '\n'.join(desc)
|
||||
|
||||
|
||||
|
|
|
@ -1,14 +1,10 @@
|
|||
from collections.abc import Iterator
|
||||
from logging import getLogger
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from personal_data.activity import ActivitySample, Label
|
||||
from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end
|
||||
|
||||
print(__name__)
|
||||
logger = getLogger(__name__)
|
||||
|
||||
|
||||
def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivitySample]:
|
||||
assert len(rows) > 0
|
||||
|
@ -17,7 +13,6 @@ def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivityS
|
|||
if True:
|
||||
event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
|
||||
possible_keys = determine_possible_keys(event_data)
|
||||
logger.info('Found possible keys: %s', possible_keys)
|
||||
del event_data
|
||||
|
||||
assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1
|
||||
|
|
|
@ -7,10 +7,8 @@ import datetime
|
|||
from logging import getLogger
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from collections.abc import Iterator
|
||||
|
||||
from personal_data.csv_import import start_end, determine_possible_keys, load_csv_file
|
||||
from personal_data.activity import ActivitySample, Label, RealizedActivitySample, heuristically_realize_samples
|
||||
from personal_data.util import load_csv_file
|
||||
|
||||
from .obsidian import Event, ObsidianVault
|
||||
|
||||
|
@ -19,32 +17,6 @@ logger = getLogger(__name__)
|
|||
Row = dict[str, Any]
|
||||
Rows = list[Row]
|
||||
|
||||
def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]:
|
||||
assert len(rows) > 0
|
||||
|
||||
if True:
|
||||
event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
|
||||
possible_keys = determine_possible_keys(event_data)
|
||||
logger.info('Found possible keys: %s', possible_keys)
|
||||
del event_data
|
||||
|
||||
assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1
|
||||
assert len(possible_keys.image) >= 0
|
||||
|
||||
for event_data in rows:
|
||||
(start_at, end_at) = start_end(event_data, possible_keys)
|
||||
labels = [Label(k, event_data[k]) for k in possible_keys.misc]
|
||||
|
||||
# Create event
|
||||
yield ActivitySample(
|
||||
labels=tuple(labels),
|
||||
start_at=start_at,
|
||||
end_at=end_at,
|
||||
)
|
||||
|
||||
del event_data
|
||||
|
||||
|
||||
|
||||
def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int:
|
||||
num_updated = 0
|
||||
|
@ -101,35 +73,33 @@ def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
|
|||
|
||||
|
||||
def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int:
|
||||
# TODO: Update to using git_time_tracker event parsing system
|
||||
verb = 'Watched'
|
||||
|
||||
samples = heuristically_realize_samples(list(iterate_samples_from_rows(rows)))
|
||||
|
||||
samples_per_date: dict[datetime.date, list[RealizedActivitySample]] = {}
|
||||
for sample in samples:
|
||||
date: datetime.date = sample.start_at.date()
|
||||
samples_per_date.setdefault(date, [])
|
||||
samples_per_date[date].append(sample)
|
||||
del date, sample
|
||||
del rows
|
||||
|
||||
def map_to_event(sample: RealizedActivitySample) -> Event:
|
||||
comment = '{} Episode {}: *{}*'.format(
|
||||
sample.single_label_with_category('season.name'),
|
||||
sample.single_label_with_category('episode.index'),
|
||||
sample.single_label_with_category('episode.name'),
|
||||
)
|
||||
return Event(sample.start_at.time(),
|
||||
sample.end_at.time(),
|
||||
verb,
|
||||
sample.single_label_with_category('series.name'),
|
||||
comment,
|
||||
)
|
||||
|
||||
num_updated = 0
|
||||
|
||||
for date, samples in samples_per_date.items():
|
||||
events = [map_to_event(sample) for sample in samples]
|
||||
rows_per_date = {}
|
||||
for row in rows:
|
||||
date = row['me.last_played_time'].date()
|
||||
rows_per_date.setdefault(date, [])
|
||||
rows_per_date[date].append(row)
|
||||
del date, row
|
||||
del rows
|
||||
|
||||
def map_to_event(row: Row) -> Event:
|
||||
start = (
|
||||
row['me.last_played_time'].time().replace(second=0, microsecond=0, fold=0)
|
||||
)
|
||||
end = start
|
||||
comment = '{} Episode {}: *{}*'.format(
|
||||
row['season.name'],
|
||||
row['episode.index'],
|
||||
row['episode.name'],
|
||||
)
|
||||
return Event(start, end, verb, row['series.name'], comment)
|
||||
|
||||
for date, rows in rows_per_date.items():
|
||||
events = [map_to_event(row) for row in rows]
|
||||
was_updated = vault.add_events(date, events)
|
||||
|
||||
if was_updated:
|
||||
|
|
|
@ -39,7 +39,6 @@ def parse_arguments():
|
|||
def main():
|
||||
logging.basicConfig()
|
||||
logging.getLogger('personal_data').setLevel('INFO')
|
||||
|
||||
args = parse_arguments()
|
||||
scraper_filter = frozenset(args.fetchers)
|
||||
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
import dataclasses
|
||||
import datetime
|
||||
from collections.abc import Iterator, Sequence
|
||||
from collections.abc import Sequence
|
||||
|
||||
HIDDEN_LABEL_CATEGORY = '__'
|
||||
DEFAULT_ESTIMATED_DURATION = datetime.timedelta(hours=1)
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True, order=True)
|
||||
|
@ -23,89 +22,8 @@ class ActivitySample:
|
|||
start_at: datetime.datetime | None
|
||||
end_at: datetime.datetime | None
|
||||
|
||||
def single_label_with_category(self, category: str) -> str:
|
||||
for label in self.labels:
|
||||
if label.category == category:
|
||||
return label.label
|
||||
return None
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True, order=True)
|
||||
class RealizedActivitySample(ActivitySample):
|
||||
start_at: datetime.datetime
|
||||
end_at: datetime.datetime
|
||||
|
||||
|
||||
def heuristically_realize_samples(
|
||||
samples: list[ActivitySample],
|
||||
) -> Iterator[RealizedActivitySample]:
|
||||
"""Secret sauce.
|
||||
|
||||
Guarentees that:
|
||||
* No samples overlap.
|
||||
"""
|
||||
|
||||
previous_sample_end = None
|
||||
for sample in samples:
|
||||
end_at = sample.end_at
|
||||
|
||||
if previous_sample_end is None:
|
||||
if end_at.tzinfo:
|
||||
previous_sample_end = datetime.datetime.fromtimestamp(0, datetime.UTC)
|
||||
else:
|
||||
previous_sample_end = datetime.datetime.fromtimestamp(0)
|
||||
|
||||
assert previous_sample_end <= end_at, 'Iterating in incorrect order'
|
||||
|
||||
# TODO: Allow end_at is None
|
||||
|
||||
start_at = sample.start_at
|
||||
if start_at is None:
|
||||
estimated_duration: datetime.timedelta = DEFAULT_ESTIMATED_DURATION
|
||||
start_at = max(previous_sample_end, end_at - estimated_duration)
|
||||
del estimated_duration
|
||||
|
||||
yield RealizedActivitySample(
|
||||
labels=sample.labels,
|
||||
end_at=end_at,
|
||||
start_at=start_at,
|
||||
)
|
||||
|
||||
previous_sample_end = sample.end_at
|
||||
del sample
|
||||
|
||||
|
||||
def mergable_labels(a: Sequence[Label], b: Sequence[Label]) -> Sequence[Label]:
|
||||
return list(set(a).intersection(set(b)))
|
||||
|
||||
|
||||
def merge_adjacent_samples(
|
||||
samples: list[RealizedActivitySample], group_category: str,
|
||||
) -> list[RealizedActivitySample]:
|
||||
max_interval_between_samples = datetime.timedelta(minutes=5)
|
||||
|
||||
def can_merge(
|
||||
before: RealizedActivitySample, after: RealizedActivitySample,
|
||||
) -> bool:
|
||||
if before.single_label_with_category(
|
||||
group_category,
|
||||
) != after.single_label_with_category(group_category):
|
||||
return False
|
||||
return (after.start_at - before.end_at) < max_interval_between_samples
|
||||
|
||||
samples.sort(key=lambda s: s.start_at)
|
||||
|
||||
new: list[RealizedActivitySample] = []
|
||||
|
||||
for s in samples:
|
||||
if len(new) > 0 and can_merge(new[-1], s):
|
||||
# TODO: Merge/strip attributes?
|
||||
new[-1] = RealizedActivitySample(
|
||||
labels=mergable_labels(new[-1].labels, s.labels),
|
||||
start_at=new[-1].start_at,
|
||||
end_at=s.end_at,
|
||||
)
|
||||
else:
|
||||
new.append(s)
|
||||
|
||||
return new
|
||||
|
|
|
@ -132,8 +132,7 @@ def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
|
|||
|
||||
|
||||
def start_end(
|
||||
sample: dict[str, Any],
|
||||
keys: PossibleKeys,
|
||||
sample: dict[str, Any], keys: PossibleKeys,
|
||||
) -> tuple[datetime.datetime | None, datetime.datetime | None]:
|
||||
if keys.time_start and keys.time_end:
|
||||
return (sample[keys.time_start[0]], sample[keys.time_end[0]])
|
||||
|
@ -143,11 +142,6 @@ def start_end(
|
|||
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
|
||||
return (start, start + duration)
|
||||
|
||||
if keys.time_end and keys.duration:
|
||||
end = sample[keys.time_end[0]]
|
||||
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
|
||||
return (end - duration, end)
|
||||
|
||||
if keys.time_start:
|
||||
start = sample[keys.time_start[0]]
|
||||
return (start, None)
|
||||
|
|
|
@ -10,13 +10,7 @@ def test_determine_possible_keys():
|
|||
{
|
||||
'game.name': 'Halo',
|
||||
'me.last_played_time': datetime.datetime(
|
||||
2021,
|
||||
6,
|
||||
13,
|
||||
19,
|
||||
12,
|
||||
21,
|
||||
tzinfo=datetime.timezone.utc,
|
||||
2021, 6, 13, 19, 12, 21, tzinfo=datetime.timezone.utc,
|
||||
),
|
||||
'trophy.name': 'Test',
|
||||
'trophy.desc': 'Description',
|
||||
|
|
Loading…
Reference in New Issue
Block a user