1
0

Moved more logic to personal_data

This commit is contained in:
Jon Michael Aanes 2024-10-14 19:43:34 +02:00
parent 72be664d82
commit c07c371939
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
5 changed files with 174 additions and 169 deletions

View File

@ -25,170 +25,6 @@ And the ([Hamster](https://github.com/projecthamster/hamster)) manual time track
![](docs/obligatory-hamster.png) ![](docs/obligatory-hamster.png)
""" """
import argparse
import datetime
import logging
import sys
from collections.abc import Iterator
from pathlib import Path
from personal_data.activity import (
ActivitySample,
RealizedActivitySample,
)
from .format import cli, icalendar
from .source import csv_file, git_repo
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
__all__ = []
DEFAULT_ESTIMATED_DURATION = datetime.timedelta(hours=1)
ZERO_DURATION = datetime.timedelta(seconds=0)
HOUR = datetime.timedelta(hours=1)
MINUTE = datetime.timedelta(minutes=1)
def filter_samples(
samples: list[ActivitySample],
sample_filter: set[str],
) -> list[ActivitySample]:
assert len(sample_filter) > 0
return [s for s in samples if set(s.labels).intersection(sample_filter)]
def heuristically_realize_samples(
samples: list[ActivitySample],
) -> Iterator[RealizedActivitySample]:
"""Secret sauce.
Guarentees that:
* No samples overlap.
"""
previous_sample_end = None
for sample in samples:
end_at = sample.end_at
if previous_sample_end is None:
if end_at.tzinfo:
previous_sample_end = datetime.datetime.fromtimestamp(0, datetime.UTC)
else:
previous_sample_end = datetime.datetime.fromtimestamp(0)
assert previous_sample_end <= end_at, 'Iterating in incorrect order'
# TODO: Allow end_at is None
start_at = sample.start_at
if start_at is None:
estimated_duration: datetime.timedelta = DEFAULT_ESTIMATED_DURATION
start_at = max(previous_sample_end, end_at - estimated_duration)
del estimated_duration
yield RealizedActivitySample(
labels=sample.labels, end_at=end_at, start_at=start_at,
)
previous_sample_end = sample.end_at
del sample
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
'--git-repo',
action='extend',
nargs='+',
type=Path,
dest='repositories',
default=[],
)
parser.add_argument(
'--csv-file',
action='extend',
nargs='+',
type=Path,
dest='csv_files',
default=[],
)
parser.add_argument(
'--filter',
action='extend',
nargs='+',
type=str,
dest='sample_filter',
default=[],
)
parser.add_argument(
'--format',
action='store',
type=str,
dest='format_mode',
default='cli_report',
choices=['cli_report', 'icalendar'],
)
parser.add_argument(
'--out',
action='store',
type=Path,
dest='output_file',
default='output/samples.ics',
)
return parser.parse_args()
def load_samples(args) -> set[ActivitySample]:
shared_time_stamps_set: set[ActivitySample] = set()
# Git repositories
for repo_path in args.repositories:
logger.warning('Determine commits from %s', repo_path)
shared_time_stamps_set |= set(
git_repo.iterate_samples_from_git_repository(repo_path),
)
del repo_path
# CSV Files
for csv_path in args.csv_files:
logger.warning('Load samples from %s', csv_path)
shared_time_stamps_set |= set(
csv_file.iterate_samples_from_csv_file(csv_path),
)
del csv_path
return shared_time_stamps_set
def main():
logging.basicConfig()
args = parse_arguments()
# Determine samples
shared_time_stamps_set = load_samples(args)
# Sort samples
shared_time_stamps = sorted(shared_time_stamps_set, key=lambda s: s.end_at)
del shared_time_stamps_set
# Filter samples
sample_filter = args.sample_filter
if len(sample_filter) != 0:
logger.warning('Filtering %s samples', len(shared_time_stamps))
shared_time_stamps = filter_samples(shared_time_stamps, sample_filter)
logger.warning('Filtered down to %s samples', len(shared_time_stamps))
# Heuristic samples
logger.warning('Realizing %s samples', len(shared_time_stamps))
shared_time_stamps = list(heuristically_realize_samples(shared_time_stamps))
# Output format
if args.format_mode == 'cli_report':
for t in cli.generate_report(shared_time_stamps):
sys.stdout.write(t)
elif args.format_mode == 'icalendar':
icalendar.generate_icalendar_file(
shared_time_stamps,
file=args.output_file,
)

View File

@ -1,4 +1,126 @@
from git_time_tracker import main import argparse
import logging
import sys
from pathlib import Path
from personal_data.activity import (
ActivitySample,
heuristically_realize_samples,
)
from .format import cli, icalendar
from .source import csv_file, git_repo
logger = logging.getLogger(__name__)
def filter_samples(
samples: list[ActivitySample],
sample_filter: set[str],
) -> list[ActivitySample]:
assert len(sample_filter) > 0
return [s for s in samples if set(s.labels).intersection(sample_filter)]
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
'--git-repo',
action='extend',
nargs='+',
type=Path,
dest='repositories',
default=[],
)
parser.add_argument(
'--csv-file',
action='extend',
nargs='+',
type=Path,
dest='csv_files',
default=[],
)
parser.add_argument(
'--filter',
action='extend',
nargs='+',
type=str,
dest='sample_filter',
default=[],
)
parser.add_argument(
'--format',
action='store',
type=str,
dest='format_mode',
default='cli_report',
choices=['cli_report', 'icalendar'],
)
parser.add_argument(
'--out',
action='store',
type=Path,
dest='output_file',
default='output/samples.ics',
)
return parser.parse_args()
def load_samples(args) -> set[ActivitySample]:
shared_time_stamps_set: set[ActivitySample] = set()
# Git repositories
for repo_path in args.repositories:
logger.warning('Determine commits from %s', repo_path)
shared_time_stamps_set |= set(
git_repo.iterate_samples_from_git_repository(repo_path),
)
del repo_path
# CSV Files
for csv_path in args.csv_files:
logger.warning('Load samples from %s', csv_path)
shared_time_stamps_set |= set(
csv_file.iterate_samples_from_csv_file(csv_path),
)
del csv_path
return shared_time_stamps_set
def main():
logging.basicConfig()
args = parse_arguments()
# Determine samples
shared_time_stamps_set = load_samples(args)
# Sort samples
shared_time_stamps = sorted(shared_time_stamps_set, key=lambda s: s.end_at)
del shared_time_stamps_set
# Filter samples
sample_filter = args.sample_filter
if len(sample_filter) != 0:
logger.warning('Filtering %s samples', len(shared_time_stamps))
shared_time_stamps = filter_samples(shared_time_stamps, sample_filter)
logger.warning('Filtered down to %s samples', len(shared_time_stamps))
# Heuristic samples
logger.warning('Realizing %s samples', len(shared_time_stamps))
shared_time_stamps = list(heuristically_realize_samples(shared_time_stamps))
# Output format
if args.format_mode == 'cli_report':
for t in cli.generate_report(shared_time_stamps):
sys.stdout.write(t)
elif args.format_mode == 'icalendar':
icalendar.generate_icalendar_file(
shared_time_stamps,
file=args.output_file,
)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -1,8 +1,9 @@
import dataclasses import dataclasses
import datetime import datetime
from collections.abc import Sequence from collections.abc import Iterator, Sequence
HIDDEN_LABEL_CATEGORY = '__' HIDDEN_LABEL_CATEGORY = '__'
DEFAULT_ESTIMATED_DURATION = datetime.timedelta(hours=1)
@dataclasses.dataclass(frozen=True, order=True) @dataclasses.dataclass(frozen=True, order=True)
@ -27,3 +28,42 @@ class ActivitySample:
class RealizedActivitySample(ActivitySample): class RealizedActivitySample(ActivitySample):
start_at: datetime.datetime start_at: datetime.datetime
end_at: datetime.datetime end_at: datetime.datetime
def heuristically_realize_samples(
samples: list[ActivitySample],
) -> Iterator[RealizedActivitySample]:
"""Secret sauce.
Guarentees that:
* No samples overlap.
"""
previous_sample_end = None
for sample in samples:
end_at = sample.end_at
if previous_sample_end is None:
if end_at.tzinfo:
previous_sample_end = datetime.datetime.fromtimestamp(0, datetime.UTC)
else:
previous_sample_end = datetime.datetime.fromtimestamp(0)
assert previous_sample_end <= end_at, 'Iterating in incorrect order'
# TODO: Allow end_at is None
start_at = sample.start_at
if start_at is None:
estimated_duration: datetime.timedelta = DEFAULT_ESTIMATED_DURATION
start_at = max(previous_sample_end, end_at - estimated_duration)
del estimated_duration
yield RealizedActivitySample(
labels=sample.labels,
end_at=end_at,
start_at=start_at,
)
previous_sample_end = sample.end_at
del sample

View File

@ -132,7 +132,8 @@ def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
def start_end( def start_end(
sample: dict[str, Any], keys: PossibleKeys, sample: dict[str, Any],
keys: PossibleKeys,
) -> tuple[datetime.datetime | None, datetime.datetime | None]: ) -> tuple[datetime.datetime | None, datetime.datetime | None]:
if keys.time_start and keys.time_end: if keys.time_start and keys.time_end:
return (sample[keys.time_start[0]], sample[keys.time_end[0]]) return (sample[keys.time_start[0]], sample[keys.time_end[0]])

View File

@ -10,7 +10,13 @@ def test_determine_possible_keys():
{ {
'game.name': 'Halo', 'game.name': 'Halo',
'me.last_played_time': datetime.datetime( 'me.last_played_time': datetime.datetime(
2021, 6, 13, 19, 12, 21, tzinfo=datetime.timezone.utc, 2021,
6,
13,
19,
12,
21,
tzinfo=datetime.timezone.utc,
), ),
'trophy.name': 'Test', 'trophy.name': 'Test',
'trophy.desc': 'Description', 'trophy.desc': 'Description',