1
0

Ruff format

This commit is contained in:
Jon Michael Aanes 2024-06-02 23:16:11 +02:00
parent be8f727c74
commit 7b905911af
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
8 changed files with 26 additions and 42 deletions

View File

@ -17,7 +17,9 @@ def parse_arguments():
) )
parser.add_argument('--cookiejar', action='store_true') parser.add_argument('--cookiejar', action='store_true')
parser.add_argument('--email', action='store_true', dest='send_email_notification') parser.add_argument('--email', action='store_true', dest='send_email_notification')
parser.add_argument('--loud-sound', action='store_true', dest='trigger_loud_and_annoying_sound') parser.add_argument(
'--loud-sound', action='store_true', dest='trigger_loud_and_annoying_sound',
)
return parser.parse_args() return parser.parse_args()

View File

@ -1,10 +1,8 @@
import abc
import dataclasses import dataclasses
import datetime import datetime
import logging import logging
from collections.abc import Iterator, Mapping from collections.abc import Iterator, Mapping
import fin_depo
import kucoin.client import kucoin.client
from frozendict import frozendict from frozendict import frozendict
@ -22,6 +20,7 @@ client = kucoin.client.Client(
secrets.KUCOIN_PASS, secrets.KUCOIN_PASS,
) )
def addresses_to_data_points(addresses: list[dict[str, str]]) -> frozendict: def addresses_to_data_points(addresses: list[dict[str, str]]) -> frozendict:
data_point = {} data_point = {}
data_point['account.update_time'] = datetime.datetime.now(tz=datetime.UTC) data_point['account.update_time'] = datetime.datetime.now(tz=datetime.UTC)

View File

@ -1,18 +1,10 @@
import dataclasses import dataclasses
import datetime
import email.utils
import json
import logging import logging
from collections.abc import Iterator, Mapping from collections.abc import Iterator, Mapping
from decimal import Decimal
import requests
from frozendict import frozendict
from personal_data.data import DeduplicateMode, Scraper
import fin_depo import fin_depo
from .. import secrets from personal_data.data import DeduplicateMode, Scraper
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -1,11 +1,9 @@
import abc import abc
import dataclasses import dataclasses
import datetime
import logging import logging
from collections.abc import Iterator, Mapping from collections.abc import Iterator, Mapping
import fin_depo import fin_depo
import kucoin.client
from frozendict import frozendict from frozendict import frozendict
from personal_data.data import DeduplicateMode, Scraper from personal_data.data import DeduplicateMode, Scraper
@ -60,6 +58,7 @@ class KucoinAccountBalances(FinanceDepoScraper):
kucoin_pass=secrets.KUCOIN_PASS, kucoin_pass=secrets.KUCOIN_PASS,
) )
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class MpcBalance(FinanceDepoScraper): class MpcBalance(FinanceDepoScraper):
dataset_name = 'defi_mpc_balance' dataset_name = 'defi_mpc_balance'
@ -67,6 +66,5 @@ class MpcBalance(FinanceDepoScraper):
def get_depo_fetcher(self): def get_depo_fetcher(self):
return fin_depo.defi_partisia_blockchain.PartisiaBlockchainAccountDepoFetcher( return fin_depo.defi_partisia_blockchain.PartisiaBlockchainAccountDepoFetcher(
self.session, self.session,
blockchain_address = secrets.PBC_ACCOUNT_ADDRESS blockchain_address=secrets.PBC_ACCOUNT_ADDRESS,
) )

View File

@ -28,6 +28,7 @@ except ImportError:
import personal_data.data import personal_data.data
import personal_data.fetchers import personal_data.fetchers
from . import notification from . import notification
CSV_DIALECT = 'one_true_dialect' CSV_DIALECT = 'one_true_dialect'
@ -235,7 +236,7 @@ def main(
logger.warning('No cookiejar is used') logger.warning('No cookiejar is used')
if len(notification_types) == 0: if len(notification_types) == 0:
logger.info('Email not enabled: Notifications will not be sent!') logger.info('No notifications enabled: Notifications will not be sent!')
for scraper_cls in available_scrapers(): for scraper_cls in available_scrapers():
session = get_session(cookiejar, with_cfscrape=scraper_cls.requires_cfscrape()) session = get_session(cookiejar, with_cfscrape=scraper_cls.requires_cfscrape())
@ -264,9 +265,8 @@ def main(
logger.info('Scraper done: %s', scraper.dataset_name) logger.info('Scraper done: %s', scraper.dataset_name)
if status['extended']: if status['extended']:
notification.send_notifications(session, notification.send_notifications(
scraper_cls.__name__, session, scraper_cls.__name__, status['dicts'][-1], notification_types,
status['dicts'][-1], )
notification_types)
del scraper, session del scraper, session

View File

@ -1,31 +1,19 @@
import csv import enum
import datetime
import decimal
import inspect
import io
import logging import logging
from collections.abc import Iterable, Mapping, Sequence
from decimal import Decimal
import requests import requests
import requests_cache
from frozendict import frozendict from frozendict import frozendict
import personal_data.data
import personal_data.fetchers
from . import mailgun from . import mailgun
import enum
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class NotificationType(enum.Enum): class NotificationType(enum.Enum):
EMAIL = 1 EMAIL = 1
LOUD_SOUND = 2 LOUD_SOUND = 2
def send_email_notification( def send_email_notification(
session: requests.Session, session: requests.Session,
scraper_name: str, scraper_name: str,
@ -44,6 +32,7 @@ def play_loud_and_annoying_sound(
) -> None: ) -> None:
pass pass
NOTIFICATION_TYPE_TO_NOTIFIER = { NOTIFICATION_TYPE_TO_NOTIFIER = {
NotificationType.EMAIL: send_email_notification, NotificationType.EMAIL: send_email_notification,
NotificationType.LOUD_SOUND: play_loud_and_annoying_sound, NotificationType.LOUD_SOUND: play_loud_and_annoying_sound,
@ -57,5 +46,6 @@ def send_notifications(
notification_types: set[NotificationType], notification_types: set[NotificationType],
) -> None: ) -> None:
for notification_type in notification_types: for notification_type in notification_types:
NOTIFICATION_TYPE_TO_NOTIFIER[notification_type](session, scraper_name, latest_dict) NOTIFICATION_TYPE_TO_NOTIFIER[notification_type](
session, scraper_name, latest_dict,
)

View File

@ -15,6 +15,7 @@ PACKAGE_NAME = 'personal_data'
with open('README.md') as f: with open('README.md') as f:
readme = f.read() readme = f.read()
def parse_version_file(text: str) -> str: def parse_version_file(text: str) -> str:
match = re.match(r'^__version__\s*=\s*(["\'])([\d\.]+)\1$', text) match = re.match(r'^__version__\s*=\s*(["\'])([\d\.]+)\1$', text)
if match is None: if match is None:
@ -22,9 +23,11 @@ def parse_version_file(text: str) -> str:
raise Exception(msg) raise Exception(msg)
return match.group(2) return match.group(2)
with open(PACKAGE_NAME + '/_version.py') as f: with open(PACKAGE_NAME + '/_version.py') as f:
version = parse_version_file(f.read()) version = parse_version_file(f.read())
def parse_requirements(text: str) -> list[str]: def parse_requirements(text: str) -> list[str]:
return text.strip().split('\n') return text.strip().split('\n')