93 lines
2.3 KiB
Python
93 lines
2.3 KiB
Python
|
import datetime
|
||
|
import logging
|
||
|
from pathlib import Path
|
||
|
|
||
|
import bs4
|
||
|
|
||
|
from .data import Message
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
def parse_timestamp(c) -> datetime.time:
|
||
|
timestamp_obj = c
|
||
|
if c.font is not None:
|
||
|
c = c.font
|
||
|
m = re.match(r'\((\d+):(\d+):(\d+)\)', timestamp_obj.get_text())
|
||
|
return datetime.time(int(m.group(1)), int(m.group(2)), int(m.group(3)))
|
||
|
|
||
|
|
||
|
def parse_messages_in_chat_file(path: Path) -> list[Message]:
|
||
|
logger.info('Parsing %s', path)
|
||
|
chat_start = datetime.datetime.fromisoformat(
|
||
|
path.stem.removesuffix('CEST').removesuffix('CET'),
|
||
|
)
|
||
|
|
||
|
with open(path) as f:
|
||
|
soup = bs4.BeautifulSoup(f, 'lxml')
|
||
|
|
||
|
if len(soup.contents) == 0:
|
||
|
logger.warning('File is empty?')
|
||
|
return []
|
||
|
|
||
|
messages = []
|
||
|
|
||
|
cur_sent_at: datetime.datetime | None = None
|
||
|
cur_sender: str | None = None
|
||
|
cur_text: str = ''
|
||
|
|
||
|
if soup.body.p:
|
||
|
loglines = soup.body.p.children
|
||
|
else:
|
||
|
loglines = soup.body.children
|
||
|
|
||
|
for c in loglines:
|
||
|
if c.name in {'font', 'span'} and cur_sent_at is None:
|
||
|
# Get timestamp
|
||
|
cur_sent_at = datetime_sent(chat_start, parse_timestamp(c))
|
||
|
|
||
|
# Get sender
|
||
|
if c.b:
|
||
|
assert cur_sender is None
|
||
|
cur_sender = (
|
||
|
c.b.get_text()
|
||
|
.strip()
|
||
|
.removesuffix(':')
|
||
|
.removeprefix('***')
|
||
|
.removesuffix('[m]')
|
||
|
)
|
||
|
|
||
|
elif c.name in {None, 'span', 'font'}:
|
||
|
cur_text += c.get_text()
|
||
|
|
||
|
elif c.name == 'a':
|
||
|
cur_text += '<' + c['href'] + '>'
|
||
|
|
||
|
elif c.name == 'br':
|
||
|
if cur_sender:
|
||
|
messages.append(Message(cur_sent_at, cur_sender, cur_text.strip()))
|
||
|
cur_sent_at = None
|
||
|
cur_sender = None
|
||
|
cur_text = ''
|
||
|
|
||
|
elif c.name == 'b':
|
||
|
# Indicates system message. Ignore
|
||
|
pass
|
||
|
|
||
|
elif c.name in {'h1', 'h3'}:
|
||
|
pass # Ignore log header
|
||
|
|
||
|
else:
|
||
|
assert False, c
|
||
|
|
||
|
return messages
|
||
|
|
||
|
|
||
|
def parse_messages_in_chat_folder(chat_folder_path: Path) -> list[Message]:
|
||
|
messages = []
|
||
|
for file_path in sorted(chat_folder_path.iterdir()):
|
||
|
messages.extend(parse_messages_in_chat_file(file_path))
|
||
|
|
||
|
messages.sort()
|
||
|
return messages
|