1
0

Compare commits

...

4 Commits

Author SHA1 Message Date
88717fbb8e 🤖 Bumped version to 0.1.29
Some checks failed
Package Python / Package (push) Successful in 25s
Test Python / Test (push) Failing after 26s
This commit was automatically generated by a script: https://gitfub.space/Jmaa/python-omni
2024-08-02 15:10:20 +02:00
e5ba8f371f
nordnet_client is now gated behind soft import 2024-08-02 07:44:21 +02:00
3eead4bd35
Avoid throwing error if nordnet_api_client is not present 2024-08-02 07:38:16 +02:00
de619e2548
Type adjustments 2024-08-02 05:28:56 +02:00
5 changed files with 47 additions and 14 deletions

View File

@ -55,12 +55,31 @@ __all__ = [
'static',
]
import importlib
import logging
from . import (
data,
defi_kraken,
defi_kucoin,
defi_partisia_blockchain,
investbank_nordnet,
static,
)
from ._version import __version__
logger = logging.getLogger(__name__)
def load_backend(name: str) -> object | None:
try:
return importlib.import_module(f'{__name__}.{name}')
except Exception:
logger.exception(
'Backend %s could not be imported? Are all module dependencies installed?',
name,
)
return None
# Import modules
defi_kraken = load_backend('defi_kraken')
defi_kucoin = load_backend('defi_kucoin')
investbank_nordnet = load_backend('investbank_nordnet')
defi_partisia_blockchain = load_backend('defi_partisia_blockchain')

View File

@ -1 +1 @@
__version__ = '0.1.28'
__version__ = '0.1.29'

View File

@ -139,7 +139,10 @@ class KucoinDepoFetcher(DepoFetcher):
return self._get_order_details(response['orderId'], input_asset, output_asset)
def _get_order_details(
self, order_id: str, input_asset: fin_defs.Asset, output_asset: fin_defs.Asset,
self,
order_id: str,
input_asset: fin_defs.Asset,
output_asset: fin_defs.Asset,
) -> TradeOrderDetails:
"""Determine the order details for the order with the given id.
@ -164,14 +167,19 @@ class KucoinDepoFetcher(DepoFetcher):
fee_asset=fin_defs.WELL_KNOWN_SYMBOLS[order_details['feeCurrency']],
fee_amount=Decimal(order_details['fee']),
executed_time=datetime.datetime.fromtimestamp(
order_details['createdAt'] / 1000, tz=datetime.UTC,
order_details['createdAt'] / 1000,
tz=datetime.UTC,
),
order_id=order_id,
raw_order_details=order_details,
)
def _get_order_with_retries(
self, order_id: str, *, num_retries: int, sleep_between_tries: float = 1.0,
self,
order_id: str,
*,
num_retries: int,
sleep_between_tries: float = 1.0,
) -> dict:
"""Get the order details from KuCoin backend.

View File

@ -18,18 +18,23 @@ import fin_defs
import requests
from frozendict import frozendict
from nordnet_api_client import NordnetClient
from nordnet_api_client.data import Instrument
from .data import Depo, DepoFetcher, DepoGroup, DepoSingle
logger = logging.getLogger(__name__)
def asset_from_instrument(intrument) -> fin_defs.Asset | None:
if intrument.instrument_group_type == 'FND':
def asset_from_instrument(instrument: Instrument) -> fin_defs.Asset | None:
if instrument.instrument_group_type == 'FND':
return None
symbol = intrument.symbol
exchange_id = intrument.tradables[0].mic
return fin_defs.Stock(symbol, fin_defs.EXCHANGES_BY_IDS[exchange_id])
symbol = instrument.symbol
exchange_id = instrument.tradables[0].mic
return fin_defs.Stock(
symbol,
fin_defs.EXCHANGES_BY_IDS[exchange_id],
nordnet_id=instrument.instrument_id,
)
EMPTY_DICT: Mapping[str, str | int] = frozendict()

View File

@ -7,7 +7,8 @@ import fin_depo
def test_get_depo():
fetcher = fin_depo.static.StaticDepoFetcher(
'Test', {fin_defs.BTC: Decimal(1000), fin_defs.USD: Decimal(2000)},
'Test',
{fin_defs.BTC: Decimal(1000), fin_defs.USD: Decimal(2000)},
)
depo = fetcher.get_depo()