1
0
libpurple-to-markdown/libpurple_to_markdown/__init__.py
2024-10-26 14:53:31 +02:00

193 lines
5.3 KiB
Python

import dataclasses
import datetime
from collections.abc import Iterator, Iterable
import logging
import re
from pathlib import Path
import bs4
from ._version import __version__
__all__ = ['__version__']
logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True, order=True)
class Message:
sent_at: datetime.datetime
sender: str
text: str
def __post_init__(self):
assert self.sent_at is not None
assert self.sender is not None
assert self.text is not None
def datetime_sent(
chat_start: datetime.datetime, message_sent: datetime.time,
) -> datetime.datetime:
naive = datetime.datetime.combine(
chat_start.date(), message_sent, chat_start.tzinfo,
)
if chat_start.time() > message_sent:
naive = naive + datetime.timedelta(days=1)
return naive
def parse_timestamp(c) -> datetime.time:
timestamp_obj = c
if c.font is not None:
c = c.font
m = re.match(r'\((\d+):(\d+):(\d+)\)', timestamp_obj.get_text())
return datetime.time(int(m.group(1)), int(m.group(2)), int(m.group(3)))
def parse_messages_in_chat_file(path: Path) -> list[Message]:
logger.info('Parsing %s', path)
chat_start = datetime.datetime.fromisoformat(
path.stem.removesuffix('CEST').removesuffix('CET'),
)
with open(path) as f:
soup = bs4.BeautifulSoup(f, 'lxml')
if len(soup.contents) == 0:
logger.warning('File is empty?')
return []
messages = []
cur_sent_at: datetime.datetime | None = None
cur_sender: str | None = None
cur_text: str = ''
if soup.body.p:
loglines = soup.body.p.children
else:
loglines = soup.body.children
for c in loglines:
if c.name in {'font', 'span'} and cur_sent_at is None:
# Get timestamp
cur_sent_at = datetime_sent(chat_start, parse_timestamp(c))
# Get sender
if c.b:
assert cur_sender is None
cur_sender = c.b.get_text().strip().removesuffix(':').removeprefix('***').removesuffix('[m]')
elif c.name in {None, 'span', 'font'}:
cur_text += c.get_text()
elif c.name == 'a':
cur_text += '<' + c['href'] + '>'
elif c.name == 'br':
if cur_sender:
messages.append(Message(cur_sent_at, cur_sender, cur_text.strip()))
cur_sent_at = None
cur_sender = None
cur_text = ''
elif c.name == 'b':
# Indicates system message. Ignore
pass
elif c.name in {'h1', 'h3'}:
pass # Ignore log header
else:
assert False, c
return messages
def parse_messages_in_chat_folder(chat_folder_path: Path) -> list[Message]:
messages = []
for file_path in sorted(chat_folder_path.iterdir()):
messages.extend(parse_messages_in_chat_file(file_path))
messages.sort()
return messages
def format_message_as_citation(out: list[str], msg: Message) -> None:
out.append(f'{msg.sent_at.date()} {msg.sent_at.time()} [[{msg.sender}]]:')
out.append('\n')
for line in msg.text.split('\n'):
line = re.sub(r'(<[\w ]+>)', r'`\1`', line)
line = re.sub(r'(\$\$\$)', r'`\1`', line)
out.append(f'> {line}\n')
del line
out.append('\n')
def format_message_as_table(out: list[str], msg: Message) -> None:
out.append(f'| {msg.sent_at} | [[{msg.sender}]] | ')
for line in msg.text.split('\n'):
out.append(f'{line}')
del line
out.append('|\n')
def format_messages(messages: list[Message], title: str) -> str:
out = ['# ', title, '\n\n']
as_table = False
for msg_idx, msg in enumerate(messages):
if msg_idx == 0 or messages[msg_idx - 1].sent_at.date() != msg.sent_at.date():
out.append('---\n')
out.append(f'## [[{msg.sent_at.date()}]]\n\n')
if as_table:
out.append('| sent at | sender | text |\n')
out.append('| ------- | ------ | ---- |\n')
if as_table:
format_message_as_table(out, msg)
else:
format_message_as_citation(out, msg)
del msg
return ''.join(out)
MSG_ADJACENTCY_DIST = datetime.timedelta(minutes=2)
def is_useless_message(msg: Message) -> bool:
return msg.sender.endswith('<AUTO-REPLY>') or msg.sender == ''
def filter_useless_messages(messages: Iterable[Message]) -> Iterator[Message]:
for msg in messages:
if not is_useless_message(msg):
yield msg
else:
print(msg.text)
def is_adjacent_messages(first: Message, second: Message) -> bool:
return (
first.sender == second.sender
and second.sent_at - first.sent_at <= MSG_ADJACENTCY_DIST
)
def merge_texts(text1: str, text2: str) -> str:
punctuated = text1.endswith('.?!,:')
# return text1 + (' ' if punctuated else '. ') + text2
return text1 + (' ' if punctuated else '. ') + '\n' + text2
def merge_adjacent_messages(messages: Iterable[Message]) -> list[Message]:
out = []
for msg in messages:
if out and is_adjacent_messages(out[-1], msg):
out[-1] = dataclasses.replace(
out[-1], text=merge_texts(out[-1].text, msg.text),
)
else:
out.append(msg)
return out