Compare commits
No commits in common. "7ab46bc48edc905d35c17c3960733e931d87123f" and "41e574c971f07ff942d9b53ff201678813bff624" have entirely different histories.
7ab46bc48e
...
41e574c971
|
@ -32,57 +32,105 @@ import sys
|
|||
from collections.abc import Iterator
|
||||
from pathlib import Path
|
||||
|
||||
from .data import HIDDEN_LABEL_PREFIX, HIDDEN_LABEL_TOTAL, WorkSample, RealizedWorkSample
|
||||
from .format import cli, icalendar
|
||||
from .data import HIDDEN_LABEL_PREFIX, HIDDEN_LABEL_TOTAL, WorkSample
|
||||
from .source import git_repo
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
DEFAULT_ESTIMATED_DURATION = datetime.timedelta(hours=1)
|
||||
DEFAULT_EST_TIME = datetime.timedelta(hours=1)
|
||||
|
||||
ZERO_DURATION = datetime.timedelta(seconds=0)
|
||||
HOUR = datetime.timedelta(hours=1)
|
||||
MINUTE = datetime.timedelta(minutes=1)
|
||||
|
||||
|
||||
def filter_samples(samples: list[WorkSample], sample_filter: set[str]) -> list[WorkSample]:
|
||||
assert len(sample_filter) > 0
|
||||
return [s for s in samples if set(s.labels).intersection(sample_filter)]
|
||||
def fmt_year_ranges_internal(years: list[int]) -> Iterator[str]:
|
||||
years = sorted(years)
|
||||
for idx, year in enumerate(years):
|
||||
at_end = idx == len(years) - 1
|
||||
range_before = idx > 0 and years[idx - 1] == year - 1
|
||||
range_after = not at_end and years[idx + 1] == year + 1
|
||||
|
||||
def heuristically_realize_samples(samples: list[WorkSample]) -> Iterator[RealizedWorkSample]:
|
||||
"""Secret sauce.
|
||||
if not range_before or not range_after:
|
||||
yield str(year)
|
||||
|
||||
Guarentees that:
|
||||
* No samples overlap.
|
||||
"""
|
||||
if not at_end:
|
||||
if not range_before and range_after:
|
||||
yield '-'
|
||||
elif not range_after:
|
||||
yield ','
|
||||
|
||||
previous_sample_end = datetime.datetime.fromtimestamp(0, datetime.UTC)
|
||||
|
||||
def fmt_year_ranges(years: list[int]) -> str:
|
||||
return ''.join(list(fmt_year_ranges_internal(years)))
|
||||
|
||||
|
||||
def fmt_line(label_type: str, label: str, total_time: datetime.timedelta) -> str:
|
||||
hours = int(total_time / HOUR)
|
||||
minutes = int((total_time - hours * HOUR) / MINUTE)
|
||||
return f' {label_type:10} {label:40} {hours:-4d}h {minutes:-2d}m'
|
||||
|
||||
|
||||
def generate_report(
|
||||
samples: list[WorkSample], sample_filter=frozenset(),
|
||||
) -> Iterator[str]:
|
||||
LABEL_FILTER = {}
|
||||
|
||||
# Time spent per label
|
||||
time_per_label: dict[str, datetime.timedelta] = {}
|
||||
years_per_label: dict[str, set[int]] = {}
|
||||
prev_time = datetime.datetime.fromtimestamp(0, datetime.UTC)
|
||||
for sample in samples:
|
||||
end_at = sample.end_at
|
||||
est_time: datetime.timedelta = DEFAULT_EST_TIME
|
||||
est_time = min(sample.registered_at - prev_time, est_time)
|
||||
|
||||
assert previous_sample_end <= end_at, 'Iterating in incorrect order'
|
||||
if len(sample_filter) == 0:
|
||||
pass
|
||||
elif not set(sample.labels).intersection(sample_filter):
|
||||
continue
|
||||
|
||||
# TODO: Allow end_at is None
|
||||
for label in sample.labels:
|
||||
time_per_label.setdefault(label, ZERO_DURATION)
|
||||
time_per_label[label] += est_time
|
||||
years_per_label.setdefault(label, set()).add(sample.registered_at.year)
|
||||
|
||||
start_at = sample.start_at
|
||||
if start_at is None:
|
||||
estimated_duration: datetime.timedelta = DEFAULT_ESTIMATED_DURATION
|
||||
start_at = max(previous_sample_end, end_at - estimated_duration)
|
||||
del estimated_duration
|
||||
prev_time = sample.registered_at
|
||||
del sample, est_time
|
||||
|
||||
yield RealizedWorkSample(labels=sample.labels, end_at=end_at, start_at=start_at)
|
||||
time_and_label = [(duration, label) for label, duration in time_per_label.items()]
|
||||
time_and_label.sort(reverse=True)
|
||||
|
||||
#
|
||||
yield '-' * 66
|
||||
yield '\n'
|
||||
for total_time, label_and_type in time_and_label:
|
||||
if label_and_type.startswith(HIDDEN_LABEL_PREFIX):
|
||||
continue
|
||||
|
||||
label_type, label = label_and_type.split(':', 1)
|
||||
|
||||
if len(LABEL_FILTER) > 0 and label_type not in LABEL_FILTER:
|
||||
continue
|
||||
|
||||
yield fmt_line(label_type, label, total_time)
|
||||
yield ' ('
|
||||
yield fmt_year_ranges(years_per_label.get(label_and_type, []))
|
||||
yield ')'
|
||||
yield '\n'
|
||||
del label, total_time
|
||||
|
||||
yield '-' * 66
|
||||
yield '\n'
|
||||
|
||||
yield fmt_line('', 'TOTAL', time_per_label.get(HIDDEN_LABEL_TOTAL, ZERO_DURATION))
|
||||
yield '\n'
|
||||
|
||||
previous_sample_end = sample.end_at
|
||||
del sample
|
||||
|
||||
def parse_arguments():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
'--git-repo',
|
||||
action='extend',
|
||||
nargs='+',
|
||||
type=Path,
|
||||
dest='repositories',
|
||||
'--git-repo', action='extend', nargs='+', type=Path, dest='repositories',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--filter',
|
||||
|
@ -92,14 +140,6 @@ def parse_arguments():
|
|||
dest='sample_filter',
|
||||
default=[],
|
||||
)
|
||||
parser.add_argument(
|
||||
'--format',
|
||||
action='store',
|
||||
type=str,
|
||||
dest='format_mode',
|
||||
default='cli_report',
|
||||
choices=['cli_report', 'icalendar'],
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
|
@ -108,27 +148,14 @@ def main():
|
|||
|
||||
args = parse_arguments()
|
||||
|
||||
shared_time_stamps_set: set[WorkSample] = set()
|
||||
shared_time_stamps: set[WorkSample] = set()
|
||||
for repo_path in args.repositories:
|
||||
logger.warning('Visit %s', repo_path)
|
||||
shared_time_stamps_set |= set(
|
||||
shared_time_stamps |= set(
|
||||
git_repo.iterate_samples_from_git_repository(repo_path),
|
||||
)
|
||||
|
||||
shared_time_stamps = sorted(shared_time_stamps_set, key = lambda s: s.end_at)
|
||||
del shared_time_stamps_set
|
||||
shared_time_stamps = sorted(shared_time_stamps)
|
||||
|
||||
sample_filter = args.sample_filter
|
||||
if len(sample_filter) != 0:
|
||||
logger.warning('Filtering %s samples', len(shared_time_stamps))
|
||||
shared_time_stamps = filter_samples(shared_time_stamps, sample_filter)
|
||||
logger.warning('Filtered down to %s samples', len(shared_time_stamps))
|
||||
|
||||
logger.warning('Realizing %s samples', len(shared_time_stamps))
|
||||
shared_time_stamps = list(heuristically_realize_samples(shared_time_stamps))
|
||||
|
||||
if args.format_mode == 'cli_report':
|
||||
for t in cli.generate_report(shared_time_stamps):
|
||||
sys.stdout.write(t)
|
||||
elif args.format_mode == 'icalendar':
|
||||
icalendar.generate_icalendar_file(shared_time_stamps, file='./output/samples.ics')
|
||||
for t in generate_report(shared_time_stamps, sample_filter=args.sample_filter):
|
||||
sys.stdout.write(t)
|
||||
|
|
|
@ -8,11 +8,5 @@ HIDDEN_LABEL_TOTAL = HIDDEN_LABEL_PREFIX + 'TOTAL'
|
|||
|
||||
@dataclasses.dataclass(frozen=True, order=True)
|
||||
class WorkSample:
|
||||
registered_at: datetime.datetime
|
||||
labels: Sequence[str]
|
||||
start_at: datetime.datetime | None
|
||||
end_at: datetime.datetime | None
|
||||
|
||||
@dataclasses.dataclass(frozen=True, order=True)
|
||||
class RealizedWorkSample(WorkSample):
|
||||
start_at: datetime.datetime
|
||||
end_at: datetime.datetime
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
"""Submodule containing output formats."""
|
|
@ -1,77 +0,0 @@
|
|||
import datetime
|
||||
from collections.abc import Iterator
|
||||
|
||||
from ..data import HIDDEN_LABEL_PREFIX, HIDDEN_LABEL_TOTAL, RealizedWorkSample
|
||||
|
||||
ZERO_DURATION = datetime.timedelta(seconds=0)
|
||||
HOUR = datetime.timedelta(hours=1)
|
||||
MINUTE = datetime.timedelta(minutes=1)
|
||||
|
||||
|
||||
def fmt_year_ranges_internal(years: list[int]) -> Iterator[str]:
|
||||
years = sorted(years)
|
||||
for idx, year in enumerate(years):
|
||||
at_end = idx == len(years) - 1
|
||||
range_before = idx > 0 and years[idx - 1] == year - 1
|
||||
range_after = not at_end and years[idx + 1] == year + 1
|
||||
|
||||
if not range_before or not range_after:
|
||||
yield str(year)
|
||||
|
||||
if not at_end:
|
||||
if not range_before and range_after:
|
||||
yield '-'
|
||||
elif not range_after:
|
||||
yield ','
|
||||
|
||||
|
||||
def fmt_year_ranges(years: list[int]) -> str:
|
||||
return ''.join(list(fmt_year_ranges_internal(years)))
|
||||
|
||||
|
||||
def fmt_line(label_type: str, label: str, total_time: datetime.timedelta) -> str:
|
||||
hours = int(total_time / HOUR)
|
||||
minutes = int((total_time - hours * HOUR) / MINUTE)
|
||||
return f' {label_type:10} {label:40} {hours:-4d}h {minutes:-2d}m'
|
||||
|
||||
|
||||
def generate_report(
|
||||
samples: list[RealizedWorkSample],
|
||||
) -> Iterator[str]:
|
||||
# Time spent per label
|
||||
time_per_label: dict[str, datetime.timedelta] = {}
|
||||
years_per_label: dict[str, set[int]] = {}
|
||||
for sample in samples:
|
||||
duration = sample.end_at - sample.start_at
|
||||
|
||||
for label in sample.labels:
|
||||
time_per_label.setdefault(label, ZERO_DURATION)
|
||||
time_per_label[label] += duration
|
||||
years_per_label.setdefault(label, set()).add(sample.end_at.year)
|
||||
|
||||
del sample, duration
|
||||
|
||||
time_and_label = [(duration, label) for label, duration in time_per_label.items()]
|
||||
time_and_label.sort(reverse=True)
|
||||
|
||||
#
|
||||
yield '-' * 66
|
||||
yield '\n'
|
||||
for total_time, label_and_type in time_and_label:
|
||||
if label_and_type.startswith(HIDDEN_LABEL_PREFIX):
|
||||
continue
|
||||
|
||||
label_type, label = label_and_type.split(':', 1)
|
||||
|
||||
yield fmt_line(label_type, label, total_time)
|
||||
yield ' ('
|
||||
yield fmt_year_ranges(years_per_label.get(label_and_type, []))
|
||||
yield ')'
|
||||
yield '\n'
|
||||
del label, total_time
|
||||
|
||||
yield '-' * 66
|
||||
yield '\n'
|
||||
|
||||
yield fmt_line('', 'TOTAL', time_per_label.get(HIDDEN_LABEL_TOTAL, ZERO_DURATION))
|
||||
yield '\n'
|
|
@ -1,71 +0,0 @@
|
|||
import datetime
|
||||
from collections.abc import Iterator
|
||||
import argparse
|
||||
import datetime
|
||||
import urllib.parse
|
||||
|
||||
import icalendar
|
||||
|
||||
from personal_data.util import load_csv_file
|
||||
|
||||
|
||||
from ..data import HIDDEN_LABEL_PREFIX, HIDDEN_LABEL_TOTAL, RealizedWorkSample
|
||||
|
||||
ZERO_DURATION = datetime.timedelta(seconds=0)
|
||||
HOUR = datetime.timedelta(hours=1)
|
||||
MINUTE = datetime.timedelta(minutes=1)
|
||||
|
||||
|
||||
def create_title(sample: RealizedWorkSample) -> str:
|
||||
ls = []
|
||||
for label_and_type in sample.labels:
|
||||
if label_and_type.startswith(HIDDEN_LABEL_PREFIX):
|
||||
continue
|
||||
if label_and_type.startswith('author:'):
|
||||
continue
|
||||
ls.append(label_and_type)
|
||||
return ' '.join(ls)
|
||||
|
||||
|
||||
def generate_calendar(
|
||||
samples: list[RealizedWorkSample],
|
||||
) -> icalendar.Calendar:
|
||||
max_title_parts = 2
|
||||
|
||||
cal = icalendar.Calendar()
|
||||
cal.add('prodid', '-//personal_data_calendar//example.org//')
|
||||
cal.add('version', '2.0')
|
||||
|
||||
for sample in samples:
|
||||
|
||||
title = create_title(sample)
|
||||
|
||||
description = ''
|
||||
|
||||
# Create event
|
||||
event = icalendar.Event()
|
||||
|
||||
event.add('summary', title)
|
||||
event.add('description', description)
|
||||
event.add('dtstart', sample.start_at)
|
||||
event.add('dtend', sample.end_at)
|
||||
|
||||
for label_and_type in sample.labels:
|
||||
if label_and_type.startswith('author:'):
|
||||
event.add('organizer', 'mailto:'+label_and_type.removeprefix('author:'))
|
||||
|
||||
cal.add_component(event)
|
||||
del event
|
||||
|
||||
return cal
|
||||
|
||||
|
||||
def generate_icalendar_file(
|
||||
samples: list[RealizedWorkSample],
|
||||
file: str,
|
||||
) -> None:
|
||||
|
||||
calendar = generate_calendar(samples)
|
||||
|
||||
with open(file, 'wb') as f:
|
||||
f.write(calendar.to_ical())
|
|
@ -1 +0,0 @@
|
|||
"""Submodule containing input formats."""
|
|
@ -37,21 +37,14 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]:
|
|||
labels = [HIDDEN_LABEL_TOTAL]
|
||||
labels.append('project:' + project_name)
|
||||
labels.append('author:' + commit.author.email)
|
||||
|
||||
authored_date = datetime.datetime.fromtimestamp(commit.authored_date, tz=datetime.UTC)
|
||||
committed_date = datetime.datetime.fromtimestamp(commit.committed_date, tz=datetime.UTC)
|
||||
|
||||
yield WorkSample(
|
||||
labels = tuple(labels),
|
||||
start_at = None,
|
||||
end_at = authored_date,
|
||||
datetime.datetime.fromtimestamp(commit.authored_date, tz=datetime.UTC),
|
||||
tuple(labels),
|
||||
)
|
||||
yield WorkSample(
|
||||
datetime.datetime.fromtimestamp(commit.committed_date, tz=datetime.UTC),
|
||||
tuple(labels),
|
||||
)
|
||||
if authored_date != committed_date:
|
||||
yield WorkSample(
|
||||
labels = tuple(labels),
|
||||
start_at = None,
|
||||
end_at = committed_date,
|
||||
)
|
||||
del labels
|
||||
|
||||
|
||||
|
|
Reference in New Issue
Block a user