1
0
pbcabi/pbcabi/binaryreader.py
Jon Michael Aanes ae1c7d984a
Some checks failed
Python Package / Package (push) Failing after 24s
Fixed shard computation
2024-05-03 10:06:05 +02:00

150 lines
4.3 KiB
Python

'''
Utility module for reading binary streams of data.
Follows the ABI specification quite closely. The full ABI specification can be found at:
https://partisiablockchain.gitlab.io/documentation/abiv.html
'''
import io
class BinaryReader:
'''
Wrapper for io.BytesIO for iteratively parsing byte streams.
'''
def __init__(self, buf: bytes):
'''
Initializes BinaryReader.
'''
self.buf = buf
if isinstance(self.buf, bytes):
self.buf = io.BytesIO(self.buf)
if not isinstance(self.buf, io.BytesIO):
raise Exception(f'Incorrect reader type: {type(self.buf)}')
self.size = len(buf)
self.position = 0
def __repr__(self) -> str:
return 'BinaryReader[{} / {}]'.format(self.position, self.size)
def __str__(self) -> str:
return repr(self)
def readBytes(self, num_bytes: int) -> bytes:
'''
Reads a number of bytes from stream.
'''
self.position += num_bytes
bts = self.buf.read(num_bytes)
if len(bts) != num_bytes:
raise Exception("Could not read {read} (0x{read:02X}) bytes: Buffer only contained {size} (0x{size:02X}) bytes".format(read=num_bytes, size=len(bts)))
assert len(bts) == num_bytes
return bts
def readDynamicBytes(self, size_reader = None) -> bytes:
'''
Reads a dynamically sized list of bytes.
'''
size_reader = size_reader or BinaryReader.readUInt32BigEndian
num_bytes = size_reader(self)
return self.readBytes(num_bytes)
def readString(self, size_reader = None) -> str:
'''
Reads a string from stream.
'''
return self.readDynamicBytes(size_reader).decode('utf8')
def readUInt8(self) -> int:
'''
Reads a unsigned 8-bit integer from stream.
'''
bytes = self.readBytes(1)
return bytes[0]
def readUInt32BigEndian(self) -> int:
'''
Reads a unsigned 32-bit integer from stream.
'''
return self.readUIntBigEndian(4)
def readUInt32LittleEndian(self) -> int:
'''
Reads a unsigned 32-bit integer from stream.
'''
return self.readUIntLittleEndian(4)
def readUIntBigEndian(self, num_bytes: int) -> int:
'''
Reads an unsigned N-bit integer from stream.
'''
bytes = self.readBytes(num_bytes)
c = 0
for i in range(0, num_bytes):
c += bytes[num_bytes - i - 1] * 2**(i * 8)
return c
def readSignedIntBigEndian(self, num_bytes: int) -> int:
'''
Reads an signed N-bit integer from stream.
'''
# TODO: Test!
result = self.readUIntBigEndian(num_bytes)
full = 2**(num_bytes * 8)
if result >= full//2:
result -= full
return result
def readUIntLittleEndian(self, num_bytes: int) -> int:
'''
Reads an unsigned N-bit integer from stream.
'''
bytes = self.readBytes(num_bytes)
c = 0
for i in range(0, num_bytes):
c += bytes[i] * 2**(i * 8)
return c
def readSignedIntLittleEndian(self, num_bytes: int) -> int:
'''
Reads an signed N-bit integer from stream.
'''
# TODO: Test!
result = self.readUIntLittleEndian(num_bytes)
full = 2**(num_bytes * 8)
if result >= full//2:
result -= full
return result
def readLeb128(self) -> int:
'''
Reads a LEB-128 integer from stream.
'''
# TODO: Test!
v = self.readUInt8()
count = 0
while v & 0x80 != 0:
v = self.readUInt8()
count += 1
assert count < 6, "TODO: " + str(v)
return v
def readSizedList(self, fn, num_elements: int) -> list[object]:
ls = []
for i in range(0, num_elements):
ls.append(fn(self))
return ls
def readList(self, fn, size_reader=None) -> list[object]:
'''
Reads a list of elements from stream, by applying the given function
several times.
Function must take a single argument of type BinaryReader.
'''
size_reader = size_reader or BinaryReader.readUInt32BigEndian
num_elements = size_reader(self)
return self.readSizedList(fn, num_elements)