Compare commits

...

5 Commits

Author SHA1 Message Date
ff5550db95 Ruff
Some checks failed
Run Python tests (through Pytest) / Test (push) Failing after 24s
Verify Python project can be installed, loaded and have version checked / Test (push) Failing after 21s
2025-04-13 13:55:20 +02:00
fbded540e2 Remove debug prints 2025-04-13 13:54:58 +02:00
561e03ab3c Test vectors 2025-04-13 13:46:52 +02:00
99190e910b Use coincurve for crypto 2025-04-13 13:22:38 +02:00
9033c8df3d Messing around with signing 2025-04-13 12:17:18 +02:00
4 changed files with 140 additions and 66 deletions

View File

@ -10,7 +10,6 @@ import base64
import dataclasses
import datetime
import email.utils
import hashlib
import json
import logging
import time
@ -18,19 +17,15 @@ from collections.abc import Mapping
from decimal import Decimal
from typing import Any
import ecdsa
import pbcabi
import requests
from frozendict import frozendict
from ._version import __version__ # noqa: F401
from .crypto import SenderAuthentication, sign_transaction
from .pbc_types import (
Address,
HashSha256,
Signature,
SignedTransaction,
SignedTransactionInnerPart,
Transaction,
)
logger = logging.getLogger(__name__)
@ -48,6 +43,8 @@ URL_NONCE = 'https://{hostname}/{shard}blockchain/account/{address}'
MPC_DECIMALS = 10000
TRANSACTION_VALIDITY_DURATION = 60
def shard_id_for_address(address: str | Address) -> str:
address = str(address)
@ -55,7 +52,7 @@ def shard_id_for_address(address: str | Address) -> str:
if address is None:
msg = 'Address must not be None'
raise TypeError(msg)
if address.endswith('a'):
if address.endswith(('a', 'd', 'c')):
return 'shards/Shard0/'
if address.endswith('2'):
return 'shards/Shard1/'
@ -69,56 +66,6 @@ class Balances:
byoc: Mapping[str, Decimal]
@dataclasses.dataclass(frozen=True)
class SenderAuthentication:
secret_key: str
def sender_address(self) -> Address:
verifying_key_repr = (
self._signing_key().get_verifying_key().to_string('uncompressed')
)
hashed = HashSha256.of_bytes(verifying_key_repr)
return Address(b'\0' + hashed._bytes[-20:])
def sign_hash(self, _hash: HashSha256) -> Signature:
return Signature(self._signing_key().sign(_hash._bytes))
def _signing_key(self):
secret_exponent = int(self.secret_key, 16)
return ecdsa.SigningKey.from_secret_exponent(
secret_exponent,
curve=ecdsa.SECP256k1,
)
TRANSACTION_VALIDITY_DURATION = 60
def sign_transaction(
sender_authentication: SenderAuthentication,
nonce: int,
gas_cost: int,
chain_id: str,
contract_address: Address | str,
transaction_rpc: bytes,
) -> SignedTransaction:
sender = sender_authentication.sender_address
valid_to_time: int = int(time.time() + TRANSACTION_VALIDITY_DURATION) * 1000
inner: SignedTransactionInnerPart = SignedTransactionInnerPart(
nonce,
valid_to_time,
gas_cost,
Transaction(Address.from_string(contract_address), transaction_rpc),
)
transaction_hash_bytes = inner.rpc_serialize() + chain_id.encode('utf8')
transaction_hash: HashSha256 = HashSha256.of_bytes(transaction_hash_bytes)
signature: Signature = sender_authentication.sign_hash(transaction_hash)
return SignedTransaction(inner, signature, transaction_hash)
@dataclasses.dataclass(frozen=True)
class PbcClient:
session: requests.Session
@ -137,15 +84,22 @@ class PbcClient:
msg = 'PbcClient.sender_authentication required for send_transaction'
raise Exception(msg)
valid_to_time: int = int(time.time() + TRANSACTION_VALIDITY_DURATION) * 1000
signed_transaction = sign_transaction(
self.sender_authentication,
self.get_sender_authentication_nonce(),
valid_to_time,
gas_cost,
self.get_chain_id(),
contract_address,
rpc,
)
assert len(signed_transaction.inner.transaction.rpc_serialize()) == 25
assert len(signed_transaction.signature.rpc_serialize()) == 65
assert len(signed_transaction.inner.rpc_serialize()) == 49
assert len(signed_transaction.rpc_serialize()) == 114
return self.send_signed_transaction(signed_transaction)
def send_signed_transaction(self, signed_transaction: SignedTransaction):
@ -187,6 +141,8 @@ class PbcClient:
'date',
)
date = email.utils.parsedate_to_datetime(date_text)
if response.text == '':
return (None, date)
json_data = response.json()
if json_data is None:
msg = 'No result data for ' + url

84
pbc_client/crypto.py Normal file
View File

@ -0,0 +1,84 @@
import dataclasses
import coincurve
from .pbc_types import (
Address,
HashSha256,
Signature,
SignedTransaction,
SignedTransactionInnerPart,
Transaction,
)
def find_recovery_id(
der_sig: bytes,
message_hash: HashSha256,
expected_public_key: coincurve.PublicKey,
) -> int:
r, s = coincurve.der.parse_signature(der_sig)
for recovery_id in range(4):
recovered_public_key = coincurve.PublicKey.from_signature_and_message(
signature=r + s + bytes([recovery_id]),
message=message_hash._bytes,
hasher=None,
)
if recovered_public_key.format() == expected_public_key.format():
return recovery_id
return None
@dataclasses.dataclass(frozen=True)
class SenderAuthentication:
_private_key_hex: str
def sender_address(self) -> Address:
verifying_key_repr = self._public_key().format(False)
assert len(verifying_key_repr) == 65
hashed = HashSha256.of_bytes(verifying_key_repr)
return Address(b'\0' + hashed._bytes[-20:])
def sign_hash(self, hash_to_sign: HashSha256) -> Signature:
private_key = self._private_key()
signature_wrong_location = private_key.sign_recoverable(
hash_to_sign._bytes,
hasher=lambda x: x,
)
signature = signature_wrong_location[64:] + signature_wrong_location[:64]
return Signature(signature)
def _public_key(self) -> coincurve.PublicKey:
return self._private_key().public_key
def _private_key(self) -> coincurve.PrivateKey:
return coincurve.PrivateKey.from_hex(
self._private_key_hex,
)
def sign_transaction(
sender_authentication: SenderAuthentication,
nonce: int,
valid_to_time: int,
gas_cost: int,
chain_id: str,
contract_address: Address | str,
transaction_rpc: bytes,
) -> SignedTransaction:
sender = sender_authentication.sender_address
inner: SignedTransactionInnerPart = SignedTransactionInnerPart(
nonce,
valid_to_time,
gas_cost,
Transaction(Address.from_string(contract_address), transaction_rpc),
)
signature: Signature = sender_authentication.sign_hash(inner.hash(chain_id))
return SignedTransaction(inner, signature)

View File

@ -20,7 +20,7 @@ class Address:
return self._bytes.hex()
@staticmethod
def from_string(s: str) -> 'Address':
def from_string(s: 'Address | str') -> 'Address':
if isinstance(s, Address):
return s
return Address(bytes.fromhex(s))
@ -31,7 +31,8 @@ class Signature:
_bytes: bytes
def __post_init__(self):
assert len(self._bytes) == 64, len(self._bytes)
assert len(self._bytes) == 65, len(self._bytes)
assert self._bytes[0] < 4, 'First byte is the recovery id. Must be less than 4'
def rpc_serialize(self) -> bytes:
return self._bytes
@ -81,22 +82,25 @@ class SignedTransactionInnerPart:
def rpc_serialize(self) -> bytes:
return (
self.nonce.to_bytes(4, 'big')
self.nonce.to_bytes(8, 'big')
+ self.valid_to_time.to_bytes(8, 'big')
+ self.gas_cost.to_bytes(8, 'big')
+ self.transaction.rpc_serialize()
)
def hash(self, chain_id: str) -> HashSha256:
return HashSha256.of_bytes(
self.rpc_serialize() + size_prefixed(chain_id.encode('utf8')),
)
@dataclasses.dataclass(frozen=True)
class SignedTransaction:
inner: SignedTransactionInnerPart
signature: Signature
hash: HashSha256
def hash(self, chain_id: str) -> HashSha256:
return self.inner.hash(chain_id)
def rpc_serialize(self) -> bytes:
return (
self.inner.rpc_serialize()
+ self.signature.rpc_serialize()
+ self.hash.rpc_serialize()
)
return self.signature.rpc_serialize() + self.inner.rpc_serialize()

30
test/test_crypto.py Normal file
View File

@ -0,0 +1,30 @@
from pbc_client.crypto import SenderAuthentication, sign_transaction
def test_sign():
sender_authentication = SenderAuthentication('01')
chain_id = 'SpecificChainIDLocatable'
contract_address = '000000000000000000000000000000000000000001'
signed = sign_transaction(
sender_authentication=sender_authentication,
nonce=2,
valid_to_time=3,
gas_cost=2,
chain_id=chain_id,
contract_address=contract_address,
transaction_rpc=bytes([0 for i in range(99)]),
)
print(signed)
assert (
str(signed.inner.rpc_serialize().hex())
== '00000000000000020000000000000003000000000000000200000000000000000000000000000000000000000100000063000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'
)
assert (
str(signed.hash(chain_id))
== '1be2895a85862e8dd3cc75b290cc28a22e960ae02dd5c07a73b93716f9adbee8'
)
assert (
str(signed.signature)
== '01f0f7d8f44919466eedbebc76bcb369bbb4c6f4f076e25c0ffe8ec9285890e53b4e39098540d088878a019c345ad73963543ce813fb9ccf4b84b0c25770452bd1'
)