This commit is contained in:
parent
0ffe623d89
commit
c854200a3d
|
@ -14,7 +14,7 @@ class BinaryReader:
|
||||||
Wrapper for io.BytesIO for iteratively parsing byte streams.
|
Wrapper for io.BytesIO for iteratively parsing byte streams.
|
||||||
'''
|
'''
|
||||||
|
|
||||||
def __init__(self, buf):
|
def __init__(self, buf: bytes):
|
||||||
'''
|
'''
|
||||||
Initializes BinaryReader.
|
Initializes BinaryReader.
|
||||||
'''
|
'''
|
||||||
|
@ -24,17 +24,19 @@ class BinaryReader:
|
||||||
self.size = len(buf)
|
self.size = len(buf)
|
||||||
self.position = 0
|
self.position = 0
|
||||||
|
|
||||||
def readBytes(self, num_bytes):
|
def readBytes(self, num_bytes: int) -> bytes:
|
||||||
'''
|
'''
|
||||||
Reads a number of bytes from stream.
|
Reads a number of bytes from stream.
|
||||||
'''
|
'''
|
||||||
self.position += num_bytes
|
self.position += num_bytes
|
||||||
bts = self.buf.read(num_bytes)
|
bts = self.buf.read(num_bytes)
|
||||||
if len(bts) != num_bytes:
|
if len(bts) != num_bytes:
|
||||||
raise Exception("Not enough bytes in buffer: Got {}, but expected {}".format(len(bts), num_bytes))
|
raise Exception("Could not read {read} (0x{read:02X}) bytes: Buffer only contained {size} (0x{size:02X}) bytes".format(read=num_bytes, size=len(bts)))
|
||||||
|
assert len(bts) == num_bytes
|
||||||
|
print(' ', bts)
|
||||||
return bts
|
return bts
|
||||||
|
|
||||||
def readDynamicBytes(self, size_reader = None):
|
def readDynamicBytes(self, size_reader = None) -> bytes:
|
||||||
'''
|
'''
|
||||||
Reads a dynamically sized list of bytes.
|
Reads a dynamically sized list of bytes.
|
||||||
'''
|
'''
|
||||||
|
@ -42,32 +44,32 @@ class BinaryReader:
|
||||||
num_bytes = size_reader(self)
|
num_bytes = size_reader(self)
|
||||||
return self.readBytes(num_bytes)
|
return self.readBytes(num_bytes)
|
||||||
|
|
||||||
def readString(self, size_reader = None):
|
def readString(self, size_reader = None) -> str:
|
||||||
'''
|
'''
|
||||||
Reads a string from stream.
|
Reads a string from stream.
|
||||||
'''
|
'''
|
||||||
return self.readDynamicBytes(size_reader).decode('utf8')
|
return self.readDynamicBytes(size_reader).decode('utf8')
|
||||||
|
|
||||||
def readUInt8(self):
|
def readUInt8(self) -> int:
|
||||||
'''
|
'''
|
||||||
Reads a unsigned 8-bit integer from stream.
|
Reads a unsigned 8-bit integer from stream.
|
||||||
'''
|
'''
|
||||||
bytes = self.readBytes(1)
|
bytes = self.readBytes(1)
|
||||||
return bytes[0]
|
return bytes[0]
|
||||||
|
|
||||||
def readUInt32BigEndian(self):
|
def readUInt32BigEndian(self) -> int:
|
||||||
'''
|
'''
|
||||||
Reads a unsigned 32-bit integer from stream.
|
Reads a unsigned 32-bit integer from stream.
|
||||||
'''
|
'''
|
||||||
return self.readUIntBigEndian(4)
|
return self.readUIntBigEndian(4)
|
||||||
|
|
||||||
def readUInt32LittleEndian(self):
|
def readUInt32LittleEndian(self) -> int:
|
||||||
'''
|
'''
|
||||||
Reads a unsigned 32-bit integer from stream.
|
Reads a unsigned 32-bit integer from stream.
|
||||||
'''
|
'''
|
||||||
return self.readUIntLittleEndian(4)
|
return self.readUIntLittleEndian(4)
|
||||||
|
|
||||||
def readUIntBigEndian(self, num_bytes):
|
def readUIntBigEndian(self, num_bytes: int) -> int:
|
||||||
'''
|
'''
|
||||||
Reads an unsigned N-bit integer from stream.
|
Reads an unsigned N-bit integer from stream.
|
||||||
'''
|
'''
|
||||||
|
@ -77,7 +79,7 @@ class BinaryReader:
|
||||||
c += bytes[num_bytes - i - 1] * 2**(i * 8)
|
c += bytes[num_bytes - i - 1] * 2**(i * 8)
|
||||||
return c
|
return c
|
||||||
|
|
||||||
def readSignedIntBigEndian(self, num_bytes):
|
def readSignedIntBigEndian(self, num_bytes: int) -> int:
|
||||||
'''
|
'''
|
||||||
Reads an signed N-bit integer from stream.
|
Reads an signed N-bit integer from stream.
|
||||||
'''
|
'''
|
||||||
|
@ -88,7 +90,7 @@ class BinaryReader:
|
||||||
result -= half
|
result -= half
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def readUIntLittleEndian(self, num_bytes):
|
def readUIntLittleEndian(self, num_bytes: int) -> int:
|
||||||
'''
|
'''
|
||||||
Reads an unsigned N-bit integer from stream.
|
Reads an unsigned N-bit integer from stream.
|
||||||
'''
|
'''
|
||||||
|
@ -98,7 +100,7 @@ class BinaryReader:
|
||||||
c += bytes[i] * 2**(i * 8)
|
c += bytes[i] * 2**(i * 8)
|
||||||
return c
|
return c
|
||||||
|
|
||||||
def readSignedIntLittleEndian(self, num_bytes):
|
def readSignedIntLittleEndian(self, num_bytes: int) -> int:
|
||||||
'''
|
'''
|
||||||
Reads an signed N-bit integer from stream.
|
Reads an signed N-bit integer from stream.
|
||||||
'''
|
'''
|
||||||
|
@ -109,7 +111,7 @@ class BinaryReader:
|
||||||
result -= half
|
result -= half
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def readLeb128(self):
|
def readLeb128(self) -> int:
|
||||||
'''
|
'''
|
||||||
Reads a LEB-128 integer from stream.
|
Reads a LEB-128 integer from stream.
|
||||||
'''
|
'''
|
||||||
|
@ -122,7 +124,13 @@ class BinaryReader:
|
||||||
assert count < 6, "TODO: " + str(v)
|
assert count < 6, "TODO: " + str(v)
|
||||||
return v
|
return v
|
||||||
|
|
||||||
def readList(self, fn, size_reader=None):
|
def readSizedList(self, fn, num_elements: int) -> list[object]:
|
||||||
|
ls = []
|
||||||
|
for i in range(0, num_elements):
|
||||||
|
ls.append(fn(self))
|
||||||
|
return ls
|
||||||
|
|
||||||
|
def readList(self, fn, size_reader=None) -> list[object]:
|
||||||
'''
|
'''
|
||||||
Reads a list of elements from stream, by applying the given function
|
Reads a list of elements from stream, by applying the given function
|
||||||
several times.
|
several times.
|
||||||
|
@ -130,8 +138,5 @@ class BinaryReader:
|
||||||
Function must take a single argument of type BinaryReader.
|
Function must take a single argument of type BinaryReader.
|
||||||
'''
|
'''
|
||||||
size_reader = size_reader or BinaryReader.readUInt32BigEndian
|
size_reader = size_reader or BinaryReader.readUInt32BigEndian
|
||||||
length = size_reader(self)
|
num_elements = size_reader(self)
|
||||||
ls = []
|
return self.readSizedList(fn, num_elements)
|
||||||
for i in range(0, length):
|
|
||||||
ls.append(fn(self))
|
|
||||||
return ls
|
|
||||||
|
|
|
@ -87,7 +87,8 @@ class NamedTypeRef(TypeSpec):
|
||||||
Deserialize this 'TypeSpec' type from 'BinaryReader'.
|
Deserialize this 'TypeSpec' type from 'BinaryReader'.
|
||||||
'''
|
'''
|
||||||
assert reader.readUInt8() == NamedTypeRef.DISCRIMINANT
|
assert reader.readUInt8() == NamedTypeRef.DISCRIMINANT
|
||||||
return NamedTypeRef.read_from_inner(reader)
|
type_spec = NamedTypeRef.read_from_inner(reader)
|
||||||
|
return type_spec
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def read_from_inner(reader: BinaryReader):
|
def read_from_inner(reader: BinaryReader):
|
||||||
|
@ -203,7 +204,8 @@ class VecTypeSpec(TypeSpec):
|
||||||
Deserialize this 'TypeSpec' type from 'BinaryReader'.
|
Deserialize this 'TypeSpec' type from 'BinaryReader'.
|
||||||
'''
|
'''
|
||||||
type_elements = TypeSpec.read_from(reader)
|
type_elements = TypeSpec.read_from(reader)
|
||||||
return VecTypeSpec(type_elements)
|
type_spec = VecTypeSpec(type_elements)
|
||||||
|
return type_spec
|
||||||
|
|
||||||
def read_element_from(self, reader: BinaryReader, type_env,
|
def read_element_from(self, reader: BinaryReader, type_env,
|
||||||
mode: SerializeMode):
|
mode: SerializeMode):
|
||||||
|
@ -255,7 +257,8 @@ class InlineMapTypeSpec(MapTypeSpec):
|
||||||
'''
|
'''
|
||||||
type_key = TypeSpec.read_from(reader)
|
type_key = TypeSpec.read_from(reader)
|
||||||
type_value = TypeSpec.read_from(reader)
|
type_value = TypeSpec.read_from(reader)
|
||||||
return MapTypeSpec(type_key, type_value)
|
type_spec = MapTypeSpec(type_key, type_value)
|
||||||
|
return type_spec
|
||||||
|
|
||||||
def read_element_from(self, reader: BinaryReader, type_env,
|
def read_element_from(self, reader: BinaryReader, type_env,
|
||||||
mode: SerializeMode):
|
mode: SerializeMode):
|
||||||
|
@ -288,7 +291,8 @@ class SetTypeSpec(TypeSpec):
|
||||||
Deserialize this 'TypeSpec' type from 'BinaryReader'.
|
Deserialize this 'TypeSpec' type from 'BinaryReader'.
|
||||||
'''
|
'''
|
||||||
type_elements = TypeSpec.read_from(reader)
|
type_elements = TypeSpec.read_from(reader)
|
||||||
return SetTypeSpec(type_elements)
|
type_spec = SetTypeSpec(type_elements)
|
||||||
|
return type_spec
|
||||||
|
|
||||||
def read_element_from(self, reader: BinaryReader, type_env,
|
def read_element_from(self, reader: BinaryReader, type_env,
|
||||||
mode: SerializeMode):
|
mode: SerializeMode):
|
||||||
|
@ -320,7 +324,8 @@ class ArrayTypeSpec(TypeSpec):
|
||||||
Deserialize this 'TypeSpec' type from 'BinaryReader'.
|
Deserialize this 'TypeSpec' type from 'BinaryReader'.
|
||||||
'''
|
'''
|
||||||
length = reader.readUInt8()
|
length = reader.readUInt8()
|
||||||
return ArrayTypeSpec(length)
|
type_spec = ArrayTypeSpec(length)
|
||||||
|
return type_spec
|
||||||
|
|
||||||
def read_element_from(self, reader: BinaryReader, type_env,
|
def read_element_from(self, reader: BinaryReader, type_env,
|
||||||
mode: SerializeMode):
|
mode: SerializeMode):
|
||||||
|
@ -354,7 +359,8 @@ class OptionTypeSpec(TypeSpec):
|
||||||
Deserialize this 'TypeSpec' type from 'BinaryReader'.
|
Deserialize this 'TypeSpec' type from 'BinaryReader'.
|
||||||
'''
|
'''
|
||||||
type_elements = TypeSpec.read_from(reader)
|
type_elements = TypeSpec.read_from(reader)
|
||||||
return OptionTypeSpec(type_elements)
|
type_spec = OptionTypeSpec(type_elements)
|
||||||
|
return type_spec
|
||||||
|
|
||||||
def read_element_from(self, reader: BinaryReader, type_env,
|
def read_element_from(self, reader: BinaryReader, type_env,
|
||||||
mode: SerializeMode):
|
mode: SerializeMode):
|
||||||
|
@ -524,7 +530,8 @@ class StructTypeSpec(NamedTypeSpec):
|
||||||
'''
|
'''
|
||||||
name = reader.readString()
|
name = reader.readString()
|
||||||
variants = reader.readList(FieldAbi.read_from)
|
variants = reader.readList(FieldAbi.read_from)
|
||||||
return StructTypeSpec(name, type_index, variants)
|
type_spec = StructTypeSpec(name, type_index, variants)
|
||||||
|
return type_spec
|
||||||
|
|
||||||
def read_element_from(self, reader: BinaryReader, type_env,
|
def read_element_from(self, reader: BinaryReader, type_env,
|
||||||
mode: SerializeMode):
|
mode: SerializeMode):
|
||||||
|
@ -555,8 +562,8 @@ class EnumVariant:
|
||||||
Deserialize an 'EnumVariant' from 'BinaryReader'.
|
Deserialize an 'EnumVariant' from 'BinaryReader'.
|
||||||
'''
|
'''
|
||||||
discriminant = reader.readUInt8()
|
discriminant = reader.readUInt8()
|
||||||
type = NamedTypeRef.read_from_inner(reader)
|
named_type = NamedTypeRef.read_from(reader)
|
||||||
return EnumVariant(discriminant, type)
|
return EnumVariant(discriminant, named_type)
|
||||||
|
|
||||||
|
|
||||||
@enforce_typing.enforce_types
|
@enforce_typing.enforce_types
|
||||||
|
@ -579,7 +586,9 @@ class EnumTypeSpec(NamedTypeSpec):
|
||||||
variants = reader.readList(EnumVariant.read_from)
|
variants = reader.readList(EnumVariant.read_from)
|
||||||
variants_by_discriminant = frozendict(
|
variants_by_discriminant = frozendict(
|
||||||
{v.discriminant: v for v in variants})
|
{v.discriminant: v for v in variants})
|
||||||
return EnumTypeSpec(name, type_index, variants_by_discriminant)
|
assert len(variants_by_discriminant) == len(variants), 'Duplicant discriminants'
|
||||||
|
type_spec = EnumTypeSpec(name, type_index, variants_by_discriminant)
|
||||||
|
return type_spec
|
||||||
|
|
||||||
def read_element_from(self, reader: BinaryReader, type_env,
|
def read_element_from(self, reader: BinaryReader, type_env,
|
||||||
mode: SerializeMode):
|
mode: SerializeMode):
|
||||||
|
@ -640,7 +649,8 @@ class ContractAbi:
|
||||||
num_named_types = reader.readUInt32BigEndian()
|
num_named_types = reader.readUInt32BigEndian()
|
||||||
named_types = []
|
named_types = []
|
||||||
for type_index in range(0, num_named_types):
|
for type_index in range(0, num_named_types):
|
||||||
named_types.append(NamedTypeSpec.read_from(reader, type_index))
|
named_type = NamedTypeSpec.read_from(reader, type_index)
|
||||||
|
named_types.append(named_type)
|
||||||
|
|
||||||
hooks = reader.readList(FnAbi.read_from)
|
hooks = reader.readList(FnAbi.read_from)
|
||||||
state_type = TypeSpec.read_from(reader)
|
state_type = TypeSpec.read_from(reader)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user