Ruff
Some checks failed
Run Python tests (through Pytest) / Test (push) Failing after 23s
Verify Python project can be installed, loaded and have version checked / Test (push) Failing after 22s

This commit is contained in:
Jon Michael Aanes 2025-04-13 01:19:47 +02:00
parent 5afd7e596f
commit dae2168d8f
3 changed files with 86 additions and 50 deletions

View File

@ -5,27 +5,33 @@ Unofficial library for reading contract states from Partisia Blockchain.
This library is not officially associated with Partisia Blockchain nor Partisia This library is not officially associated with Partisia Blockchain nor Partisia
Group ApS. Group ApS.
""" """
import base64
import dataclasses import dataclasses
import datetime import datetime
import base64
import email.utils import email.utils
import hashlib
import json import json
import logging import logging
import ecdsa
import hashlib
import pbcabi
from collections.abc import Mapping
import time import time
from collections.abc import Mapping
from decimal import Decimal from decimal import Decimal
from typing import Any from typing import Any
import ecdsa
import pbcabi
import requests import requests
from frozendict import frozendict from frozendict import frozendict
from .pbc_types import (Address, Signature, HashSha256, Transaction,
SignedTransaction, SignedTransactionInnerPart)
from ._version import __version__ # noqa: F401 from ._version import __version__ # noqa: F401
from .pbc_types import (
Address,
HashSha256,
Signature,
SignedTransaction,
SignedTransactionInnerPart,
Transaction,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -62,12 +68,15 @@ class Balances:
mpc: Decimal mpc: Decimal
byoc: Mapping[str, Decimal] byoc: Mapping[str, Decimal]
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class SenderAuthentication: class SenderAuthentication:
secret_key: str secret_key: str
def sender_address(self) -> Address: def sender_address(self) -> Address:
verifying_key_repr = self._signing_key().get_verifying_key().to_string("uncompressed") verifying_key_repr = (
self._signing_key().get_verifying_key().to_string('uncompressed')
)
hashed = HashSha256.of_bytes(verifying_key_repr) hashed = HashSha256.of_bytes(verifying_key_repr)
return Address(b'\0' + hashed._bytes[-20:]) return Address(b'\0' + hashed._bytes[-20:])
@ -76,10 +85,15 @@ class SenderAuthentication:
def _signing_key(self): def _signing_key(self):
secret_exponent = int(self.secret_key, 16) secret_exponent = int(self.secret_key, 16)
return ecdsa.SigningKey.from_secret_exponent(secret_exponent , curve=ecdsa.SECP256k1) return ecdsa.SigningKey.from_secret_exponent(
secret_exponent,
curve=ecdsa.SECP256k1,
)
TRANSACTION_VALIDITY_DURATION = 60 TRANSACTION_VALIDITY_DURATION = 60
def sign_transaction( def sign_transaction(
sender_authentication: SenderAuthentication, sender_authentication: SenderAuthentication,
nonce: int, nonce: int,
@ -91,11 +105,12 @@ def sign_transaction(
sender = sender_authentication.sender_address sender = sender_authentication.sender_address
valid_to_time: int = int(time.time() + TRANSACTION_VALIDITY_DURATION) * 1000 valid_to_time: int = int(time.time() + TRANSACTION_VALIDITY_DURATION) * 1000
inner: SignedTransactionInnerPart = SignedTransactionInnerPart(nonce, inner: SignedTransactionInnerPart = SignedTransactionInnerPart(
nonce,
valid_to_time, valid_to_time,
gas_cost, gas_cost,
Transaction(Address.from_string(contract_address), Transaction(Address.from_string(contract_address), transaction_rpc),
transaction_rpc)) )
transaction_hash_bytes = inner.rpc_serialize() + chain_id.encode('utf8') transaction_hash_bytes = inner.rpc_serialize() + chain_id.encode('utf8')
@ -117,29 +132,38 @@ class PbcClient:
if self.sender_authentication is not None: if self.sender_authentication is not None:
assert isinstance(self.sender_authentication, SenderAuthentication) assert isinstance(self.sender_authentication, SenderAuthentication)
def send_transaction(self, contract_address: str, rpc: bytes, gas_cost: int): def send_transaction(self, contract_address: str, rpc: bytes, gas_cost: int):
if self.sender_authentication is None: if self.sender_authentication is None:
msg = "PbcClient.sender_authentication required for send_transaction" msg = 'PbcClient.sender_authentication required for send_transaction'
raise Exception(msg) raise Exception(msg)
signed_transaction = sign_transaction(self.sender_authentication, signed_transaction = sign_transaction(
self.sender_authentication,
self.get_sender_authentication_nonce(), self.get_sender_authentication_nonce(),
gas_cost, gas_cost,
self.get_chain_id(), self.get_chain_id(),
contract_address, rpc) contract_address,
rpc,
)
return self.send_signed_transaction(signed_transaction) return self.send_signed_transaction(signed_transaction)
def send_signed_transaction(self, signed_transaction: SignedTransaction): def send_signed_transaction(self, signed_transaction: SignedTransaction):
url = URL_TRANSACTION.format( url = URL_TRANSACTION.format(
hostname=self.hostname, hostname=self.hostname,
shard=shard_id_for_address(signed_transaction.inner.transaction.contract_address), shard=shard_id_for_address(
signed_transaction.inner.transaction.contract_address,
),
) )
transaction_payload: str = base64.b64encode(signed_transaction.rpc_serialize()).decode('utf8') transaction_payload: str = base64.b64encode(
self._get_json(url, data = {'transactionPayload':transaction_payload }, method = 'PUT') signed_transaction.rpc_serialize(),
).decode('utf8')
self._get_json(
url,
data={'transactionPayload': transaction_payload},
method='PUT',
)
def _get_json( def _get_json(
self, self,
@ -226,7 +250,6 @@ class PbcClient:
) )
return self._get_json(url, method='GET')[0]['nonce'] return self._get_json(url, method='GET')[0]['nonce']
def get_contract_state(self, address: str) -> tuple[dict, datetime.datetime]: def get_contract_state(self, address: str) -> tuple[dict, datetime.datetime]:
# TODO: Rename to get_contract_state_json # TODO: Rename to get_contract_state_json
url = URL_CONTRACT_STATE.format( url = URL_CONTRACT_STATE.format(
@ -248,7 +271,11 @@ class PbcClient:
return state_deserialized, server_time return state_deserialized, server_time
def get_typed_contract_avl_tree(self, address: str, avl_tree_id: pbcabi.model.AvlTreeId) -> tuple[dict, datetime.datetime]: def get_typed_contract_avl_tree(
self,
address: str,
avl_tree_id: pbcabi.model.AvlTreeId,
) -> tuple[dict, datetime.datetime]:
file_abi = self.get_contract_abi(address) file_abi = self.get_contract_abi(address)
state, server_time = self.get_contract_state(address) state, server_time = self.get_contract_state(address)
for avl_tree in state['avlTrees']: for avl_tree in state['avlTrees']:
@ -260,8 +287,14 @@ class PbcClient:
key_bytes = base64.b64decode(key_and_value['key']['data']['data']) key_bytes = base64.b64decode(key_and_value['key']['data']['data'])
value_bytes = base64.b64decode(key_and_value['value']['data']) value_bytes = base64.b64decode(key_and_value['value']['data'])
key = file_abi.contract.read_state(key_bytes, avl_tree_id.type_spec.type_key) key = file_abi.contract.read_state(
value = file_abi.contract.read_state(value_bytes, avl_tree_id.type_spec.type_value) key_bytes,
avl_tree_id.type_spec.type_key,
)
value = file_abi.contract.read_state(
value_bytes,
avl_tree_id.type_spec.type_value,
)
data[key] = value data[key] = value
@ -269,7 +302,6 @@ class PbcClient:
return data, server_time return data, server_time
def get_contract_abi(self, address: str) -> pbcabi.model.FileAbi: def get_contract_abi(self, address: str) -> pbcabi.model.FileAbi:
url = URL_CONTRACT_STATE.format( url = URL_CONTRACT_STATE.format(
hostname=self.hostname, hostname=self.hostname,

View File

@ -1,22 +1,11 @@
import dataclasses import dataclasses
import datetime
import base64
import email.utils
import json
import logging
import hashlib import hashlib
import pbcabi
from collections.abc import Mapping
import time
from decimal import Decimal
from typing import Any
def size_prefixed(_bytes: bytes) -> bytes: def size_prefixed(_bytes: bytes) -> bytes:
return len(_bytes).to_bytes(4, 'big') + _bytes return len(_bytes).to_bytes(4, 'big') + _bytes
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class Address: class Address:
_bytes: bytes _bytes: bytes
@ -50,6 +39,7 @@ class Signature:
def __str__(self) -> str: def __str__(self) -> str:
return self._bytes.hex() return self._bytes.hex()
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class HashSha256: class HashSha256:
_bytes: bytes _bytes: bytes
@ -67,6 +57,7 @@ class HashSha256:
def __str__(self) -> str: def __str__(self) -> str:
return self._bytes.hex() return self._bytes.hex()
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class Transaction: class Transaction:
contract_address: Address contract_address: Address
@ -76,7 +67,10 @@ class Transaction:
assert isinstance(self.contract_address, Address), self.contract_address assert isinstance(self.contract_address, Address), self.contract_address
def rpc_serialize(self) -> bytes: def rpc_serialize(self) -> bytes:
return self.contract_address.rpc_serialize() + size_prefixed(self.transaction_rpc) return self.contract_address.rpc_serialize() + size_prefixed(
self.transaction_rpc,
)
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class SignedTransactionInnerPart: class SignedTransactionInnerPart:
@ -86,7 +80,13 @@ class SignedTransactionInnerPart:
transaction: Transaction transaction: Transaction
def rpc_serialize(self) -> bytes: def rpc_serialize(self) -> bytes:
return self.nonce.to_bytes(4, 'big') + self.valid_to_time.to_bytes(8, 'big') + self.gas_cost.to_bytes(8, 'big') + self.transaction.rpc_serialize() return (
self.nonce.to_bytes(4, 'big')
+ self.valid_to_time.to_bytes(8, 'big')
+ self.gas_cost.to_bytes(8, 'big')
+ self.transaction.rpc_serialize()
)
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class SignedTransaction: class SignedTransaction:
@ -95,4 +95,8 @@ class SignedTransaction:
hash: HashSha256 hash: HashSha256
def rpc_serialize(self) -> bytes: def rpc_serialize(self) -> bytes:
return self.inner.rpc_serialize() + self.signature.rpc_serialize() + self.hash.rpc_serialize() return (
self.inner.rpc_serialize()
+ self.signature.rpc_serialize()
+ self.hash.rpc_serialize()
)