1
0
fin-depo/fin_depo/defi_partisia_blockchain.py

82 lines
2.5 KiB
Python

"""See `PartisiaBlockchainAccountDepoFetcher` for documentation."""
import logging
import fin_defs
import pbc_client
import requests
from .data import (
DepoFetcher,
DepoGroup,
DepoSingle,
DepositDetails,
DoubleRegister,
WithdrawalDetails,
)
logger = logging.getLogger(__name__)
BYOC_ASSETS = {
'ETH': fin_defs.WELL_KNOWN_SYMBOLS['ETH'],
'POLYGON_USDC': fin_defs.WELL_KNOWN_SYMBOLS['USDC'],
'BNB': fin_defs.WELL_KNOWN_SYMBOLS['BNB'],
'MATIC': fin_defs.WELL_KNOWN_SYMBOLS['MATIC'],
}
class PartisiaBlockchainAccountDepoFetcher(DepoFetcher):
"""Depository fetcher for individual [Partisia
Blockchain](https://partisiablockchain.com/) accounts, including [MPC](https://partisiablockchain.gitlab.io/documentation/pbc-fundamentals/mpc-token-model-and-account-elements.html) and
[Bring-Your-Own-Coin](https://partisiablockchain.gitlab.io/documentation/pbc-fundamentals/byoc/introduction-to-byoc.html).
Requirements for use:
- Account on Partisia Blockchain.
Depository structure: A nested depo containing both a `Native` and a `BYOC`
depo.
"""
def __init__(self, session: requests.Session, blockchain_address: str):
self.assert_param('session', requests.Session, session)
self.assert_param('blockchain_address', str, blockchain_address)
self.client = pbc_client.PbcClient(session)
self.blockchain_address = blockchain_address
def get_depo(self) -> DepoGroup:
balances = self.client.get_account_balances(self.blockchain_address)
depo_mpc = DepoSingle(
'Native',
balances.update_time,
{fin_defs.MPC: balances.mpc},
)
depo_byoc = DepoSingle(
'Bring Your Own Coin',
balances.update_time,
{BYOC_ASSETS[k]: v for k, v in balances.byoc.items()},
)
return DepoGroup(
'Partisia Blockchain',
balances.update_time,
[depo_mpc, depo_byoc],
)
def _get_withdrawals(self) -> list[WithdrawalDetails]:
msg = 'TODO'
raise UnsupportedOperationException(msg)
def _get_deposits(self) -> list[DepositDetails]:
msg = 'TODO'
raise UnsupportedOperationException(msg)
def _get_double_registers(self) -> list[DoubleRegister]:
double_registers: list[DoubleRegister] = []
double_registers += self._get_deposits()
double_registers += self._get_withdrawals()
double_registers.sort(key=lambda x: x.executed_time)
return double_registers