import io import struct from functools import wraps _OT_NONE = 0 _OT_TRUE = 1 _OT_FALSE = 2 _OT_INT = 3 _OT_REAL = 4 _OT_STR = 5 _OT_BYTES = 6 _OT_LIST = 7 _OT_MAP = 8 _OT_SET = 9 _OT_NINT_BYTE = 0 _OT_NINT_SHORT = 1 _OT_NINT_INT = 2 _OT_NINT_LONG = 3 _OT_PINT_BYTE = 4 _OT_PINT_SHORT = 5 _OT_PINT_INT = 6 _OT_PINT_LONG = 7 _OT_INT_STR = 8 _OT_NOT_LINK = 0 _OT_LINK = 1 class BinarySerializationDeserializationException(Exception): def __init__(self, *args, **kwargs): super(BinarySerializationDeserializationException, self).__init__(*args, **kwargs) class BinarySerializationException(BinarySerializationDeserializationException): def __init__(self, *args, **kwargs): super(BinarySerializationException, self).__init__(*args, **kwargs) class BinaryDeserializationException(BinarySerializationDeserializationException): def __init__(self, *args, **kwargs): super(BinaryDeserializationException, self).__init__(*args, **kwargs) def _boundary_check(bytes_count): def _boundary_check_decorator(fnc): @wraps(fnc) def _boundary_check_wrapper(bytedata, offset, maxlen, *args, **kwargs): if offset+bytes_count > maxlen: raise BinaryDeserializationException("Attempting to read past data, maxlen %s, read attempt for %s at %s" % (str(maxlen), str(bytes_count), str(offset))) return fnc(bytedata, offset, maxlen, *args, **kwargs) return _boundary_check_wrapper return _boundary_check_decorator def dump(py_primitive): writer = io.BytesIO() try: _dump(writer, py_primitive, _ValueCache()) return writer.getvalue() finally: writer.close() def _dump(w, py_value, cache): if py_value is None: w.write(_get_type_as_byte(_OT_NONE)) elif py_value is True: w.write(_get_type_as_byte(_OT_TRUE)) elif py_value is False: w.write(_get_type_as_byte(_OT_FALSE)) elif isinstance(py_value, int): _dump_int(w, py_value) elif isinstance(py_value, float): _dump_float(w, py_value) elif isinstance(py_value, str): _dump_str(w, py_value) elif isinstance(py_value, bytes): _dump_bytes(w, py_value) elif isinstance(py_value, dict): _dump_dict(w, py_value, cache) elif isinstance(py_value, list): _dump_list(w, py_value, cache) elif isinstance(py_value, set): _dump_set(w, py_value, cache) else: raise BinarySerializationException("Not supported type %s: %s" % (type(py_value), str(py_value))) def _dump_int(w, i): w.write(_get_type_as_byte(_OT_INT)) is_negative = i < 0 if is_negative: i = abs(i) bit_len = i.bit_length() if bit_len <= 8: if is_negative: w.write(_get_type_as_byte(_OT_NINT_BYTE)) else: w.write(_get_type_as_byte(_OT_PINT_BYTE)) w.write(struct.pack("!B", i)) elif bit_len <= 16: if is_negative: w.write(_get_type_as_byte(_OT_NINT_SHORT)) else: w.write(_get_type_as_byte(_OT_PINT_SHORT)) w.write(struct.pack("!H", i)) elif bit_len <= 32: if is_negative: w.write(_get_type_as_byte(_OT_NINT_INT)) else: w.write(_get_type_as_byte(_OT_PINT_INT)) w.write(struct.pack("!I", i)) elif bit_len <= 64: if is_negative: w.write(_get_type_as_byte(_OT_NINT_LONG)) else: w.write(_get_type_as_byte(_OT_PINT_LONG)) w.write(struct.pack("!Q", i)) else: w.write(_get_type_as_byte(_OT_INT_STR)) _dump_str(w, str(i) if not is_negative else ("-" + str(i))) def _dump_float(w, f): w.write(_get_type_as_byte(_OT_REAL)) w.write(struct.pack("!d", f)) def _dump_str(w, string): w.write(_get_type_as_byte(_OT_STR)) _dump_bytes(w, string.encode("utf-8")) def _dump_bytes(w, bytestring): w.write(_get_type_as_byte(_OT_BYTES)) w.write(struct.pack("!I", len(bytestring))) w.write(bytestring) def _dump_dict(w, d, cache): w.write(_get_type_as_byte(_OT_MAP)) cache_id, already_cached = cache.append_to_cache(d) if not already_cached: w.write(_get_type_as_byte(_OT_NOT_LINK)) else: w.write(_get_type_as_byte(_OT_LINK)) w.write(struct.pack("!I", cache_id)) return keys = d.keys() w.write(struct.pack("!I", len(keys))) for key in keys: _dump(w, key, cache) _dump(w, d[key], cache) def _dump_list(w, l, cache): w.write(_get_type_as_byte(_OT_LIST)) cache_id, already_cached = cache.append_to_cache(l) if not already_cached: w.write(_get_type_as_byte(_OT_NOT_LINK)) else: w.write(_get_type_as_byte(_OT_LINK)) w.write(struct.pack("!I", cache_id)) return w.write(struct.pack("!I", len(l))) for py_value in l: _dump(w, py_value, cache) def _dump_set(w, s, cache): w.write(_get_type_as_byte(_OT_SET)) cache_id, already_cached = cache.append_to_cache(s) if not already_cached: w.write(_get_type_as_byte(_OT_NOT_LINK)) else: w.write(_get_type_as_byte(_OT_LINK)) w.write(struct.pack("!I", cache_id)) return w.write(struct.pack("!I", len(s))) for py_value in s: _dump(w, py_value, cache) def _get_type_as_byte(t): return bytes((t,)) def load(bytedata): if len(bytedata) == 0: raise BinaryDeserializationException("empty datasource") return _load(bytedata, 0, len(bytedata), _ValueCache())[0] @_boundary_check(1) def _load(bytedata, offset, maxlen, cache): py_type = bytedata[offset] offset += 1 if py_type == _OT_NONE: return None, offset elif py_type == _OT_TRUE: return True, offset elif py_type == _OT_FALSE: return False, offset elif py_type == _OT_INT: return _load_int(bytedata, offset, maxlen) elif py_type == _OT_REAL: return _load_float(bytedata, offset, maxlen) elif py_type == _OT_STR: return _load_str(bytedata, offset, maxlen) elif py_type == _OT_BYTES: return _load_bytes(bytedata, offset, maxlen) elif py_type == _OT_MAP: return _load_dict(bytedata, offset, maxlen, cache) elif py_type == _OT_LIST: return _load_list(bytedata, offset, maxlen, cache) elif py_type == _OT_SET: return _load_set(bytedata, offset, maxlen, cache) else: raise BinaryDeserializationException("wrong type specifier %s" %str(py_type)) @_boundary_check(1) def _load_int(bytedata, offset, maxlen): subtype = bytedata[offset] offset += 1 if subtype == _OT_NINT_BYTE or subtype == _OT_PINT_BYTE: return _load_int_byte(bytedata, offset, maxlen, subtype == _OT_NINT_BYTE) elif subtype == _OT_NINT_SHORT or subtype == _OT_PINT_SHORT: return _load_int_short(bytedata, offset, maxlen, subtype == _OT_NINT_SHORT) elif subtype == _OT_NINT_INT or subtype == _OT_PINT_INT: return _load_int_int(bytedata, offset, maxlen, subtype == _OT_NINT_INT) elif subtype == _OT_NINT_LONG or subtype == _OT_PINT_LONG: return _load_int_long(bytedata, offset, maxlen, subtype == _OT_NINT_LONG) elif subtype == _OT_INT_STR: return _load(bytedata, offset, maxlen) else: raise BinaryDeserializationException("incorrect byte for int type %s" % (str(subtype))) @_boundary_check(1) def _load_int_byte(bytedata, offset, _maxlen, is_negative): return _load_int_value(bytedata, offset, "!B", 1, is_negative) @_boundary_check(2) def _load_int_short(bytedata, offset, _maxlen, is_negative): return _load_int_value(bytedata, offset, "!H", 2, is_negative) @_boundary_check(4) def _load_int_int(bytedata, offset, _maxlen, is_negative): return _load_int_value(bytedata, offset, "!I", 4, is_negative) @_boundary_check(8) def _load_int_long(bytedata, offset, _maxlen, is_negative): return _load_int_value(bytedata, offset, "!Q", 8, is_negative) def _load_int_value(bytedata, offset, decode_frm, move_offset, is_negative): value = struct.unpack_from(decode_frm, bytedata, offset)[0] offset += move_offset if is_negative: value = -value return value, offset @_boundary_check(8) def _load_float(bytedata, offset, _maxlen): return struct.unpack_from("!d", bytedata, offset)[0], offset+8 @_boundary_check(1) def _load_str(bytedata, offset, maxlen): subtype = bytedata[offset] offset += 1 if subtype != _OT_BYTES: raise BinaryDeserializationException("incorrect byte for bytes type %s" %(str(subtype))) b, offset = _load_bytes(bytedata, offset, maxlen) return str(b, "utf-8"), offset @_boundary_check(4) def _load_bytes(bytedata, offset, maxlen): length = struct.unpack_from("!I", bytedata, offset)[0] offset += 4 @_boundary_check(length) def dynamic_get(bytedata, offset, _maxlen): v = bytearray(length) for ix in range(length): v[ix] = bytedata[ix+offset] return bytes(v), offset+length return dynamic_get(bytedata, offset, maxlen) @_boundary_check(1) def _load_cached(bytedata, offset, maxlen, cache, loader): cached = bytedata[offset] offset += 1 if cached == _OT_LINK: return _get_cached(bytedata, offset, maxlen, cache) else: return loader(bytedata, offset, maxlen, cache) @_boundary_check(4) def _get_cached(bytedata, offset, _maxlen, cache): return cache.get_cached(struct.unpack_from("!I", bytedata, offset)[0]), offset + 4 def _load_dict(bytedata, offset, maxlen, cache): return _load_cached(bytedata, offset, maxlen, cache, _load_dict_actual) @_boundary_check(4) def _load_dict_actual(bytedata, offset, maxlen, cache): v = dict() cache.append_to_cache(v) length = struct.unpack_from("!I", bytedata, offset)[0] offset += 4 for _ix in range(length): key, offset = _load(bytedata, offset, maxlen, cache) value, offset = _load(bytedata, offset, maxlen, cache) v[key] = value return v, offset def _load_list(bytedata, offset, maxlen, cache): return _load_cached(bytedata, offset, maxlen, cache, _load_list_actual) @_boundary_check(4) def _load_list_actual(bytedata, offset, maxlen, cache): v = list() cache.append_to_cache(v) length = struct.unpack_from("!I", bytedata, offset)[0] offset += 4 for _ix in range(length): value, offset = _load(bytedata, offset, maxlen, cache) v.append(value) return v, offset def _load_set(bytedata, offset, maxlen, cache): return _load_cached(bytedata, offset, maxlen, cache, _load_set_actual) @_boundary_check(4) def _load_set_actual(bytedata, offset, maxlen, cache): v = set() cache.append_to_cache(v) length = struct.unpack_from("!I", bytedata, offset)[0] offset += 4 for _ix in range(length): value, offset = _load(bytedata, offset, maxlen, cache) v.add(value) return v, offset class _ValueCache(object): def __init__(self): self._objectmap = {} self._valmap = {} self._nextid = 0 def append_to_cache(self, obj): if id(obj) not in self._objectmap: new_id = self._nextid self._nextid += 1 self._objectmap[id(obj)] = new_id self._valmap[new_id] = obj return new_id, False return self._objectmap[id(obj)], True def get_cached(self, oid): if oid in self._valmap: return self._valmap[oid] raise BinaryDeserializationException("object not in cache!")