1
0

Compare commits

...

2 Commits

Author SHA1 Message Date
5b188cf1aa
Ruff
Some checks failed
Test Python / Test (push) Failing after 20s
2024-06-04 22:15:21 +02:00
3f08b75a00
Nordnet testing 2024-06-04 22:14:58 +02:00
3 changed files with 45 additions and 14 deletions

View File

@ -148,7 +148,9 @@ class PartisiaBlockchainAccountDepoFetcher:
balances = self.client.get_account_balances(self.blockchain_address)
depo_mpc = DepoSingle(
'Native', balances.update_time, {fin_defs.MPC: balances.mpc}
'Native',
balances.update_time,
{fin_defs.MPC: balances.mpc},
)
depo_byoc = DepoSingle(
'BYOC',

View File

@ -15,7 +15,7 @@ from decimal import Decimal
import fin_defs
from .data import Depo, DepoSingle, DepoGroup
from .data import Depo, DepoGroup, DepoSingle
logger = logging.getLogger(__name__)
@ -38,6 +38,7 @@ def asset_from_instrument_json(json) -> fin_defs.Asset | None:
exchange_id = json['tradables'][0]['mic']
return fin_defs.Stock(symbol, fin_defs.EXCHANGES_BY_IDS[exchange_id])
class NordnetDepoFetcher:
def __init__(self, session, username: str, password: str):
assert session is not None, 'missing session'
@ -84,7 +85,9 @@ class NordnetDepoFetcher:
def get_depo(self) -> Depo:
self.login()
now = datetime.datetime.now(tz=datetime.UTC) # TODO: Use info from json requests
now = datetime.datetime.now(
tz=datetime.UTC,
) # TODO: Use info from json requests
json_accounts = self.get_json(API_ACCOUNTS)
@ -92,31 +95,39 @@ class NordnetDepoFetcher:
for json_account in json_accounts:
account_id = json_account['accid']
assets: dict[fin_defs.Asset, Decimal] = {
}
assets: dict[fin_defs.Asset, Decimal] = {}
# Determine amount of usable currency stored on Nordnet
if True:
json_account_info = self.get_json(API_ACCOUNT_INFO.format(accid = account_id))
asset = fin_defs.FiatCurrency(json_account_info[0]['account_sum']['currency'])
json_account_info = self.get_json(
API_ACCOUNT_INFO.format(accid=account_id),
)
asset = fin_defs.FiatCurrency(
json_account_info[0]['account_sum']['currency'],
)
amount = Decimal(json_account_info[0]['account_sum']['value'])
assets[asset] = amount
del asset, amount
# Determine positions on Nordnet
json_account_positions = self.get_json(API_ACCOUNT_POSITIONS.format(accid = account_id))
json_account_positions = self.get_json(
API_ACCOUNT_POSITIONS.format(accid=account_id),
)
for pos in json_account_positions:
asset = asset_from_instrument_json(pos['instrument'])
if asset is None: continue
if asset is None:
continue
amount = Decimal(pos['qty'])
assets[asset] = amount
del asset, amount
nested.append(DepoSingle(
name='{} {}'.format(json_account['type'], json_account['alias']),
_assets=assets,
updated_time=now,
))
nested.append(
DepoSingle(
name='{} {}'.format(json_account['type'], json_account['alias']),
_assets=assets,
updated_time=now,
),
)
return DepoGroup(
name=f'Nordnet',

18
test/test_nordnet.py Normal file
View File

@ -0,0 +1,18 @@
import requests
from fin_depo import investbank_nordnet
SECRET_USERNAME = None
SECRET_PASSWORD = None
def test_get_depo():
session = requests.Session()
nordnet = investbank_nordnet.NordnetDepoFetcher(
session,
SECRET_USERNAME,
SECRET_PASSWORD,
)
depo = nordnet.get_depo()
assert depo is not None