"""# Example Depository CLI Tool. This example script demonstrates how to initialize the depository fetchers, and how to access the downloaded data. """ import sys import requests from secret_loader import SecretLoader import fin_depo secret_loader = SecretLoader(ENV_KEY_PREFIX='CF_FD') DISABLE_NORDNET = True def setup_aggregate_depos() -> fin_depo.static.AggregateDepoFetcher: """Initializes a composite depository fetcher. The secret handling implemented here is robust, but possibly overkill for your application. """ session = requests.Session() depo_fetchers = [] if pbc_address := secret_loader.load('PBC_ACCOUNT_ADDRESS'): depo_fetchers.append( fin_depo.defi_partisia_blockchain.PartisiaBlockchainAccountDepoFetcher( session, pbc_address, ), ) del pbc_address if ( nordnet_username := secret_loader.load('NORDNET_USERNAME') and not DISABLE_NORDNET ): depo_fetchers.append( fin_depo.investbank_nordnet.NordnetDepoFetcher( session, nordnet_username, secret_loader.load_or_fail('NORDNET_PASSWORD'), ), ) del nordnet_username if kraken_key := secret_loader.load('KRAKEN_KEY'): depo_fetchers.append( fin_depo.defi_kraken.KrakenDepoFetcher( kraken_key, secret_loader.load_or_fail('KRAKEN_SECRET'), ), ) del kraken_key if kucoin_key := secret_loader.load('KUCOIN_KEY'): depo_fetchers.append( fin_depo.defi_kucoin.KucoinDepoFetcher( kucoin_key, secret_loader.load_or_fail('KUCOIN_SECRET'), secret_loader.load_or_fail('KUCOIN_PASS'), ), ) del kucoin_key return fin_depo.static.AggregateDepoFetcher('Aggregated', depo_fetchers) def _format_depo_tree_internal( depo: fin_depo.data.Depo, fmt: list[str], indent: str, more_whitespace: bool = False, ): """Internal string formatting for `format_depo_tree`.""" fmt.append(depo.name) fmt.append('\n') if isinstance(depo, fin_depo.data.DepoGroup): for idx, nested_depo in enumerate(depo.nested): is_last = idx == len(depo.nested) - 1 fmt.append(indent) indent_new = indent + (' ' if is_last else '│') + ' ' line = '└─ ' if is_last else '├─ ' fmt.append(line) _format_depo_tree_internal(nested_depo, fmt, indent_new) else: depo_assets = list(depo.assets()) for idx, asset in enumerate(depo_assets): is_last = idx == len(depo_assets) - 1 amount = depo.get_amount_of_asset(asset) line = '└─ ' if is_last else '├─ ' derp = indent + line + asset.raw_short_name() amount_str = f'{amount: 25.10f}'.rstrip('0').removesuffix('.') fmt.append(f'{derp:20s}{amount_str}\n') if more_whitespace: fmt.append(f'{indent}\n') def format_depo_tree( depo: fin_depo.data.Depo, more_whitespace: bool = False, ) -> str: """Formats the given `Depo` to a tree structure.""" fmt: list[str] = [] _format_depo_tree_internal(depo, fmt, ' ', more_whitespace=more_whitespace) return ''.join(fmt) def main(): """Main function.""" aggregated_depos = setup_aggregate_depos() depo = aggregated_depos.get_depo() sys.stdout.write(format_depo_tree(depo)) if __name__ == '__main__': main()