1
0
personal-data/personal_data/fetchers/kucoin.py

52 lines
1.3 KiB
Python
Raw Normal View History

2024-05-16 22:24:01 +00:00
import dataclasses
import datetime
import email.utils
import json
from frozendict import frozendict
import logging
from collections.abc import Iterator, Mapping
from decimal import Decimal
from frozendict import frozendict
from personal_data.data import DeduplicateMode, Scraper
from .. import secrets
import kucoin.client
logger = logging.getLogger(__name__)
HOSTNAME = 'api.kucoin.com'
URL_ACCOUNTS = 'https://{hostname}/api/v1/accounts'
# TODO: Move these into secrets!
client = kucoin.client.Client(
secrets.KUCOIN_KEY,
secrets.KUCOIN_SECRET,
secrets.KUCOIN_PASS,
)
@dataclasses.dataclass(frozen=True)
class KucoinAccountBalances(Scraper):
dataset_name = 'defi_kucoin_balance'
deduplicate_mode = DeduplicateMode.ONLY_LATEST
deduplicate_ignore_columns = ['account.update_time']
def scrape(self) -> Iterator[Mapping[str, object]]:
time = datetime.datetime.now()
for account in client.get_accounts():
print(account)
data_point = {
'account.id': account['id'],
'account.currency': account['currency'],
'account.type': account['type'],
'account.balance': account['balance'],
'account.update_time': time,
}
yield frozendict(data_point)