|
|
|
@ -1,16 +1,20 @@
|
|
|
|
|
import argparse
|
|
|
|
|
import dataclasses
|
|
|
|
|
import datetime
|
|
|
|
|
import logging
|
|
|
|
|
import sys
|
|
|
|
|
import time
|
|
|
|
|
import dataclasses
|
|
|
|
|
import git
|
|
|
|
|
import datetime
|
|
|
|
|
from collections.abc import Iterator, Sequence
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
|
import git
|
|
|
|
|
import logging
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
def parse_arguments():
|
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
|
parser.add_argument('repositories', action='extend', nargs='+', type=Path)
|
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True, order=True)
|
|
|
|
|
class WorkSample:
|
|
|
|
@ -25,18 +29,15 @@ def determine_default(repo: git.Repo):
|
|
|
|
|
except:
|
|
|
|
|
return 'master'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
HIDDEN_LABEL_PREFIX = '__'
|
|
|
|
|
HIDDEN_LABEL_TOTAL = HIDDEN_LABEL_PREFIX + 'TOTAL'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def determine_project_name(repo: git.Repo) -> str:
|
|
|
|
|
remotes = repo.remotes
|
|
|
|
|
if len(remotes) > 0:
|
|
|
|
|
return remotes.origin.url.removeprefix('git@gitfub.space:')
|
|
|
|
|
return Path(repo.working_tree_dir).name
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]:
|
|
|
|
|
project_name = determine_project_name(repo)
|
|
|
|
|
assert project_name is not None
|
|
|
|
@ -48,80 +49,46 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]:
|
|
|
|
|
for commit in repo.iter_commits(determine_default(repo)):
|
|
|
|
|
labels = [HIDDEN_LABEL_TOTAL]
|
|
|
|
|
labels.append('project:' + project_name)
|
|
|
|
|
labels.append('author:' + commit.author.email)
|
|
|
|
|
yield WorkSample(
|
|
|
|
|
datetime.datetime.fromtimestamp(commit.authored_date, tz=datetime.UTC),
|
|
|
|
|
tuple(labels),
|
|
|
|
|
)
|
|
|
|
|
yield WorkSample(
|
|
|
|
|
datetime.datetime.fromtimestamp(commit.committed_date, tz=datetime.UTC),
|
|
|
|
|
tuple(labels),
|
|
|
|
|
)
|
|
|
|
|
labels.append('author:' + commit.author.email)
|
|
|
|
|
yield WorkSample(datetime.datetime.fromtimestamp(commit.authored_date, tz=datetime.UTC), tuple(labels))
|
|
|
|
|
yield WorkSample(datetime.datetime.fromtimestamp(commit.committed_date, tz=datetime.UTC), tuple(labels))
|
|
|
|
|
del labels
|
|
|
|
|
|
|
|
|
|
DEFAULT_EST_TIME=datetime.timedelta(hours=1)
|
|
|
|
|
|
|
|
|
|
DEFAULT_EST_TIME = datetime.timedelta(hours=1)
|
|
|
|
|
ZERO_DURATION = datetime.timedelta(seconds = 0)
|
|
|
|
|
HOUR = datetime.timedelta(hours = 1)
|
|
|
|
|
|
|
|
|
|
ZERO_DURATION = datetime.timedelta(seconds=0)
|
|
|
|
|
HOUR = datetime.timedelta(hours=1)
|
|
|
|
|
MINUTE = datetime.timedelta(minutes=1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def fmt_year_ranges_internal(years: list[int]) -> Iterator[str]:
|
|
|
|
|
years = sorted(years)
|
|
|
|
|
for idx, year in enumerate(years):
|
|
|
|
|
at_end = idx == len(years) - 1
|
|
|
|
|
range_before = idx > 0 and years[idx-1] == year - 1
|
|
|
|
|
range_after = not at_end and years[idx+1] == year + 1
|
|
|
|
|
|
|
|
|
|
if not range_before or not range_after:
|
|
|
|
|
yield str(year)
|
|
|
|
|
|
|
|
|
|
if not at_end:
|
|
|
|
|
if not range_before and range_after:
|
|
|
|
|
yield '-'
|
|
|
|
|
elif not range_after:
|
|
|
|
|
yield ','
|
|
|
|
|
|
|
|
|
|
def fmt_year_ranges(years: list[int]) -> str:
|
|
|
|
|
return ''.join(list(fmt_year_ranges_internal(years)))
|
|
|
|
|
|
|
|
|
|
def fmt_line(label_type: str, label: str, total_time: datetime.timedelta) -> str:
|
|
|
|
|
hours = int(total_time / HOUR)
|
|
|
|
|
minutes = int((total_time - hours*HOUR)/MINUTE)
|
|
|
|
|
return f' {label_type:10} {label:40} {hours:-4d}h {minutes:-2d}m'
|
|
|
|
|
|
|
|
|
|
def generate_report(samples: list[WorkSample], sample_filter = frozenset()) -> Iterator[str]:
|
|
|
|
|
def generate_report(samples: list[WorkSample]) -> Iterator[str]:
|
|
|
|
|
SAMPLE_FILTER = {}
|
|
|
|
|
LABEL_FILTER = {}
|
|
|
|
|
|
|
|
|
|
# Time spent per label
|
|
|
|
|
time_per_label: dict[str, datetime.timedelta] = {}
|
|
|
|
|
years_per_label: dict[str, set[int]] = {}
|
|
|
|
|
prev_time = datetime.datetime.fromtimestamp(0, datetime.UTC)
|
|
|
|
|
for sample in samples:
|
|
|
|
|
est_time: datetime.timedelta = DEFAULT_EST_TIME
|
|
|
|
|
est_time = min(sample.registered_at - prev_time, est_time)
|
|
|
|
|
|
|
|
|
|
if len(sample_filter) == 0:
|
|
|
|
|
if len(SAMPLE_FILTER) == 0:
|
|
|
|
|
pass
|
|
|
|
|
elif not set(sample.labels).intersection(sample_filter):
|
|
|
|
|
elif not set(sample.labels).intersection(SAMPLE_FILTER):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
for label in sample.labels:
|
|
|
|
|
time_per_label.setdefault(label, ZERO_DURATION)
|
|
|
|
|
time_per_label.setdefault(label,ZERO_DURATION)
|
|
|
|
|
time_per_label[label] += est_time
|
|
|
|
|
years_per_label.setdefault(label,set()).add(sample.registered_at.year)
|
|
|
|
|
|
|
|
|
|
prev_time = sample.registered_at
|
|
|
|
|
del sample, est_time
|
|
|
|
|
|
|
|
|
|
time_and_label = [(duration, label) for label, duration in time_per_label.items()]
|
|
|
|
|
time_and_label = [(duration, label) for label,duration in time_per_label.items()]
|
|
|
|
|
time_and_label.sort(reverse=True)
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
|
yield '-' * 66
|
|
|
|
|
yield '\n'
|
|
|
|
|
for total_time, label_and_type in time_and_label:
|
|
|
|
|
for (total_time, label_and_type) in time_and_label:
|
|
|
|
|
if label_and_type.startswith(HIDDEN_LABEL_PREFIX):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
@ -130,26 +97,15 @@ def generate_report(samples: list[WorkSample], sample_filter = frozenset()) -> I
|
|
|
|
|
if len(LABEL_FILTER) > 0 and label_type not in LABEL_FILTER:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
yield fmt_line(label_type, label, total_time)
|
|
|
|
|
yield ' ('
|
|
|
|
|
yield fmt_year_ranges(years_per_label.get(label_and_type,[]))
|
|
|
|
|
yield ')'
|
|
|
|
|
yield '\n'
|
|
|
|
|
label_type = '' # TODO
|
|
|
|
|
|
|
|
|
|
yield f' {label_type:8} {label:40} {total_time / HOUR:-4.2f} hours\n'
|
|
|
|
|
del label, total_time
|
|
|
|
|
|
|
|
|
|
yield '-' * 66
|
|
|
|
|
yield '\n'
|
|
|
|
|
|
|
|
|
|
yield fmt_line('', 'TOTAL', time_per_label.get(HIDDEN_LABEL_TOTAL, ZERO_DURATION))
|
|
|
|
|
yield '\n'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_arguments():
|
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
|
parser.add_argument('repositories', action='extend', nargs='+', type=Path)
|
|
|
|
|
parser.add_argument('--filter', action='extend', nargs='+', type=str, dest='sample_filter', default=[])
|
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
|
|
|
|
yield ' {label_type:8} {label:40} {hours:-4.0f} hours\n'.format(label_type='', label='TOTAL', hours = time_per_label.get(HIDDEN_LABEL_TOTAL, ZERO_DURATION) / HOUR)
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
logging.basicConfig()
|
|
|
|
@ -168,5 +124,5 @@ def main():
|
|
|
|
|
|
|
|
|
|
shared_time_stamps = sorted(shared_time_stamps)
|
|
|
|
|
|
|
|
|
|
for t in generate_report(shared_time_stamps, sample_filter=args.sample_filter):
|
|
|
|
|
for t in generate_report(shared_time_stamps):
|
|
|
|
|
sys.stdout.write(t)
|
|
|
|
|