Compare commits
2 Commits
0e98fe6225
...
c576687122
Author | SHA1 | Date | |
---|---|---|---|
c576687122 | |||
2e41125952 |
|
@ -27,6 +27,26 @@ logger = getLogger(__name__)
|
|||
Row = dict[str, Any]
|
||||
Rows = list[Row]
|
||||
|
||||
HOUR = datetime.timedelta(hours=1)
|
||||
MINUTE = datetime.timedelta(minutes=1)
|
||||
SECOND = datetime.timedelta(seconds=1)
|
||||
|
||||
def to_text_duration(duration: datetime.timedelta) -> str:
|
||||
hours = int(duration / HOUR)
|
||||
duration -= hours * HOUR
|
||||
minutes = int(duration / MINUTE)
|
||||
duration -= minutes * MINUTE
|
||||
seconds = int(duration / SECOND)
|
||||
|
||||
l = []
|
||||
if hours > 0:
|
||||
l.append(f'{hours} hours')
|
||||
if minutes > 0:
|
||||
l.append(f'{minutes} minutes')
|
||||
if seconds > 0:
|
||||
l.append(f'{seconds} seconds')
|
||||
return ' '.join(l)
|
||||
|
||||
|
||||
def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]:
|
||||
assert len(rows) > 0
|
||||
|
@ -84,7 +104,7 @@ def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int:
|
|||
|
||||
|
||||
def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
|
||||
MINIMUM = 300
|
||||
MINIMUM_STEPS = 300
|
||||
|
||||
num_updated = 0
|
||||
|
||||
|
@ -100,7 +120,7 @@ def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
|
|||
}
|
||||
|
||||
for date, steps in steps_per_date.items():
|
||||
if steps < MINIMUM:
|
||||
if steps < MINIMUM_STEPS:
|
||||
continue
|
||||
was_updated = vault.add_statistic(date, 'Steps', steps)
|
||||
if was_updated:
|
||||
|
@ -109,6 +129,40 @@ def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
|
|||
|
||||
return num_updated
|
||||
|
||||
def import_stepmania_steps_csv(vault: ObsidianVault, rows: Rows) -> int:
|
||||
num_updated = 0
|
||||
|
||||
rows_per_date = {}
|
||||
for row in rows:
|
||||
date = row['play.start'].date()
|
||||
rows_per_date.setdefault(date, [])
|
||||
rows_per_date[date].append(row)
|
||||
del date, row
|
||||
|
||||
COLUMNS = ['score.w1', 'score.w2', 'score.w3', 'score.w4', 'score.w5']
|
||||
|
||||
def all_steps(row: dict[str,int]):
|
||||
return sum(row[column] for column in COLUMNS)
|
||||
|
||||
steps_per_date = {
|
||||
date: sum(all_steps(row) for row in rows) for date, rows in rows_per_date.items()
|
||||
}
|
||||
|
||||
duration_per_date = {
|
||||
date: sum((row['play.duration'] for row in rows), start=datetime.timedelta()) for date, rows in rows_per_date.items()
|
||||
}
|
||||
print(steps_per_date)
|
||||
print(duration_per_date)
|
||||
|
||||
for date in steps_per_date:
|
||||
was_updated_1 = vault.add_statistic(date, 'Stepmania (Steps)', int(steps_per_date[date]))
|
||||
was_updated_2 = vault.add_statistic(date, 'Stepmania (Duration)', to_text_duration(duration_per_date[date]))
|
||||
if was_updated_1 or was_updated_2:
|
||||
num_updated += 1
|
||||
del date, was_updated_1, was_updated_2
|
||||
|
||||
return num_updated
|
||||
|
||||
|
||||
def escape_for_obsidian_link(link: str) -> str:
|
||||
return link.replace(':', ' ').replace('/', ' ').replace(' ', ' ')
|
||||
|
@ -167,18 +221,6 @@ def import_activity_sample_csv(
|
|||
return num_updated
|
||||
|
||||
|
||||
def import_activity_sample_csv_from_file(
|
||||
vault: ObsidianVault,
|
||||
data_path: Path,
|
||||
content_mapper,
|
||||
**kwargs,
|
||||
) -> int:
|
||||
rows = load_csv_file(data_path)
|
||||
logger.info('Loaded CSV with %d lines (%s)', len(rows), data_path)
|
||||
num_updated = import_activity_sample_csv(vault, rows, content_mapper, **kwargs)
|
||||
logger.info('Updated %d files', num_updated)
|
||||
|
||||
|
||||
def map_watched_series_content(sample: RealizedActivitySample) -> EventContent:
|
||||
subject = sample.single_label_with_category('series.name')
|
||||
comment = '{} Episode {}: *{}*'.format(
|
||||
|
@ -203,49 +245,36 @@ def map_games_played_content(sample: RealizedActivitySample) -> EventContent:
|
|||
)
|
||||
|
||||
|
||||
def import_watched_series_csv_from_file(vault: ObsidianVault) -> int:
|
||||
data_path = Path('output/show_episodes_watched.csv')
|
||||
return import_activity_sample_csv_from_file(
|
||||
vault,
|
||||
data_path,
|
||||
map_watched_series_content,
|
||||
PATH_WATCHED = Path('output/show_episodes_watched.csv')
|
||||
PATH_PLAYED = Path('output/games_played.csv')
|
||||
PATH_WORKOUT = Path('/home/jmaa/Notes/workout.csv')
|
||||
PATH_STEP_COUNTS = Path(
|
||||
'/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv',
|
||||
)
|
||||
PATH_STEPMANIA = Path('output/stepmania.csv')
|
||||
|
||||
|
||||
def import_played_games_csv_from_file(vault: ObsidianVault) -> int:
|
||||
data_path = Path('output/games_played.csv')
|
||||
if not data_path.exists():
|
||||
logger.warning('Skipping import of played games: %s is missing', data_path)
|
||||
return 0
|
||||
return import_activity_sample_csv_from_file(
|
||||
vault,
|
||||
data_path,
|
||||
map_games_played_content,
|
||||
group_category='game.name',
|
||||
)
|
||||
|
||||
IMPORTERS = [
|
||||
{'path': PATH_WORKOUT, 'import_rows': import_workout_csv},
|
||||
{'path': PATH_STEP_COUNTS, 'import_rows': import_step_counts_csv},
|
||||
{'path': PATH_STEPMANIA, 'import_rows': import_stepmania_steps_csv},
|
||||
{'path': PATH_PLAYED, 'import_rows': lambda vault, rows: import_activity_sample_csv(vault, rows, map_games_played_content, group_category='game.name',) },
|
||||
{'path': PATH_WATCHED, 'import_rows': lambda vault, rows: import_activity_sample_csv(vault, rows, map_watched_series_content) },
|
||||
]
|
||||
|
||||
def import_data(obsidian_path: Path, dry_run=True):
|
||||
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)
|
||||
|
||||
if False:
|
||||
data_path = Path('/home/jmaa/Notes/workout.csv')
|
||||
rows = load_csv_file(data_path)
|
||||
for import_def in IMPORTERS:
|
||||
if not import_def['path'].exists():
|
||||
logger.warning('Skipping %s: %s is missing', import_def['import_rows'], import_def['path'])
|
||||
continue
|
||||
rows = load_csv_file(import_def['path'])
|
||||
logger.info('Loaded CSV with %d lines', len(rows))
|
||||
num_updated = import_workout_csv(vault, rows)
|
||||
logger.info('Updated %d files', num_updated)
|
||||
num_files_updated = import_def['import_rows'](vault, rows)
|
||||
logger.info('Updated %d files', num_files_updated)
|
||||
del import_def, rows
|
||||
|
||||
if False:
|
||||
data_path = Path(
|
||||
'/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv',
|
||||
)
|
||||
rows = load_csv_file(data_path)
|
||||
logger.info('Loaded CSV with %d lines', len(rows))
|
||||
num_updated = import_step_counts_csv(vault, rows)
|
||||
logger.info('Updated %d files', num_updated)
|
||||
|
||||
import_played_games_csv_from_file(vault)
|
||||
import_watched_series_csv_from_file(vault)
|
||||
|
||||
num_dirty = len([f for f in vault.internal_file_text_cache.values() if f.is_dirty])
|
||||
logger.info('dirty files in cache: %d', num_dirty)
|
||||
|
|
|
@ -324,7 +324,7 @@ def parse_event_string(
|
|||
start_time = datetime.time.fromisoformat(m.group(1))
|
||||
end_time = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start_time
|
||||
else:
|
||||
logger.info('Could not parse format: %s', event_str)
|
||||
logger.debug('Could not parse format: %s', event_str)
|
||||
return Event(None, None, None, None, event_str)
|
||||
|
||||
start = datetime.datetime.combine(date, start_time, timezone).astimezone(
|
||||
|
|
Loading…
Reference in New Issue
Block a user