Compare commits
3 Commits
72be664d82
...
a64cbc6186
Author | SHA1 | Date | |
---|---|---|---|
a64cbc6186 | |||
c226ac623c | |||
c07c371939 |
|
@ -25,170 +25,4 @@ And the ([Hamster](https://github.com/projecthamster/hamster)) manual time track
|
||||||
![](docs/obligatory-hamster.png)
|
![](docs/obligatory-hamster.png)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import argparse
|
__all__: list[str] = []
|
||||||
import datetime
|
|
||||||
import logging
|
|
||||||
import sys
|
|
||||||
from collections.abc import Iterator
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
from personal_data.activity import (
|
|
||||||
ActivitySample,
|
|
||||||
RealizedActivitySample,
|
|
||||||
)
|
|
||||||
|
|
||||||
from .format import cli, icalendar
|
|
||||||
from .source import csv_file, git_repo
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
DEFAULT_ESTIMATED_DURATION = datetime.timedelta(hours=1)
|
|
||||||
ZERO_DURATION = datetime.timedelta(seconds=0)
|
|
||||||
HOUR = datetime.timedelta(hours=1)
|
|
||||||
MINUTE = datetime.timedelta(minutes=1)
|
|
||||||
|
|
||||||
|
|
||||||
def filter_samples(
|
|
||||||
samples: list[ActivitySample],
|
|
||||||
sample_filter: set[str],
|
|
||||||
) -> list[ActivitySample]:
|
|
||||||
assert len(sample_filter) > 0
|
|
||||||
return [s for s in samples if set(s.labels).intersection(sample_filter)]
|
|
||||||
|
|
||||||
|
|
||||||
def heuristically_realize_samples(
|
|
||||||
samples: list[ActivitySample],
|
|
||||||
) -> Iterator[RealizedActivitySample]:
|
|
||||||
"""Secret sauce.
|
|
||||||
|
|
||||||
Guarentees that:
|
|
||||||
* No samples overlap.
|
|
||||||
"""
|
|
||||||
|
|
||||||
previous_sample_end = None
|
|
||||||
for sample in samples:
|
|
||||||
end_at = sample.end_at
|
|
||||||
|
|
||||||
if previous_sample_end is None:
|
|
||||||
if end_at.tzinfo:
|
|
||||||
previous_sample_end = datetime.datetime.fromtimestamp(0, datetime.UTC)
|
|
||||||
else:
|
|
||||||
previous_sample_end = datetime.datetime.fromtimestamp(0)
|
|
||||||
|
|
||||||
assert previous_sample_end <= end_at, 'Iterating in incorrect order'
|
|
||||||
|
|
||||||
# TODO: Allow end_at is None
|
|
||||||
|
|
||||||
start_at = sample.start_at
|
|
||||||
if start_at is None:
|
|
||||||
estimated_duration: datetime.timedelta = DEFAULT_ESTIMATED_DURATION
|
|
||||||
start_at = max(previous_sample_end, end_at - estimated_duration)
|
|
||||||
del estimated_duration
|
|
||||||
|
|
||||||
yield RealizedActivitySample(
|
|
||||||
labels=sample.labels, end_at=end_at, start_at=start_at,
|
|
||||||
)
|
|
||||||
|
|
||||||
previous_sample_end = sample.end_at
|
|
||||||
del sample
|
|
||||||
|
|
||||||
|
|
||||||
def parse_arguments():
|
|
||||||
parser = argparse.ArgumentParser()
|
|
||||||
parser.add_argument(
|
|
||||||
'--git-repo',
|
|
||||||
action='extend',
|
|
||||||
nargs='+',
|
|
||||||
type=Path,
|
|
||||||
dest='repositories',
|
|
||||||
default=[],
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
'--csv-file',
|
|
||||||
action='extend',
|
|
||||||
nargs='+',
|
|
||||||
type=Path,
|
|
||||||
dest='csv_files',
|
|
||||||
default=[],
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
'--filter',
|
|
||||||
action='extend',
|
|
||||||
nargs='+',
|
|
||||||
type=str,
|
|
||||||
dest='sample_filter',
|
|
||||||
default=[],
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
'--format',
|
|
||||||
action='store',
|
|
||||||
type=str,
|
|
||||||
dest='format_mode',
|
|
||||||
default='cli_report',
|
|
||||||
choices=['cli_report', 'icalendar'],
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
'--out',
|
|
||||||
action='store',
|
|
||||||
type=Path,
|
|
||||||
dest='output_file',
|
|
||||||
default='output/samples.ics',
|
|
||||||
)
|
|
||||||
return parser.parse_args()
|
|
||||||
|
|
||||||
|
|
||||||
def load_samples(args) -> set[ActivitySample]:
|
|
||||||
shared_time_stamps_set: set[ActivitySample] = set()
|
|
||||||
|
|
||||||
# Git repositories
|
|
||||||
for repo_path in args.repositories:
|
|
||||||
logger.warning('Determine commits from %s', repo_path)
|
|
||||||
shared_time_stamps_set |= set(
|
|
||||||
git_repo.iterate_samples_from_git_repository(repo_path),
|
|
||||||
)
|
|
||||||
del repo_path
|
|
||||||
|
|
||||||
# CSV Files
|
|
||||||
for csv_path in args.csv_files:
|
|
||||||
logger.warning('Load samples from %s', csv_path)
|
|
||||||
shared_time_stamps_set |= set(
|
|
||||||
csv_file.iterate_samples_from_csv_file(csv_path),
|
|
||||||
)
|
|
||||||
del csv_path
|
|
||||||
|
|
||||||
return shared_time_stamps_set
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
logging.basicConfig()
|
|
||||||
|
|
||||||
args = parse_arguments()
|
|
||||||
|
|
||||||
# Determine samples
|
|
||||||
shared_time_stamps_set = load_samples(args)
|
|
||||||
|
|
||||||
# Sort samples
|
|
||||||
shared_time_stamps = sorted(shared_time_stamps_set, key=lambda s: s.end_at)
|
|
||||||
del shared_time_stamps_set
|
|
||||||
|
|
||||||
# Filter samples
|
|
||||||
sample_filter = args.sample_filter
|
|
||||||
if len(sample_filter) != 0:
|
|
||||||
logger.warning('Filtering %s samples', len(shared_time_stamps))
|
|
||||||
shared_time_stamps = filter_samples(shared_time_stamps, sample_filter)
|
|
||||||
logger.warning('Filtered down to %s samples', len(shared_time_stamps))
|
|
||||||
|
|
||||||
# Heuristic samples
|
|
||||||
logger.warning('Realizing %s samples', len(shared_time_stamps))
|
|
||||||
shared_time_stamps = list(heuristically_realize_samples(shared_time_stamps))
|
|
||||||
|
|
||||||
# Output format
|
|
||||||
if args.format_mode == 'cli_report':
|
|
||||||
for t in cli.generate_report(shared_time_stamps):
|
|
||||||
sys.stdout.write(t)
|
|
||||||
elif args.format_mode == 'icalendar':
|
|
||||||
icalendar.generate_icalendar_file(
|
|
||||||
shared_time_stamps,
|
|
||||||
file=args.output_file,
|
|
||||||
)
|
|
||||||
|
|
|
@ -1,4 +1,140 @@
|
||||||
from git_time_tracker import main
|
import argparse
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from personal_data.activity import (
|
||||||
|
ActivitySample,
|
||||||
|
heuristically_realize_samples,
|
||||||
|
merge_adjacent_samples,
|
||||||
|
)
|
||||||
|
|
||||||
|
from .format import cli, icalendar
|
||||||
|
from .source import csv_file, git_repo
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def filter_samples(
|
||||||
|
samples: list[ActivitySample],
|
||||||
|
sample_filter: set[str],
|
||||||
|
) -> list[ActivitySample]:
|
||||||
|
assert len(sample_filter) > 0
|
||||||
|
return [s for s in samples if set(s.labels).intersection(sample_filter)]
|
||||||
|
|
||||||
|
|
||||||
|
def parse_arguments():
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument(
|
||||||
|
'--git-repo',
|
||||||
|
action='extend',
|
||||||
|
nargs='+',
|
||||||
|
type=Path,
|
||||||
|
dest='repositories',
|
||||||
|
default=[],
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--csv-file',
|
||||||
|
action='extend',
|
||||||
|
nargs='+',
|
||||||
|
type=Path,
|
||||||
|
dest='csv_files',
|
||||||
|
default=[],
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--filter',
|
||||||
|
action='extend',
|
||||||
|
nargs='+',
|
||||||
|
type=str,
|
||||||
|
dest='sample_filter',
|
||||||
|
default=[],
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--format',
|
||||||
|
action='store',
|
||||||
|
type=str,
|
||||||
|
dest='format_mode',
|
||||||
|
default='cli_report',
|
||||||
|
choices=['cli_report', 'icalendar'],
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--merge',
|
||||||
|
action='store',
|
||||||
|
type=str,
|
||||||
|
dest='merge',
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--out',
|
||||||
|
action='store',
|
||||||
|
type=Path,
|
||||||
|
dest='output_file',
|
||||||
|
default='output/samples.ics',
|
||||||
|
)
|
||||||
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
def load_samples(args) -> set[ActivitySample]:
|
||||||
|
shared_time_stamps_set: set[ActivitySample] = set()
|
||||||
|
|
||||||
|
# Git repositories
|
||||||
|
for repo_path in args.repositories:
|
||||||
|
logger.warning('Determine commits from %s', repo_path)
|
||||||
|
shared_time_stamps_set |= set(
|
||||||
|
git_repo.iterate_samples_from_git_repository(repo_path),
|
||||||
|
)
|
||||||
|
del repo_path
|
||||||
|
|
||||||
|
# CSV Files
|
||||||
|
for csv_path in args.csv_files:
|
||||||
|
logger.warning('Load samples from %s', csv_path)
|
||||||
|
shared_time_stamps_set |= set(
|
||||||
|
csv_file.iterate_samples_from_csv_file(csv_path),
|
||||||
|
)
|
||||||
|
del csv_path
|
||||||
|
|
||||||
|
return shared_time_stamps_set
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
logging.basicConfig()
|
||||||
|
logging.getLogger('git_time_tracker').setLevel('INFO')
|
||||||
|
|
||||||
|
args = parse_arguments()
|
||||||
|
|
||||||
|
# Determine samples
|
||||||
|
shared_time_stamps_set = load_samples(args)
|
||||||
|
|
||||||
|
# Sort samples
|
||||||
|
shared_time_stamps = sorted(shared_time_stamps_set, key=lambda s: s.end_at)
|
||||||
|
del shared_time_stamps_set
|
||||||
|
|
||||||
|
# Filter samples
|
||||||
|
sample_filter = args.sample_filter
|
||||||
|
if len(sample_filter) != 0:
|
||||||
|
logger.warning('Filtering %s samples', len(shared_time_stamps))
|
||||||
|
shared_time_stamps = filter_samples(shared_time_stamps, sample_filter)
|
||||||
|
logger.warning('Filtered down to %s samples', len(shared_time_stamps))
|
||||||
|
|
||||||
|
# Heuristic samples
|
||||||
|
logger.warning('Realizing %s samples', len(shared_time_stamps))
|
||||||
|
shared_time_stamps = list(heuristically_realize_samples(shared_time_stamps))
|
||||||
|
|
||||||
|
# Merge adjacent
|
||||||
|
if args.merge:
|
||||||
|
shared_time_stamps = merge_adjacent_samples(shared_time_stamps, args.merge)
|
||||||
|
logger.warning('Merged to %s samples', len(shared_time_stamps))
|
||||||
|
|
||||||
|
# Output format
|
||||||
|
if args.format_mode == 'cli_report':
|
||||||
|
for t in cli.generate_report(shared_time_stamps):
|
||||||
|
sys.stdout.write(t)
|
||||||
|
elif args.format_mode == 'icalendar':
|
||||||
|
icalendar.generate_icalendar_file(
|
||||||
|
shared_time_stamps,
|
||||||
|
file=args.output_file,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
||||||
|
|
|
@ -16,9 +16,9 @@ def create_title(sample: RealizedActivitySample) -> tuple[str, str]:
|
||||||
if label.category in {HIDDEN_LABEL_CATEGORY, 'author'}:
|
if label.category in {HIDDEN_LABEL_CATEGORY, 'author'}:
|
||||||
continue
|
continue
|
||||||
if len(ls) == 0:
|
if len(ls) == 0:
|
||||||
ls.append(label.label)
|
ls.append(str(label.label))
|
||||||
else:
|
else:
|
||||||
desc.append(label.label)
|
desc.append(str(label.label))
|
||||||
return ' '.join(ls), '\n'.join(desc)
|
return ' '.join(ls), '\n'.join(desc)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,10 +1,14 @@
|
||||||
from collections.abc import Iterator
|
from collections.abc import Iterator
|
||||||
|
from logging import getLogger
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from personal_data.activity import ActivitySample, Label
|
from personal_data.activity import ActivitySample, Label
|
||||||
from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end
|
from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end
|
||||||
|
|
||||||
|
print(__name__)
|
||||||
|
logger = getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivitySample]:
|
def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivitySample]:
|
||||||
assert len(rows) > 0
|
assert len(rows) > 0
|
||||||
|
@ -13,6 +17,7 @@ def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivityS
|
||||||
if True:
|
if True:
|
||||||
event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
|
event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
|
||||||
possible_keys = determine_possible_keys(event_data)
|
possible_keys = determine_possible_keys(event_data)
|
||||||
|
logger.info('Found possible keys: %s', possible_keys)
|
||||||
del event_data
|
del event_data
|
||||||
|
|
||||||
assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1
|
assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1
|
||||||
|
|
|
@ -7,8 +7,10 @@ import datetime
|
||||||
from logging import getLogger
|
from logging import getLogger
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
from collections.abc import Iterator
|
||||||
|
|
||||||
from personal_data.util import load_csv_file
|
from personal_data.csv_import import start_end, determine_possible_keys, load_csv_file
|
||||||
|
from personal_data.activity import ActivitySample, Label, RealizedActivitySample, heuristically_realize_samples
|
||||||
|
|
||||||
from .obsidian import Event, ObsidianVault
|
from .obsidian import Event, ObsidianVault
|
||||||
|
|
||||||
|
@ -17,6 +19,32 @@ logger = getLogger(__name__)
|
||||||
Row = dict[str, Any]
|
Row = dict[str, Any]
|
||||||
Rows = list[Row]
|
Rows = list[Row]
|
||||||
|
|
||||||
|
def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]:
|
||||||
|
assert len(rows) > 0
|
||||||
|
|
||||||
|
if True:
|
||||||
|
event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
|
||||||
|
possible_keys = determine_possible_keys(event_data)
|
||||||
|
logger.info('Found possible keys: %s', possible_keys)
|
||||||
|
del event_data
|
||||||
|
|
||||||
|
assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1
|
||||||
|
assert len(possible_keys.image) >= 0
|
||||||
|
|
||||||
|
for event_data in rows:
|
||||||
|
(start_at, end_at) = start_end(event_data, possible_keys)
|
||||||
|
labels = [Label(k, event_data[k]) for k in possible_keys.misc]
|
||||||
|
|
||||||
|
# Create event
|
||||||
|
yield ActivitySample(
|
||||||
|
labels=tuple(labels),
|
||||||
|
start_at=start_at,
|
||||||
|
end_at=end_at,
|
||||||
|
)
|
||||||
|
|
||||||
|
del event_data
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int:
|
def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int:
|
||||||
num_updated = 0
|
num_updated = 0
|
||||||
|
@ -73,33 +101,35 @@ def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
|
||||||
|
|
||||||
|
|
||||||
def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int:
|
def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int:
|
||||||
# TODO: Update to using git_time_tracker event parsing system
|
|
||||||
verb = 'Watched'
|
verb = 'Watched'
|
||||||
|
|
||||||
|
samples = heuristically_realize_samples(list(iterate_samples_from_rows(rows)))
|
||||||
|
|
||||||
|
samples_per_date: dict[datetime.date, list[RealizedActivitySample]] = {}
|
||||||
|
for sample in samples:
|
||||||
|
date: datetime.date = sample.start_at.date()
|
||||||
|
samples_per_date.setdefault(date, [])
|
||||||
|
samples_per_date[date].append(sample)
|
||||||
|
del date, sample
|
||||||
|
del rows
|
||||||
|
|
||||||
|
def map_to_event(sample: RealizedActivitySample) -> Event:
|
||||||
|
comment = '{} Episode {}: *{}*'.format(
|
||||||
|
sample.single_label_with_category('season.name'),
|
||||||
|
sample.single_label_with_category('episode.index'),
|
||||||
|
sample.single_label_with_category('episode.name'),
|
||||||
|
)
|
||||||
|
return Event(sample.start_at.time(),
|
||||||
|
sample.end_at.time(),
|
||||||
|
verb,
|
||||||
|
sample.single_label_with_category('series.name'),
|
||||||
|
comment,
|
||||||
|
)
|
||||||
|
|
||||||
num_updated = 0
|
num_updated = 0
|
||||||
|
|
||||||
rows_per_date = {}
|
for date, samples in samples_per_date.items():
|
||||||
for row in rows:
|
events = [map_to_event(sample) for sample in samples]
|
||||||
date = row['me.last_played_time'].date()
|
|
||||||
rows_per_date.setdefault(date, [])
|
|
||||||
rows_per_date[date].append(row)
|
|
||||||
del date, row
|
|
||||||
del rows
|
|
||||||
|
|
||||||
def map_to_event(row: Row) -> Event:
|
|
||||||
start = (
|
|
||||||
row['me.last_played_time'].time().replace(second=0, microsecond=0, fold=0)
|
|
||||||
)
|
|
||||||
end = start
|
|
||||||
comment = '{} Episode {}: *{}*'.format(
|
|
||||||
row['season.name'],
|
|
||||||
row['episode.index'],
|
|
||||||
row['episode.name'],
|
|
||||||
)
|
|
||||||
return Event(start, end, verb, row['series.name'], comment)
|
|
||||||
|
|
||||||
for date, rows in rows_per_date.items():
|
|
||||||
events = [map_to_event(row) for row in rows]
|
|
||||||
was_updated = vault.add_events(date, events)
|
was_updated = vault.add_events(date, events)
|
||||||
|
|
||||||
if was_updated:
|
if was_updated:
|
||||||
|
|
|
@ -39,6 +39,7 @@ def parse_arguments():
|
||||||
def main():
|
def main():
|
||||||
logging.basicConfig()
|
logging.basicConfig()
|
||||||
logging.getLogger('personal_data').setLevel('INFO')
|
logging.getLogger('personal_data').setLevel('INFO')
|
||||||
|
|
||||||
args = parse_arguments()
|
args = parse_arguments()
|
||||||
scraper_filter = frozenset(args.fetchers)
|
scraper_filter = frozenset(args.fetchers)
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
import dataclasses
|
import dataclasses
|
||||||
import datetime
|
import datetime
|
||||||
from collections.abc import Sequence
|
from collections.abc import Iterator, Sequence
|
||||||
|
|
||||||
HIDDEN_LABEL_CATEGORY = '__'
|
HIDDEN_LABEL_CATEGORY = '__'
|
||||||
|
DEFAULT_ESTIMATED_DURATION = datetime.timedelta(hours=1)
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(frozen=True, order=True)
|
@dataclasses.dataclass(frozen=True, order=True)
|
||||||
|
@ -22,8 +23,89 @@ class ActivitySample:
|
||||||
start_at: datetime.datetime | None
|
start_at: datetime.datetime | None
|
||||||
end_at: datetime.datetime | None
|
end_at: datetime.datetime | None
|
||||||
|
|
||||||
|
def single_label_with_category(self, category: str) -> str:
|
||||||
|
for label in self.labels:
|
||||||
|
if label.category == category:
|
||||||
|
return label.label
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(frozen=True, order=True)
|
@dataclasses.dataclass(frozen=True, order=True)
|
||||||
class RealizedActivitySample(ActivitySample):
|
class RealizedActivitySample(ActivitySample):
|
||||||
start_at: datetime.datetime
|
start_at: datetime.datetime
|
||||||
end_at: datetime.datetime
|
end_at: datetime.datetime
|
||||||
|
|
||||||
|
|
||||||
|
def heuristically_realize_samples(
|
||||||
|
samples: list[ActivitySample],
|
||||||
|
) -> Iterator[RealizedActivitySample]:
|
||||||
|
"""Secret sauce.
|
||||||
|
|
||||||
|
Guarentees that:
|
||||||
|
* No samples overlap.
|
||||||
|
"""
|
||||||
|
|
||||||
|
previous_sample_end = None
|
||||||
|
for sample in samples:
|
||||||
|
end_at = sample.end_at
|
||||||
|
|
||||||
|
if previous_sample_end is None:
|
||||||
|
if end_at.tzinfo:
|
||||||
|
previous_sample_end = datetime.datetime.fromtimestamp(0, datetime.UTC)
|
||||||
|
else:
|
||||||
|
previous_sample_end = datetime.datetime.fromtimestamp(0)
|
||||||
|
|
||||||
|
assert previous_sample_end <= end_at, 'Iterating in incorrect order'
|
||||||
|
|
||||||
|
# TODO: Allow end_at is None
|
||||||
|
|
||||||
|
start_at = sample.start_at
|
||||||
|
if start_at is None:
|
||||||
|
estimated_duration: datetime.timedelta = DEFAULT_ESTIMATED_DURATION
|
||||||
|
start_at = max(previous_sample_end, end_at - estimated_duration)
|
||||||
|
del estimated_duration
|
||||||
|
|
||||||
|
yield RealizedActivitySample(
|
||||||
|
labels=sample.labels,
|
||||||
|
end_at=end_at,
|
||||||
|
start_at=start_at,
|
||||||
|
)
|
||||||
|
|
||||||
|
previous_sample_end = sample.end_at
|
||||||
|
del sample
|
||||||
|
|
||||||
|
|
||||||
|
def mergable_labels(a: Sequence[Label], b: Sequence[Label]) -> Sequence[Label]:
|
||||||
|
return list(set(a).intersection(set(b)))
|
||||||
|
|
||||||
|
|
||||||
|
def merge_adjacent_samples(
|
||||||
|
samples: list[RealizedActivitySample], group_category: str,
|
||||||
|
) -> list[RealizedActivitySample]:
|
||||||
|
max_interval_between_samples = datetime.timedelta(minutes=5)
|
||||||
|
|
||||||
|
def can_merge(
|
||||||
|
before: RealizedActivitySample, after: RealizedActivitySample,
|
||||||
|
) -> bool:
|
||||||
|
if before.single_label_with_category(
|
||||||
|
group_category,
|
||||||
|
) != after.single_label_with_category(group_category):
|
||||||
|
return False
|
||||||
|
return (after.start_at - before.end_at) < max_interval_between_samples
|
||||||
|
|
||||||
|
samples.sort(key=lambda s: s.start_at)
|
||||||
|
|
||||||
|
new: list[RealizedActivitySample] = []
|
||||||
|
|
||||||
|
for s in samples:
|
||||||
|
if len(new) > 0 and can_merge(new[-1], s):
|
||||||
|
# TODO: Merge/strip attributes?
|
||||||
|
new[-1] = RealizedActivitySample(
|
||||||
|
labels=mergable_labels(new[-1].labels, s.labels),
|
||||||
|
start_at=new[-1].start_at,
|
||||||
|
end_at=s.end_at,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
new.append(s)
|
||||||
|
|
||||||
|
return new
|
||||||
|
|
|
@ -132,7 +132,8 @@ def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
|
||||||
|
|
||||||
|
|
||||||
def start_end(
|
def start_end(
|
||||||
sample: dict[str, Any], keys: PossibleKeys,
|
sample: dict[str, Any],
|
||||||
|
keys: PossibleKeys,
|
||||||
) -> tuple[datetime.datetime | None, datetime.datetime | None]:
|
) -> tuple[datetime.datetime | None, datetime.datetime | None]:
|
||||||
if keys.time_start and keys.time_end:
|
if keys.time_start and keys.time_end:
|
||||||
return (sample[keys.time_start[0]], sample[keys.time_end[0]])
|
return (sample[keys.time_start[0]], sample[keys.time_end[0]])
|
||||||
|
@ -142,6 +143,11 @@ def start_end(
|
||||||
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
|
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
|
||||||
return (start, start + duration)
|
return (start, start + duration)
|
||||||
|
|
||||||
|
if keys.time_end and keys.duration:
|
||||||
|
end = sample[keys.time_end[0]]
|
||||||
|
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
|
||||||
|
return (end - duration, end)
|
||||||
|
|
||||||
if keys.time_start:
|
if keys.time_start:
|
||||||
start = sample[keys.time_start[0]]
|
start = sample[keys.time_start[0]]
|
||||||
return (start, None)
|
return (start, None)
|
||||||
|
|
|
@ -10,7 +10,13 @@ def test_determine_possible_keys():
|
||||||
{
|
{
|
||||||
'game.name': 'Halo',
|
'game.name': 'Halo',
|
||||||
'me.last_played_time': datetime.datetime(
|
'me.last_played_time': datetime.datetime(
|
||||||
2021, 6, 13, 19, 12, 21, tzinfo=datetime.timezone.utc,
|
2021,
|
||||||
|
6,
|
||||||
|
13,
|
||||||
|
19,
|
||||||
|
12,
|
||||||
|
21,
|
||||||
|
tzinfo=datetime.timezone.utc,
|
||||||
),
|
),
|
||||||
'trophy.name': 'Test',
|
'trophy.name': 'Test',
|
||||||
'trophy.desc': 'Description',
|
'trophy.desc': 'Description',
|
||||||
|
|
Loading…
Reference in New Issue
Block a user