1
0
personal-data/personal_data/fetchers/kucoin.py
Jon Michael Aanes c42092137e
Some checks failed
Build container / Package-Python (push) Failing after 23s
Build container / Package-Container (push) Successful in 1m17s
Kucoin balances with format similar to defi_mpc
2024-05-17 00:29:02 +02:00

48 lines
1.2 KiB
Python

import dataclasses
import datetime
import email.utils
import json
from frozendict import frozendict
import logging
from collections.abc import Iterator, Mapping
from decimal import Decimal
from frozendict import frozendict
from personal_data.data import DeduplicateMode, Scraper
from .. import secrets
import kucoin.client
logger = logging.getLogger(__name__)
HOSTNAME = 'api.kucoin.com'
URL_ACCOUNTS = 'https://{hostname}/api/v1/accounts'
# TODO: Move these into secrets!
client = kucoin.client.Client(
secrets.KUCOIN_KEY,
secrets.KUCOIN_SECRET,
secrets.KUCOIN_PASS,
)
@dataclasses.dataclass(frozen=True)
class KucoinAccountBalances(Scraper):
dataset_name = 'defi_kucoin_balance'
deduplicate_mode = DeduplicateMode.ONLY_LATEST
deduplicate_ignore_columns = ['account.update_time']
def scrape(self) -> Iterator[Mapping[str, object]]:
data_point = {
'account.update_time': datetime.datetime.now(tz=datetime.UTC),
}
for account in client.get_accounts():
key = f"balance.{account['currency']}"
data_point[key] = data_point.get(key, Decimal(0)) + Decimal(account['balance'])
yield frozendict(data_point)