From 57cac8daa1e368499874f2c40222318c3cfb90fc Mon Sep 17 00:00:00 2001 From: Jon Michael Aanes Date: Thu, 31 Oct 2024 20:36:38 +0100 Subject: [PATCH] Implement SMS to message conversoin --- libpurple_to_markdown/__main__.py | 63 +++++++++++++++++++-------- libpurple_to_markdown/data.py | 5 +++ libpurple_to_markdown/libpurple.py | 10 +++-- libpurple_to_markdown/synctech_sms.py | 22 ++++++++-- 4 files changed, 74 insertions(+), 26 deletions(-) diff --git a/libpurple_to_markdown/__main__.py b/libpurple_to_markdown/__main__.py index 1d05056..d8c3829 100644 --- a/libpurple_to_markdown/__main__.py +++ b/libpurple_to_markdown/__main__.py @@ -1,19 +1,28 @@ import argparse import logging from pathlib import Path +from collections.abc import Iterable from . import ( filter_useless_messages, format_messages, libpurple, merge_adjacent_messages, + synctech_sms, ) from .data import Message logger = logging.getLogger(__name__) -def group_messages_by_period(messages: list[Message]) -> dict[str, list[Message]]: +def group_messages_by_chat_id(messages: Iterable[Message]) -> dict[str, list[Message]]: + by_period: dict[str, list[Message]] = {} + for msg in messages: + by_period.setdefault(msg.chat_id, []).append(msg) + del msg + return by_period + +def group_messages_by_period(messages: Iterable[Message]) -> dict[str, list[Message]]: by_period: dict[str, list[Message]] = {} for msg in messages: period_key = f'{msg.sent_at.year}-{msg.sent_at.month:02}' @@ -24,7 +33,8 @@ def group_messages_by_period(messages: list[Message]) -> dict[str, list[Message] def parse_args(): parser = argparse.ArgumentParser() - parser.add_argument('path', type=Path) + parser.add_argument('--purple', type=Path, dest='purple_folder') + parser.add_argument('--synctech', type=Path, dest='synctech_sms_backup_file') parser.add_argument('--output', type=Path) return parser.parse_args() @@ -34,27 +44,42 @@ def main(): logging.getLogger().setLevel('INFO') args = parse_args() - server = args.path.parent.name - receipient = args.path.name + if args.purple_folder: + all_messages = libpurple.parse_messages_in_chat_folder(args.purple_folder) + elif args.synctech_sms_backup_file: + all_messages = synctech_sms.parse_messages_in_backup_xml_file(args.synctech_sms_backup_file) + else: + logger.fatal('No input file given!') + return - all_messages = libpurple.parse_messages_in_chat_folder(args.path) - all_messages = filter_useless_messages(all_messages) - all_messages = merge_adjacent_messages(all_messages) + all_messages = list(all_messages) + logger.info('%d messages after loading', len(all_messages)) - messages_by_period = group_messages_by_period(all_messages) + all_messages = list(filter_useless_messages(all_messages)) + logger.info('%d messages after filtering', len(all_messages)) - for period_key, messages in messages_by_period.items(): - output_file = args.output / f'{server} - {receipient} - {period_key}.md' - logger.info('Writing % 5d messages to %s', len(messages), output_file) - with open(output_file, 'w') as f: - f.write( - format_messages( - messages, - title=f'{server} - {receipient} - {period_key}', - ), - ) + messages_by_chat_id = group_messages_by_chat_id(all_messages) + logger.info('%d message groups', len(messages_by_chat_id)) + del all_messages - del period_key, messages, output_file + for chat_id, messages_in_chat_original in messages_by_chat_id.items(): + messages_in_chat = merge_adjacent_messages(messages_in_chat_original ) + + messages_by_period = group_messages_by_period(messages_in_chat) + + for period_key, messages in messages_by_period.items(): + output_file = args.output / f'{chat_id} - {period_key}.md' + logger.info('Writing % 5d messages to %s', len(messages), output_file) + with open(output_file, 'w') as f: + f.write( + format_messages( + messages, + title=f'{chat_id} - {period_key}', + ), + ) + + del period_key, messages, output_file + del chat_id, messages_in_chat_original, messages_in_chat if __name__ == '__main__': diff --git a/libpurple_to_markdown/data.py b/libpurple_to_markdown/data.py index 7614de8..edbff58 100644 --- a/libpurple_to_markdown/data.py +++ b/libpurple_to_markdown/data.py @@ -2,13 +2,18 @@ import dataclasses import datetime +MYSELF = 'MYSELF' + + @dataclasses.dataclass(frozen=True, order=True) class Message: sent_at: datetime.datetime sender: str text: str + chat_id: str def __post_init__(self): assert self.sent_at is not None assert self.sender is not None assert self.text is not None + assert self.chat_id is not None diff --git a/libpurple_to_markdown/libpurple.py b/libpurple_to_markdown/libpurple.py index 02403c0..c2f265d 100644 --- a/libpurple_to_markdown/libpurple.py +++ b/libpurple_to_markdown/libpurple.py @@ -26,7 +26,7 @@ def parse_timestamp(c) -> datetime.time: return datetime.time(int(m.group(1)), int(m.group(2)), int(m.group(3))) -def parse_messages_in_chat_file(path: Path) -> list[Message]: +def parse_messages_in_chat_file(path: Path, chat_id: str) -> list[Message]: logger.info('Parsing %s', path) chat_start = datetime.datetime.fromisoformat( path.stem.removesuffix('CEST').removesuffix('CET'), @@ -74,7 +74,8 @@ def parse_messages_in_chat_file(path: Path) -> list[Message]: elif c.name == 'br': if cur_sender: - messages.append(Message(cur_sent_at, cur_sender, cur_text.strip())) + messages.append(Message(cur_sent_at, cur_sender, + cur_text.strip(), chat_id)) cur_sent_at = None cur_sender = None cur_text = '' @@ -94,8 +95,11 @@ def parse_messages_in_chat_file(path: Path) -> list[Message]: def parse_messages_in_chat_folder(chat_folder_path: Path) -> list[Message]: messages = [] + server = args.purple_folder.parent.name + receipient = args.purple_folder.name + chat_id = f'{server} - {receipient}' for file_path in sorted(chat_folder_path.iterdir()): - messages.extend(parse_messages_in_chat_file(file_path)) + messages.extend(parse_messages_in_chat_file(file_path, chat_id)) messages.sort() return messages diff --git a/libpurple_to_markdown/synctech_sms.py b/libpurple_to_markdown/synctech_sms.py index 4b949a1..ec29161 100644 --- a/libpurple_to_markdown/synctech_sms.py +++ b/libpurple_to_markdown/synctech_sms.py @@ -7,22 +7,36 @@ standardized Message format. """ import datetime import logging +from collections.abc import Iterator from pathlib import Path import bs4 -from .data import Message +from .data import Message, MYSELF logger = logging.getLogger(__name__) +def sms_soup_to_message(soup: bs4.BeautifulSoup) -> Message: + # TODO: Require myself + sent_at = datetime.datetime.fromtimestamp(int(soup['date'])/1000) -def parse_messages_in_backup_xml_file(path: Path) -> list[Message]: + if soup['type'] == '2': + sender=MYSELF + else: + sender=soup.get('contact_name') or soup['address'] + + text = soup['body'] + chat_id = 'SMS ' + soup['address'] + return Message(sent_at,sender, text, chat_id = chat_id) + +def parse_messages_in_backup_xml_file(path: Path) -> Iterator[Message]: logger.info('Parsing %s', path) with open(path) as f: soup = bs4.BeautifulSoup(f, 'lxml-xml') - # TODO: Implement message parsing + for sms in soup.find_all('sms'): + yield sms_soup_to_message(sms) + del sms - return []