74 lines
2.0 KiB
Python
74 lines
2.0 KiB
Python
import dataclasses
|
|
import datetime
|
|
import logging
|
|
from collections.abc import Iterator, Mapping
|
|
from decimal import Decimal
|
|
|
|
import kucoin.client
|
|
from frozendict import frozendict
|
|
from fin_depo.defi_kucoin import KucoinDepoFetcher
|
|
|
|
from personal_data.data import DeduplicateMode, Scraper
|
|
|
|
from .. import secrets
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# TODO: Move these into secrets!
|
|
client = kucoin.client.Client(
|
|
secrets.KUCOIN_KEY,
|
|
secrets.KUCOIN_SECRET,
|
|
secrets.KUCOIN_PASS,
|
|
)
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class KucoinAccountBalances(Scraper):
|
|
dataset_name = 'defi_kucoin_balance'
|
|
deduplicate_mode = DeduplicateMode.ONLY_LATEST
|
|
deduplicate_ignore_columns = ['account.update_time']
|
|
|
|
def scrape(self) -> Iterator[Mapping[str, object]]:
|
|
fetcher = KucoinDepoFetcher(
|
|
kucoin_key = secrets.KUCOIN_KEY,
|
|
kucoin_secret = secrets.KUCOIN_SECRET,
|
|
kucoin_pass = secrets.KUCOIN_PASS,
|
|
)
|
|
|
|
depo = fetcher.get_depo()
|
|
|
|
data_point = {
|
|
'account.update_time': depo.updated_time,
|
|
}
|
|
|
|
for asset in depo.assets():
|
|
key = f"balance.{asset}"
|
|
data_point[key] = depo.get_amount_of_asset(asset)
|
|
|
|
print(data_point)
|
|
|
|
yield frozendict(data_point)
|
|
|
|
|
|
def addresses_to_data_points(addresses: list[dict[str,str]]) -> frozendict:
|
|
data_point = {}
|
|
data_point['account.update_time'] = datetime.datetime.now(tz=datetime.UTC)
|
|
data_point['account.num_deposit_addresses'] = len(addresses)
|
|
|
|
if len(addresses) > 0:
|
|
for k, v in addresses[-1].items():
|
|
data_point[f'deposit.{k}'] = v
|
|
|
|
return frozendict(data_point)
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class KucoinDepositAddresses(Scraper):
|
|
dataset_name = 'defi_kucoin_deposit_address'
|
|
deduplicate_mode = DeduplicateMode.ONLY_LATEST
|
|
deduplicate_ignore_columns = ['account.update_time']
|
|
|
|
def scrape(self) -> Iterator[Mapping[str, object]]:
|
|
addresses = client.get_deposit_address('MPC')
|
|
yield addresses_to_data_points(addresses)
|