Ruff
Some checks failed
Verify Python project can be installed, loaded and have version checked / Test (push) Waiting to run
Run Python tests (through Pytest) / Test (push) Has been cancelled

This commit is contained in:
Jon Michael Aanes 2025-05-21 00:55:57 +02:00
parent d49636fff7
commit 959ceff255
8 changed files with 326 additions and 283 deletions

View File

@ -39,9 +39,10 @@ print(token_state['balances'][my_address])
__all__ = ['model', 'binaryreader', '__version__']
from . import model, binaryreader
from . import binaryreader, model
from ._version import __version__
def get_version():
"""Returns a PEP 386-compliant version number from __version__."""
return __version__

View File

@ -1,23 +1,23 @@
'''
"""
Utility module for reading binary streams of data.
Follows the ABI specification quite closely. The full ABI specification can be found at:
https://partisiablockchain.gitlab.io/documentation/abiv.html
'''
"""
import io
class BinaryReader:
'''
"""
Wrapper for io.BytesIO for iteratively parsing byte streams.
'''
"""
def __init__(self, buf: bytes):
'''
"""
Initializes BinaryReader.
'''
"""
self.buf = buf
if isinstance(self.buf, bytes):
self.buf = io.BytesIO(self.buf)
@ -27,108 +27,113 @@ class BinaryReader:
self.position = 0
def __repr__(self) -> str:
return 'BinaryReader[{} / {}]'.format(self.position, self.size)
return f'BinaryReader[{self.position} / {self.size}]'
def __str__(self) -> str:
return repr(self)
def readBytes(self, num_bytes: int) -> bytes:
'''
"""
Reads a number of bytes from stream.
'''
"""
self.position += num_bytes
bts = self.buf.read(num_bytes)
if len(bts) != num_bytes:
raise Exception("Could not read {read} (0x{read:02X}) bytes: Buffer only contained {size} (0x{size:02X}) bytes".format(read=num_bytes, size=len(bts)))
raise Exception(
'Could not read {read} (0x{read:02X}) bytes: Buffer only contained {size} (0x{size:02X}) bytes'.format(
read=num_bytes,
size=len(bts),
),
)
assert len(bts) == num_bytes
return bts
def readDynamicBytes(self, size_reader = None) -> bytes:
'''
def readDynamicBytes(self, size_reader=None) -> bytes:
"""
Reads a dynamically sized list of bytes.
'''
"""
size_reader = size_reader or BinaryReader.readUInt32BigEndian
num_bytes = size_reader(self)
return self.readBytes(num_bytes)
def readString(self, size_reader = None) -> str:
'''
def readString(self, size_reader=None) -> str:
"""
Reads a string from stream.
'''
"""
return self.readDynamicBytes(size_reader).decode('utf8')
def readUInt8(self) -> int:
'''
"""
Reads a unsigned 8-bit integer from stream.
'''
"""
bytes = self.readBytes(1)
return bytes[0]
def readUInt32BigEndian(self) -> int:
'''
"""
Reads a unsigned 32-bit integer from stream.
'''
"""
return self.readUIntBigEndian(4)
def readUInt32LittleEndian(self) -> int:
'''
"""
Reads a unsigned 32-bit integer from stream.
'''
"""
return self.readUIntLittleEndian(4)
def readUIntBigEndian(self, num_bytes: int) -> int:
'''
"""
Reads an unsigned N-bit integer from stream.
'''
"""
bytes = self.readBytes(num_bytes)
c = 0
for i in range(0, num_bytes):
c += bytes[num_bytes - i - 1] * 2**(i * 8)
c += bytes[num_bytes - i - 1] * 2 ** (i * 8)
return c
def readSignedIntBigEndian(self, num_bytes: int) -> int:
'''
"""
Reads an signed N-bit integer from stream.
'''
"""
# TODO: Test!
result = self.readUIntBigEndian(num_bytes)
full = 2**(num_bytes * 8)
if result >= full//2:
full = 2 ** (num_bytes * 8)
if result >= full // 2:
result -= full
return result
def readUIntLittleEndian(self, num_bytes: int) -> int:
'''
"""
Reads an unsigned N-bit integer from stream.
'''
"""
bytes = self.readBytes(num_bytes)
c = 0
for i in range(0, num_bytes):
c += bytes[i] * 2**(i * 8)
c += bytes[i] * 2 ** (i * 8)
return c
def readSignedIntLittleEndian(self, num_bytes: int) -> int:
'''
"""
Reads an signed N-bit integer from stream.
'''
"""
# TODO: Test!
result = self.readUIntLittleEndian(num_bytes)
full = 2**(num_bytes * 8)
if result >= full//2:
full = 2 ** (num_bytes * 8)
if result >= full // 2:
result -= full
return result
def readLeb128(self) -> int:
'''
"""
Reads a LEB-128 integer from stream.
'''
"""
# TODO: Test!
v = self.readUInt8()
count = 0
while v & 0x80 != 0:
v = self.readUInt8()
count += 1
assert count < 6, "TODO: " + str(v)
assert count < 6, 'TODO: ' + str(v)
return v
def readSizedList(self, fn, num_elements: int) -> list[object]:
@ -138,12 +143,12 @@ class BinaryReader:
return ls
def readList(self, fn, size_reader=None) -> list[object]:
'''
"""
Reads a list of elements from stream, by applying the given function
several times.
Function must take a single argument of type BinaryReader.
'''
"""
size_reader = size_reader or BinaryReader.readUInt32BigEndian
num_elements = size_reader(self)
return self.readSizedList(fn, num_elements)

View File

@ -1,25 +1,28 @@
'''
"""
Definitions of several useful types commonly seen on Partisia Blockchain.
Follows the ABI specification quite closely. The full ABI specification can be found at:
https://partisiablockchain.gitlab.io/documentation/abiv.html
'''
"""
import datetime
import dataclasses
from typing import Mapping, Collection, Optional, Union
from enum import Enum
import enforce_typing
from .binaryreader import BinaryReader
def hexformat(bts: bytes, sep=''):
return sep.join('{:02X}'.format(c) for c in bts)
return sep.join(f'{c:02X}' for c in bts)
class AddressType(Enum):
'''
"""
Type of a PBC BlockchainAddress.
'''
"""
ACCOUNT = 0x00
CONTRACT_SYSTEM = 0x01
CONTRACT_PUBLIC = 0x02
@ -27,9 +30,9 @@ class AddressType(Enum):
CONTRACT_GOVERANCE = 0x04
def human_name(self):
'''
"""
Produces a human-readable name for self.
'''
"""
return ADDRESS_TYPES[self]
@ -44,32 +47,33 @@ ADDRESS_TYPES = {
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, order=True)
class ByteData(object):
'''
class ByteData:
"""
Supertype for several built-in PBC types.
'''
"""
data: bytes
def to_hex(self):
'''
"""
Formats this byte data as a hex string.
'''
"""
return hexformat(self.data)
def short_str(self):
'''
"""
Formats this byte data as a short hex string.
'''
"""
long = str(self)
return '{}..{}'.format(long[:4], long[-4:])
return f'{long[:4]}..{long[-4:]}'
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, order=True)
class BlockchainAddress(ByteData):
'''
"""
Address on PBC.
'''
"""
BYTE_LEN = 21
@ -99,12 +103,13 @@ class BlockchainAddress(ByteData):
shard = abs(shard) % num_shards
return f'Shard{shard}'
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, order=True)
class Hash(ByteData):
'''
"""
Hash.
'''
"""
data: bytes
@ -128,9 +133,9 @@ class Hash(ByteData):
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, order=True)
class PublicKey(ByteData):
'''
"""
PBC public key.
'''
"""
data: bytes
@ -150,9 +155,9 @@ class PublicKey(ByteData):
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, order=True)
class Signature(ByteData):
'''
"""
Message signature on PBC.
'''
"""
data: bytes
@ -172,9 +177,9 @@ class Signature(ByteData):
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, order=True)
class BlsPublicKey(ByteData):
'''
"""
BLS Public Key.
'''
"""
data: bytes
@ -194,9 +199,9 @@ class BlsPublicKey(ByteData):
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, order=True)
class BlsSignature(ByteData):
'''
"""
BLS signature.
'''
"""
data: bytes

View File

@ -1,20 +1,23 @@
'''
"""
Specifies the ABI Spec model and how to deserialize the ABI, and additionally
how specific type specifications can deserialize their elements.
Follows the ABI specification quite closely. The full ABI specification can be found at:
https://partisiablockchain.gitlab.io/documentation/abiv.html
'''
"""
from frozendict import frozendict
import dataclasses
from typing import Mapping
from enum import Enum
import enforce_typing
from .binaryreader import BinaryReader
import pbcabi.data as data
import logging
from collections.abc import Mapping
from enum import Enum
import enforce_typing
from frozendict import frozendict
from pbcabi import data
from .binaryreader import BinaryReader
logger = logging.getLogger(__name__)
@ -28,57 +31,58 @@ class SerializeMode(Enum):
class SimpleType(Enum):
'''
"""
The specific simple type.
'''
"""
U8 = 0X01
U16 = 0X02
U32 = 0X03
U64 = 0X04
U128 = 0X05
U256 = 0X18
I8 = 0X06
I16 = 0X07
I32 = 0X08
I64 = 0X09
I128 = 0X0a
STRING = 0X0b
BOOL = 0X0c
ADDRESS = 0X0d
HASH = 0X13
PUBLIC_KEY = 0X14
SIGNATURE = 0X15
BLS_PUBLIC_KEY = 0X16
BLS_SIGNATURE = 0X17
U8 = 0x01
U16 = 0x02
U32 = 0x03
U64 = 0x04
U128 = 0x05
U256 = 0x18
I8 = 0x06
I16 = 0x07
I32 = 0x08
I64 = 0x09
I128 = 0x0A
STRING = 0x0B
BOOL = 0x0C
ADDRESS = 0x0D
HASH = 0x13
PUBLIC_KEY = 0x14
SIGNATURE = 0x15
BLS_PUBLIC_KEY = 0x16
BLS_SIGNATURE = 0x17
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class TypeSpec:
'''
"""
Supertype of all 'TypeSpec'.
'''
"""
@staticmethod
def read_from(reader: BinaryReader):
'''
"""
Deserialize this 'TypeSpec' type from 'BinaryReader'.
'''
"""
discriminant = reader.readUInt8()
if type_spec_read_from := TYPE_SPEC_SUBTYPE_READ_FROM_BY_DISCRIMINANT.get(
discriminant):
discriminant,
):
type_spec = type_spec_read_from(reader)
return type_spec
assert False, "Unknown TypeSpec discriminant: 0x{:02x}".format(discriminant)
assert False, f'Unknown TypeSpec discriminant: 0x{discriminant:02x}'
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class NamedTypeRef(TypeSpec):
'''
"""
Reference to a named type.
'''
"""
idx: int
@ -86,25 +90,24 @@ class NamedTypeRef(TypeSpec):
@staticmethod
def read_from(reader: BinaryReader):
'''
"""
Deserialize this 'TypeSpec' type from 'BinaryReader'.
'''
"""
assert reader.readUInt8() == NamedTypeRef.DISCRIMINANT
type_spec = NamedTypeRef.read_from_inner(reader)
return type_spec
@staticmethod
def read_from_inner(reader: BinaryReader):
'''
"""
Deserialize this 'TypeSpec' type from 'BinaryReader'.
'''
"""
return NamedTypeRef(reader.readUInt8())
def read_element_from(self, reader: BinaryReader, type_env,
mode: SerializeMode):
'''
def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode):
"""
Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'.
'''
"""
return type_env[self.idx].read_element_from(reader, type_env, mode)
@ -158,24 +161,23 @@ READ_SIMPLE_TYPE[SerializeMode.STATE] = {
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class SimpleTypeSpec(TypeSpec):
'''
"""
Simple type spec.
'''
"""
type: SimpleType
@staticmethod
def read_from(reader: BinaryReader):
'''
"""
Deserialize this 'TypeSpec' type from 'BinaryReader'.
'''
"""
return SimpleTypeSpec(SimpleType[reader.readUInt8()])
def read_element_from(self, reader: BinaryReader, type_env,
mode: SerializeMode):
'''
def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode):
"""
Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'.
'''
"""
return READ_SIMPLE_TYPE[mode][self.type](reader)
@ -185,36 +187,35 @@ SIZE_SIMPLE_TYPE_SPEC = SimpleTypeSpec(SimpleType.U32)
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class CompositeTypeSpec(TypeSpec):
'''
"""
Supertype of all composite type specs.
'''
"""
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class VecTypeSpec(TypeSpec):
'''
"""
Type spec for a dynamically-sized vector of some some type.
'''
"""
type_elements: TypeSpec
DISCRIMINANT = 0x0e
DISCRIMINANT = 0x0E
@staticmethod
def read_from(reader: BinaryReader):
'''
"""
Deserialize this 'TypeSpec' type from 'BinaryReader'.
'''
"""
type_elements = TypeSpec.read_from(reader)
type_spec = VecTypeSpec(type_elements)
return type_spec
def read_element_from(self, reader: BinaryReader, type_env,
mode: SerializeMode):
'''
def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode):
"""
Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'.
'''
"""
length = SIZE_SIMPLE_TYPE_SPEC.read_element_from(reader, type_env, mode)
if self.type_elements == SimpleTypeSpec(SimpleType.U8):
@ -230,67 +231,72 @@ class VecTypeSpec(TypeSpec):
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class MapTypeSpec(TypeSpec):
'''
"""
Type spec for a dynamically-sized map from some type to another type.
'''
"""
type_key: TypeSpec
type_value: TypeSpec
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class AvlTreeTypeSpec(MapTypeSpec):
'''
"""
Type spec for AVL-tree. Avl trees does not store data inline.
'''
"""
DISCRIMINANT = 0x19
@staticmethod
def read_from(reader: BinaryReader):
'''
"""
Deserialize this 'TypeSpec' type from 'BinaryReader'.
'''
"""
type_key = TypeSpec.read_from(reader)
type_value = TypeSpec.read_from(reader)
return AvlTreeTypeSpec(type_key, type_value)
def read_element_from(self, reader: BinaryReader, type_env,
mode: SerializeMode) -> 'AvlTreeId':
def read_element_from(
self,
reader: BinaryReader,
type_env,
mode: SerializeMode,
) -> 'AvlTreeId':
avl_tree_id = SIZE_SIMPLE_TYPE_SPEC.read_element_from(reader, type_env, mode)
return AvlTreeId(avl_tree_id, self)
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class AvlTreeId:
avl_tree_id: int
type_spec: AvlTreeTypeSpec
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class InlineMapTypeSpec(MapTypeSpec):
'''
"""
Type spec for a dynamically-sized map from some type to another type.
'''
DISCRIMINANT = 0x0f
"""
DISCRIMINANT = 0x0F
@staticmethod
def read_from(reader: BinaryReader):
'''
"""
Deserialize this 'TypeSpec' type from 'BinaryReader'.
'''
"""
type_key = TypeSpec.read_from(reader)
type_value = TypeSpec.read_from(reader)
type_spec = MapTypeSpec(type_key, type_value)
return type_spec
def read_element_from(self, reader: BinaryReader, type_env,
mode: SerializeMode):
'''
def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode):
"""
Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'.
'''
"""
out = {}
length = SIZE_SIMPLE_TYPE_SPEC.read_element_from(reader, type_env, mode)
for i in range(0, length):
@ -303,9 +309,9 @@ class InlineMapTypeSpec(MapTypeSpec):
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class SetTypeSpec(TypeSpec):
'''
"""
Type spec for a dynamically-sized set.
'''
"""
type_elements: TypeSpec
@ -313,32 +319,30 @@ class SetTypeSpec(TypeSpec):
@staticmethod
def read_from(reader: BinaryReader):
'''
"""
Deserialize this 'TypeSpec' type from 'BinaryReader'.
'''
"""
type_elements = TypeSpec.read_from(reader)
type_spec = SetTypeSpec(type_elements)
return type_spec
def read_element_from(self, reader: BinaryReader, type_env,
mode: SerializeMode):
'''
def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode):
"""
Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'.
'''
"""
array = []
length = SIZE_SIMPLE_TYPE_SPEC.read_element_from(reader, type_env, mode)
for i in range(0, length):
array.append(self.type_elements.read_element_from(reader, type_env),
mode)
array.append(self.type_elements.read_element_from(reader, type_env), mode)
return frozenset(array)
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class ArrayTypeSpec(TypeSpec):
'''
"""
Type spec for byte-array.
'''
"""
length: int
@ -346,34 +350,32 @@ class ArrayTypeSpec(TypeSpec):
@staticmethod
def read_from(reader: BinaryReader):
'''
"""
Deserialize this 'TypeSpec' type from 'BinaryReader'.
'''
"""
length = reader.readUInt8()
type_spec = ArrayTypeSpec(length)
return type_spec
def read_element_from(self, reader: BinaryReader, type_env,
mode: SerializeMode):
'''
def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode):
"""
Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'.
'''
"""
return reader.readBytes(self.length)
def read_element_from(self, reader: BinaryReader, type_env,
mode: SerializeMode):
'''
def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode):
"""
Deserialize elements for this 'TypeSpec' from state 'BinaryReader'.
'''
"""
return reader.readBytes(self.length)
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class OptionTypeSpec(TypeSpec):
'''
"""
Type spec for some option type containing a specific sub-type.
'''
"""
type_elements: TypeSpec
@ -381,29 +383,27 @@ class OptionTypeSpec(TypeSpec):
@staticmethod
def read_from(reader: BinaryReader):
'''
"""
Deserialize this 'TypeSpec' type from 'BinaryReader'.
'''
"""
type_elements = TypeSpec.read_from(reader)
type_spec = OptionTypeSpec(type_elements)
return type_spec
def read_element_from(self, reader: BinaryReader, type_env,
mode: SerializeMode):
'''
def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode):
"""
Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'.
'''
"""
discriminant = reader.readUInt8()
if discriminant == 0:
return None
else:
return self.type_elements.read_element_from(reader, type_env, mode)
def read_element_from(self, reader: BinaryReader, type_env,
mode: SerializeMode):
'''
def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode):
"""
Deserialize elements for this 'TypeSpec' from state 'BinaryReader'.
'''
"""
discriminant = reader.readUInt8()
if discriminant == 0:
return None
@ -416,31 +416,41 @@ def simple_type_spec_read_from(variant):
return lambda r: v
TYPE_SPEC_SUBTYPE_READ_FROM_BY_DISCRIMINANT = {
0x00: NamedTypeRef.read_from_inner,
} | {
v.value: simple_type_spec_read_from(v) for v in SimpleType
} | {
t.DISCRIMINANT: t.read_from for t in
[VecTypeSpec, InlineMapTypeSpec, AvlTreeTypeSpec, SetTypeSpec, ArrayTypeSpec, OptionTypeSpec]
}
TYPE_SPEC_SUBTYPE_READ_FROM_BY_DISCRIMINANT = (
{
0x00: NamedTypeRef.read_from_inner,
}
| {v.value: simple_type_spec_read_from(v) for v in SimpleType}
| {
t.DISCRIMINANT: t.read_from
for t in [
VecTypeSpec,
InlineMapTypeSpec,
AvlTreeTypeSpec,
SetTypeSpec,
ArrayTypeSpec,
OptionTypeSpec,
]
}
)
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots = True, order=True)
@dataclasses.dataclass(frozen=True, slots=True, order=True)
class Version:
'''
"""
Version field.
'''
"""
major: int
minor: int
patch: int
@staticmethod
def read_from(reader: BinaryReader):
'''
"""
Deserialize this struct from 'BinaryReader'.
'''
"""
major = reader.readUInt8()
minor = reader.readUInt8()
patch = reader.readUInt8()
@ -448,9 +458,9 @@ class Version:
class FnKind(Enum):
'''
"""
Function kinds that contract can be invoked with.
'''
"""
INIT = 0x01
ACTION = 0x02
@ -467,31 +477,31 @@ class FnKind(Enum):
@staticmethod
def read_from(reader: BinaryReader):
'''
"""
Deserialize this struct from 'BinaryReader'.
'''
"""
discriminant = reader.readUInt8()
if found := [kind for kind in FnKind if kind.value == discriminant]:
assert len(found) == 1
return found[0]
assert False, "Unknown FnKind discriminant: 0x{:02x}".format(discriminant)
assert False, f'Unknown FnKind discriminant: 0x{discriminant:02x}'
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class FieldAbi:
'''
"""
Field of a struct.
'''
"""
name: Identifier
type: TypeSpec
@staticmethod
def read_from(reader: BinaryReader):
'''
"""
Deserialize this struct from 'BinaryReader'.
'''
"""
name = reader.readString()
type = TypeSpec.read_from(reader)
return FieldAbi(name, type)
@ -500,17 +510,18 @@ class FieldAbi:
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class ArgumentAbi:
'''
"""
Argument of a function.
'''
"""
name: Identifier
type: TypeSpec
@staticmethod
def read_from(reader: BinaryReader):
'''
"""
Deserialize this struct from 'BinaryReader'.
'''
"""
name = reader.readString()
type = TypeSpec.read_from(reader)
return ArgumentAbi(name, type)
@ -519,18 +530,18 @@ class ArgumentAbi:
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class NamedTypeSpec:
'''
"""
Supertype of named types.
'''
"""
name: Identifier
type_index: int
@staticmethod
def read_from(reader: BinaryReader, type_index: int):
'''
"""
Deserialize this 'TypeSpec' type from 'BinaryReader'.
'''
"""
discriminant = reader.readUInt8()
if discriminant == StructTypeSpec.DISCRIMINANT:
return StructTypeSpec.read_from(reader, type_index)
@ -541,9 +552,9 @@ class NamedTypeSpec:
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class StructTypeSpec(NamedTypeSpec):
'''
"""
Struct type specification.
'''
"""
fields: list[FieldAbi]
@ -551,9 +562,9 @@ class StructTypeSpec(NamedTypeSpec):
@staticmethod
def read_from(reader: BinaryReader, type_index: int):
'''
"""
Deserialize this 'TypeSpec' type from 'BinaryReader'.
'''
"""
logger.debug('Reading Struct')
name = reader.readString()
@ -561,15 +572,13 @@ class StructTypeSpec(NamedTypeSpec):
type_spec = StructTypeSpec(name, type_index, variants)
return type_spec
def read_element_from(self, reader: BinaryReader, type_env,
mode: SerializeMode):
'''
def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode):
"""
Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'.
'''
"""
result = {}
for field in self.fields:
result[field.name] = field.type.read_element_from(
reader, type_env, mode)
result[field.name] = field.type.read_element_from(reader, type_env, mode)
result['__type'] = self.name
return result
@ -577,18 +586,18 @@ class StructTypeSpec(NamedTypeSpec):
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class EnumVariant:
'''
"""
Enum variant specification.
'''
"""
discriminant: int
definition: NamedTypeRef
@staticmethod
def read_from(reader: BinaryReader):
'''
"""
Deserialize an 'EnumVariant' from 'BinaryReader'.
'''
"""
discriminant = reader.readUInt8()
named_type = NamedTypeRef.read_from(reader)
return EnumVariant(discriminant, named_type)
@ -597,9 +606,9 @@ class EnumVariant:
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class EnumTypeSpec(NamedTypeSpec):
'''
"""
Enum type specification.
'''
"""
variants_by_discriminant: Mapping[int, EnumVariant]
@ -607,22 +616,20 @@ class EnumTypeSpec(NamedTypeSpec):
@staticmethod
def read_from(reader: BinaryReader, type_index: int):
'''
"""
Deserialize this 'TypeSpec' type from 'BinaryReader'.
'''
"""
name = reader.readString()
variants = reader.readList(EnumVariant.read_from)
variants_by_discriminant = frozendict(
{v.discriminant: v for v in variants})
variants_by_discriminant = frozendict({v.discriminant: v for v in variants})
assert len(variants_by_discriminant) == len(variants), 'Duplicant discriminants'
type_spec = EnumTypeSpec(name, type_index, variants_by_discriminant)
return type_spec
def read_element_from(self, reader: BinaryReader, type_env,
mode: SerializeMode):
'''
def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode):
"""
Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'.
'''
"""
discriminant = reader.readUInt8()
variant = self.variants_by_discriminant[discriminant]
return variant.definition.read_element_from(reader, type_env, mode)
@ -631,9 +638,9 @@ class EnumTypeSpec(NamedTypeSpec):
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class FnAbi:
'''
"""
Function definition.
'''
"""
kind: FnKind
name: Identifier
@ -643,9 +650,9 @@ class FnAbi:
@staticmethod
def read_from(reader: BinaryReader):
'''
"""
Deserialize 'FnAbi' from 'BinaryReader'.
'''
"""
kind = FnKind.read_from(reader)
name = reader.readString()
shortname = reader.readLeb128()
@ -656,6 +663,7 @@ class FnAbi:
return FnAbi(kind, name, shortname, arguments, secret_argument)
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class ParsedInvocation:
@ -670,12 +678,13 @@ class ParsedInvocation:
return frozendict(arguments)
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class ContractAbi:
'''
"""
Contract definition.
'''
"""
named_types_by_id: Mapping[Identifier, NamedTypeSpec]
named_types_by_idx: Mapping[int, NamedTypeSpec]
@ -684,9 +693,9 @@ class ContractAbi:
@staticmethod
def read_from(reader: BinaryReader | str):
'''
"""
Deserialize 'ContractAbi' from 'BinaryReader'.
'''
"""
num_named_types = reader.readUInt32BigEndian()
named_types = []
@ -696,26 +705,38 @@ class ContractAbi:
hooks = reader.readList(FnAbi.read_from)
state_type = TypeSpec.read_from(reader)
return ContractAbi(frozendict({t.name: t for t in named_types}),
frozendict({t.type_index: t for t in named_types}),
frozendict({t.name: t for t in hooks}), state_type)
return ContractAbi(
frozendict({t.name: t for t in named_types}),
frozendict({t.type_index: t for t in named_types}),
frozendict({t.name: t for t in hooks}),
state_type,
)
def read_type_element_from_rpc(self, type_name: Identifier,
rpc: BinaryReader):
'''
def read_type_element_from_rpc(self, type_name: Identifier, rpc: BinaryReader):
"""
Reads element of the given type name from RPC 'BinaryReader'.
'''
"""
return self.named_types_by_id[type_name].read_element_from(
rpc, self.named_types_by_idx, SerializeMode.RPC)
rpc,
self.named_types_by_idx,
SerializeMode.RPC,
)
def read_state(self, reader: BinaryReader | bytes, explicit_type: TypeSpec | None = None):
def read_state(
self,
reader: BinaryReader | bytes,
explicit_type: TypeSpec | None = None,
):
if not isinstance(reader, BinaryReader):
reader = BinaryReader(reader)
explicit_type = explicit_type or self.state_type
return explicit_type.read_element_from(reader, self.named_types_by_idx,
SerializeMode.STATE)
return explicit_type.read_element_from(
reader,
self.named_types_by_idx,
SerializeMode.STATE,
)
def get_fn_abi_from_shortname(self, shortname: int) -> FnAbi:
for derp in self.hooks.values():
@ -729,16 +750,23 @@ class ContractAbi:
fn_abi = self.get_fn_abi_from_shortname(shortname)
arguments = []
for arg_abi in fn_abi.arguments:
arguments.append(arg_abi.type.read_element_from(reader, self.named_types_by_idx, SerializeMode.RPC))
arguments.append(
arg_abi.type.read_element_from(
reader,
self.named_types_by_idx,
SerializeMode.RPC,
),
)
return ParsedInvocation(shortname, arguments, fn_abi)
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class FileAbi:
'''
"""
File definition.
'''
"""
version_binder: Version
version_client: Version
@ -746,9 +774,9 @@ class FileAbi:
@staticmethod
def read_from(reader: BinaryReader | bytes):
'''
"""
Deserialize 'FileAbi' from 'BinaryReader'.
'''
"""
if not isinstance(reader, BinaryReader):
reader = BinaryReader(reader)

View File

@ -51,6 +51,7 @@ print(token_state['balances'][my_address])
PACKAGE_DESCRIPTION_SHORT = """
Unofficial utility library for parsing and processing the Partisia Blockchain ABI Format.""".strip()
def parse_version_file(text: str) -> str:
text = re.sub('^#.*', '', text, flags=re.MULTILINE)
match = re.match(r'^\s*__version__\s*=\s*(["\'])([\d\.]+)\1$', text)
@ -59,6 +60,7 @@ def parse_version_file(text: str) -> str:
raise Exception(msg)
return match.group(2)
with open(PACKAGE_NAME + '/_version.py') as f:
version = parse_version_file(f.read())

View File

@ -1,19 +1,17 @@
import pytest
from pathlib import Path
from pbcabi.binaryreader import BinaryReader
import pbcabi.model
from pbcabi.data import BlockchainAddress
EXAMPLE_SHARDS = [
("Shard1", 2, "000000000000000000000000000000000000000001"),
("Shard0", 2, "000000000000000000000000000000000000000002"),
("Shard0", 3, "025FA781D389D7C7CAAF836E5E47ABED6CEFD2D928"),
("Shard1", 3, "04FE17D1009372C8ED3AC5B790B32E349359C2C7E9"),
("Shard0", 3, "01A2020BB33EF9E0323C7A3210D5CB7FD492AA0D65"),
('Shard1', 2, '000000000000000000000000000000000000000001'),
('Shard0', 2, '000000000000000000000000000000000000000002'),
('Shard0', 3, '025FA781D389D7C7CAAF836E5E47ABED6CEFD2D928'),
('Shard1', 3, '04FE17D1009372C8ED3AC5B790B32E349359C2C7E9'),
('Shard0', 3, '01A2020BB33EF9E0323C7A3210D5CB7FD492AA0D65'),
]
@pytest.mark.parametrize('shard_id,num_shards,address', EXAMPLE_SHARDS)
def test_parse_abi(shard_id: int, num_shards:int, address: str):
def test_parse_abi(shard_id: int, num_shards: int, address: str):
address = BlockchainAddress.from_hex_hash(address)
assert address.shard_id(num_shards) == shard_id

View File

@ -1,12 +1,11 @@
import pytest
from pathlib import Path
from pbcabi.binaryreader import BinaryReader
import pbcabi.model
def assert_parse(hex, value):
derp = bytes.fromhex(hex)
assert BinaryReader(derp).readSignedIntBigEndian(1) == value
def test_parse():
assert_parse('00', 0)
assert_parse('01', 1)
@ -15,4 +14,3 @@ def test_parse():
assert_parse('80', -0x80)
assert_parse('F0', -16)
assert_parse('FF', -1)

View File

@ -1,16 +1,22 @@
import pytest
from pathlib import Path
from pbcabi.binaryreader import BinaryReader
import pytest
import pbcabi.model
from pbcabi.binaryreader import BinaryReader
EXAMPLE_ABIS_FOLDER = Path.cwd() / 'test' / 'example-abis'
EXAMPLE_ABIS = [str(p.relative_to(EXAMPLE_ABIS_FOLDER)) for p in EXAMPLE_ABIS_FOLDER.glob('*.abi')]
EXAMPLE_ABIS = [
str(p.relative_to(EXAMPLE_ABIS_FOLDER)) for p in EXAMPLE_ABIS_FOLDER.glob('*.abi')
]
def test_parse_abi_num():
print(EXAMPLE_ABIS_FOLDER)
print(EXAMPLE_ABIS)
assert len(EXAMPLE_ABIS) > 0
@pytest.mark.parametrize('abi_file_path', EXAMPLE_ABIS)
def test_parse_abi(abi_file_path):
with open(EXAMPLE_ABIS_FOLDER / abi_file_path, 'rb') as f: