Compare commits

..

No commits in common. "main" and "v0.1.3" have entirely different histories.
main ... v0.1.3

13 changed files with 109 additions and 302 deletions

View File

@ -1,7 +1,3 @@
# WARNING!
# THIS IS AN AUTOGENERATED FILE!
# MANUAL CHANGES CAN AND WILL BE OVERWRITTEN!
name: Package Python
on:
push:
@ -10,24 +6,11 @@ on:
paths-ignore: ['README.md', '.gitignore', 'LICENSE', 'CONVENTIONS.md', 'ruff.toml']
jobs:
Package-Python-And-Publish:
runs-on: ubuntu-latest
container:
image: node:21-bookworm
steps:
- name: Setting up Python ${{ env.PYTHON_VERSION }} for ${{runner.arch}} ${{runner.os}}
run: |
apt-get update
apt-get install -y python3 python3-pip
- name: Check out repository code
if: success()
uses: actions/checkout@v3
- name: Installing Python Dependencies
if: success()
run: python3 -m pip install --upgrade pip setuptools wheel build twine pytest --break-system-packages
- name: Build
if: success()
run: python3 -m build
- name: Publish
if: success()
run: python3 -m twine upload --repository-url "https://gitfub.space/api/packages/jmaa/pypi" -u ${{ secrets.PIPY_REPO_USER }} -p ${{ secrets.PIPY_REPO_PASS }} dist/*
Package:
uses: jmaa/workflows/.gitea/workflows/python-package.yaml@v6.21
with:
REGISTRY_DOMAIN: gitfub.space
REGISTRY_ORGANIZATION: jmaa
secrets:
PIPY_REPO_USER: ${{ secrets.PIPY_REPO_USER }}
PIPY_REPO_PASS: ${{ secrets.PIPY_REPO_PASS }}

View File

@ -1,7 +1,3 @@
# WARNING!
# THIS IS AN AUTOGENERATED FILE!
# MANUAL CHANGES CAN AND WILL BE OVERWRITTEN!
name: Run Python tests (through Pytest)
on:

View File

@ -1,7 +1,3 @@
# WARNING!
# THIS IS AN AUTOGENERATED FILE!
# MANUAL CHANGES CAN AND WILL BE OVERWRITTEN!
name: Verify Python project can be installed, loaded and have version checked
on:

View File

@ -1,14 +1,3 @@
<!-- WARNING! -->
<!-- THIS IS AN AUTOGENERATED FILE! -->
<!-- MANUAL CHANGES CAN AND WILL BE OVERWRITTEN! -->
# Conventions
When contributing code to this project, you MUST follow the requirements
specified here.
## Code Conventions
When contributing code to this project, you MUST follow these principles:
- Code should be easy to read and understand.
@ -24,13 +13,3 @@ When contributing code to this project, you MUST follow these principles:
- Documentation should document semantics, not syntax.
- Prefer importing modules, not individual items from modules.
- Do not use f-strings in logging statements.
- Loop variables and walrus-expression-variables should be deleted when
unneeded to keep scope clean, and to avoid accidental use.
## Testing
When contributing test to this project, you MUST follow these principles:
- Do not use any testing libraries other than `pytest`.
- Mocking is the root of all evil. Writing your own stubs is much more
preferable.

View File

@ -1,6 +1,7 @@
<!-- WARNING! -->
<!-- THIS IS AN AUTOGENERATED FILE! -->
<!-- MANUAL CHANGES CAN AND WILL BE OVERWRITTEN! -->
<!--- WARNING --->
<!--- THIS IS AN AUTO-GENERATED FILE --->
<!--- MANUAL CHANGES CAN AND WILL BE OVERWRITTEN --->
# Partisia Blockchain Client
@ -16,16 +17,7 @@ Group ApS.
This project requires [Python](https://www.python.org/) 3.8 or newer.
All required libraries can be installed easily using:
```bash
pip install -r requirements.txt
```
Full list of requirements:
- [coincurve](https://pypi.org/project/coincurve/)
- [pbcabi](https://gitfub.space/Jmaa/pbcabi)
This project does not have any library requirements 😎
## Contributing

View File

@ -1,32 +1,29 @@
"""# Partisia Blockchain Client
""" # Partisia Blockchain Client
Unofficial library for reading contract states from Partisia Blockchain.
This library is not officially associated with Partisia Blockchain nor Partisia
Group ApS.
"""
import base64
import dataclasses
import datetime
import base64
import email.utils
import json
import logging
import time
import ecdsa
import hashlib
import pbcabi
from collections.abc import Mapping
import time
from decimal import Decimal
from typing import Any
import pbcabi
import requests
from frozendict import frozendict
from ._version import __version__ # noqa: F401
from .crypto import SenderAuthentication, sign_transaction
from .pbc_types import (
Address,
SignedTransaction,
)
from .pbc_types import (Address, Signature, HashSha256, Transaction,
SignedTransaction, SignedTransactionInnerPart)
logger = logging.getLogger(__name__)
@ -43,8 +40,6 @@ URL_NONCE = 'https://{hostname}/{shard}blockchain/account/{address}'
MPC_DECIMALS = 10000
TRANSACTION_VALIDITY_DURATION = 60
def shard_id_for_address(address: str | Address) -> str:
address = str(address)
@ -52,7 +47,7 @@ def shard_id_for_address(address: str | Address) -> str:
if address is None:
msg = 'Address must not be None'
raise TypeError(msg)
if address.endswith(('a', 'd', 'c')):
if address.endswith('a'):
return 'shards/Shard0/'
if address.endswith('2'):
return 'shards/Shard1/'
@ -65,6 +60,49 @@ class Balances:
mpc: Decimal
byoc: Mapping[str, Decimal]
@dataclasses.dataclass(frozen=True)
class SenderAuthentication:
secret_key: str
def sender_address(self) -> Address:
derp = self._signing_key().verifying_key.to_string()
hashed = HashSha256.of_bytes(derp)
print(derp.hex())
print(derp)
print(hashed)
return Address(b'\0' + hashed._bytes[-20:])
def sign_hash(self, _hash: HashSha256) -> Signature:
return Signature(self._signing_key().sign(_hash._bytes))
def _signing_key(self):
return ecdsa.SigningKey.from_string(bytearray.fromhex(self.secret_key), curve=ecdsa.SECP256k1)
TRANSACTION_VALIDITY_DURATION = 60
def sign_transaction(
sender_authentication: SenderAuthentication,
nonce: int,
gas_cost: int,
chain_id: str,
contract_address: Address | str,
transaction_rpc: bytes,
) -> SignedTransaction:
sender = sender_authentication.sender_address
valid_to_time: int = int(time.time() + TRANSACTION_VALIDITY_DURATION) * 1000
inner: SignedTransactionInnerPart = SignedTransactionInnerPart(nonce,
valid_to_time,
gas_cost,
Transaction(Address.from_string(contract_address),
transaction_rpc))
transaction_hash_bytes = inner.rpc_serialize() + chain_id.encode('utf8')
transaction_hash: HashSha256 = HashSha256.of_bytes(transaction_hash_bytes)
signature : Signature = sender_authentication.sign_hash(transaction_hash)
return SignedTransaction(inner, signature, transaction_hash)
@dataclasses.dataclass(frozen=True)
class PbcClient:
@ -79,45 +117,29 @@ class PbcClient:
if self.sender_authentication is not None:
assert isinstance(self.sender_authentication, SenderAuthentication)
def send_transaction(self, contract_address: str, rpc: bytes, gas_cost: int):
if self.sender_authentication is None:
msg = 'PbcClient.sender_authentication required for send_transaction'
msg = "PbcClient.sender_authentication required for send_transaction"
raise Exception(msg)
valid_to_time: int = int(time.time() + TRANSACTION_VALIDITY_DURATION) * 1000
signed_transaction = sign_transaction(
self.sender_authentication,
self.get_sender_authentication_nonce(),
valid_to_time,
gas_cost,
self.get_chain_id(),
contract_address,
rpc,
)
assert len(signed_transaction.inner.transaction.rpc_serialize()) == 25
assert len(signed_transaction.signature.rpc_serialize()) == 65
assert len(signed_transaction.inner.rpc_serialize()) == 49
assert len(signed_transaction.rpc_serialize()) == 114
signed_transaction = sign_transaction(self.sender_authentication,
self.get_sender_authentication_nonce(),
gas_cost,
self.get_chain_id(),
contract_address, rpc)
return self.send_signed_transaction(signed_transaction)
def send_signed_transaction(self, signed_transaction: SignedTransaction):
url = URL_TRANSACTION.format(
hostname=self.hostname,
shard=shard_id_for_address(
signed_transaction.inner.transaction.contract_address,
),
shard=shard_id_for_address(signed_transaction.inner.transaction.contract_address),
)
transaction_payload: str = base64.b64encode(
signed_transaction.rpc_serialize(),
).decode('utf8')
self._get_json(
url,
data={'transactionPayload': transaction_payload},
method='PUT',
)
transaction_payload: str = base64.b64encode(signed_transaction.rpc_serialize()).decode('utf8')
self._get_json(url, data = {'transactionPayload':transaction_payload }, method = 'PUT')
def _get_json(
self,
@ -141,8 +163,6 @@ class PbcClient:
'date',
)
date = email.utils.parsedate_to_datetime(date_text)
if response.text == '':
return (None, date)
json_data = response.json()
if json_data is None:
msg = 'No result data for ' + url
@ -206,6 +226,7 @@ class PbcClient:
)
return self._get_json(url, method='GET')[0]['nonce']
def get_contract_state(self, address: str) -> tuple[dict, datetime.datetime]:
# TODO: Rename to get_contract_state_json
url = URL_CONTRACT_STATE.format(
@ -223,15 +244,11 @@ class PbcClient:
file_abi = self.get_contract_abi(address)
state, server_time = self.get_contract_state(address)
state_bytes = base64.b64decode(state['state']['data'])
state_deserialized = file_abi.contract.read_state(state_bytes)
state_deserialized = file_abi.contract.read_state(state_bytes)
return state_deserialized, server_time
def get_typed_contract_avl_tree(
self,
address: str,
avl_tree_id: pbcabi.model.AvlTreeId,
) -> tuple[dict, datetime.datetime]:
def get_typed_contract_avl_tree(self, address: str, avl_tree_id: pbcabi.model.AvlTreeId) -> tuple[dict, datetime.datetime]:
file_abi = self.get_contract_abi(address)
state, server_time = self.get_contract_state(address)
for avl_tree in state['avlTrees']:
@ -243,14 +260,8 @@ class PbcClient:
key_bytes = base64.b64decode(key_and_value['key']['data']['data'])
value_bytes = base64.b64decode(key_and_value['value']['data'])
key = file_abi.contract.read_state(
key_bytes,
avl_tree_id.type_spec.type_key,
)
value = file_abi.contract.read_state(
value_bytes,
avl_tree_id.type_spec.type_value,
)
key = file_abi.contract.read_state(key_bytes, avl_tree_id.type_spec.type_key)
value = file_abi.contract.read_state(value_bytes, avl_tree_id.type_spec.type_value)
data[key] = value
@ -258,6 +269,7 @@ class PbcClient:
return data, server_time
def get_contract_abi(self, address: str) -> pbcabi.model.FileAbi:
url = URL_CONTRACT_STATE.format(
hostname=self.hostname,

View File

@ -1 +1 @@
__version__ = '0.1.10'
__version__ = '0.1.3'

View File

@ -1,84 +0,0 @@
import dataclasses
import coincurve
from .pbc_types import (
Address,
HashSha256,
Signature,
SignedTransaction,
SignedTransactionInnerPart,
Transaction,
)
def find_recovery_id(
der_sig: bytes,
message_hash: HashSha256,
expected_public_key: coincurve.PublicKey,
) -> int:
r, s = coincurve.der.parse_signature(der_sig)
for recovery_id in range(4):
recovered_public_key = coincurve.PublicKey.from_signature_and_message(
signature=r + s + bytes([recovery_id]),
message=message_hash._bytes,
hasher=None,
)
if recovered_public_key.format() == expected_public_key.format():
return recovery_id
return None
@dataclasses.dataclass(frozen=True)
class SenderAuthentication:
_private_key_hex: str
def sender_address(self) -> Address:
verifying_key_repr = self._public_key().format(False)
assert len(verifying_key_repr) == 65
hashed = HashSha256.of_bytes(verifying_key_repr)
return Address(b'\0' + hashed._bytes[-20:])
def sign_hash(self, hash_to_sign: HashSha256) -> Signature:
private_key = self._private_key()
signature_wrong_location = private_key.sign_recoverable(
hash_to_sign._bytes,
hasher=lambda x: x,
)
signature = signature_wrong_location[64:] + signature_wrong_location[:64]
return Signature(signature)
def _public_key(self) -> coincurve.PublicKey:
return self._private_key().public_key
def _private_key(self) -> coincurve.PrivateKey:
return coincurve.PrivateKey.from_hex(
self._private_key_hex,
)
def sign_transaction(
sender_authentication: SenderAuthentication,
nonce: int,
valid_to_time: int,
gas_cost: int,
chain_id: str,
contract_address: Address | str,
transaction_rpc: bytes,
) -> SignedTransaction:
sender = sender_authentication.sender_address
inner: SignedTransactionInnerPart = SignedTransactionInnerPart(
nonce,
valid_to_time,
gas_cost,
Transaction(Address.from_string(contract_address), transaction_rpc),
)
signature: Signature = sender_authentication.sign_hash(inner.hash(chain_id))
return SignedTransaction(inner, signature)

View File

@ -1,11 +1,22 @@
import dataclasses
import datetime
import base64
import email.utils
import json
import logging
import hashlib
import pbcabi
from collections.abc import Mapping
import time
from decimal import Decimal
from typing import Any
def size_prefixed(_bytes: bytes) -> bytes:
return len(_bytes).to_bytes(4, 'big') + _bytes
@dataclasses.dataclass(frozen=True)
class Address:
_bytes: bytes
@ -20,7 +31,7 @@ class Address:
return self._bytes.hex()
@staticmethod
def from_string(s: 'Address | str') -> 'Address':
def from_string(s: str) -> 'Address':
if isinstance(s, Address):
return s
return Address(bytes.fromhex(s))
@ -31,8 +42,7 @@ class Signature:
_bytes: bytes
def __post_init__(self):
assert len(self._bytes) == 65, len(self._bytes)
assert self._bytes[0] < 4, 'First byte is the recovery id. Must be less than 4'
assert len(self._bytes) == 64, len(self._bytes)
def rpc_serialize(self) -> bytes:
return self._bytes
@ -40,7 +50,6 @@ class Signature:
def __str__(self) -> str:
return self._bytes.hex()
@dataclasses.dataclass(frozen=True)
class HashSha256:
_bytes: bytes
@ -58,7 +67,6 @@ class HashSha256:
def __str__(self) -> str:
return self._bytes.hex()
@dataclasses.dataclass(frozen=True)
class Transaction:
contract_address: Address
@ -68,10 +76,7 @@ class Transaction:
assert isinstance(self.contract_address, Address), self.contract_address
def rpc_serialize(self) -> bytes:
return self.contract_address.rpc_serialize() + size_prefixed(
self.transaction_rpc,
)
return self.contract_address.rpc_serialize() + size_prefixed(self.transaction_rpc)
@dataclasses.dataclass(frozen=True)
class SignedTransactionInnerPart:
@ -81,26 +86,13 @@ class SignedTransactionInnerPart:
transaction: Transaction
def rpc_serialize(self) -> bytes:
return (
self.nonce.to_bytes(8, 'big')
+ self.valid_to_time.to_bytes(8, 'big')
+ self.gas_cost.to_bytes(8, 'big')
+ self.transaction.rpc_serialize()
)
def hash(self, chain_id: str) -> HashSha256:
return HashSha256.of_bytes(
self.rpc_serialize() + size_prefixed(chain_id.encode('utf8')),
)
return self.nonce.to_bytes(4, 'big') + self.valid_to_time.to_bytes(8, 'big') + self.gas_cost.to_bytes(8, 'big') + self.transaction.rpc_serialize()
@dataclasses.dataclass(frozen=True)
class SignedTransaction:
inner: SignedTransactionInnerPart
signature: Signature
def hash(self, chain_id: str) -> HashSha256:
return self.inner.hash(chain_id)
hash: HashSha256
def rpc_serialize(self) -> bytes:
return self.signature.rpc_serialize() + self.inner.rpc_serialize()
return self.inner.rpc_serialize() + self.signature.rpc_serialize() + self.hash.rpc_serialize()

View File

@ -1,2 +0,0 @@
coincurve
pbcabi @ git+https://gitfub.space/Jmaa/pbcabi

View File

@ -1,16 +1,17 @@
# WARNING!
# THIS IS AN AUTOGENERATED FILE!
# MANUAL CHANGES CAN AND WILL BE OVERWRITTEN!
# WARNING
#
# THIS IS AN AUTOGENERATED FILE.
#
# MANUAL CHANGES CAN AND WILL BE OVERWRITTEN.
import re
from pathlib import Path
from setuptools import setup
PACKAGE_NAME = 'pbc_client'
PACKAGE_DESCRIPTION = """
# Partisia Blockchain Client
# Partisia Blockchain Client
Unofficial library for reading contract states from Partisia Blockchain.
@ -23,36 +24,18 @@ Unofficial library for reading contract states from Partisia Blockchain.""".stri
def parse_version_file(text: str) -> str:
text = re.sub('^#.*', '', text, flags=re.MULTILINE)
match = re.match(r'^\s*__version__\s*=\s*(["\'])([\d\.]+)\1$', text)
match = re.match(r'^__version__\s*=\s*(["\'])([\d\.]+)\1$', text)
if match is None:
msg = 'Malformed _version.py file!'
raise Exception(msg)
return match.group(2)
def find_python_packages() -> list[str]:
"""
Find all python packages. (Directories containing __init__.py files.)
"""
root_path = Path(PACKAGE_NAME)
packages: set[str] = set([PACKAGE_NAME])
# Search recursively
for init_file in root_path.rglob('__init__.py'):
packages.add(str(init_file.parent).replace('/', '.'))
print(f'Found following packages: {packages}')
return sorted(packages)
with open(PACKAGE_NAME + '/_version.py') as f:
version = parse_version_file(f.read())
REQUIREMENTS_MAIN = [
'coincurve',
'pbcabi @ git+https://gitfub.space/Jmaa/pbcabi',
]
REQUIREMENTS_MAIN = []
REQUIREMENTS_TEST = []
@ -65,7 +48,7 @@ setup(
author='Jon Michael Aanes',
author_email='jonjmaa@gmail.com',
url='https://gitfub.space/Jmaa/' + PACKAGE_NAME,
packages=find_python_packages(),
packages=[PACKAGE_NAME],
install_requires=REQUIREMENTS_MAIN,
extras_require={
'test': REQUIREMENTS_TEST,

View File

@ -1,38 +0,0 @@
from pbc_client.crypto import SenderAuthentication, sign_transaction
from pbc_client.pbc_types import Address
def test_sender_authentication():
sender_authentication = SenderAuthentication('aa')
assert str(sender_authentication.sender_address()) == '00e72e44eab933faaf1fd4ce94bb57e08bff98a1ed'
def test_sender_authentication_2():
sender_authentication = SenderAuthentication('2')
assert str(sender_authentication.sender_address()) =='00b2e734b5d8da089318d0d2b076c19f59c450855a'
def test_sign():
sender_authentication = SenderAuthentication('01')
chain_id = 'SpecificChainIDLocatable'
contract_address = '000000000000000000000000000000000000000001'
signed = sign_transaction(
sender_authentication=sender_authentication,
nonce=2,
valid_to_time=3,
gas_cost=2,
chain_id=chain_id,
contract_address=contract_address,
transaction_rpc=bytes([0 for i in range(99)]),
)
print(signed)
assert (
str(signed.inner.rpc_serialize().hex())
== '00000000000000020000000000000003000000000000000200000000000000000000000000000000000000000100000063000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'
)
assert (
str(signed.hash(chain_id))
== '1be2895a85862e8dd3cc75b290cc28a22e960ae02dd5c07a73b93716f9adbee8'
)
assert (
str(signed.signature)
== '01f0f7d8f44919466eedbebc76bcb369bbb4c6f4f076e25c0ffe8ec9285890e53b4e39098540d088878a019c345ad73963543ce813fb9ccf4b84b0c25770452bd1'
)

View File

@ -1,2 +0,0 @@
def test_init():
import pbc_client # noqa: F401