170 lines
5.8 KiB
Python
170 lines
5.8 KiB
Python
import argparse
|
|
import dataclasses
|
|
import logging
|
|
import datetime
|
|
from collections.abc import Iterable, Iterator, Mapping
|
|
from typing import Callable
|
|
from pathlib import Path
|
|
|
|
from . import (
|
|
filter_useless_messages,
|
|
libpurple,
|
|
merge_adjacent_messages,
|
|
synctech_sms,
|
|
)
|
|
from .data import MYSELF, Message
|
|
from .markdown import format_messages
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def group_messages(messages: Iterable[Message], key_fn: Callable[[Message], str]) -> dict[str, list[Message]]:
|
|
by_key: dict[str, list[Message]] = {}
|
|
for msg in messages:
|
|
by_key.setdefault(key_fn(msg), []).append(msg)
|
|
del msg
|
|
return by_key
|
|
|
|
|
|
def group_messages_by_chat_id(messages: Iterable[Message]) -> dict[str, list[Message]]:
|
|
return group_messages(messages, key_fn=lambda msg: msg.chat_id)
|
|
|
|
|
|
def year_and_month_period_key(msg: Message):
|
|
return f'{msg.sent_at.year}-{msg.sent_at.month:02}'
|
|
|
|
|
|
def year_period_key(msg: Message):
|
|
return f'{msg.sent_at.year}'
|
|
|
|
|
|
def year_quarter_period_key(msg: Message):
|
|
quarter = int((msg.sent_at.month - 1) / 3) + 1
|
|
return f'{msg.sent_at.year}-Q{quarter:01}'
|
|
|
|
|
|
MAX_AVERAGE_MESSAGES_PER_PERIOD = 120
|
|
|
|
TOO_FEW_MESSAGES_TO_CARE = 2
|
|
|
|
PERIOD_KEYS_BY_NAME: Mapping[str, Callable[[Message], str]] = {
|
|
'full': (lambda msg: 'full'),
|
|
'year': year_period_key,
|
|
'quarter': year_quarter_period_key,
|
|
'month': year_and_month_period_key,
|
|
}
|
|
|
|
|
|
def group_messages_by_period(messages: Iterable[Message], period_key: str | None = None) -> tuple[dict[str, list[Message]], Callable[[Message], str]]:
|
|
# Determine key function
|
|
possible_period_keys: Iterable[Callable[[Message], str]] = PERIOD_KEYS_BY_NAME.values()
|
|
if period_key is not None:
|
|
possible_period_keys = [PERIOD_KEYS_BY_NAME[period_key]]
|
|
del period_key
|
|
|
|
# Group by key
|
|
for period_key_fn in possible_period_keys:
|
|
grouped = group_messages(messages, key_fn=period_key_fn)
|
|
average_num_messages = sum(len(grouped[k]) for k in grouped) / len(grouped)
|
|
if average_num_messages <= MAX_AVERAGE_MESSAGES_PER_PERIOD:
|
|
break
|
|
|
|
del average_num_messages
|
|
|
|
return grouped, period_key_fn
|
|
|
|
|
|
def replace_myself(messages: Iterable[Message], myself: str) -> Iterator[Message]:
|
|
for msg in messages:
|
|
if msg.sender == MYSELF:
|
|
yield dataclasses.replace(msg, sender=myself)
|
|
else:
|
|
yield msg
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--purple', type=Path, dest='purple_folder')
|
|
parser.add_argument('--synctech', type=Path, dest='synctech_sms_backup_file')
|
|
parser.add_argument('--output', type=Path)
|
|
parser.add_argument('--myself', type=str, default='Myself')
|
|
parser.add_argument('--overwrite', action='store_true', dest='overwrite_files')
|
|
parser.add_argument('--period', dest='period_key', choices=list(PERIOD_KEYS_BY_NAME.keys()))
|
|
parser.add_argument('--skip-this-period', action='store_true', dest='skip_this_period')
|
|
return parser.parse_args()
|
|
|
|
def main():
|
|
logging.basicConfig()
|
|
logging.getLogger().setLevel('INFO')
|
|
args = parse_args()
|
|
|
|
if args.purple_folder:
|
|
all_messages = libpurple.parse_messages_in_chat_folder(args.purple_folder)
|
|
elif args.synctech_sms_backup_file:
|
|
all_messages = synctech_sms.parse_messages_in_backup_xml_file(
|
|
args.synctech_sms_backup_file,
|
|
)
|
|
else:
|
|
logger.fatal('No input file given!')
|
|
return
|
|
|
|
all_messages = replace_myself(all_messages, myself=args.myself)
|
|
all_messages = list(all_messages)
|
|
logger.info('%d messages after loading', len(all_messages))
|
|
|
|
all_messages = list(filter_useless_messages(all_messages))
|
|
logger.info('%d messages after filtering', len(all_messages))
|
|
|
|
messages_by_chat_id = group_messages_by_chat_id(all_messages)
|
|
logger.info('%d message groups', len(messages_by_chat_id))
|
|
del all_messages
|
|
|
|
for chat_id, messages_in_chat_original in messages_by_chat_id.items():
|
|
messages_in_chat = merge_adjacent_messages(messages_in_chat_original)
|
|
if len(messages_in_chat) <= TOO_FEW_MESSAGES_TO_CARE:
|
|
logger.info(' "%s": Skipped due to too few messages', chat_id)
|
|
continue
|
|
|
|
messages_by_period, period_key_fn = group_messages_by_period(messages_in_chat, args.period_key)
|
|
logger.info(
|
|
' "%s": %d messages, %d periods (%d msg/period avg)',
|
|
chat_id,
|
|
len(messages_in_chat_original),
|
|
len(messages_by_period),
|
|
len(messages_in_chat_original) / len(messages_by_period),
|
|
)
|
|
|
|
this_period_name = period_key_fn(Message(datetime.datetime.now(), '','',''))
|
|
|
|
for period_key_name, messages in messages_by_period.items():
|
|
file_escaped_chat_id = chat_id.replace(' ', '-')
|
|
output_file = (
|
|
args.output / chat_id / f'{file_escaped_chat_id}-{period_key_name}.md'
|
|
)
|
|
|
|
logger.info('Writing % 5d messages to %s', len(messages), output_file)
|
|
|
|
if this_period_name == period_key_name:
|
|
logger.info('Skipping due to --skip-this-period: %s', output_file)
|
|
continue
|
|
if output_file.exists() and not args.overwrite_files:
|
|
logger.info('Skipping existing file: %s', output_file)
|
|
continue
|
|
|
|
# Create folders and file
|
|
output_file.parent.mkdir(exist_ok=True, parents=True)
|
|
with open(output_file, 'w') as f:
|
|
f.write(
|
|
format_messages(
|
|
messages,
|
|
title=f'{chat_id} - {period_key_name}',
|
|
),
|
|
)
|
|
|
|
del period_key_name, messages, output_file
|
|
del chat_id, messages_in_chat_original, messages_in_chat
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|