142 lines
4.0 KiB
Python
142 lines
4.0 KiB
Python
'''
|
|
Utility module for reading binary streams of data.
|
|
|
|
Follows the ABI specification quite closely. The full ABI specification can be found at:
|
|
|
|
https://partisiablockchain.gitlab.io/documentation/abiv.html
|
|
'''
|
|
|
|
import io
|
|
|
|
|
|
class BinaryReader:
|
|
'''
|
|
Wrapper for io.BytesIO for iteratively parsing byte streams.
|
|
'''
|
|
|
|
def __init__(self, buf: bytes):
|
|
'''
|
|
Initializes BinaryReader.
|
|
'''
|
|
self.buf = buf
|
|
if isinstance(self.buf, bytes):
|
|
self.buf = io.BytesIO(self.buf)
|
|
self.size = len(buf)
|
|
self.position = 0
|
|
|
|
def readBytes(self, num_bytes: int) -> bytes:
|
|
'''
|
|
Reads a number of bytes from stream.
|
|
'''
|
|
self.position += num_bytes
|
|
bts = self.buf.read(num_bytes)
|
|
if len(bts) != num_bytes:
|
|
raise Exception("Could not read {read} (0x{read:02X}) bytes: Buffer only contained {size} (0x{size:02X}) bytes".format(read=num_bytes, size=len(bts)))
|
|
assert len(bts) == num_bytes
|
|
return bts
|
|
|
|
def readDynamicBytes(self, size_reader = None) -> bytes:
|
|
'''
|
|
Reads a dynamically sized list of bytes.
|
|
'''
|
|
size_reader = size_reader or BinaryReader.readUInt32BigEndian
|
|
num_bytes = size_reader(self)
|
|
return self.readBytes(num_bytes)
|
|
|
|
def readString(self, size_reader = None) -> str:
|
|
'''
|
|
Reads a string from stream.
|
|
'''
|
|
return self.readDynamicBytes(size_reader).decode('utf8')
|
|
|
|
def readUInt8(self) -> int:
|
|
'''
|
|
Reads a unsigned 8-bit integer from stream.
|
|
'''
|
|
bytes = self.readBytes(1)
|
|
return bytes[0]
|
|
|
|
def readUInt32BigEndian(self) -> int:
|
|
'''
|
|
Reads a unsigned 32-bit integer from stream.
|
|
'''
|
|
return self.readUIntBigEndian(4)
|
|
|
|
def readUInt32LittleEndian(self) -> int:
|
|
'''
|
|
Reads a unsigned 32-bit integer from stream.
|
|
'''
|
|
return self.readUIntLittleEndian(4)
|
|
|
|
def readUIntBigEndian(self, num_bytes: int) -> int:
|
|
'''
|
|
Reads an unsigned N-bit integer from stream.
|
|
'''
|
|
bytes = self.readBytes(num_bytes)
|
|
c = 0
|
|
for i in range(0, num_bytes):
|
|
c += bytes[num_bytes - i - 1] * 2**(i * 8)
|
|
return c
|
|
|
|
def readSignedIntBigEndian(self, num_bytes: int) -> int:
|
|
'''
|
|
Reads an signed N-bit integer from stream.
|
|
'''
|
|
# TODO: Test!
|
|
result = self.readUIntBigEndian(num_bytes)
|
|
half = 2**(num_bytes * 8)
|
|
if result > half:
|
|
result -= half
|
|
return result
|
|
|
|
def readUIntLittleEndian(self, num_bytes: int) -> int:
|
|
'''
|
|
Reads an unsigned N-bit integer from stream.
|
|
'''
|
|
bytes = self.readBytes(num_bytes)
|
|
c = 0
|
|
for i in range(0, num_bytes):
|
|
c += bytes[i] * 2**(i * 8)
|
|
return c
|
|
|
|
def readSignedIntLittleEndian(self, num_bytes: int) -> int:
|
|
'''
|
|
Reads an signed N-bit integer from stream.
|
|
'''
|
|
# TODO: Test!
|
|
result = self.readUIntLittleEndian(num_bytes)
|
|
half = 2**(num_bytes * 8)
|
|
if result > half:
|
|
result -= half
|
|
return result
|
|
|
|
def readLeb128(self) -> int:
|
|
'''
|
|
Reads a LEB-128 integer from stream.
|
|
'''
|
|
# TODO: Test!
|
|
v = self.readUInt8()
|
|
count = 0
|
|
while v & 0x80 != 0:
|
|
v = self.readUInt8()
|
|
count += 1
|
|
assert count < 6, "TODO: " + str(v)
|
|
return v
|
|
|
|
def readSizedList(self, fn, num_elements: int) -> list[object]:
|
|
ls = []
|
|
for i in range(0, num_elements):
|
|
ls.append(fn(self))
|
|
return ls
|
|
|
|
def readList(self, fn, size_reader=None) -> list[object]:
|
|
'''
|
|
Reads a list of elements from stream, by applying the given function
|
|
several times.
|
|
|
|
Function must take a single argument of type BinaryReader.
|
|
'''
|
|
size_reader = size_reader or BinaryReader.readUInt32BigEndian
|
|
num_elements = size_reader(self)
|
|
return self.readSizedList(fn, num_elements)
|