Implement SMS to message conversoin
This commit is contained in:
parent
c45726d666
commit
57cac8daa1
|
@ -1,19 +1,28 @@
|
|||
import argparse
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from collections.abc import Iterable
|
||||
|
||||
from . import (
|
||||
filter_useless_messages,
|
||||
format_messages,
|
||||
libpurple,
|
||||
merge_adjacent_messages,
|
||||
synctech_sms,
|
||||
)
|
||||
from .data import Message
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def group_messages_by_period(messages: list[Message]) -> dict[str, list[Message]]:
|
||||
def group_messages_by_chat_id(messages: Iterable[Message]) -> dict[str, list[Message]]:
|
||||
by_period: dict[str, list[Message]] = {}
|
||||
for msg in messages:
|
||||
by_period.setdefault(msg.chat_id, []).append(msg)
|
||||
del msg
|
||||
return by_period
|
||||
|
||||
def group_messages_by_period(messages: Iterable[Message]) -> dict[str, list[Message]]:
|
||||
by_period: dict[str, list[Message]] = {}
|
||||
for msg in messages:
|
||||
period_key = f'{msg.sent_at.year}-{msg.sent_at.month:02}'
|
||||
|
@ -24,7 +33,8 @@ def group_messages_by_period(messages: list[Message]) -> dict[str, list[Message]
|
|||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('path', type=Path)
|
||||
parser.add_argument('--purple', type=Path, dest='purple_folder')
|
||||
parser.add_argument('--synctech', type=Path, dest='synctech_sms_backup_file')
|
||||
parser.add_argument('--output', type=Path)
|
||||
return parser.parse_args()
|
||||
|
||||
|
@ -34,27 +44,42 @@ def main():
|
|||
logging.getLogger().setLevel('INFO')
|
||||
args = parse_args()
|
||||
|
||||
server = args.path.parent.name
|
||||
receipient = args.path.name
|
||||
if args.purple_folder:
|
||||
all_messages = libpurple.parse_messages_in_chat_folder(args.purple_folder)
|
||||
elif args.synctech_sms_backup_file:
|
||||
all_messages = synctech_sms.parse_messages_in_backup_xml_file(args.synctech_sms_backup_file)
|
||||
else:
|
||||
logger.fatal('No input file given!')
|
||||
return
|
||||
|
||||
all_messages = libpurple.parse_messages_in_chat_folder(args.path)
|
||||
all_messages = filter_useless_messages(all_messages)
|
||||
all_messages = merge_adjacent_messages(all_messages)
|
||||
all_messages = list(all_messages)
|
||||
logger.info('%d messages after loading', len(all_messages))
|
||||
|
||||
messages_by_period = group_messages_by_period(all_messages)
|
||||
all_messages = list(filter_useless_messages(all_messages))
|
||||
logger.info('%d messages after filtering', len(all_messages))
|
||||
|
||||
messages_by_chat_id = group_messages_by_chat_id(all_messages)
|
||||
logger.info('%d message groups', len(messages_by_chat_id))
|
||||
del all_messages
|
||||
|
||||
for chat_id, messages_in_chat_original in messages_by_chat_id.items():
|
||||
messages_in_chat = merge_adjacent_messages(messages_in_chat_original )
|
||||
|
||||
messages_by_period = group_messages_by_period(messages_in_chat)
|
||||
|
||||
for period_key, messages in messages_by_period.items():
|
||||
output_file = args.output / f'{server} - {receipient} - {period_key}.md'
|
||||
output_file = args.output / f'{chat_id} - {period_key}.md'
|
||||
logger.info('Writing % 5d messages to %s', len(messages), output_file)
|
||||
with open(output_file, 'w') as f:
|
||||
f.write(
|
||||
format_messages(
|
||||
messages,
|
||||
title=f'{server} - {receipient} - {period_key}',
|
||||
title=f'{chat_id} - {period_key}',
|
||||
),
|
||||
)
|
||||
|
||||
del period_key, messages, output_file
|
||||
del chat_id, messages_in_chat_original, messages_in_chat
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -2,13 +2,18 @@ import dataclasses
|
|||
import datetime
|
||||
|
||||
|
||||
MYSELF = 'MYSELF'
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True, order=True)
|
||||
class Message:
|
||||
sent_at: datetime.datetime
|
||||
sender: str
|
||||
text: str
|
||||
chat_id: str
|
||||
|
||||
def __post_init__(self):
|
||||
assert self.sent_at is not None
|
||||
assert self.sender is not None
|
||||
assert self.text is not None
|
||||
assert self.chat_id is not None
|
||||
|
|
|
@ -26,7 +26,7 @@ def parse_timestamp(c) -> datetime.time:
|
|||
return datetime.time(int(m.group(1)), int(m.group(2)), int(m.group(3)))
|
||||
|
||||
|
||||
def parse_messages_in_chat_file(path: Path) -> list[Message]:
|
||||
def parse_messages_in_chat_file(path: Path, chat_id: str) -> list[Message]:
|
||||
logger.info('Parsing %s', path)
|
||||
chat_start = datetime.datetime.fromisoformat(
|
||||
path.stem.removesuffix('CEST').removesuffix('CET'),
|
||||
|
@ -74,7 +74,8 @@ def parse_messages_in_chat_file(path: Path) -> list[Message]:
|
|||
|
||||
elif c.name == 'br':
|
||||
if cur_sender:
|
||||
messages.append(Message(cur_sent_at, cur_sender, cur_text.strip()))
|
||||
messages.append(Message(cur_sent_at, cur_sender,
|
||||
cur_text.strip(), chat_id))
|
||||
cur_sent_at = None
|
||||
cur_sender = None
|
||||
cur_text = ''
|
||||
|
@ -94,8 +95,11 @@ def parse_messages_in_chat_file(path: Path) -> list[Message]:
|
|||
|
||||
def parse_messages_in_chat_folder(chat_folder_path: Path) -> list[Message]:
|
||||
messages = []
|
||||
server = args.purple_folder.parent.name
|
||||
receipient = args.purple_folder.name
|
||||
chat_id = f'{server} - {receipient}'
|
||||
for file_path in sorted(chat_folder_path.iterdir()):
|
||||
messages.extend(parse_messages_in_chat_file(file_path))
|
||||
messages.extend(parse_messages_in_chat_file(file_path, chat_id))
|
||||
|
||||
messages.sort()
|
||||
return messages
|
||||
|
|
|
@ -7,22 +7,36 @@ standardized Message format.
|
|||
"""
|
||||
import datetime
|
||||
import logging
|
||||
from collections.abc import Iterator
|
||||
from pathlib import Path
|
||||
|
||||
import bs4
|
||||
|
||||
from .data import Message
|
||||
from .data import Message, MYSELF
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def sms_soup_to_message(soup: bs4.BeautifulSoup) -> Message:
|
||||
# TODO: Require myself
|
||||
sent_at = datetime.datetime.fromtimestamp(int(soup['date'])/1000)
|
||||
|
||||
def parse_messages_in_backup_xml_file(path: Path) -> list[Message]:
|
||||
if soup['type'] == '2':
|
||||
sender=MYSELF
|
||||
else:
|
||||
sender=soup.get('contact_name') or soup['address']
|
||||
|
||||
text = soup['body']
|
||||
chat_id = 'SMS ' + soup['address']
|
||||
return Message(sent_at,sender, text, chat_id = chat_id)
|
||||
|
||||
def parse_messages_in_backup_xml_file(path: Path) -> Iterator[Message]:
|
||||
logger.info('Parsing %s', path)
|
||||
|
||||
with open(path) as f:
|
||||
soup = bs4.BeautifulSoup(f, 'lxml-xml')
|
||||
|
||||
# TODO: Implement message parsing
|
||||
for sms in soup.find_all('sms'):
|
||||
yield sms_soup_to_message(sms)
|
||||
del sms
|
||||
|
||||
return []
|
||||
|
|
Loading…
Reference in New Issue
Block a user