1
0

Kucoin deposit addresses

This commit is contained in:
Jon Michael Aanes 2024-05-18 21:35:58 +02:00
parent dfed3a204f
commit 34ba384265
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
3 changed files with 66 additions and 31 deletions

View File

@ -40,3 +40,23 @@ class KucoinAccountBalances(Scraper):
)
yield frozendict(data_point)
@dataclasses.dataclass(frozen=True)
class KucoinDepositAddresses(Scraper):
dataset_name = 'defi_kucoin_deposit_address'
deduplicate_mode = DeduplicateMode.ONLY_LATEST
deduplicate_ignore_columns = ['account.update_time']
def scrape(self) -> Iterator[Mapping[str, object]]:
data_point = {}
addresses = client.get_deposit_address('MPC')
data_point['account.num_deposit_addresses'] = len(addresses)
data_point['account.update_time'] = datetime.datetime.now(tz=datetime.UTC)
for k, v in addresses[-1].items():
data_point[f'deposit.{k}'] = v
yield frozendict(data_point)

View File

@ -6,10 +6,11 @@ from collections.abc import Iterator
import bs4
import personal_data.html_util
import personal_data.parse_util
from personal_data import secrets
from personal_data.data import DeduplicateMode, Scraper
from .. import parse_util
logger = logging.getLogger(__name__)
URL_PROFILE = 'https://psnprofiles.com/{psn_id}'
@ -54,7 +55,7 @@ class PsnProfilesScraper(Scraper):
response = self.session.get(url)
response.raise_for_status()
NOW = personal_data.parse_util.parse_response_datetime(response)
NOW = parse_util.parse_response_datetime(response)
# Parse data
soup = bs4.BeautifulSoup(response.content, 'lxml')
@ -76,7 +77,7 @@ class PsnProfilesScraper(Scraper):
gotten_at = (
cells[2].get_text().strip().removesuffix(' in').removesuffix(' ago')
)
gotten_at = personal_data.parse_util.parse_duration(gotten_at)
gotten_at = parse_util.parse_duration(gotten_at)
time_acquired = NOW - gotten_at
yield {
@ -117,7 +118,7 @@ class PsnProfilesScraper(Scraper):
if len(small_infos) > 2:
time_played_div = small_infos[2]
time_played_div.sup.extract()
time_played = personal_data.parse_util.parse_date(
time_played = parse_util.parse_date(
time_played_div.get_text(),
)
else:
@ -179,7 +180,7 @@ class PsnProfilesScraper(Scraper):
trophy_icon = cells[0].img['src']
cells[2].span.span.nobr.sup.extract()
gotten_at = parse_time(cells[2].get_text())
gotten_at = parse_util.parse_time(cells[2].get_text())
yield {
'game.name': game_name,

View File

@ -3,6 +3,7 @@ import datetime
import decimal
import io
import logging
from collections.abc import Sequence
from decimal import Decimal
import requests
@ -62,33 +63,11 @@ def to_value(s: str) -> object:
return s
def extend_csv_file(
filename: str,
new_dicts: dict,
def deduplicate_dicts(
dicts: Sequence[dict],
deduplicate_mode: personal_data.data.DeduplicateMode,
deduplicate_ignore_columns: list[str],
) -> dict:
dicts = []
try:
with open(filename) as csvfile:
reader = csv.DictReader(csvfile, dialect=CSV_DIALECT)
for row in reader:
for k in list(row.keys()):
orig = row[k]
row[k] = to_value(orig)
if row[k] is None:
del row[k]
del k, orig
dicts.append(frozendict(row))
del row
del csvfile
except FileNotFoundError as e:
logger.info('Creating file: %s', filename)
original_num_dicts = len(dicts)
dicts += [frozendict(d) for d in new_dicts]
del new_dicts
) -> tuple[Sequence[dict], list[str]]:
fieldnames = []
for d in dicts:
for k in d.keys():
@ -117,6 +96,41 @@ def extend_csv_file(
dicts = set(dicts)
dicts = sorted(dicts, key=lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames))
return dicts, fieldnames
def extend_csv_file(
filename: str,
new_dicts: list[dict],
deduplicate_mode: personal_data.data.DeduplicateMode,
deduplicate_ignore_columns: list[str],
) -> dict:
dicts = []
try:
with open(filename) as csvfile:
reader = csv.DictReader(csvfile, dialect=CSV_DIALECT)
for row in reader:
for k in list(row.keys()):
orig = row[k]
row[k] = to_value(orig)
if row[k] is None:
del row[k]
del k, orig
dicts.append(frozendict(row))
del row
del csvfile
except FileNotFoundError as e:
logger.info('Creating file: %s', filename)
original_num_dicts = len(dicts)
dicts += [frozendict(d) for d in new_dicts]
del new_dicts
dicts, fieldnames = deduplicate_dicts(
dicts,
deduplicate_mode,
deduplicate_ignore_columns,
)
csvfile_in_memory = io.StringIO()
writer = csv.DictWriter(
@ -227,7 +241,7 @@ def main(
logger.exception('Failed in running %s', scraper_cls.__name__)
continue
status = extend_csv_file(
'output/' + scraper.dataset_name,
f'output/{scraper.dataset_name}.csv',
result_rows,
deduplicate_mode=scraper.deduplicate_mode,
deduplicate_ignore_columns=scraper.deduplicate_ignore_columns,