import argparse import dataclasses import logging from collections.abc import Iterable, Iterator from pathlib import Path from . import ( filter_useless_messages, libpurple, merge_adjacent_messages, synctech_sms, ) from .data import MYSELF, Message from .markdown import format_messages logger = logging.getLogger(__name__) def group_messages(messages: Iterable[Message], key) -> dict[str, list[Message]]: by_key: dict[str, list[Message]] = {} for msg in messages: by_key.setdefault(key(msg), []).append(msg) del msg return by_key def group_messages_by_chat_id(messages: Iterable[Message]) -> dict[str, list[Message]]: return group_messages(messages, key=lambda msg: msg.chat_id) def year_and_month_period_key(msg: Message): return f'{msg.sent_at.year}-{msg.sent_at.month:02}' def year_period_key(msg: Message): return f'{msg.sent_at.year}' def year_quarter_period_key(msg: Message): quarter = int((msg.sent_at.month - 1) / 3) + 1 return f'{msg.sent_at.year}-Q{quarter:01}' MAX_AVERAGE_MESSAGES_PER_PERIOD = 120 def group_messages_by_period(messages: Iterable[Message]) -> dict[str, list[Message]]: possible_period_keys = [ (lambda msg: 'full'), year_period_key, year_quarter_period_key, year_and_month_period_key, ] for period_key in possible_period_keys: grouped = group_messages(messages, key=period_key) average_num_messages = sum(len(grouped[k]) for k in grouped) / len(grouped) if average_num_messages <= MAX_AVERAGE_MESSAGES_PER_PERIOD: break del period_key, average_num_messages return grouped def replace_myself(messages: Iterable[Message], myself: str) -> Iterator[Message]: for msg in messages: if msg.sender == MYSELF: yield dataclasses.replace(msg, sender=myself) else: yield msg def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('--purple', type=Path, dest='purple_folder') parser.add_argument('--synctech', type=Path, dest='synctech_sms_backup_file') parser.add_argument('--output', type=Path) parser.add_argument('--myself', type=str, default='Myself') return parser.parse_args() def main(): logging.basicConfig() logging.getLogger().setLevel('INFO') args = parse_args() if args.purple_folder: all_messages = libpurple.parse_messages_in_chat_folder(args.purple_folder) elif args.synctech_sms_backup_file: all_messages = synctech_sms.parse_messages_in_backup_xml_file( args.synctech_sms_backup_file, ) else: logger.fatal('No input file given!') return all_messages = replace_myself(all_messages, myself=args.myself) all_messages = list(all_messages) logger.info('%d messages after loading', len(all_messages)) all_messages = list(filter_useless_messages(all_messages)) logger.info('%d messages after filtering', len(all_messages)) messages_by_chat_id = group_messages_by_chat_id(all_messages) logger.info('%d message groups', len(messages_by_chat_id)) del all_messages for chat_id, messages_in_chat_original in messages_by_chat_id.items(): messages_in_chat = merge_adjacent_messages(messages_in_chat_original) if len(messages_in_chat) <= 2: logger.info(' "%s": Skipped due to too few messages', chat_id) continue messages_by_period = group_messages_by_period(messages_in_chat) logger.info( ' "%s": %d messages, %d periods (%d msg/period avg)', chat_id, len(messages_in_chat_original), len(messages_by_period), len(messages_in_chat_original) / len(messages_by_period), ) for period_key, messages in messages_by_period.items(): file_escaped_chat_id = chat_id.replace(' ', '-') output_file = ( args.output / chat_id / f'{file_escaped_chat_id}-{period_key}.md' ) output_file.parent.mkdir(exist_ok=True) logger.info('Writing % 5d messages to %s', len(messages), output_file) with open(output_file, 'w') as f: f.write( format_messages( messages, title=f'{chat_id} - {period_key}', ), ) del period_key, messages, output_file del chat_id, messages_in_chat_original, messages_in_chat if __name__ == '__main__': main()