1
0

Ruff
Some checks failed
Test Python / Test (push) Failing after 26s

This commit is contained in:
Jon Michael Aanes 2024-07-27 03:14:12 +02:00
parent 5f89c5d6df
commit bcae3d4e29
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
6 changed files with 28 additions and 21 deletions

View File

@ -2,9 +2,9 @@
import datetime
import logging
import time
from decimal import Decimal
import time
import fin_defs
import kucoin.client
@ -138,9 +138,9 @@ class KucoinDepoFetcher(DepoFetcher):
# Determine order details
return self._get_order_details(response['orderId'], input_asset, output_asset)
def _get_order_details(self, order_id: str,
input_asset: fin_defs.Asset,
output_asset: fin_defs.Asset) -> TradeOrderDetails:
def _get_order_details(
self, order_id: str, input_asset: fin_defs.Asset, output_asset: fin_defs.Asset,
) -> TradeOrderDetails:
"""Determine the order details for the order with the given id.
Retries the order a few times, as KuCoin might not have propagated the
@ -164,13 +164,15 @@ class KucoinDepoFetcher(DepoFetcher):
fee_asset=fin_defs.WELL_KNOWN_SYMBOLS[order_details['feeCurrency']],
fee_amount=Decimal(order_details['fee']),
executed_time=datetime.datetime.fromtimestamp(
order_details['createdAt'] / 1000, tz=datetime.UTC
order_details['createdAt'] / 1000, tz=datetime.UTC,
),
order_id=order_id,
raw_order_details=order_details,
)
def _get_order_with_retries(self, order_id: str, *, num_retries: int, sleep_between_tries:float = 1.0) -> dict:
def _get_order_with_retries(
self, order_id: str, *, num_retries: int, sleep_between_tries: float = 1.0,
) -> dict:
"""Get the order details from KuCoin backend.
Retries the order a few times, as KuCoin might not have propagated the

View File

@ -13,12 +13,11 @@ import datetime
import logging
from collections.abc import Mapping
from decimal import Decimal
from typing import Any
from nordnet_api_client import NordnetClient
import fin_defs
import requests
from frozendict import frozendict
from nordnet_api_client import NordnetClient
from .data import Depo, DepoFetcher, DepoGroup, DepoSingle

View File

@ -4,18 +4,18 @@ This module does not represent an API or other integration. This module
contains aggregators and static depositories.
"""
import dataclasses
import datetime
import logging
from decimal import Decimal
import fin_defs
import krakenex
import dataclasses
from .data import Depo, DepoFetcher, DepoSingle, DepoGroup
from .data import DepoFetcher, DepoGroup, DepoSingle
logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class StaticDepoFetcher(DepoFetcher):
"""Depository "fetcher" that doesn't do any fetching.
@ -25,7 +25,9 @@ class StaticDepoFetcher(DepoFetcher):
name: str
depo_assets: dict[fin_defs.Asset, Decimal]
last_updated: datetime.datetime = dataclasses.field(default_factory=lambda: datetime.datetime.now(tz=datetime.UTC))
last_updated: datetime.datetime = dataclasses.field(
default_factory=lambda: datetime.datetime.now(tz=datetime.UTC),
)
def get_depo(self) -> DepoSingle:
return DepoSingle(
@ -34,6 +36,7 @@ class StaticDepoFetcher(DepoFetcher):
updated_time=self.last_updated,
)
@dataclasses.dataclass(frozen=True)
class AggregateDepoFetcher(DepoFetcher):
"""Depository "fetcher" that delegates to the aggregated fetchers."""

View File

@ -1,5 +1,4 @@
import pytest
import requests
import fin_depo

View File

@ -1,8 +1,8 @@
import datetime
from decimal import Decimal
import fin_defs
import pytest
import datetime
import fin_depo
@ -19,6 +19,7 @@ fin_depo.defi_kucoin.logger.setLevel('INFO')
NOW = datetime.datetime.now(tz=datetime.UTC)
@needs_secrets
def test_get_depo():
"""Can inspect kucoin depository."""
@ -86,6 +87,7 @@ def test_place_buy_side_order():
assert NOW <= order_details.executed_time <= NOW + datetime.timedelta(minutes=10)
@needs_secrets
def test_place_sell_side_order():
"""Client can place sell side market orders."""

View File

@ -1,11 +1,13 @@
from decimal import Decimal
import fin_defs
import fin_depo
from decimal import Decimal
def test_get_depo():
fetcher = fin_depo.static.StaticDepoFetcher(
'Test', {fin_defs.BTC: Decimal(1000), fin_defs.USD: Decimal(2000)}
'Test', {fin_defs.BTC: Decimal(1000), fin_defs.USD: Decimal(2000)},
)
depo = fetcher.get_depo()