1
0

Parse invocation
Some checks failed
Python Package / Package (push) Failing after 23s

This commit is contained in:
Jon Michael Aanes 2024-05-03 00:27:33 +02:00
parent 8c333ada57
commit cae659638a
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
3 changed files with 38 additions and 0 deletions

View File

@ -21,6 +21,8 @@ class BinaryReader:
self.buf = buf
if isinstance(self.buf, bytes):
self.buf = io.BytesIO(self.buf)
if not isinstance(self.buf, io.BytesIO):
raise Exception(f'Incorrect reader type: {type(self.buf)}')
self.size = len(buf)
self.position = 0

View File

@ -11,6 +11,7 @@ import dataclasses
from typing import Mapping, Collection, Optional, Union
from enum import Enum
import enforce_typing
from .binaryreader import BinaryReader
def hexformat(bts: bytes, sep=''):
return sep.join('{:02X}'.format(c) for c in bts)
@ -93,6 +94,12 @@ class BlockchainAddress(ByteData):
def __str__(self):
return repr(self)
def shard_id(self, num_shards: int) -> str:
reader = BinaryReader(self.data)
reader.readBytes(17)
shard = reader.readUInt32BigEndian()
shard = abs(shard) % num_shards
return f'Shard{shard}'
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, order=True)

View File

@ -628,6 +628,20 @@ class FnAbi:
return FnAbi(kind, name, shortname, arguments, secret_argument)
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class ParsedInvocation:
shortname: int
arguments: list[object]
fn_abi: FnAbi
def arguments_map(self) -> Mapping[str, object]:
arguments = {}
for idx, arg_abi in enumerate(self.fn_abi.arguments):
arguments[arg_abi.name] = self.arguments[idx]
return frozendict(arguments)
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)
class ContractAbi:
@ -673,6 +687,21 @@ class ContractAbi:
return self.state_type.read_element_from(reader, self.named_types_by_idx,
SerializeMode.STATE)
def get_fn_abi_from_shortname(self, shortname: int) -> FnAbi:
for derp in self.hooks.values():
if derp.shortname == shortname:
return derp
def read_invocation(self, reader: BinaryReader) -> ParsedInvocation:
if not isinstance(reader, BinaryReader):
reader = BinaryReader(reader)
shortname = reader.readLeb128()
fn_abi = self.get_fn_abi_from_shortname(shortname)
arguments = []
for arg_abi in fn_abi.arguments:
arguments.append(arg_abi.type.read_element_from(reader, self.named_types_by_idx, SerializeMode.RPC))
return ParsedInvocation(shortname, arguments, fn_abi)
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, slots=True)