Compare commits

..

5 Commits

Author SHA1 Message Date
ff5550db95 Ruff
Some checks failed
Run Python tests (through Pytest) / Test (push) Failing after 24s
Verify Python project can be installed, loaded and have version checked / Test (push) Failing after 21s
2025-04-13 13:55:20 +02:00
fbded540e2 Remove debug prints 2025-04-13 13:54:58 +02:00
561e03ab3c Test vectors 2025-04-13 13:46:52 +02:00
99190e910b Use coincurve for crypto 2025-04-13 13:22:38 +02:00
9033c8df3d Messing around with signing 2025-04-13 12:17:18 +02:00
4 changed files with 140 additions and 66 deletions

View File

@ -10,7 +10,6 @@ import base64
import dataclasses import dataclasses
import datetime import datetime
import email.utils import email.utils
import hashlib
import json import json
import logging import logging
import time import time
@ -18,19 +17,15 @@ from collections.abc import Mapping
from decimal import Decimal from decimal import Decimal
from typing import Any from typing import Any
import ecdsa
import pbcabi import pbcabi
import requests import requests
from frozendict import frozendict from frozendict import frozendict
from ._version import __version__ # noqa: F401 from ._version import __version__ # noqa: F401
from .crypto import SenderAuthentication, sign_transaction
from .pbc_types import ( from .pbc_types import (
Address, Address,
HashSha256,
Signature,
SignedTransaction, SignedTransaction,
SignedTransactionInnerPart,
Transaction,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -48,6 +43,8 @@ URL_NONCE = 'https://{hostname}/{shard}blockchain/account/{address}'
MPC_DECIMALS = 10000 MPC_DECIMALS = 10000
TRANSACTION_VALIDITY_DURATION = 60
def shard_id_for_address(address: str | Address) -> str: def shard_id_for_address(address: str | Address) -> str:
address = str(address) address = str(address)
@ -55,7 +52,7 @@ def shard_id_for_address(address: str | Address) -> str:
if address is None: if address is None:
msg = 'Address must not be None' msg = 'Address must not be None'
raise TypeError(msg) raise TypeError(msg)
if address.endswith('a'): if address.endswith(('a', 'd', 'c')):
return 'shards/Shard0/' return 'shards/Shard0/'
if address.endswith('2'): if address.endswith('2'):
return 'shards/Shard1/' return 'shards/Shard1/'
@ -69,56 +66,6 @@ class Balances:
byoc: Mapping[str, Decimal] byoc: Mapping[str, Decimal]
@dataclasses.dataclass(frozen=True)
class SenderAuthentication:
secret_key: str
def sender_address(self) -> Address:
verifying_key_repr = (
self._signing_key().get_verifying_key().to_string('uncompressed')
)
hashed = HashSha256.of_bytes(verifying_key_repr)
return Address(b'\0' + hashed._bytes[-20:])
def sign_hash(self, _hash: HashSha256) -> Signature:
return Signature(self._signing_key().sign(_hash._bytes))
def _signing_key(self):
secret_exponent = int(self.secret_key, 16)
return ecdsa.SigningKey.from_secret_exponent(
secret_exponent,
curve=ecdsa.SECP256k1,
)
TRANSACTION_VALIDITY_DURATION = 60
def sign_transaction(
sender_authentication: SenderAuthentication,
nonce: int,
gas_cost: int,
chain_id: str,
contract_address: Address | str,
transaction_rpc: bytes,
) -> SignedTransaction:
sender = sender_authentication.sender_address
valid_to_time: int = int(time.time() + TRANSACTION_VALIDITY_DURATION) * 1000
inner: SignedTransactionInnerPart = SignedTransactionInnerPart(
nonce,
valid_to_time,
gas_cost,
Transaction(Address.from_string(contract_address), transaction_rpc),
)
transaction_hash_bytes = inner.rpc_serialize() + chain_id.encode('utf8')
transaction_hash: HashSha256 = HashSha256.of_bytes(transaction_hash_bytes)
signature: Signature = sender_authentication.sign_hash(transaction_hash)
return SignedTransaction(inner, signature, transaction_hash)
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class PbcClient: class PbcClient:
session: requests.Session session: requests.Session
@ -137,15 +84,22 @@ class PbcClient:
msg = 'PbcClient.sender_authentication required for send_transaction' msg = 'PbcClient.sender_authentication required for send_transaction'
raise Exception(msg) raise Exception(msg)
valid_to_time: int = int(time.time() + TRANSACTION_VALIDITY_DURATION) * 1000
signed_transaction = sign_transaction( signed_transaction = sign_transaction(
self.sender_authentication, self.sender_authentication,
self.get_sender_authentication_nonce(), self.get_sender_authentication_nonce(),
valid_to_time,
gas_cost, gas_cost,
self.get_chain_id(), self.get_chain_id(),
contract_address, contract_address,
rpc, rpc,
) )
assert len(signed_transaction.inner.transaction.rpc_serialize()) == 25
assert len(signed_transaction.signature.rpc_serialize()) == 65
assert len(signed_transaction.inner.rpc_serialize()) == 49
assert len(signed_transaction.rpc_serialize()) == 114
return self.send_signed_transaction(signed_transaction) return self.send_signed_transaction(signed_transaction)
def send_signed_transaction(self, signed_transaction: SignedTransaction): def send_signed_transaction(self, signed_transaction: SignedTransaction):
@ -187,6 +141,8 @@ class PbcClient:
'date', 'date',
) )
date = email.utils.parsedate_to_datetime(date_text) date = email.utils.parsedate_to_datetime(date_text)
if response.text == '':
return (None, date)
json_data = response.json() json_data = response.json()
if json_data is None: if json_data is None:
msg = 'No result data for ' + url msg = 'No result data for ' + url

84
pbc_client/crypto.py Normal file
View File

@ -0,0 +1,84 @@
import dataclasses
import coincurve
from .pbc_types import (
Address,
HashSha256,
Signature,
SignedTransaction,
SignedTransactionInnerPart,
Transaction,
)
def find_recovery_id(
der_sig: bytes,
message_hash: HashSha256,
expected_public_key: coincurve.PublicKey,
) -> int:
r, s = coincurve.der.parse_signature(der_sig)
for recovery_id in range(4):
recovered_public_key = coincurve.PublicKey.from_signature_and_message(
signature=r + s + bytes([recovery_id]),
message=message_hash._bytes,
hasher=None,
)
if recovered_public_key.format() == expected_public_key.format():
return recovery_id
return None
@dataclasses.dataclass(frozen=True)
class SenderAuthentication:
_private_key_hex: str
def sender_address(self) -> Address:
verifying_key_repr = self._public_key().format(False)
assert len(verifying_key_repr) == 65
hashed = HashSha256.of_bytes(verifying_key_repr)
return Address(b'\0' + hashed._bytes[-20:])
def sign_hash(self, hash_to_sign: HashSha256) -> Signature:
private_key = self._private_key()
signature_wrong_location = private_key.sign_recoverable(
hash_to_sign._bytes,
hasher=lambda x: x,
)
signature = signature_wrong_location[64:] + signature_wrong_location[:64]
return Signature(signature)
def _public_key(self) -> coincurve.PublicKey:
return self._private_key().public_key
def _private_key(self) -> coincurve.PrivateKey:
return coincurve.PrivateKey.from_hex(
self._private_key_hex,
)
def sign_transaction(
sender_authentication: SenderAuthentication,
nonce: int,
valid_to_time: int,
gas_cost: int,
chain_id: str,
contract_address: Address | str,
transaction_rpc: bytes,
) -> SignedTransaction:
sender = sender_authentication.sender_address
inner: SignedTransactionInnerPart = SignedTransactionInnerPart(
nonce,
valid_to_time,
gas_cost,
Transaction(Address.from_string(contract_address), transaction_rpc),
)
signature: Signature = sender_authentication.sign_hash(inner.hash(chain_id))
return SignedTransaction(inner, signature)

View File

@ -20,7 +20,7 @@ class Address:
return self._bytes.hex() return self._bytes.hex()
@staticmethod @staticmethod
def from_string(s: str) -> 'Address': def from_string(s: 'Address | str') -> 'Address':
if isinstance(s, Address): if isinstance(s, Address):
return s return s
return Address(bytes.fromhex(s)) return Address(bytes.fromhex(s))
@ -31,7 +31,8 @@ class Signature:
_bytes: bytes _bytes: bytes
def __post_init__(self): def __post_init__(self):
assert len(self._bytes) == 64, len(self._bytes) assert len(self._bytes) == 65, len(self._bytes)
assert self._bytes[0] < 4, 'First byte is the recovery id. Must be less than 4'
def rpc_serialize(self) -> bytes: def rpc_serialize(self) -> bytes:
return self._bytes return self._bytes
@ -81,22 +82,25 @@ class SignedTransactionInnerPart:
def rpc_serialize(self) -> bytes: def rpc_serialize(self) -> bytes:
return ( return (
self.nonce.to_bytes(4, 'big') self.nonce.to_bytes(8, 'big')
+ self.valid_to_time.to_bytes(8, 'big') + self.valid_to_time.to_bytes(8, 'big')
+ self.gas_cost.to_bytes(8, 'big') + self.gas_cost.to_bytes(8, 'big')
+ self.transaction.rpc_serialize() + self.transaction.rpc_serialize()
) )
def hash(self, chain_id: str) -> HashSha256:
return HashSha256.of_bytes(
self.rpc_serialize() + size_prefixed(chain_id.encode('utf8')),
)
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class SignedTransaction: class SignedTransaction:
inner: SignedTransactionInnerPart inner: SignedTransactionInnerPart
signature: Signature signature: Signature
hash: HashSha256
def hash(self, chain_id: str) -> HashSha256:
return self.inner.hash(chain_id)
def rpc_serialize(self) -> bytes: def rpc_serialize(self) -> bytes:
return ( return self.signature.rpc_serialize() + self.inner.rpc_serialize()
self.inner.rpc_serialize()
+ self.signature.rpc_serialize()
+ self.hash.rpc_serialize()
)

30
test/test_crypto.py Normal file
View File

@ -0,0 +1,30 @@
from pbc_client.crypto import SenderAuthentication, sign_transaction
def test_sign():
sender_authentication = SenderAuthentication('01')
chain_id = 'SpecificChainIDLocatable'
contract_address = '000000000000000000000000000000000000000001'
signed = sign_transaction(
sender_authentication=sender_authentication,
nonce=2,
valid_to_time=3,
gas_cost=2,
chain_id=chain_id,
contract_address=contract_address,
transaction_rpc=bytes([0 for i in range(99)]),
)
print(signed)
assert (
str(signed.inner.rpc_serialize().hex())
== '00000000000000020000000000000003000000000000000200000000000000000000000000000000000000000100000063000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'
)
assert (
str(signed.hash(chain_id))
== '1be2895a85862e8dd3cc75b290cc28a22e960ae02dd5c07a73b93716f9adbee8'
)
assert (
str(signed.signature)
== '01f0f7d8f44919466eedbebc76bcb369bbb4c6f4f076e25c0ffe8ec9285890e53b4e39098540d088878a019c345ad73963543ce813fb9ccf4b84b0c25770452bd1'
)