import argparse import dataclasses import datetime import logging from collections.abc import Callable, Iterable, Iterator, Mapping from pathlib import Path from . import ( filter_useless_messages, libpurple, merge_adjacent_messages, synctech_sms, ) from .data import MYSELF, Message from .markdown import format_messages logger = logging.getLogger(__name__) def group_messages( messages: Iterable[Message], key_fn: Callable[[Message], str], ) -> dict[str, list[Message]]: by_key: dict[str, list[Message]] = {} for msg in messages: by_key.setdefault(key_fn(msg), []).append(msg) del msg return by_key def group_messages_by_chat_id(messages: Iterable[Message]) -> dict[str, list[Message]]: return group_messages(messages, key_fn=lambda msg: msg.chat_id) def year_and_month_period_key(msg: Message): return f'{msg.sent_at.year}-{msg.sent_at.month:02}' def year_period_key(msg: Message): return f'{msg.sent_at.year}' def year_quarter_period_key(msg: Message): quarter = int((msg.sent_at.month - 1) / 3) + 1 return f'{msg.sent_at.year}-Q{quarter:01}' MAX_AVERAGE_MESSAGES_PER_PERIOD = 120 TOO_FEW_MESSAGES_TO_CARE = 2 PERIOD_KEYS_BY_NAME: Mapping[str, Callable[[Message], str]] = { 'full': (lambda msg: 'full'), 'year': year_period_key, 'quarter': year_quarter_period_key, 'month': year_and_month_period_key, } def group_messages_by_period( messages: Iterable[Message], period_key: str | None = None, ) -> tuple[dict[str, list[Message]], Callable[[Message], str]]: # Determine key function possible_period_keys: Iterable[Callable[[Message], str]] = ( PERIOD_KEYS_BY_NAME.values() ) if period_key is not None: possible_period_keys = [PERIOD_KEYS_BY_NAME[period_key]] del period_key # Group by key for period_key_fn in possible_period_keys: grouped = group_messages(messages, key_fn=period_key_fn) average_num_messages = sum(len(grouped[k]) for k in grouped) / len(grouped) if average_num_messages <= MAX_AVERAGE_MESSAGES_PER_PERIOD: break del average_num_messages return grouped, period_key_fn def replace_myself(messages: Iterable[Message], myself: str) -> Iterator[Message]: for msg in messages: if msg.sender == MYSELF: yield dataclasses.replace(msg, sender=myself) else: yield msg def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('--purple', type=Path, dest='purple_folder') parser.add_argument('--synctech', type=Path, dest='synctech_sms_backup_file') parser.add_argument('--output', type=Path) parser.add_argument('--myself', type=str, default='Myself') parser.add_argument('--overwrite', action='store_true', dest='overwrite_files') parser.add_argument( '--period', dest='period_key', choices=list(PERIOD_KEYS_BY_NAME.keys()), ) parser.add_argument( '--skip-this-period', action='store_true', dest='skip_this_period', ) return parser.parse_args() def main(): logging.basicConfig() logging.getLogger().setLevel('INFO') args = parse_args() if args.purple_folder: all_messages = libpurple.parse_messages_in_chat_folder(args.purple_folder) elif args.synctech_sms_backup_file: all_messages = synctech_sms.parse_messages_in_backup_xml_file( args.synctech_sms_backup_file, ) else: logger.fatal('No input file given!') return all_messages = replace_myself(all_messages, myself=args.myself) all_messages = list(all_messages) logger.info('%d messages after loading', len(all_messages)) all_messages = list(filter_useless_messages(all_messages)) logger.info('%d messages after filtering', len(all_messages)) messages_by_chat_id = group_messages_by_chat_id(all_messages) logger.info('%d message groups', len(messages_by_chat_id)) del all_messages for chat_id, messages_in_chat_original in messages_by_chat_id.items(): messages_in_chat = merge_adjacent_messages(messages_in_chat_original) if len(messages_in_chat) <= TOO_FEW_MESSAGES_TO_CARE: logger.info(' "%s": Skipped due to too few messages', chat_id) continue messages_by_period, period_key_fn = group_messages_by_period( messages_in_chat, args.period_key, ) logger.info( ' "%s": %d messages, %d periods (%d msg/period avg)', chat_id, len(messages_in_chat_original), len(messages_by_period), len(messages_in_chat_original) / len(messages_by_period), ) this_period_name = period_key_fn(Message(datetime.datetime.now(), '', '', '')) for period_key_name, messages in messages_by_period.items(): file_escaped_chat_id = chat_id.replace(' ', '-') output_file = ( args.output / chat_id / f'{file_escaped_chat_id}-{period_key_name}.md' ) logger.info('Writing % 5d messages to %s', len(messages), output_file) if this_period_name == period_key_name: logger.info('Skipping due to --skip-this-period: %s', output_file) continue if output_file.exists() and not args.overwrite_files: logger.info('Skipping existing file: %s', output_file) continue # Create folders and file output_file.parent.mkdir(exist_ok=True, parents=True) with open(output_file, 'w') as f: f.write( format_messages( messages, title=f'{chat_id} - {period_key_name}', ), ) del period_key_name, messages, output_file del chat_id, messages_in_chat_original, messages_in_chat if __name__ == '__main__': main()