import dataclasses import re import datetime from pathlib import Path import bs4 import logging logger = logging.getLogger(__name__) @dataclasses.dataclass(frozen=True, order=True) class Message: sent_at: datetime.datetime sender: str text: str def __post_init__(self): assert self.sent_at is not None assert self.sender is not None assert self.text is not None def datetime_sent(chat_start: datetime.datetime, message_sent: datetime.time) -> datetime.datetime: naive = datetime.datetime.combine(chat_start.date(), message_sent, chat_start.tzinfo) if chat_start.time() > message_sent: naive = naive - datetime.timedelta(days=1) return naive def parse_messages_in_chat_file(path: Path) -> list[Message]: logger.info('Parsing %s', path) chat_start = datetime.datetime.fromisoformat(path.stem.removesuffix('CEST')) with open(path) as f: soup = bs4.BeautifulSoup(f) messages = [] cur_sent_at: datetime.datetime | None = None cur_sender: str = 'NOT DEFINED' cur_text: str = '' for c in soup.body.children: if c.name == 'font': # Get timestamp m = re.match(r'\((\d+):(\d+):(\d+)\)', c.font.get_text()) time_sent = datetime.time(int(m.group(1)), int(m.group(2)), int(m.group(3))) cur_sent_at = datetime_sent(chat_start, time_sent) # Get sender cur_sender = c.b.get_text().strip().removesuffix(':') elif c.name is None: cur_text = c.get_text() elif c.name == 'a': cur_text = cur_text + '<' + c['href'] + '>' elif c.name == 'br': messages.append(Message(cur_sent_at, cur_sender, cur_text.strip())) cur_sent_at = None cur_sender = 'NOT DEFINED' cur_text = '' elif c.name == 'h3': pass # Ignore log header else: assert False, c return messages def parse_messages_in_chat_folder(chat_folder_path: Path) -> list[Message]: messages = [] for file_path in chat_folder_path.iterdir(): messages.extend(parse_messages_in_chat_file(file_path)) messages.sort() return messages def format_messages(messages: list[Message]) -> str: out = [ '# Chat 2018' '\n\n' ] for msg in messages: out.append(f'[[{msg.sent_at.date()}]] {msg.sent_at.time()} [[{msg.sender}]]:\n> {msg.text}\n') del msg return ''.join(out) MSG_ADJACENTCY_DIST = datetime.timedelta(minutes=2) def is_adjacent_messages(first, second): return first.sender == second.sender and second.sent_at - first.sent_at <= MSG_ADJACENTCY_DIST def merge_adjacent_messages(messages: list[Message]) -> list[Message]: out = [] for msg in messages: if out and is_adjacent_messages(out[-1], msg): out[-1] = dataclasses.replace(out[-1], text=out[-1].text + '\n\n' + msg.text) else: out.append(msg) return out