1
0
personal-data/personal_data/fetchers/defi_kucoin.py
Jon Michael Aanes 4be07a91bb
All checks were successful
Build container / Package-Python (push) Successful in 22s
Build container / Package-Container (push) Successful in 1m19s
Kucoin coverage
2024-05-18 22:06:08 +02:00

66 lines
1.8 KiB
Python

import dataclasses
import datetime
import logging
from collections.abc import Iterator, Mapping
from decimal import Decimal
import kucoin.client
from frozendict import frozendict
from personal_data.data import DeduplicateMode, Scraper
from .. import secrets
logger = logging.getLogger(__name__)
# TODO: Move these into secrets!
client = kucoin.client.Client(
secrets.KUCOIN_KEY,
secrets.KUCOIN_SECRET,
secrets.KUCOIN_PASS,
)
@dataclasses.dataclass(frozen=True)
class KucoinAccountBalances(Scraper):
dataset_name = 'defi_kucoin_balance'
deduplicate_mode = DeduplicateMode.ONLY_LATEST
deduplicate_ignore_columns = ['account.update_time']
def scrape(self) -> Iterator[Mapping[str, object]]:
data_point = {
'account.update_time': datetime.datetime.now(tz=datetime.UTC),
}
for account in client.get_accounts():
key = f"balance.{account['currency']}"
data_point[key] = data_point.get(key, Decimal(0)) + Decimal(
account['balance'],
)
yield frozendict(data_point)
def addresses_to_data_points(addresses) -> frozendict:
data_point = {}
data_point['account.update_time'] = datetime.datetime.now(tz=datetime.UTC)
data_point['account.num_deposit_addresses'] = len(addresses)
if len(addresses) > 0:
for k, v in addresses[-1].items():
data_point[f'deposit.{k}'] = v
return frozendict(data_point)
@dataclasses.dataclass(frozen=True)
class KucoinDepositAddresses(Scraper):
dataset_name = 'defi_kucoin_deposit_address'
deduplicate_mode = DeduplicateMode.ONLY_LATEST
deduplicate_ignore_columns = ['account.update_time']
def scrape(self) -> Iterator[Mapping[str, object]]:
addresses = client.get_deposit_address('MPC')
yield addresses_to_data_points(addresses)