diff --git a/pbcabi/__init__.py b/pbcabi/__init__.py index c5cd384..4c13dd8 100644 --- a/pbcabi/__init__.py +++ b/pbcabi/__init__.py @@ -39,9 +39,10 @@ print(token_state['balances'][my_address]) __all__ = ['model', 'binaryreader', '__version__'] -from . import model, binaryreader +from . import binaryreader, model from ._version import __version__ + def get_version(): """Returns a PEP 386-compliant version number from __version__.""" return __version__ diff --git a/pbcabi/binaryreader.py b/pbcabi/binaryreader.py index bbe81e8..1fa537e 100644 --- a/pbcabi/binaryreader.py +++ b/pbcabi/binaryreader.py @@ -1,23 +1,23 @@ -''' +""" Utility module for reading binary streams of data. Follows the ABI specification quite closely. The full ABI specification can be found at: https://partisiablockchain.gitlab.io/documentation/abiv.html -''' +""" import io class BinaryReader: - ''' + """ Wrapper for io.BytesIO for iteratively parsing byte streams. - ''' + """ def __init__(self, buf: bytes): - ''' + """ Initializes BinaryReader. - ''' + """ self.buf = buf if isinstance(self.buf, bytes): self.buf = io.BytesIO(self.buf) @@ -27,108 +27,113 @@ class BinaryReader: self.position = 0 def __repr__(self) -> str: - return 'BinaryReader[{} / {}]'.format(self.position, self.size) + return f'BinaryReader[{self.position} / {self.size}]' def __str__(self) -> str: return repr(self) def readBytes(self, num_bytes: int) -> bytes: - ''' + """ Reads a number of bytes from stream. - ''' + """ self.position += num_bytes bts = self.buf.read(num_bytes) if len(bts) != num_bytes: - raise Exception("Could not read {read} (0x{read:02X}) bytes: Buffer only contained {size} (0x{size:02X}) bytes".format(read=num_bytes, size=len(bts))) + raise Exception( + 'Could not read {read} (0x{read:02X}) bytes: Buffer only contained {size} (0x{size:02X}) bytes'.format( + read=num_bytes, + size=len(bts), + ), + ) assert len(bts) == num_bytes return bts - def readDynamicBytes(self, size_reader = None) -> bytes: - ''' + def readDynamicBytes(self, size_reader=None) -> bytes: + """ Reads a dynamically sized list of bytes. - ''' + """ size_reader = size_reader or BinaryReader.readUInt32BigEndian num_bytes = size_reader(self) return self.readBytes(num_bytes) - def readString(self, size_reader = None) -> str: - ''' + def readString(self, size_reader=None) -> str: + """ Reads a string from stream. - ''' + """ return self.readDynamicBytes(size_reader).decode('utf8') def readUInt8(self) -> int: - ''' + """ Reads a unsigned 8-bit integer from stream. - ''' + """ bytes = self.readBytes(1) return bytes[0] def readUInt32BigEndian(self) -> int: - ''' + """ Reads a unsigned 32-bit integer from stream. - ''' + """ return self.readUIntBigEndian(4) def readUInt32LittleEndian(self) -> int: - ''' + """ Reads a unsigned 32-bit integer from stream. - ''' + """ return self.readUIntLittleEndian(4) def readUIntBigEndian(self, num_bytes: int) -> int: - ''' + """ Reads an unsigned N-bit integer from stream. - ''' + """ bytes = self.readBytes(num_bytes) c = 0 for i in range(0, num_bytes): - c += bytes[num_bytes - i - 1] * 2**(i * 8) + c += bytes[num_bytes - i - 1] * 2 ** (i * 8) return c def readSignedIntBigEndian(self, num_bytes: int) -> int: - ''' + """ Reads an signed N-bit integer from stream. - ''' + """ # TODO: Test! result = self.readUIntBigEndian(num_bytes) - full = 2**(num_bytes * 8) - if result >= full//2: + full = 2 ** (num_bytes * 8) + if result >= full // 2: result -= full return result def readUIntLittleEndian(self, num_bytes: int) -> int: - ''' + """ Reads an unsigned N-bit integer from stream. - ''' + """ bytes = self.readBytes(num_bytes) c = 0 for i in range(0, num_bytes): - c += bytes[i] * 2**(i * 8) + c += bytes[i] * 2 ** (i * 8) return c def readSignedIntLittleEndian(self, num_bytes: int) -> int: - ''' + """ Reads an signed N-bit integer from stream. - ''' + """ # TODO: Test! result = self.readUIntLittleEndian(num_bytes) - full = 2**(num_bytes * 8) - if result >= full//2: + full = 2 ** (num_bytes * 8) + if result >= full // 2: result -= full return result def readLeb128(self) -> int: - ''' + """ Reads a LEB-128 integer from stream. - ''' + """ # TODO: Test! v = self.readUInt8() count = 0 while v & 0x80 != 0: v = self.readUInt8() count += 1 - assert count < 6, "TODO: " + str(v) + assert count < 6, 'TODO: ' + str(v) return v def readSizedList(self, fn, num_elements: int) -> list[object]: @@ -138,12 +143,12 @@ class BinaryReader: return ls def readList(self, fn, size_reader=None) -> list[object]: - ''' + """ Reads a list of elements from stream, by applying the given function several times. Function must take a single argument of type BinaryReader. - ''' + """ size_reader = size_reader or BinaryReader.readUInt32BigEndian num_elements = size_reader(self) return self.readSizedList(fn, num_elements) diff --git a/pbcabi/data.py b/pbcabi/data.py index 67ee7e5..46be6b5 100644 --- a/pbcabi/data.py +++ b/pbcabi/data.py @@ -1,25 +1,28 @@ -''' +""" Definitions of several useful types commonly seen on Partisia Blockchain. Follows the ABI specification quite closely. The full ABI specification can be found at: https://partisiablockchain.gitlab.io/documentation/abiv.html -''' +""" -import datetime import dataclasses -from typing import Mapping, Collection, Optional, Union from enum import Enum + import enforce_typing + from .binaryreader import BinaryReader + def hexformat(bts: bytes, sep=''): - return sep.join('{:02X}'.format(c) for c in bts) + return sep.join(f'{c:02X}' for c in bts) + class AddressType(Enum): - ''' + """ Type of a PBC BlockchainAddress. - ''' + """ + ACCOUNT = 0x00 CONTRACT_SYSTEM = 0x01 CONTRACT_PUBLIC = 0x02 @@ -27,9 +30,9 @@ class AddressType(Enum): CONTRACT_GOVERANCE = 0x04 def human_name(self): - ''' + """ Produces a human-readable name for self. - ''' + """ return ADDRESS_TYPES[self] @@ -44,32 +47,33 @@ ADDRESS_TYPES = { @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, order=True) -class ByteData(object): - ''' +class ByteData: + """ Supertype for several built-in PBC types. - ''' + """ + data: bytes def to_hex(self): - ''' + """ Formats this byte data as a hex string. - ''' + """ return hexformat(self.data) def short_str(self): - ''' + """ Formats this byte data as a short hex string. - ''' + """ long = str(self) - return '{}..{}'.format(long[:4], long[-4:]) + return f'{long[:4]}..{long[-4:]}' @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, order=True) class BlockchainAddress(ByteData): - ''' + """ Address on PBC. - ''' + """ BYTE_LEN = 21 @@ -99,12 +103,13 @@ class BlockchainAddress(ByteData): shard = abs(shard) % num_shards return f'Shard{shard}' + @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, order=True) class Hash(ByteData): - ''' + """ Hash. - ''' + """ data: bytes @@ -128,9 +133,9 @@ class Hash(ByteData): @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, order=True) class PublicKey(ByteData): - ''' + """ PBC public key. - ''' + """ data: bytes @@ -150,9 +155,9 @@ class PublicKey(ByteData): @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, order=True) class Signature(ByteData): - ''' + """ Message signature on PBC. - ''' + """ data: bytes @@ -172,9 +177,9 @@ class Signature(ByteData): @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, order=True) class BlsPublicKey(ByteData): - ''' + """ BLS Public Key. - ''' + """ data: bytes @@ -194,9 +199,9 @@ class BlsPublicKey(ByteData): @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, order=True) class BlsSignature(ByteData): - ''' + """ BLS signature. - ''' + """ data: bytes diff --git a/pbcabi/model.py b/pbcabi/model.py index 204477d..3f03be0 100644 --- a/pbcabi/model.py +++ b/pbcabi/model.py @@ -1,20 +1,23 @@ -''' +""" Specifies the ABI Spec model and how to deserialize the ABI, and additionally how specific type specifications can deserialize their elements. Follows the ABI specification quite closely. The full ABI specification can be found at: https://partisiablockchain.gitlab.io/documentation/abiv.html -''' +""" -from frozendict import frozendict import dataclasses -from typing import Mapping -from enum import Enum -import enforce_typing -from .binaryreader import BinaryReader -import pbcabi.data as data import logging +from collections.abc import Mapping +from enum import Enum + +import enforce_typing +from frozendict import frozendict + +from pbcabi import data + +from .binaryreader import BinaryReader logger = logging.getLogger(__name__) @@ -28,57 +31,58 @@ class SerializeMode(Enum): class SimpleType(Enum): - ''' + """ The specific simple type. - ''' + """ - U8 = 0X01 - U16 = 0X02 - U32 = 0X03 - U64 = 0X04 - U128 = 0X05 - U256 = 0X18 - I8 = 0X06 - I16 = 0X07 - I32 = 0X08 - I64 = 0X09 - I128 = 0X0a - STRING = 0X0b - BOOL = 0X0c - ADDRESS = 0X0d - HASH = 0X13 - PUBLIC_KEY = 0X14 - SIGNATURE = 0X15 - BLS_PUBLIC_KEY = 0X16 - BLS_SIGNATURE = 0X17 + U8 = 0x01 + U16 = 0x02 + U32 = 0x03 + U64 = 0x04 + U128 = 0x05 + U256 = 0x18 + I8 = 0x06 + I16 = 0x07 + I32 = 0x08 + I64 = 0x09 + I128 = 0x0A + STRING = 0x0B + BOOL = 0x0C + ADDRESS = 0x0D + HASH = 0x13 + PUBLIC_KEY = 0x14 + SIGNATURE = 0x15 + BLS_PUBLIC_KEY = 0x16 + BLS_SIGNATURE = 0x17 @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class TypeSpec: - ''' + """ Supertype of all 'TypeSpec'. - ''' + """ @staticmethod def read_from(reader: BinaryReader): - ''' + """ Deserialize this 'TypeSpec' type from 'BinaryReader'. - ''' + """ discriminant = reader.readUInt8() if type_spec_read_from := TYPE_SPEC_SUBTYPE_READ_FROM_BY_DISCRIMINANT.get( - discriminant): + discriminant, + ): type_spec = type_spec_read_from(reader) return type_spec - assert False, "Unknown TypeSpec discriminant: 0x{:02x}".format(discriminant) + assert False, f'Unknown TypeSpec discriminant: 0x{discriminant:02x}' @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class NamedTypeRef(TypeSpec): - ''' + """ Reference to a named type. - ''' + """ idx: int @@ -86,25 +90,24 @@ class NamedTypeRef(TypeSpec): @staticmethod def read_from(reader: BinaryReader): - ''' + """ Deserialize this 'TypeSpec' type from 'BinaryReader'. - ''' + """ assert reader.readUInt8() == NamedTypeRef.DISCRIMINANT type_spec = NamedTypeRef.read_from_inner(reader) return type_spec @staticmethod def read_from_inner(reader: BinaryReader): - ''' + """ Deserialize this 'TypeSpec' type from 'BinaryReader'. - ''' + """ return NamedTypeRef(reader.readUInt8()) - def read_element_from(self, reader: BinaryReader, type_env, - mode: SerializeMode): - ''' + def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode): + """ Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'. - ''' + """ return type_env[self.idx].read_element_from(reader, type_env, mode) @@ -158,24 +161,23 @@ READ_SIMPLE_TYPE[SerializeMode.STATE] = { @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class SimpleTypeSpec(TypeSpec): - ''' + """ Simple type spec. - ''' + """ type: SimpleType @staticmethod def read_from(reader: BinaryReader): - ''' + """ Deserialize this 'TypeSpec' type from 'BinaryReader'. - ''' + """ return SimpleTypeSpec(SimpleType[reader.readUInt8()]) - def read_element_from(self, reader: BinaryReader, type_env, - mode: SerializeMode): - ''' + def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode): + """ Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'. - ''' + """ return READ_SIMPLE_TYPE[mode][self.type](reader) @@ -185,36 +187,35 @@ SIZE_SIMPLE_TYPE_SPEC = SimpleTypeSpec(SimpleType.U32) @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class CompositeTypeSpec(TypeSpec): - ''' + """ Supertype of all composite type specs. - ''' + """ @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class VecTypeSpec(TypeSpec): - ''' + """ Type spec for a dynamically-sized vector of some some type. - ''' + """ type_elements: TypeSpec - DISCRIMINANT = 0x0e + DISCRIMINANT = 0x0E @staticmethod def read_from(reader: BinaryReader): - ''' + """ Deserialize this 'TypeSpec' type from 'BinaryReader'. - ''' + """ type_elements = TypeSpec.read_from(reader) type_spec = VecTypeSpec(type_elements) return type_spec - def read_element_from(self, reader: BinaryReader, type_env, - mode: SerializeMode): - ''' + def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode): + """ Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'. - ''' + """ length = SIZE_SIMPLE_TYPE_SPEC.read_element_from(reader, type_env, mode) if self.type_elements == SimpleTypeSpec(SimpleType.U8): @@ -230,67 +231,72 @@ class VecTypeSpec(TypeSpec): @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class MapTypeSpec(TypeSpec): - ''' + """ Type spec for a dynamically-sized map from some type to another type. - ''' + """ type_key: TypeSpec type_value: TypeSpec - - @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class AvlTreeTypeSpec(MapTypeSpec): - ''' + """ Type spec for AVL-tree. Avl trees does not store data inline. - ''' + """ + DISCRIMINANT = 0x19 @staticmethod def read_from(reader: BinaryReader): - ''' + """ Deserialize this 'TypeSpec' type from 'BinaryReader'. - ''' + """ type_key = TypeSpec.read_from(reader) type_value = TypeSpec.read_from(reader) return AvlTreeTypeSpec(type_key, type_value) - def read_element_from(self, reader: BinaryReader, type_env, - mode: SerializeMode) -> 'AvlTreeId': + def read_element_from( + self, + reader: BinaryReader, + type_env, + mode: SerializeMode, + ) -> 'AvlTreeId': avl_tree_id = SIZE_SIMPLE_TYPE_SPEC.read_element_from(reader, type_env, mode) return AvlTreeId(avl_tree_id, self) + @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class AvlTreeId: avl_tree_id: int type_spec: AvlTreeTypeSpec + @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class InlineMapTypeSpec(MapTypeSpec): - ''' + """ Type spec for a dynamically-sized map from some type to another type. - ''' - DISCRIMINANT = 0x0f + """ + + DISCRIMINANT = 0x0F @staticmethod def read_from(reader: BinaryReader): - ''' + """ Deserialize this 'TypeSpec' type from 'BinaryReader'. - ''' + """ type_key = TypeSpec.read_from(reader) type_value = TypeSpec.read_from(reader) type_spec = MapTypeSpec(type_key, type_value) return type_spec - def read_element_from(self, reader: BinaryReader, type_env, - mode: SerializeMode): - ''' + def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode): + """ Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'. - ''' + """ out = {} length = SIZE_SIMPLE_TYPE_SPEC.read_element_from(reader, type_env, mode) for i in range(0, length): @@ -303,9 +309,9 @@ class InlineMapTypeSpec(MapTypeSpec): @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class SetTypeSpec(TypeSpec): - ''' + """ Type spec for a dynamically-sized set. - ''' + """ type_elements: TypeSpec @@ -313,32 +319,30 @@ class SetTypeSpec(TypeSpec): @staticmethod def read_from(reader: BinaryReader): - ''' + """ Deserialize this 'TypeSpec' type from 'BinaryReader'. - ''' + """ type_elements = TypeSpec.read_from(reader) type_spec = SetTypeSpec(type_elements) return type_spec - def read_element_from(self, reader: BinaryReader, type_env, - mode: SerializeMode): - ''' + def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode): + """ Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'. - ''' + """ array = [] length = SIZE_SIMPLE_TYPE_SPEC.read_element_from(reader, type_env, mode) for i in range(0, length): - array.append(self.type_elements.read_element_from(reader, type_env), - mode) + array.append(self.type_elements.read_element_from(reader, type_env), mode) return frozenset(array) @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class ArrayTypeSpec(TypeSpec): - ''' + """ Type spec for byte-array. - ''' + """ length: int @@ -346,34 +350,32 @@ class ArrayTypeSpec(TypeSpec): @staticmethod def read_from(reader: BinaryReader): - ''' + """ Deserialize this 'TypeSpec' type from 'BinaryReader'. - ''' + """ length = reader.readUInt8() type_spec = ArrayTypeSpec(length) return type_spec - def read_element_from(self, reader: BinaryReader, type_env, - mode: SerializeMode): - ''' + def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode): + """ Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'. - ''' + """ return reader.readBytes(self.length) - def read_element_from(self, reader: BinaryReader, type_env, - mode: SerializeMode): - ''' + def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode): + """ Deserialize elements for this 'TypeSpec' from state 'BinaryReader'. - ''' + """ return reader.readBytes(self.length) @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class OptionTypeSpec(TypeSpec): - ''' + """ Type spec for some option type containing a specific sub-type. - ''' + """ type_elements: TypeSpec @@ -381,29 +383,27 @@ class OptionTypeSpec(TypeSpec): @staticmethod def read_from(reader: BinaryReader): - ''' + """ Deserialize this 'TypeSpec' type from 'BinaryReader'. - ''' + """ type_elements = TypeSpec.read_from(reader) type_spec = OptionTypeSpec(type_elements) return type_spec - def read_element_from(self, reader: BinaryReader, type_env, - mode: SerializeMode): - ''' + def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode): + """ Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'. - ''' + """ discriminant = reader.readUInt8() if discriminant == 0: return None else: return self.type_elements.read_element_from(reader, type_env, mode) - def read_element_from(self, reader: BinaryReader, type_env, - mode: SerializeMode): - ''' + def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode): + """ Deserialize elements for this 'TypeSpec' from state 'BinaryReader'. - ''' + """ discriminant = reader.readUInt8() if discriminant == 0: return None @@ -416,31 +416,41 @@ def simple_type_spec_read_from(variant): return lambda r: v -TYPE_SPEC_SUBTYPE_READ_FROM_BY_DISCRIMINANT = { - 0x00: NamedTypeRef.read_from_inner, -} | { - v.value: simple_type_spec_read_from(v) for v in SimpleType -} | { - t.DISCRIMINANT: t.read_from for t in - [VecTypeSpec, InlineMapTypeSpec, AvlTreeTypeSpec, SetTypeSpec, ArrayTypeSpec, OptionTypeSpec] -} +TYPE_SPEC_SUBTYPE_READ_FROM_BY_DISCRIMINANT = ( + { + 0x00: NamedTypeRef.read_from_inner, + } + | {v.value: simple_type_spec_read_from(v) for v in SimpleType} + | { + t.DISCRIMINANT: t.read_from + for t in [ + VecTypeSpec, + InlineMapTypeSpec, + AvlTreeTypeSpec, + SetTypeSpec, + ArrayTypeSpec, + OptionTypeSpec, + ] + } +) @enforce_typing.enforce_types -@dataclasses.dataclass(frozen=True, slots = True, order=True) +@dataclasses.dataclass(frozen=True, slots=True, order=True) class Version: - ''' + """ Version field. - ''' + """ + major: int minor: int patch: int @staticmethod def read_from(reader: BinaryReader): - ''' + """ Deserialize this struct from 'BinaryReader'. - ''' + """ major = reader.readUInt8() minor = reader.readUInt8() patch = reader.readUInt8() @@ -448,9 +458,9 @@ class Version: class FnKind(Enum): - ''' + """ Function kinds that contract can be invoked with. - ''' + """ INIT = 0x01 ACTION = 0x02 @@ -467,31 +477,31 @@ class FnKind(Enum): @staticmethod def read_from(reader: BinaryReader): - ''' + """ Deserialize this struct from 'BinaryReader'. - ''' + """ discriminant = reader.readUInt8() if found := [kind for kind in FnKind if kind.value == discriminant]: assert len(found) == 1 return found[0] - assert False, "Unknown FnKind discriminant: 0x{:02x}".format(discriminant) + assert False, f'Unknown FnKind discriminant: 0x{discriminant:02x}' @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class FieldAbi: - ''' + """ Field of a struct. - ''' + """ name: Identifier type: TypeSpec @staticmethod def read_from(reader: BinaryReader): - ''' + """ Deserialize this struct from 'BinaryReader'. - ''' + """ name = reader.readString() type = TypeSpec.read_from(reader) return FieldAbi(name, type) @@ -500,17 +510,18 @@ class FieldAbi: @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class ArgumentAbi: - ''' + """ Argument of a function. - ''' + """ + name: Identifier type: TypeSpec @staticmethod def read_from(reader: BinaryReader): - ''' + """ Deserialize this struct from 'BinaryReader'. - ''' + """ name = reader.readString() type = TypeSpec.read_from(reader) return ArgumentAbi(name, type) @@ -519,18 +530,18 @@ class ArgumentAbi: @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class NamedTypeSpec: - ''' + """ Supertype of named types. - ''' + """ name: Identifier type_index: int @staticmethod def read_from(reader: BinaryReader, type_index: int): - ''' + """ Deserialize this 'TypeSpec' type from 'BinaryReader'. - ''' + """ discriminant = reader.readUInt8() if discriminant == StructTypeSpec.DISCRIMINANT: return StructTypeSpec.read_from(reader, type_index) @@ -541,9 +552,9 @@ class NamedTypeSpec: @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class StructTypeSpec(NamedTypeSpec): - ''' + """ Struct type specification. - ''' + """ fields: list[FieldAbi] @@ -551,9 +562,9 @@ class StructTypeSpec(NamedTypeSpec): @staticmethod def read_from(reader: BinaryReader, type_index: int): - ''' + """ Deserialize this 'TypeSpec' type from 'BinaryReader'. - ''' + """ logger.debug('Reading Struct') name = reader.readString() @@ -561,15 +572,13 @@ class StructTypeSpec(NamedTypeSpec): type_spec = StructTypeSpec(name, type_index, variants) return type_spec - def read_element_from(self, reader: BinaryReader, type_env, - mode: SerializeMode): - ''' + def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode): + """ Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'. - ''' + """ result = {} for field in self.fields: - result[field.name] = field.type.read_element_from( - reader, type_env, mode) + result[field.name] = field.type.read_element_from(reader, type_env, mode) result['__type'] = self.name return result @@ -577,18 +586,18 @@ class StructTypeSpec(NamedTypeSpec): @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class EnumVariant: - ''' + """ Enum variant specification. - ''' + """ discriminant: int definition: NamedTypeRef @staticmethod def read_from(reader: BinaryReader): - ''' + """ Deserialize an 'EnumVariant' from 'BinaryReader'. - ''' + """ discriminant = reader.readUInt8() named_type = NamedTypeRef.read_from(reader) return EnumVariant(discriminant, named_type) @@ -597,9 +606,9 @@ class EnumVariant: @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class EnumTypeSpec(NamedTypeSpec): - ''' + """ Enum type specification. - ''' + """ variants_by_discriminant: Mapping[int, EnumVariant] @@ -607,22 +616,20 @@ class EnumTypeSpec(NamedTypeSpec): @staticmethod def read_from(reader: BinaryReader, type_index: int): - ''' + """ Deserialize this 'TypeSpec' type from 'BinaryReader'. - ''' + """ name = reader.readString() variants = reader.readList(EnumVariant.read_from) - variants_by_discriminant = frozendict( - {v.discriminant: v for v in variants}) + variants_by_discriminant = frozendict({v.discriminant: v for v in variants}) assert len(variants_by_discriminant) == len(variants), 'Duplicant discriminants' type_spec = EnumTypeSpec(name, type_index, variants_by_discriminant) return type_spec - def read_element_from(self, reader: BinaryReader, type_env, - mode: SerializeMode): - ''' + def read_element_from(self, reader: BinaryReader, type_env, mode: SerializeMode): + """ Deserialize elements for this 'TypeSpec' from RPC 'BinaryReader'. - ''' + """ discriminant = reader.readUInt8() variant = self.variants_by_discriminant[discriminant] return variant.definition.read_element_from(reader, type_env, mode) @@ -631,9 +638,9 @@ class EnumTypeSpec(NamedTypeSpec): @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class FnAbi: - ''' + """ Function definition. - ''' + """ kind: FnKind name: Identifier @@ -643,9 +650,9 @@ class FnAbi: @staticmethod def read_from(reader: BinaryReader): - ''' + """ Deserialize 'FnAbi' from 'BinaryReader'. - ''' + """ kind = FnKind.read_from(reader) name = reader.readString() shortname = reader.readLeb128() @@ -656,6 +663,7 @@ class FnAbi: return FnAbi(kind, name, shortname, arguments, secret_argument) + @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class ParsedInvocation: @@ -670,12 +678,13 @@ class ParsedInvocation: return frozendict(arguments) + @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class ContractAbi: - ''' + """ Contract definition. - ''' + """ named_types_by_id: Mapping[Identifier, NamedTypeSpec] named_types_by_idx: Mapping[int, NamedTypeSpec] @@ -684,9 +693,9 @@ class ContractAbi: @staticmethod def read_from(reader: BinaryReader | str): - ''' + """ Deserialize 'ContractAbi' from 'BinaryReader'. - ''' + """ num_named_types = reader.readUInt32BigEndian() named_types = [] @@ -696,26 +705,38 @@ class ContractAbi: hooks = reader.readList(FnAbi.read_from) state_type = TypeSpec.read_from(reader) - return ContractAbi(frozendict({t.name: t for t in named_types}), - frozendict({t.type_index: t for t in named_types}), - frozendict({t.name: t for t in hooks}), state_type) + return ContractAbi( + frozendict({t.name: t for t in named_types}), + frozendict({t.type_index: t for t in named_types}), + frozendict({t.name: t for t in hooks}), + state_type, + ) - def read_type_element_from_rpc(self, type_name: Identifier, - rpc: BinaryReader): - ''' + def read_type_element_from_rpc(self, type_name: Identifier, rpc: BinaryReader): + """ Reads element of the given type name from RPC 'BinaryReader'. - ''' + """ return self.named_types_by_id[type_name].read_element_from( - rpc, self.named_types_by_idx, SerializeMode.RPC) + rpc, + self.named_types_by_idx, + SerializeMode.RPC, + ) - def read_state(self, reader: BinaryReader | bytes, explicit_type: TypeSpec | None = None): + def read_state( + self, + reader: BinaryReader | bytes, + explicit_type: TypeSpec | None = None, + ): if not isinstance(reader, BinaryReader): reader = BinaryReader(reader) explicit_type = explicit_type or self.state_type - return explicit_type.read_element_from(reader, self.named_types_by_idx, - SerializeMode.STATE) + return explicit_type.read_element_from( + reader, + self.named_types_by_idx, + SerializeMode.STATE, + ) def get_fn_abi_from_shortname(self, shortname: int) -> FnAbi: for derp in self.hooks.values(): @@ -729,16 +750,23 @@ class ContractAbi: fn_abi = self.get_fn_abi_from_shortname(shortname) arguments = [] for arg_abi in fn_abi.arguments: - arguments.append(arg_abi.type.read_element_from(reader, self.named_types_by_idx, SerializeMode.RPC)) + arguments.append( + arg_abi.type.read_element_from( + reader, + self.named_types_by_idx, + SerializeMode.RPC, + ), + ) return ParsedInvocation(shortname, arguments, fn_abi) + @enforce_typing.enforce_types @dataclasses.dataclass(frozen=True, slots=True) class FileAbi: - ''' + """ File definition. - ''' + """ version_binder: Version version_client: Version @@ -746,9 +774,9 @@ class FileAbi: @staticmethod def read_from(reader: BinaryReader | bytes): - ''' + """ Deserialize 'FileAbi' from 'BinaryReader'. - ''' + """ if not isinstance(reader, BinaryReader): reader = BinaryReader(reader) diff --git a/setup.py b/setup.py index 2c85eed..c2386d4 100644 --- a/setup.py +++ b/setup.py @@ -51,6 +51,7 @@ print(token_state['balances'][my_address]) PACKAGE_DESCRIPTION_SHORT = """ Unofficial utility library for parsing and processing the Partisia Blockchain ABI Format.""".strip() + def parse_version_file(text: str) -> str: text = re.sub('^#.*', '', text, flags=re.MULTILINE) match = re.match(r'^\s*__version__\s*=\s*(["\'])([\d\.]+)\1$', text) @@ -59,6 +60,7 @@ def parse_version_file(text: str) -> str: raise Exception(msg) return match.group(2) + with open(PACKAGE_NAME + '/_version.py') as f: version = parse_version_file(f.read()) diff --git a/test/test_address.py b/test/test_address.py index 17c3898..7346cc0 100644 --- a/test/test_address.py +++ b/test/test_address.py @@ -1,19 +1,17 @@ import pytest -from pathlib import Path -from pbcabi.binaryreader import BinaryReader -import pbcabi.model + from pbcabi.data import BlockchainAddress EXAMPLE_SHARDS = [ - ("Shard1", 2, "000000000000000000000000000000000000000001"), - ("Shard0", 2, "000000000000000000000000000000000000000002"), - ("Shard0", 3, "025FA781D389D7C7CAAF836E5E47ABED6CEFD2D928"), - ("Shard1", 3, "04FE17D1009372C8ED3AC5B790B32E349359C2C7E9"), - ("Shard0", 3, "01A2020BB33EF9E0323C7A3210D5CB7FD492AA0D65"), + ('Shard1', 2, '000000000000000000000000000000000000000001'), + ('Shard0', 2, '000000000000000000000000000000000000000002'), + ('Shard0', 3, '025FA781D389D7C7CAAF836E5E47ABED6CEFD2D928'), + ('Shard1', 3, '04FE17D1009372C8ED3AC5B790B32E349359C2C7E9'), + ('Shard0', 3, '01A2020BB33EF9E0323C7A3210D5CB7FD492AA0D65'), ] + @pytest.mark.parametrize('shard_id,num_shards,address', EXAMPLE_SHARDS) -def test_parse_abi(shard_id: int, num_shards:int, address: str): +def test_parse_abi(shard_id: int, num_shards: int, address: str): address = BlockchainAddress.from_hex_hash(address) assert address.shard_id(num_shards) == shard_id - diff --git a/test/test_binaryreader.py b/test/test_binaryreader.py index 27e8c93..933fa98 100644 --- a/test/test_binaryreader.py +++ b/test/test_binaryreader.py @@ -1,12 +1,11 @@ -import pytest -from pathlib import Path from pbcabi.binaryreader import BinaryReader -import pbcabi.model + def assert_parse(hex, value): derp = bytes.fromhex(hex) assert BinaryReader(derp).readSignedIntBigEndian(1) == value + def test_parse(): assert_parse('00', 0) assert_parse('01', 1) @@ -15,4 +14,3 @@ def test_parse(): assert_parse('80', -0x80) assert_parse('F0', -16) assert_parse('FF', -1) - diff --git a/test/test_parse_abi.py b/test/test_parse_abi.py index 42c643c..b2b465a 100644 --- a/test/test_parse_abi.py +++ b/test/test_parse_abi.py @@ -1,16 +1,22 @@ -import pytest from pathlib import Path -from pbcabi.binaryreader import BinaryReader + +import pytest + import pbcabi.model +from pbcabi.binaryreader import BinaryReader EXAMPLE_ABIS_FOLDER = Path.cwd() / 'test' / 'example-abis' -EXAMPLE_ABIS = [str(p.relative_to(EXAMPLE_ABIS_FOLDER)) for p in EXAMPLE_ABIS_FOLDER.glob('*.abi')] +EXAMPLE_ABIS = [ + str(p.relative_to(EXAMPLE_ABIS_FOLDER)) for p in EXAMPLE_ABIS_FOLDER.glob('*.abi') +] + def test_parse_abi_num(): print(EXAMPLE_ABIS_FOLDER) print(EXAMPLE_ABIS) assert len(EXAMPLE_ABIS) > 0 + @pytest.mark.parametrize('abi_file_path', EXAMPLE_ABIS) def test_parse_abi(abi_file_path): with open(EXAMPLE_ABIS_FOLDER / abi_file_path, 'rb') as f: