1
0

Activity merging

This commit is contained in:
Jon Michael Aanes 2024-10-14 20:17:41 +02:00
parent c07c371939
commit c226ac623c
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
7 changed files with 70 additions and 5 deletions

View File

@ -25,6 +25,4 @@ And the ([Hamster](https://github.com/projecthamster/hamster)) manual time track
![](docs/obligatory-hamster.png) ![](docs/obligatory-hamster.png)
""" """
logger = logging.getLogger(__name__) __all__: list[str] = []
__all__ = []

View File

@ -6,6 +6,7 @@ from pathlib import Path
from personal_data.activity import ( from personal_data.activity import (
ActivitySample, ActivitySample,
heuristically_realize_samples, heuristically_realize_samples,
merge_adjacent_samples,
) )
from .format import cli, icalendar from .format import cli, icalendar
@ -56,6 +57,13 @@ def parse_arguments():
default='cli_report', default='cli_report',
choices=['cli_report', 'icalendar'], choices=['cli_report', 'icalendar'],
) )
parser.add_argument(
'--merge',
action='store',
type=str,
dest='merge',
default=None,
)
parser.add_argument( parser.add_argument(
'--out', '--out',
action='store', action='store',
@ -90,6 +98,7 @@ def load_samples(args) -> set[ActivitySample]:
def main(): def main():
logging.basicConfig() logging.basicConfig()
logging.getLogger('git_time_tracker').setLevel('INFO')
args = parse_arguments() args = parse_arguments()
@ -111,6 +120,11 @@ def main():
logger.warning('Realizing %s samples', len(shared_time_stamps)) logger.warning('Realizing %s samples', len(shared_time_stamps))
shared_time_stamps = list(heuristically_realize_samples(shared_time_stamps)) shared_time_stamps = list(heuristically_realize_samples(shared_time_stamps))
# Merge adjacent
if args.merge:
shared_time_stamps = merge_adjacent_samples(shared_time_stamps, args.merge)
logger.warning('Merged to %s samples', len(shared_time_stamps))
# Output format # Output format
if args.format_mode == 'cli_report': if args.format_mode == 'cli_report':
for t in cli.generate_report(shared_time_stamps): for t in cli.generate_report(shared_time_stamps):

View File

@ -16,9 +16,9 @@ def create_title(sample: RealizedActivitySample) -> tuple[str, str]:
if label.category in {HIDDEN_LABEL_CATEGORY, 'author'}: if label.category in {HIDDEN_LABEL_CATEGORY, 'author'}:
continue continue
if len(ls) == 0: if len(ls) == 0:
ls.append(label.label) ls.append(str(label.label))
else: else:
desc.append(label.label) desc.append(str(label.label))
return ' '.join(ls), '\n'.join(desc) return ' '.join(ls), '\n'.join(desc)

View File

@ -1,10 +1,14 @@
from collections.abc import Iterator from collections.abc import Iterator
from logging import getLogger
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from personal_data.activity import ActivitySample, Label from personal_data.activity import ActivitySample, Label
from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end
print(__name__)
logger = getLogger(__name__)
def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivitySample]: def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivitySample]:
assert len(rows) > 0 assert len(rows) > 0
@ -13,6 +17,7 @@ def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivityS
if True: if True:
event_data = rows[len(rows) // 2] # Hopefully select a useful representative. event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
possible_keys = determine_possible_keys(event_data) possible_keys = determine_possible_keys(event_data)
logger.info('Found possible keys: %s', possible_keys)
del event_data del event_data
assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1 assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1

View File

@ -39,6 +39,7 @@ def parse_arguments():
def main(): def main():
logging.basicConfig() logging.basicConfig()
logging.getLogger('personal_data').setLevel('INFO') logging.getLogger('personal_data').setLevel('INFO')
args = parse_arguments() args = parse_arguments()
scraper_filter = frozenset(args.fetchers) scraper_filter = frozenset(args.fetchers)

View File

@ -23,6 +23,12 @@ class ActivitySample:
start_at: datetime.datetime | None start_at: datetime.datetime | None
end_at: datetime.datetime | None end_at: datetime.datetime | None
def single_label_with_category(self, category: str) -> str:
for label in self.labels:
if label.category == category:
return label.label
return None
@dataclasses.dataclass(frozen=True, order=True) @dataclasses.dataclass(frozen=True, order=True)
class RealizedActivitySample(ActivitySample): class RealizedActivitySample(ActivitySample):
@ -67,3 +73,39 @@ def heuristically_realize_samples(
previous_sample_end = sample.end_at previous_sample_end = sample.end_at
del sample del sample
def mergable_labels(a: Sequence[Label], b: Sequence[Label]) -> Sequence[Label]:
return list(set(a).intersection(set(b)))
def merge_adjacent_samples(
samples: list[RealizedActivitySample], group_category: str,
) -> list[RealizedActivitySample]:
max_interval_between_samples = datetime.timedelta(minutes=5)
def can_merge(
before: RealizedActivitySample, after: RealizedActivitySample,
) -> bool:
if before.single_label_with_category(
group_category,
) != after.single_label_with_category(group_category):
return False
return (after.start_at - before.end_at) < max_interval_between_samples
samples.sort(key=lambda s: s.start_at)
new: list[RealizedActivitySample] = []
for s in samples:
if len(new) > 0 and can_merge(new[-1], s):
# TODO: Merge/strip attributes?
new[-1] = RealizedActivitySample(
labels=mergable_labels(new[-1].labels, s.labels),
start_at=new[-1].start_at,
end_at=s.end_at,
)
else:
new.append(s)
return new

View File

@ -143,6 +143,11 @@ def start_end(
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]])) duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
return (start, start + duration) return (start, start + duration)
if keys.time_end and keys.duration:
end = sample[keys.time_end[0]]
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
return (end - duration, end)
if keys.time_start: if keys.time_start:
start = sample[keys.time_start[0]] start = sample[keys.time_start[0]]
return (start, None) return (start, None)