1
0
libpurple-to-markdown/libpurple_to_markdown/__main__.py
2024-11-03 17:15:03 +01:00

145 lines
4.5 KiB
Python

import argparse
import dataclasses
import logging
from collections.abc import Iterable, Iterator
from pathlib import Path
from . import (
filter_useless_messages,
format_messages,
libpurple,
merge_adjacent_messages,
synctech_sms,
)
from .data import MYSELF, Message
logger = logging.getLogger(__name__)
def group_messages(messages: Iterable[Message], key) -> dict[str, list[Message]]:
by_key: dict[str, list[Message]] = {}
for msg in messages:
by_key.setdefault(key(msg), []).append(msg)
del msg
return by_key
def group_messages_by_chat_id(messages: Iterable[Message]) -> dict[str, list[Message]]:
return group_messages(messages, key=lambda msg: msg.chat_id)
def year_and_month_period_key(msg: Message):
return f'{msg.sent_at.year}-{msg.sent_at.month:02}'
def year_period_key(msg: Message):
return f'{msg.sent_at.year}'
def year_quarter_period_key(msg: Message):
quarter = int((msg.sent_at.month - 1) / 3) + 1
return f'{msg.sent_at.year}-Q{quarter:01}'
MAX_AVERAGE_MESSAGES_PER_PERIOD = 120
def group_messages_by_period(messages: Iterable[Message]) -> dict[str, list[Message]]:
possible_period_keys = [
(lambda msg: 'full'),
year_period_key,
year_quarter_period_key,
year_and_month_period_key,
]
for period_key in possible_period_keys:
grouped = group_messages(messages, key=period_key)
average_num_messages = sum(len(grouped[k]) for k in grouped) / len(grouped)
if average_num_messages <= MAX_AVERAGE_MESSAGES_PER_PERIOD:
break
del period_key, average_num_messages
return grouped
def replace_myself(messages: Iterable[Message], myself: str) -> Iterator[Message]:
for msg in messages:
if msg.sender == MYSELF:
yield dataclasses.replace(msg, sender=myself)
else:
yield msg
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--purple', type=Path, dest='purple_folder')
parser.add_argument('--synctech', type=Path, dest='synctech_sms_backup_file')
parser.add_argument('--output', type=Path)
parser.add_argument('--myself', type=str, default='Myself')
return parser.parse_args()
def main():
logging.basicConfig()
logging.getLogger().setLevel('INFO')
args = parse_args()
if args.purple_folder:
all_messages = libpurple.parse_messages_in_chat_folder(args.purple_folder)
elif args.synctech_sms_backup_file:
all_messages = synctech_sms.parse_messages_in_backup_xml_file(
args.synctech_sms_backup_file,
)
else:
logger.fatal('No input file given!')
return
all_messages = replace_myself(all_messages, myself=args.myself)
all_messages = list(all_messages)
logger.info('%d messages after loading', len(all_messages))
all_messages = list(filter_useless_messages(all_messages))
logger.info('%d messages after filtering', len(all_messages))
messages_by_chat_id = group_messages_by_chat_id(all_messages)
logger.info('%d message groups', len(messages_by_chat_id))
del all_messages
for chat_id, messages_in_chat_original in messages_by_chat_id.items():
messages_in_chat = merge_adjacent_messages(messages_in_chat_original)
if len(messages_in_chat) <= 2:
logger.info(' "%s": Skipped due to too few messages', chat_id)
continue
messages_by_period = group_messages_by_period(messages_in_chat)
logger.info(
' "%s": %d messages, %d periods (%d msg/period avg)',
chat_id,
len(messages_in_chat_original),
len(messages_by_period),
len(messages_in_chat_original) / len(messages_by_period),
)
for period_key, messages in messages_by_period.items():
file_escaped_chat_id = chat_id.replace(' ', '-')
output_file = (
args.output / chat_id / f'{file_escaped_chat_id}-{period_key}.md'
)
output_file.parent.mkdir(exist_ok=True)
logger.info('Writing % 5d messages to %s', len(messages), output_file)
with open(output_file, 'w') as f:
f.write(
format_messages(
messages,
title=f'{chat_id} - {period_key}',
),
)
del period_key, messages, output_file
del chat_id, messages_in_chat_original, messages_in_chat
if __name__ == '__main__':
main()