1
0

Ruff check
All checks were successful
Build container / Package-Python (push) Successful in 26s
Build container / Package-Container (push) Successful in 1m21s

This commit is contained in:
Jon Michael Aanes 2024-04-23 22:58:25 +02:00
parent afd2f4a0b3
commit 033f0dcf5b
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
10 changed files with 72 additions and 50 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
*.pyc *.pyc
/output/
__pycache__/ __pycache__/
/secrets/ /secrets/
*.sqlite *.sqlite

View File

@ -0,0 +1 @@
from ._version import __version__ # noqa:F401

View File

@ -1,19 +1,23 @@
import personal_data.main
import argparse import argparse
import logging import logging
import personal_data.main
def parse_arguments(): def parse_arguments():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('fetchers', metavar='FETCHER', type=str, nargs='+') parser.add_argument('fetchers', metavar='FETCHER', type=str, nargs='+')
parser.add_argument('--cookiejar', action = 'store_true') parser.add_argument('--cookiejar', action='store_true')
return parser.parse_args() return parser.parse_args()
def main(): def main():
logging.basicConfig() logging.basicConfig()
logging.getLogger('personal_data').setLevel('INFO') logging.getLogger('personal_data').setLevel('INFO')
args = parse_arguments() args = parse_arguments()
scraper_filter = frozenset(args.fetchers) scraper_filter = frozenset(args.fetchers)
personal_data.main.main(scraper_filter, use_cookiejar = args.cookiejar) personal_data.main.main(scraper_filter, use_cookiejar=args.cookiejar)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -1 +1 @@
__version__ = '0.1.0' __version__ = '0.1.1'

View File

@ -11,6 +11,7 @@ class DeduplicateMode(Enum):
BY_ALL_COLUMNS = 2 BY_ALL_COLUMNS = 2
ONLY_LATEST = 3 ONLY_LATEST = 3
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class Scraper(abc.ABC): class Scraper(abc.ABC):
session: requests.Session session: requests.Session

View File

@ -1,17 +1,12 @@
import dataclasses import dataclasses
import datetime import datetime
import logging
import re
import json
import secrets
from decimal import Decimal
import email.utils import email.utils
import json
import logging
from decimal import Decimal
import bs4
import personal_data.html_util
import personal_data.parse_util
from personal_data.data import DeduplicateMode, Scraper from personal_data.data import DeduplicateMode, Scraper
from .. import secrets from .. import secrets
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -26,9 +21,11 @@ HOSTNAME = 'reader.partisiablockchain.com'
URL_ACCOUNT_PLUGIN = 'https://{hostname}/{shard}blockchain/accountPlugin/local' URL_ACCOUNT_PLUGIN = 'https://{hostname}/{shard}blockchain/accountPlugin/local'
URL_ACCOUNT_PLUGIN_GLOBAL = 'https://{hostname}/{shard}blockchain/accountPlugin/global' URL_ACCOUNT_PLUGIN_GLOBAL = 'https://{hostname}/{shard}blockchain/accountPlugin/global'
def shard_id_for_address(address: str) -> str: def shard_id_for_address(address: str) -> str:
return 'shards/Shard2/' # TODO return 'shards/Shard2/' # TODO
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class MpcBalance(Scraper): class MpcBalance(Scraper):
dataset_name = 'defi_mpc_balance' dataset_name = 'defi_mpc_balance'
@ -41,9 +38,11 @@ class MpcBalance(Scraper):
'Accept': 'application/json', 'Accept': 'application/json',
} }
response = self.session.post(url, headers = headers, data=json.dumps(data)) response = self.session.post(url, headers=headers, data=json.dumps(data))
response.raise_for_status() response.raise_for_status()
date_text = response.headers.get('last-modified') or response.headers.get('date') date_text = response.headers.get('last-modified') or response.headers.get(
'date',
)
date = email.utils.parsedate_to_datetime(date_text) date = email.utils.parsedate_to_datetime(date_text)
json_data = response.json() json_data = response.json()
if json_data is None: if json_data is None:
@ -52,11 +51,11 @@ class MpcBalance(Scraper):
return json_data, date return json_data, date
def determine_coins(self) -> list[dict]: def determine_coins(self) -> list[dict]:
data: dict = {'path':[]} data: dict = {'path': []}
url = URL_ACCOUNT_PLUGIN_GLOBAL.format( url = URL_ACCOUNT_PLUGIN_GLOBAL.format(
hostname = HOSTNAME, hostname=HOSTNAME,
shard = '', shard='',
) )
json_data, date = self.get_json(url, data=data) json_data, date = self.get_json(url, data=data)
@ -67,11 +66,16 @@ class MpcBalance(Scraper):
coins = self.determine_coins() coins = self.determine_coins()
url = URL_ACCOUNT_PLUGIN.format( url = URL_ACCOUNT_PLUGIN.format(
hostname = HOSTNAME, hostname=HOSTNAME,
shard = shard_id_for_address(address), shard=shard_id_for_address(address),
) )
data: dict = {'path':[{'type':'field','name':'accounts'},{'type':'avl','keyType':'BLOCKCHAIN_ADDRESS','key':address}]} data: dict = {
'path': [
{'type': 'field', 'name': 'accounts'},
{'type': 'avl', 'keyType': 'BLOCKCHAIN_ADDRESS', 'key': address},
],
}
account_data, date = self.get_json(url, data=data) account_data, date = self.get_json(url, data=data)
data_point = { data_point = {
@ -79,14 +83,14 @@ class MpcBalance(Scraper):
'account.update_time': date, 'account.update_time': date,
} }
data_point['balance.MPC'] = str(Decimal(account_data['mpcTokens'])/1000) data_point['balance.MPC'] = str(Decimal(account_data['mpcTokens']) / 1000)
for coin_idx, amount_data in enumerate(account_data['accountCoins']): for coin_idx, amount_data in enumerate(account_data['accountCoins']):
coin_data = coins[coin_idx] coin_data = coins[coin_idx]
byoc_balance = Decimal(amount_data ['balance']) byoc_balance = Decimal(amount_data['balance'])
denominator = Decimal(coin_data['conversionRate']['denominator']) denominator = Decimal(coin_data['conversionRate']['denominator'])
native_balance = byoc_balance / denominator native_balance = byoc_balance / denominator
data_point['balance.'+coin_data['symbol']] = str(native_balance) data_point['balance.' + coin_data['symbol']] = str(native_balance)
del coin_idx, coin_data del coin_idx, coin_data
yield data_point yield data_point

View File

@ -159,7 +159,9 @@ class PsnProfilesScraper(Scraper):
yield d yield d
def scrape_game_trophies( def scrape_game_trophies(
self, psnprofiles_id: int, game_name: str, self,
psnprofiles_id: int,
game_name: str,
) -> Iterator[dict]: ) -> Iterator[dict]:
assert isinstance(psnprofiles_id, int), psnprofiles_id assert isinstance(psnprofiles_id, int), psnprofiles_id
assert isinstance(game_name, str), game_name assert isinstance(game_name, str), game_name
@ -167,7 +169,8 @@ class PsnProfilesScraper(Scraper):
logger.info('Getting Game Trophies %s', psnprofiles_id) logger.info('Getting Game Trophies %s', psnprofiles_id)
url = URL_USER_GAME_TROPHIES.format( url = URL_USER_GAME_TROPHIES.format(
psn_id=secrets.PLAYSTATION_PSN_ID, game_id=psnprofiles_id, psn_id=secrets.PLAYSTATION_PSN_ID,
game_id=psnprofiles_id,
) )
response = self.session.get(url) response = self.session.get(url)
response.raise_for_status() response.raise_for_status()

View File

@ -1,7 +1,9 @@
import requests
import personal_data.secrets as secrets
import logging import logging
import requests
from personal_data import secrets
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
MAILGUN_API_ENDPOINT = 'https://api.mailgun.net/v3/{mailgun_domain}/messages' MAILGUN_API_ENDPOINT = 'https://api.mailgun.net/v3/{mailgun_domain}/messages'
@ -33,4 +35,3 @@ def send_email(session: requests.Session, subject: str, text: str):
response.raise_for_status() response.raise_for_status()
logger.info('Email sent!') logger.info('Email sent!')
return response return response

View File

@ -18,16 +18,13 @@ logger = logging.getLogger(__name__)
import personal_data.data import personal_data.data
import personal_data.fetchers.crunchyroll import personal_data.fetchers.crunchyroll
import personal_data.fetchers.ffxiv_lodestone import personal_data.fetchers.ffxiv_lodestone
import personal_data.fetchers.partisia_blockchain
import personal_data.fetchers.playstation import personal_data.fetchers.playstation
import personal_data.fetchers.psnprofiles import personal_data.fetchers.psnprofiles
import personal_data.fetchers.partisia_blockchain from personal_data import mailgun
from personal_data._version import __version__
from . import mailgun from . import mailgun
import personal_data.mailgun as mailgun
import personal_data.secrets as secrets
CSV_DIALECT = 'one_true_dialect' CSV_DIALECT = 'one_true_dialect'
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True) csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
@ -93,7 +90,7 @@ def extend_csv_file(
del k del k
del d del d
def equals_without_fields(a, b, fields = []): def equals_without_fields(a, b, fields=[]):
a = dict(a) a = dict(a)
b = dict(b) b = dict(b)
@ -102,14 +99,16 @@ def extend_csv_file(
return frozendict(a) == frozendict(b) return frozendict(a) == frozendict(b)
if deduplicate_mode == personal_data.data.DeduplicateMode.ONLY_LATEST: if deduplicate_mode == personal_data.data.DeduplicateMode.ONLY_LATEST:
while len(dicts) >= 2 and equals_without_fields(dicts[-1], dicts[-2], deduplicate_ignore_columns): while len(dicts) >= 2 and equals_without_fields(
dicts[-1],
dicts[-2],
deduplicate_ignore_columns,
):
del dicts[-1] del dicts[-1]
elif deduplicate_mode != personal_data.data.DeduplicateMode.NONE: elif deduplicate_mode != personal_data.data.DeduplicateMode.NONE:
dicts = set(dicts) dicts = set(dicts)
dicts = sorted(dicts, key=lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames)) dicts = sorted(dicts, key=lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames))
csvfile_in_memory = io.StringIO() csvfile_in_memory = io.StringIO()
@ -151,6 +150,7 @@ STANDARD_HEADERS = {
if cfscrape: if cfscrape:
class CachedCfScrape(requests_cache.CacheMixin, cfscrape.CloudflareScraper): class CachedCfScrape(requests_cache.CacheMixin, cfscrape.CloudflareScraper):
pass pass
@ -165,12 +165,18 @@ def get_session(cookiejar, *, with_cfscrape: bool) -> requests.Session:
session.cookies.set_cookie(cookie) session.cookies.set_cookie(cookie)
return session return session
def send_notification(session: requests.Session, scraper_name: str, latest_dict: frozendict):
def send_notification(
session: requests.Session,
scraper_name: str,
latest_dict: frozendict,
):
body = ['A new update has occured for ', scraper_name, '\n'] body = ['A new update has occured for ', scraper_name, '\n']
for k, v in latest_dict.items(): for k, v in latest_dict.items():
body.append(f'{k}: {v}\n') body.append(f'{k}: {v}\n')
mailgun.send_email(session, f'Updated {scraper_name}', ''.join(body)) mailgun.send_email(session, f'Updated {scraper_name}', ''.join(body))
def main(scraper_filter: frozenset[str], use_cookiejar: bool): def main(scraper_filter: frozenset[str], use_cookiejar: bool):
if use_cookiejar: if use_cookiejar:
cookiejar = browsercookie.firefox() cookiejar = browsercookie.firefox()

View File

@ -6,6 +6,7 @@ logger.setLevel(logging.INFO)
ENV_KEY_PREFIX = 'CF_PD_' ENV_KEY_PREFIX = 'CF_PD_'
def load_secret(env_key: str) -> str: def load_secret(env_key: str) -> str:
filepath = os.environ.get(ENV_KEY_PREFIX + env_key) filepath = os.environ.get(ENV_KEY_PREFIX + env_key)
if filepath is None: if filepath is None:
@ -22,7 +23,7 @@ def load_secret(env_key: str) -> str:
# Crunchyroll # Crunchyroll
CRUNCHYROLL_DEVICE_ID = load_secret('CRUNCHYROLL_DEVICE_ID') CRUNCHYROLL_DEVICE_ID = load_secret('CRUNCHYROLL_DEVICE_ID')
CRUNCHYROLL_AUTH =load_secret('CRUNCHYROLL_AUTH') CRUNCHYROLL_AUTH = load_secret('CRUNCHYROLL_AUTH')
# FFXIV # FFXIV
FFXIV_CHARACTER_ID = load_secret('FFXIV_CHARACTER_ID') FFXIV_CHARACTER_ID = load_secret('FFXIV_CHARACTER_ID')