1
0
pbcabi/pbcabi/binaryreader.py
2023-06-20 11:15:13 +02:00

138 lines
3.7 KiB
Python

'''
Utility module for reading binary streams of data.
Follows the ABI specification quite closely. The full ABI specification can be found at:
https://partisiablockchain.gitlab.io/documentation/abiv.html
'''
import io
class BinaryReader:
'''
Wrapper for io.BytesIO for iteratively parsing byte streams.
'''
def __init__(self, buf):
'''
Initializes BinaryReader.
'''
self.buf = buf
if isinstance(self.buf, bytes):
self.buf = io.BytesIO(self.buf)
self.size = len(buf)
self.position = 0
def readBytes(self, num_bytes):
'''
Reads a number of bytes from stream.
'''
self.position += num_bytes
bts = self.buf.read(num_bytes)
if len(bts) != num_bytes:
raise Exception("Not enough bytes in buffer: Got {}, but expected {}".format(len(bts), num_bytes))
return bts
def readDynamicBytes(self, size_reader = None):
'''
Reads a dynamically sized list of bytes.
'''
size_reader = size_reader or BinaryReader.readUInt32BigEndian
num_bytes = size_reader(self)
return self.readBytes(num_bytes)
def readString(self, size_reader = None):
'''
Reads a string from stream.
'''
return self.readDynamicBytes(size_reader).decode('utf8')
def readUInt8(self):
'''
Reads a unsigned 8-bit integer from stream.
'''
bytes = self.readBytes(1)
return bytes[0]
def readUInt32BigEndian(self):
'''
Reads a unsigned 32-bit integer from stream.
'''
return self.readUIntBigEndian(4)
def readUInt32LittleEndian(self):
'''
Reads a unsigned 32-bit integer from stream.
'''
return self.readUIntLittleEndian(4)
def readUIntBigEndian(self, num_bytes):
'''
Reads an unsigned N-bit integer from stream.
'''
bytes = self.readBytes(num_bytes)
c = 0
for i in range(0, num_bytes):
c += bytes[num_bytes - i - 1] * 2**(i * 8)
return c
def readSignedIntBigEndian(self, num_bytes):
'''
Reads an signed N-bit integer from stream.
'''
# TODO: Test!
result = self.readUIntBigEndian(num_bytes)
half = 2**(num_bytes * 8)
if result > half:
result -= half
return result
def readUIntLittleEndian(self, num_bytes):
'''
Reads an unsigned N-bit integer from stream.
'''
bytes = self.readBytes(num_bytes)
c = 0
for i in range(0, num_bytes):
c += bytes[i] * 2**(i * 8)
return c
def readSignedIntLittleEndian(self, num_bytes):
'''
Reads an signed N-bit integer from stream.
'''
# TODO: Test!
result = self.readUIntLittleEndian(num_bytes)
half = 2**(num_bytes * 8)
if result > half:
result -= half
return result
def readLeb128(self):
'''
Reads a LEB-128 integer from stream.
'''
# TODO: Test!
v = self.readUInt8()
count = 0
while v & 0x80 != 0:
v = self.readUInt8()
count += 1
assert count < 6, "TODO: " + str(v)
return v
def readList(self, fn, size_reader=None):
'''
Reads a list of elements from stream, by applying the given function
several times.
Function must take a single argument of type BinaryReader.
'''
size_reader = size_reader or BinaryReader.readUInt32BigEndian
length = size_reader(self)
ls = []
for i in range(0, length):
ls.append(fn(self))
return ls