Use coincurve for crypto

This commit is contained in:
Jon Michael Aanes 2025-04-13 13:22:38 +02:00
parent 9033c8df3d
commit 99190e910b
3 changed files with 87 additions and 60 deletions

View File

@ -10,15 +10,12 @@ import base64
import dataclasses import dataclasses
import datetime import datetime
import email.utils import email.utils
import hashlib
import json import json
import logging import logging
import time
from collections.abc import Mapping from collections.abc import Mapping
from decimal import Decimal from decimal import Decimal
from typing import Any from typing import Any
import ecdsa
import pbcabi import pbcabi
import requests import requests
from frozendict import frozendict from frozendict import frozendict
@ -26,12 +23,9 @@ from frozendict import frozendict
from ._version import __version__ # noqa: F401 from ._version import __version__ # noqa: F401
from .pbc_types import ( from .pbc_types import (
Address, Address,
HashSha256,
Signature,
SignedTransaction, SignedTransaction,
SignedTransactionInnerPart,
Transaction,
) )
from .crypto import sign_transaction, SenderAuthentication
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -69,58 +63,6 @@ class Balances:
byoc: Mapping[str, Decimal] byoc: Mapping[str, Decimal]
@dataclasses.dataclass(frozen=True)
class SenderAuthentication:
secret_key: str
def sender_address(self) -> Address:
verifying_key_repr = (
self._signing_key().get_verifying_key().to_string('uncompressed')
)
hashed = HashSha256.of_bytes(verifying_key_repr)
return Address(b'\0' + hashed._bytes[-20:])
def sign_hash(self, _hash: HashSha256) -> Signature:
signing_key = self._signing_key()
signature = signing_key.sign_digest_deterministic(_hash._bytes)
return Signature(b'\0' + signature)
def _signing_key(self):
secret_exponent = int(self.secret_key, 16)
return ecdsa.SigningKey.from_secret_exponent(
secret_exponent,
curve=ecdsa.SECP256k1,
)
TRANSACTION_VALIDITY_DURATION = 60
def sign_transaction(
sender_authentication: SenderAuthentication,
nonce: int,
gas_cost: int,
chain_id: str,
contract_address: Address | str,
transaction_rpc: bytes,
) -> SignedTransaction:
print(contract_address)
sender = sender_authentication.sender_address
valid_to_time: int = int(time.time() + TRANSACTION_VALIDITY_DURATION) * 1000
inner: SignedTransactionInnerPart = SignedTransactionInnerPart(
nonce,
valid_to_time,
gas_cost,
Transaction(Address.from_string(contract_address), transaction_rpc),
)
signature: Signature = sender_authentication.sign_hash(inner.hash(chain_id))
print(signature)
return SignedTransaction(inner, signature)
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class PbcClient: class PbcClient:
session: requests.Session session: requests.Session

84
pbc_client/crypto.py Normal file
View File

@ -0,0 +1,84 @@
import coincurve
import hashlib
import time
from .pbc_types import (
Address,
HashSha256,
Signature,
SignedTransaction,
SignedTransactionInnerPart,
Transaction,
)
import dataclasses
TRANSACTION_VALIDITY_DURATION = 60
def find_recovery_id(der_sig: bytes, message_hash: HashSha256, expected_public_key: coincurve.PublicKey) -> int:
r, s = coincurve.der.parse_signature(der_sig)
for recovery_id in range(4):
recovered_public_key = coincurve.PublicKey.from_signature_and_message(
signature=r + s + bytes([recovery_id]),
message=message_hash._bytes,
hasher=None,
)
if recovered_public_key.format() == expected_public_key.format():
return recovery_id
return None
@dataclasses.dataclass(frozen=True)
class SenderAuthentication:
_private_key_hex: str
def sender_address(self) -> Address:
verifying_key_repr = self._public_key().format(False)
assert len(verifying_key_repr) == 65
hashed = HashSha256.of_bytes(verifying_key_repr)
print(hashed)
return Address(b'\0' + hashed._bytes[-20:])
def sign_hash(self, hash_to_sign: HashSha256) -> Signature:
private_key = self._private_key()
signature_wrong_location = private_key.sign_recoverable(hash_to_sign._bytes, hasher = lambda x: x)
signature = signature_wrong_location[64:] + signature_wrong_location[:64]
print('Sig', signature)
print(len(signature))
return Signature(signature)
def _public_key(self) -> coincurve.PublicKey:
return self._private_key().public_key
def _private_key(self) -> coincurve.PrivateKey:
return coincurve.PrivateKey.from_hex(
self._private_key_hex,
)
def sign_transaction(
sender_authentication: SenderAuthentication,
nonce: int,
gas_cost: int,
chain_id: str,
contract_address: Address | str,
transaction_rpc: bytes,
) -> SignedTransaction:
print(contract_address)
sender = sender_authentication.sender_address
valid_to_time: int = int(time.time() + TRANSACTION_VALIDITY_DURATION) * 1000
inner: SignedTransactionInnerPart = SignedTransactionInnerPart(
nonce,
valid_to_time,
gas_cost,
Transaction(Address.from_string(contract_address), transaction_rpc),
)
signature: Signature = sender_authentication.sign_hash(inner.hash(chain_id))
print(signature)
return SignedTransaction(inner, signature)

View File

@ -20,7 +20,7 @@ class Address:
return self._bytes.hex() return self._bytes.hex()
@staticmethod @staticmethod
def from_string(s: str) -> 'Address': def from_string(s: 'Address | str') -> 'Address':
if isinstance(s, Address): if isinstance(s, Address):
return s return s
return Address(bytes.fromhex(s)) return Address(bytes.fromhex(s))
@ -32,6 +32,7 @@ class Signature:
def __post_init__(self): def __post_init__(self):
assert len(self._bytes) == 65, len(self._bytes) assert len(self._bytes) == 65, len(self._bytes)
assert self._bytes[0] < 4, 'First byte is the recovery id. Must be less than 4'
def rpc_serialize(self) -> bytes: def rpc_serialize(self) -> bytes:
return self._bytes return self._bytes