1
0

Kraken and Kucoin depos
Some checks failed
Test Python / Test (push) Failing after 20s

This commit is contained in:
Jon Michael Aanes 2024-05-29 22:29:15 +02:00
parent 6d44aec4bc
commit 58b5a204f4
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
4 changed files with 97 additions and 2 deletions

View File

@ -4,11 +4,11 @@ Python library for automatic fetching of personal asset depo information.
Supports:
- None :)
- Kraken through API
- Kucoin through API
## TODO
- [ ] Kraken through API
- [ ] Partisia Blockchain Account
- [ ] Nordnet
- [ ] Bank Account (Open Banking)

23
fin_depo/data.py Normal file
View File

@ -0,0 +1,23 @@
import enforce_typing
import dataclasses
from decimal import Decimal
from collections.abc import Mapping
from fin_defs import Asset
import datetime
@enforce_typing.enforce_types
@dataclasses.dataclass
class Depo:
name: str
updated_time: datetime.datetime
@enforce_typing.enforce_types
@dataclasses.dataclass
class DepoSingle(Depo):
assets: Mapping[Asset, Decimal]
@enforce_typing.enforce_types
@dataclasses.dataclass
class DepoGroup(Depo):
nested: list[Depo]

35
fin_depo/defi_kraken.py Normal file
View File

@ -0,0 +1,35 @@
from .data import Depo, DepoSingle
from decimal import Decimal
import fin_defs
import datetime
import krakenex
import logging
logger = logging.getLogger(__name__)
class KrakenDepoFetcher:
def __init__(self, kraken_key: str, kraken_secret: str):
self.client = krakenex.API(
kraken_key,
kraken_secret,
)
def get_depo(self) -> Depo:
now = datetime.datetime.now(tz=datetime.UTC)
result = self.client.query_private('Balance')
assets: dict[fin_defs.Asset, Decimal] = {}
for account, balance_str in result['result'].items():
asset = fin_defs.WELL_KNOWN_SYMBOLS[account]
balance = Decimal(balance_str)
if balance != 0:
assets[asset] = assets.get(asset, Decimal(0)) + balance
del account, balance_str, asset, balance
return DepoSingle(
name = 'Kraken',
assets = assets,
updated_time = now,
)

37
fin_depo/defi_kucoin.py Normal file
View File

@ -0,0 +1,37 @@
from .data import DepoSingle, DepoGroup
import dataclasses
import datetime
import fin_defs
import logging
from collections.abc import Iterator, Mapping
from decimal import Decimal
import kucoin.client
from frozendict import frozendict
from personal_data.data import DeduplicateMode, Scraper
logger = logging.getLogger(__name__)
class KucoinDepoFetcher:
def __init__(self, kucoin_key: str, kucoin_secret: str, kucoin_pass: str):
self.client = kucoin.client.Client(
kucoin_key,
kucoin_secret,
kucoin_pass,
)
def get_depo(self) -> DepoGroup:
now = datetime.datetime.now(tz=datetime.UTC)
assets_by_account_type: dict[str, dict[fin_defs.Asset, Decimal]] = {}
for account_data in self.client.get_accounts():
asset = fin_defs.WELL_KNOWN_SYMBOLS[account_data['currency']]
balance = Decimal( account_data['balance'])
assets_for_account_type = assets_by_account_type.get(account_data['type'], {})
assets_for_account_type[asset] = assets_for_account_type.get(asset, Decimal(0)) +balance
del account_data, asset, balance
return DepoGroup("Kucoin", now, [
DepoSingle("Kucoin " + account_type, now, assets) for account_type,
assets in assets_by_account_type.items()
])