import argparse import dataclasses import datetime import logging import sys import time from collections.abc import Iterator, Sequence from pathlib import Path import git logger = logging.getLogger(__name__) @dataclasses.dataclass(frozen=True, order=True) class WorkSample: registered_at: datetime.datetime labels: Sequence[str] def determine_default(repo: git.Repo): try: repo.commit('main') return 'main' except: return 'master' HIDDEN_LABEL_PREFIX = '__' HIDDEN_LABEL_TOTAL = HIDDEN_LABEL_PREFIX + 'TOTAL' def determine_project_name(repo: git.Repo) -> str: remotes = repo.remotes if len(remotes) > 0: return remotes.origin.url.removeprefix('git@gitfub.space:') return Path(repo.working_tree_dir).name def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]: project_name = determine_project_name(repo) assert project_name is not None # TODO: Branch on main or master or default repo.commit() for commit in repo.iter_commits(determine_default(repo)): labels = [HIDDEN_LABEL_TOTAL] labels.append('project:' + project_name) labels.append('author:' + commit.author.email) yield WorkSample( datetime.datetime.fromtimestamp(commit.authored_date, tz=datetime.UTC), tuple(labels), ) yield WorkSample( datetime.datetime.fromtimestamp(commit.committed_date, tz=datetime.UTC), tuple(labels), ) del labels DEFAULT_EST_TIME = datetime.timedelta(hours=1) ZERO_DURATION = datetime.timedelta(seconds=0) HOUR = datetime.timedelta(hours=1) MINUTE = datetime.timedelta(minutes=1) def fmt_year_ranges_internal(years: list[int]) -> Iterator[str]: years = sorted(years) for idx, year in enumerate(years): at_end = idx == len(years) - 1 range_before = idx > 0 and years[idx-1] == year - 1 range_after = not at_end and years[idx+1] == year + 1 if not range_before or not range_after: yield str(year) if not at_end: if not range_before and range_after: yield '-' elif not range_after: yield ',' def fmt_year_ranges(years: list[int]) -> str: return ''.join(list(fmt_year_ranges_internal(years))) def fmt_line(label_type: str, label: str, total_time: datetime.timedelta) -> str: hours = int(total_time / HOUR) minutes = int((total_time - hours*HOUR)/MINUTE) return f' {label_type:10} {label:40} {hours:-4d}h {minutes:-2d}m' def generate_report(samples: list[WorkSample], sample_filter = frozenset()) -> Iterator[str]: LABEL_FILTER = {} # Time spent per label time_per_label: dict[str, datetime.timedelta] = {} years_per_label: dict[str, set[int]] = {} prev_time = datetime.datetime.fromtimestamp(0, datetime.UTC) for sample in samples: est_time: datetime.timedelta = DEFAULT_EST_TIME est_time = min(sample.registered_at - prev_time, est_time) if len(sample_filter) == 0: pass elif not set(sample.labels).intersection(sample_filter): continue for label in sample.labels: time_per_label.setdefault(label, ZERO_DURATION) time_per_label[label] += est_time years_per_label.setdefault(label,set()).add(sample.registered_at.year) prev_time = sample.registered_at del sample, est_time time_and_label = [(duration, label) for label, duration in time_per_label.items()] time_and_label.sort(reverse=True) # yield '-' * 66 yield '\n' for total_time, label_and_type in time_and_label: if label_and_type.startswith(HIDDEN_LABEL_PREFIX): continue label_type, label = label_and_type.split(':', 1) if len(LABEL_FILTER) > 0 and label_type not in LABEL_FILTER: continue yield fmt_line(label_type, label, total_time) yield ' (' yield fmt_year_ranges(years_per_label.get(label_and_type,[])) yield ')' yield '\n' del label, total_time yield '-' * 66 yield '\n' yield fmt_line('', 'TOTAL', time_per_label.get(HIDDEN_LABEL_TOTAL, ZERO_DURATION)) yield '\n' def parse_arguments(): parser = argparse.ArgumentParser() parser.add_argument('repositories', action='extend', nargs='+', type=Path) parser.add_argument('--filter', action='extend', nargs='+', type=str, dest='sample_filter', default=[]) return parser.parse_args() def main(): logging.basicConfig() args = parse_arguments() shared_time_stamps: set[WorkSample] = set() for repo_path in args.repositories: try: repo = git.Repo(repo_path) except git.exc.InvalidGitRepositoryError: logger.warning('Ignoring non-repo %s', repo_path) continue logger.warning('Visit %s', repo_path) shared_time_stamps |= set(get_samples_from_project(repo)) shared_time_stamps = sorted(shared_time_stamps) for t in generate_report(shared_time_stamps, sample_filter=args.sample_filter): sys.stdout.write(t)