1
0

Ruff
Some checks failed
Test Python / Test (push) Failing after 20s

This commit is contained in:
Jon Michael Aanes 2024-06-04 22:15:21 +02:00
parent 3f08b75a00
commit 5b188cf1aa
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
3 changed files with 33 additions and 17 deletions

View File

@ -148,7 +148,9 @@ class PartisiaBlockchainAccountDepoFetcher:
balances = self.client.get_account_balances(self.blockchain_address) balances = self.client.get_account_balances(self.blockchain_address)
depo_mpc = DepoSingle( depo_mpc = DepoSingle(
'Native', balances.update_time, {fin_defs.MPC: balances.mpc} 'Native',
balances.update_time,
{fin_defs.MPC: balances.mpc},
) )
depo_byoc = DepoSingle( depo_byoc = DepoSingle(
'BYOC', 'BYOC',

View File

@ -15,7 +15,7 @@ from decimal import Decimal
import fin_defs import fin_defs
from .data import Depo, DepoSingle, DepoGroup from .data import Depo, DepoGroup, DepoSingle
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -38,6 +38,7 @@ def asset_from_instrument_json(json) -> fin_defs.Asset | None:
exchange_id = json['tradables'][0]['mic'] exchange_id = json['tradables'][0]['mic']
return fin_defs.Stock(symbol, fin_defs.EXCHANGES_BY_IDS[exchange_id]) return fin_defs.Stock(symbol, fin_defs.EXCHANGES_BY_IDS[exchange_id])
class NordnetDepoFetcher: class NordnetDepoFetcher:
def __init__(self, session, username: str, password: str): def __init__(self, session, username: str, password: str):
assert session is not None, 'missing session' assert session is not None, 'missing session'
@ -84,7 +85,9 @@ class NordnetDepoFetcher:
def get_depo(self) -> Depo: def get_depo(self) -> Depo:
self.login() self.login()
now = datetime.datetime.now(tz=datetime.UTC) # TODO: Use info from json requests now = datetime.datetime.now(
tz=datetime.UTC,
) # TODO: Use info from json requests
json_accounts = self.get_json(API_ACCOUNTS) json_accounts = self.get_json(API_ACCOUNTS)
@ -92,31 +95,39 @@ class NordnetDepoFetcher:
for json_account in json_accounts: for json_account in json_accounts:
account_id = json_account['accid'] account_id = json_account['accid']
assets: dict[fin_defs.Asset, Decimal] = { assets: dict[fin_defs.Asset, Decimal] = {}
}
# Determine amount of usable currency stored on Nordnet # Determine amount of usable currency stored on Nordnet
if True: if True:
json_account_info = self.get_json(API_ACCOUNT_INFO.format(accid = account_id)) json_account_info = self.get_json(
asset = fin_defs.FiatCurrency(json_account_info[0]['account_sum']['currency']) API_ACCOUNT_INFO.format(accid=account_id),
)
asset = fin_defs.FiatCurrency(
json_account_info[0]['account_sum']['currency'],
)
amount = Decimal(json_account_info[0]['account_sum']['value']) amount = Decimal(json_account_info[0]['account_sum']['value'])
assets[asset] = amount assets[asset] = amount
del asset, amount del asset, amount
# Determine positions on Nordnet # Determine positions on Nordnet
json_account_positions = self.get_json(API_ACCOUNT_POSITIONS.format(accid = account_id)) json_account_positions = self.get_json(
API_ACCOUNT_POSITIONS.format(accid=account_id),
)
for pos in json_account_positions: for pos in json_account_positions:
asset = asset_from_instrument_json(pos['instrument']) asset = asset_from_instrument_json(pos['instrument'])
if asset is None: continue if asset is None:
continue
amount = Decimal(pos['qty']) amount = Decimal(pos['qty'])
assets[asset] = amount assets[asset] = amount
del asset, amount del asset, amount
nested.append(DepoSingle( nested.append(
name='{} {}'.format(json_account['type'], json_account['alias']), DepoSingle(
_assets=assets, name='{} {}'.format(json_account['type'], json_account['alias']),
updated_time=now, _assets=assets,
)) updated_time=now,
),
)
return DepoGroup( return DepoGroup(
name=f'Nordnet', name=f'Nordnet',

View File

@ -1,14 +1,17 @@
import pytest
from fin_depo import investbank_nordnet
import requests import requests
from fin_depo import investbank_nordnet
SECRET_USERNAME = None SECRET_USERNAME = None
SECRET_PASSWORD = None SECRET_PASSWORD = None
def test_get_depo(): def test_get_depo():
session = requests.Session() session = requests.Session()
nordnet = investbank_nordnet.NordnetDepoFetcher( nordnet = investbank_nordnet.NordnetDepoFetcher(
session, SECRET_USERNAME, SECRET_PASSWORD session,
SECRET_USERNAME,
SECRET_PASSWORD,
) )
depo = nordnet.get_depo() depo = nordnet.get_depo()