62 lines
1.4 KiB
Python
62 lines
1.4 KiB
Python
import csv
|
|
import datetime
|
|
import decimal
|
|
import inspect
|
|
import io
|
|
import logging
|
|
from collections.abc import Iterable, Mapping, Sequence
|
|
from decimal import Decimal
|
|
|
|
import requests
|
|
import requests_cache
|
|
from frozendict import frozendict
|
|
|
|
import personal_data.data
|
|
import personal_data.fetchers
|
|
|
|
from . import mailgun
|
|
|
|
import enum
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class NotificationType(enum.Enum):
|
|
EMAIL = 1
|
|
LOUD_SOUND = 2
|
|
|
|
|
|
|
|
def send_email_notification(
|
|
session: requests.Session,
|
|
scraper_name: str,
|
|
latest_dict: frozendict,
|
|
) -> None:
|
|
body = ['A new update has occured for ', scraper_name, '\n']
|
|
for k, v in latest_dict.items():
|
|
body.append(f'{k}: {v}\n')
|
|
mailgun.send_email(session, f'Updated {scraper_name}', ''.join(body))
|
|
|
|
|
|
def play_loud_and_annoying_sound(
|
|
session: requests.Session,
|
|
scraper_name: str,
|
|
latest_dict: frozendict,
|
|
) -> None:
|
|
pass
|
|
|
|
NOTIFICATION_TYPE_TO_NOTIFIER = {
|
|
NotificationType.EMAIL: send_email_notification,
|
|
NotificationType.LOUD_SOUND: play_loud_and_annoying_sound,
|
|
}
|
|
|
|
|
|
def send_notifications(
|
|
session: requests.Session,
|
|
scraper_name: str,
|
|
latest_dict: frozendict,
|
|
notification_types: set[NotificationType],
|
|
) -> None:
|
|
for notification_type in notification_types:
|
|
NOTIFICATION_TYPE_TO_NOTIFIER[notification_type](session, scraper_name, latest_dict)
|
|
|