1
0

Kraken use fin_depo

This commit is contained in:
Jon Michael Aanes 2024-06-02 18:00:55 +02:00
parent 526e5eee02
commit ff6e5110a4
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
3 changed files with 43 additions and 56 deletions

View File

@ -1,43 +0,0 @@
import dataclasses
import datetime
import logging
from collections.abc import Iterator, Mapping
from decimal import Decimal
import krakenex
from frozendict import frozendict
from personal_data.data import DeduplicateMode, Scraper
from .. import secrets
logger = logging.getLogger(__name__)
# TODO: Move these into secrets!
client = krakenex.API(
secrets.KRAKEN_KEY,
secrets.KRAKEN_SECRET,
)
@dataclasses.dataclass(frozen=True)
class KrakenAccountBalances(Scraper):
dataset_name = 'defi_kraken_balance'
deduplicate_mode = DeduplicateMode.ONLY_LATEST
deduplicate_ignore_columns = ['account.update_time']
def scrape(self) -> Iterator[Mapping[str, object]]:
data_point = {
'account.update_time': datetime.datetime.now(tz=datetime.UTC),
}
result = client.query_private('Balance')
for account, balance_str in result['result'].items():
key = f'balance.{account}'
balance = Decimal(balance_str)
balance = balance if balance != 0 else Decimal(0)
data_point[key] = data_point.get(key, Decimal(0)) + balance
del key, balance
yield frozendict(data_point)

View File

@ -3,10 +3,11 @@ import datetime
import logging import logging
from collections.abc import Iterator, Mapping from collections.abc import Iterator, Mapping
from decimal import Decimal from decimal import Decimal
import abc
import kucoin.client import kucoin.client
from frozendict import frozendict from frozendict import frozendict
from fin_depo.defi_kucoin import KucoinDepoFetcher import fin_depo
from personal_data.data import DeduplicateMode, Scraper from personal_data.data import DeduplicateMode, Scraper
@ -23,19 +24,14 @@ client = kucoin.client.Client(
) )
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class KucoinAccountBalances(Scraper): class FinanceDepoScraper(Scraper):
dataset_name = 'defi_kucoin_balance'
deduplicate_mode = DeduplicateMode.ONLY_LATEST @abc.abstractmethod
deduplicate_ignore_columns = ['account.update_time'] def get_depo_fetcher(self):
pass
def scrape(self) -> Iterator[Mapping[str, object]]: def scrape(self) -> Iterator[Mapping[str, object]]:
fetcher = KucoinDepoFetcher( depo = self.get_depo_fetcher().get_depo()
kucoin_key = secrets.KUCOIN_KEY,
kucoin_secret = secrets.KUCOIN_SECRET,
kucoin_pass = secrets.KUCOIN_PASS,
)
depo = fetcher.get_depo()
data_point = { data_point = {
'account.update_time': depo.updated_time, 'account.update_time': depo.updated_time,
@ -50,6 +46,31 @@ class KucoinAccountBalances(Scraper):
yield frozendict(data_point) yield frozendict(data_point)
@dataclasses.dataclass(frozen=True)
class KrakenAccountBalances(FinanceDepoScraper):
dataset_name = 'defi_kraken_balance'
deduplicate_mode = DeduplicateMode.ONLY_LATEST
deduplicate_ignore_columns = ['account.update_time']
def get_depo_fetcher(self):
return fin_depo.defi_kraken.KrakenDepoFetcher(
kraken_key = secrets.KRAKEN_KEY,
kraken_secret = secrets.KRAKEN_SECRET,
)
@dataclasses.dataclass(frozen=True)
class KucoinAccountBalances(FinanceDepoScraper):
dataset_name = 'defi_kucoin_balance'
deduplicate_mode = DeduplicateMode.ONLY_LATEST
deduplicate_ignore_columns = ['account.update_time']
def get_depo_fetcher(self):
return fin_depo.defi_kucoin.KucoinDepoFetcher(
kucoin_key = secrets.KUCOIN_KEY,
kucoin_secret = secrets.KUCOIN_SECRET,
kucoin_pass = secrets.KUCOIN_PASS,
)
def addresses_to_data_points(addresses: list[dict[str,str]]) -> frozendict: def addresses_to_data_points(addresses: list[dict[str,str]]) -> frozendict:
data_point = {} data_point = {}
data_point['account.update_time'] = datetime.datetime.now(tz=datetime.UTC) data_point['account.update_time'] = datetime.datetime.now(tz=datetime.UTC)

View File

@ -1,5 +1,6 @@
import csv import csv
import datetime import datetime
import inspect
import decimal import decimal
import io import io
import logging import logging
@ -217,7 +218,15 @@ def send_notification(
def available_scrapers() -> list[type[personal_data.data.Scraper]]: def available_scrapers() -> list[type[personal_data.data.Scraper]]:
return personal_data.data.Scraper.__subclasses__() subclasses = []
class_queue = [personal_data.data.Scraper]
while class_queue:
clazz = class_queue.pop()
if inspect.isabstract(clazz):
class_queue.extend(clazz.__subclasses__())
else:
subclasses.append(clazz)
return subclasses
def available_scraper_names() -> list[str]: def available_scraper_names() -> list[str]: