1
0
fin-depo/fin_depo/__main__.py
Jon Michael Aanes ba988fed6c
Some checks failed
Verify Python project can be installed, loaded and have version checked / Test (push) Waiting to run
Python Ruff Code Quality / ruff (push) Successful in 23s
Run Python tests (through Pytest) / Test (push) Has been cancelled
Example CLI Application
2024-11-29 00:45:31 +01:00

123 lines
3.5 KiB
Python

"""# Example Depository CLI Tool.
This example script demonstrates how to initialize the depository fetchers, and
how to access the downloaded data.
"""
import sys
import requests
from secret_loader import SecretLoader
import fin_depo
secret_loader = SecretLoader(ENV_KEY_PREFIX='CF_FD')
DISABLE_NORDNET = True
def setup_aggregate_depos() -> fin_depo.static.AggregateDepoFetcher:
"""Initializes a composite depository fetcher.
The secret handling implemented here is robust, but possibly overkill for
your application.
"""
session = requests.Session()
depo_fetchers = []
if pbc_address := secret_loader.load('PBC_ACCOUNT_ADDRESS'):
depo_fetchers.append(
fin_depo.defi_partisia_blockchain.PartisiaBlockchainAccountDepoFetcher(
session,
pbc_address,
),
)
del pbc_address
if (
nordnet_username := secret_loader.load('NORDNET_USERNAME')
and not DISABLE_NORDNET
):
depo_fetchers.append(
fin_depo.investbank_nordnet.NordnetDepoFetcher(
session,
nordnet_username,
secret_loader.load_or_fail('NORDNET_PASSWORD'),
),
)
del nordnet_username
if kraken_key := secret_loader.load('KRAKEN_KEY'):
depo_fetchers.append(
fin_depo.defi_kraken.KrakenDepoFetcher(
kraken_key,
secret_loader.load_or_fail('KRAKEN_SECRET'),
),
)
del kraken_key
if kucoin_key := secret_loader.load('KUCOIN_KEY'):
depo_fetchers.append(
fin_depo.defi_kucoin.KucoinDepoFetcher(
kucoin_key,
secret_loader.load_or_fail('KUCOIN_SECRET'),
secret_loader.load_or_fail('KUCOIN_PASS'),
),
)
del kucoin_key
return fin_depo.static.AggregateDepoFetcher('Aggregated', depo_fetchers)
def _format_depo_tree_internal(
depo: fin_depo.data.Depo,
fmt: list[str],
indent: str,
more_whitespace: bool = False,
):
"""Internal string formatting for `format_depo_tree`."""
fmt.append(depo.name)
fmt.append('\n')
if isinstance(depo, fin_depo.data.DepoGroup):
for idx, nested_depo in enumerate(depo.nested):
is_last = idx == len(depo.nested) - 1
fmt.append(indent)
indent_new = indent + (' ' if is_last else '') + ' '
line = '└─ ' if is_last else '├─ '
fmt.append(line)
_format_depo_tree_internal(nested_depo, fmt, indent_new)
else:
depo_assets = list(depo.assets())
for idx, asset in enumerate(depo_assets):
is_last = idx == len(depo_assets) - 1
amount = depo.get_amount_of_asset(asset)
line = '└─ ' if is_last else '├─ '
derp = indent + line + asset.raw_short_name()
amount_str = f'{amount: 25.10f}'.rstrip('0').removesuffix('.')
fmt.append(f'{derp:20s}{amount_str}\n')
if more_whitespace:
fmt.append(f'{indent}\n')
def format_depo_tree(
depo: fin_depo.data.Depo,
more_whitespace: bool = False,
) -> str:
"""Formats the given `Depo` to a tree structure."""
fmt: list[str] = []
_format_depo_tree_internal(depo, fmt, ' ', more_whitespace=more_whitespace)
return ''.join(fmt)
def main():
"""Main function."""
aggregated_depos = setup_aggregate_depos()
depo = aggregated_depos.get_depo()
sys.stdout.write(format_depo_tree(depo))
if __name__ == '__main__':
main()