simplify SizedBytes and StructStream (#11429)

* simplify SizedBytes and StructStream

* lint

* super()  !!!

* do not pass parameter up to super().__init__()

* Update chia/util/struct_stream.py

Co-authored-by: Arvid Norberg <arvid@libtorrent.org>

* parse fixed-width int data from class name

* add int512 and uint128 .from_bytes(), test .parse() failures

* test serialization against struct.pack()

* use typing_extensions for final

* override ignore

* stop using struct for StructStream

oh the irony

* fixup .to_bytes() to accept parameters again for where we use that

* bring back signed parameter

* format

* adjust tests for new exception

* eliminate custom coding for uint128 and int512

* tidy

* remove unused StructStream.PACK attribute

* add direct tests for parse_metadata_from_name()

* stricter hinting

* remove no-longer-needed typeshed work-around

* apply strict type checking to all touched files

* remove StructStream override of .to_bytes()

* tidy

* types touchup

* add unused parameter comments

Co-authored-by: Arvid Norberg <arvid@libtorrent.org>
Co-authored-by: wjblanke <wjb98672@gmail.com>
This commit is contained in:
Kyle Altendorf 2022-05-14 05:05:27 -04:00 committed by GitHub
parent c3272fa84c
commit 7c91f470f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 299 additions and 137 deletions

View File

@ -1,5 +1,6 @@
import io
from typing import BinaryIO, Type, TypeVar, TYPE_CHECKING
from typing import BinaryIO, Iterable, SupportsBytes, Type, TypeVar, Union
from typing_extensions import SupportsIndex
_T_SizedBytes = TypeVar("_T_SizedBytes", bound="SizedBytes")
@ -20,29 +21,28 @@ class SizedBytes(bytes):
_size = 0
@staticmethod
def __new__(cls: Type[_T_SizedBytes], v) -> _T_SizedBytes:
v = bytes(v)
if not isinstance(v, bytes) or len(v) != cls._size:
raise ValueError("bad %s initializer %s" % (cls.__name__, v))
return bytes.__new__(cls, v)
# This is just a partial exposure of the underlying int constructor. Liskov...
# https://github.com/python/typeshed/blob/f8547a3f3131de90aa47005358eb3394e79cfa13/stdlib/builtins.pyi#L483-L493
def __init__(self, v: Union[Iterable[SupportsIndex], SupportsBytes]) -> None:
# v is unused here and that is ok since .__new__() seems to have already
# processed the parameter when creating the instance of the class. We have no
# additional special action to take here beyond verifying that the newly
# created instance satisfies the length limitation of the particular subclass.
super().__init__()
if len(self) != self._size:
raise ValueError("bad %s initializer %s" % (type(self).__name__, v))
@classmethod
def parse(cls: Type[_T_SizedBytes], f: BinaryIO) -> _T_SizedBytes:
b = f.read(cls._size)
assert len(b) == cls._size
return cls(b)
def stream(self, f):
def stream(self, f: BinaryIO) -> None:
f.write(self)
@classmethod
def from_bytes(cls: Type[_T_SizedBytes], blob: bytes) -> _T_SizedBytes:
# pylint: disable=no-member
f = io.BytesIO(blob)
result = cls.parse(f)
assert f.read() == b""
return result
return cls(blob)
@classmethod
def from_hexstr(cls: Type[_T_SizedBytes], input_str: str) -> _T_SizedBytes:
@ -50,21 +50,8 @@ class SizedBytes(bytes):
return cls.fromhex(input_str[2:])
return cls.fromhex(input_str)
def __bytes__(self) -> bytes:
f = io.BytesIO()
self.stream(f)
return bytes(f.getvalue())
def __str__(self):
def __str__(self) -> str:
return self.hex()
def __repr__(self):
def __repr__(self) -> str:
return "<%s: %s>" % (self.__class__.__name__, str(self))
if TYPE_CHECKING:
# TODO: This stub implements a fix already merged into typeshed but not yet
# released in a new mypy version. Once released this should be removed.
# https://github.com/python/typeshed/pull/6201
@classmethod
def fromhex(cls: Type[_T_SizedBytes], __s: str) -> _T_SizedBytes:
...

View File

@ -1,79 +1,63 @@
from typing import Any, BinaryIO
from __future__ import annotations
from chia.util.struct_stream import StructStream
from chia.util.struct_stream import StructStream, parse_metadata_from_name
@parse_metadata_from_name
class int8(StructStream):
PACK = "!b"
pass
@parse_metadata_from_name
class uint8(StructStream):
PACK = "!B"
pass
@parse_metadata_from_name
class int16(StructStream):
PACK = "!h"
pass
@parse_metadata_from_name
class uint16(StructStream):
PACK = "!H"
pass
@parse_metadata_from_name
class int32(StructStream):
PACK = "!l"
pass
@parse_metadata_from_name
class uint32(StructStream):
PACK = "!L"
pass
@parse_metadata_from_name
class int64(StructStream):
PACK = "!q"
pass
@parse_metadata_from_name
class uint64(StructStream):
PACK = "!Q"
pass
class uint128(int):
def __new__(cls: Any, value: int):
value = int(value)
if value > (2 ** 128) - 1 or value < 0:
raise ValueError(f"Value {value} of does not fit into uint128")
return int.__new__(cls, value)
@classmethod
def parse(cls, f: BinaryIO) -> Any:
read_bytes = f.read(16)
assert len(read_bytes) == 16
n = int.from_bytes(read_bytes, "big", signed=False)
assert n <= (2 ** 128) - 1 and n >= 0
return cls(n)
def stream(self, f):
assert self <= (2 ** 128) - 1 and self >= 0
f.write(self.to_bytes(16, "big", signed=False))
@parse_metadata_from_name
class uint128(StructStream):
pass
class int512(int):
def __new__(cls: Any, value: int):
value = int(value)
# note that the boundaries for int512 is not what you might expect. We
# encode these with one extra byte, but only allow a range of
# [-INT512_MAX, INT512_MAX]
if value >= (2 ** 512) or value <= -(2 ** 512):
raise ValueError(f"Value {value} of does not fit into in512")
return int.__new__(cls, value)
class int512(StructStream):
PACK = None
# Uses 65 bytes to fit in the sign bit
@classmethod
def parse(cls, f: BinaryIO) -> Any:
read_bytes = f.read(65)
assert len(read_bytes) == 65
n = int.from_bytes(read_bytes, "big", signed=True)
assert n < (2 ** 512) and n > -(2 ** 512)
return cls(n)
SIZE = 65
BITS = 512
SIGNED = True
def stream(self, f):
assert self < (2 ** 512) and self > -(2 ** 512)
f.write(self.to_bytes(65, "big", signed=True))
# note that the boundaries for int512 is not what you might expect. We
# encode these with one extra byte, but only allow a range of
# [-INT512_MAX, INT512_MAX]
MAXIMUM_EXCLUSIVE = 2 ** BITS
MINIMUM = -(2 ** BITS) + 1

View File

@ -1,6 +1,4 @@
import io
import struct
from typing import Any, BinaryIO, SupportsInt, Type, TypeVar, Union
from typing import BinaryIO, SupportsInt, Type, TypeVar, Union
from typing_extensions import Protocol, SupportsIndex
@ -13,8 +11,45 @@ class SupportsTrunc(Protocol):
...
def parse_metadata_from_name(cls: Type[_T_StructStream]) -> Type[_T_StructStream]:
# TODO: turn this around to calculate the PACK from the size and signedness
name_signedness, _, name_bit_size = cls.__name__.partition("int")
cls.SIGNED = False if name_signedness == "u" else True
try:
cls.BITS = int(name_bit_size)
except ValueError as e:
raise ValueError(f"expected integer suffix but got: {name_bit_size!r}") from e
if cls.BITS <= 0:
raise ValueError(f"bit size must greater than zero but got: {cls.BITS}")
expected_name = f"{'' if cls.SIGNED else 'u'}int{cls.BITS}"
if cls.__name__ != expected_name:
raise ValueError(f"expected class name is {expected_name} but got: {cls.__name__}")
cls.SIZE, remainder = divmod(cls.BITS, 8)
if remainder != 0:
# There may be a good use case for removing this but until the details are
# thought through we should avoid such cases.
raise ValueError(f"cls.BITS must be a multiple of 8: {cls.BITS}")
if cls.SIGNED:
cls.MAXIMUM_EXCLUSIVE = 2 ** (cls.BITS - 1)
cls.MINIMUM = -(2 ** (cls.BITS - 1))
else:
cls.MAXIMUM_EXCLUSIVE = 2 ** cls.BITS
cls.MINIMUM = 0
return cls
class StructStream(int):
PACK = ""
SIZE = 0
BITS = 0
SIGNED = False
MAXIMUM_EXCLUSIVE = 0
MINIMUM = 0
"""
Create a class that can parse and stream itself based on a struct.pack template string.
@ -22,39 +57,28 @@ class StructStream(int):
# This is just a partial exposure of the underlying int constructor. Liskov...
# https://github.com/python/typeshed/blob/5d07ebc864577c04366fcc46b84479dbec033921/stdlib/builtins.pyi#L181-L185
def __new__(
cls: Type[_T_StructStream], value: Union[str, bytes, SupportsInt, SupportsIndex, SupportsTrunc]
) -> _T_StructStream:
value = int(value)
try:
v1 = struct.unpack(cls.PACK, struct.pack(cls.PACK, value))[0]
if value != v1:
raise ValueError(f"Value {value} does not fit into {cls.__name__}")
except Exception:
bits = struct.calcsize(cls.PACK) * 8
raise ValueError(
f"Value {value} of size {value.bit_length()} does not fit into " f"{cls.__name__} of size {bits}"
)
return int.__new__(cls, value)
def __init__(self, value: Union[str, bytes, SupportsInt, SupportsIndex, SupportsTrunc]) -> None:
# v is unused here and that is ok since .__new__() seems to have already
# processed the parameter when creating the instance of the class. We have no
# additional special action to take here beyond verifying that the newly
# created instance satisfies the bounds limitations of the particular subclass.
super().__init__()
if not (self.MINIMUM <= self < self.MAXIMUM_EXCLUSIVE):
raise ValueError(f"Value {self} does not fit into {type(self).__name__}")
@classmethod
def parse(cls: Any, f: BinaryIO) -> Any:
bytes_to_read = struct.calcsize(cls.PACK)
read_bytes = f.read(bytes_to_read)
assert read_bytes is not None and len(read_bytes) == bytes_to_read
return cls(*struct.unpack(cls.PACK, read_bytes))
def parse(cls: Type[_T_StructStream], f: BinaryIO) -> _T_StructStream:
read_bytes = f.read(cls.SIZE)
return cls.from_bytes(read_bytes)
def stream(self, f):
f.write(struct.pack(self.PACK, self))
def stream(self, f: BinaryIO) -> None:
f.write(bytes(self))
@classmethod
def from_bytes(cls: Any, blob: bytes) -> Any: # type: ignore
f = io.BytesIO(blob)
result = cls.parse(f)
assert f.read() == b""
return result
def from_bytes(cls: Type[_T_StructStream], blob: bytes) -> _T_StructStream: # type: ignore[override]
if len(blob) != cls.SIZE:
raise ValueError(f"{cls.__name__}.from_bytes() requires {cls.SIZE} bytes but got: {len(blob)}")
return cls(int.from_bytes(blob, "big", signed=cls.SIGNED))
def __bytes__(self: Any) -> bytes:
f = io.BytesIO()
self.stream(f)
return bytes(f.getvalue())
def __bytes__(self) -> bytes:
return super().to_bytes(length=self.SIZE, byteorder="big", signed=self.SIGNED)

File diff suppressed because one or more lines are too long

View File

@ -303,7 +303,7 @@ def test_ambiguous_deserialization_int() -> None:
a: uint32
# Does not have the required uint size
with pytest.raises(AssertionError):
with pytest.raises(ValueError):
TestClassUint.from_bytes(b"\x00\x00")
@ -314,7 +314,7 @@ def test_ambiguous_deserialization_list() -> None:
a: List[uint8]
# Does not have the required elements
with pytest.raises(AssertionError):
with pytest.raises(ValueError):
TestClassList.from_bytes(bytes([0, 0, 100, 24]))

View File

@ -1,12 +1,112 @@
from __future__ import annotations
from dataclasses import dataclass
from decimal import Decimal
import pytest
import struct
import io
from typing import Iterable, List, Optional, Type
import pytest
# TODO: update after resolution in https://github.com/pytest-dev/pytest/issues/7469
from _pytest.mark.structures import ParameterSet
# TODO: update after resolution in https://github.com/pytest-dev/pytest/issues/7469
from _pytest.fixtures import SubRequest
from typing_extensions import final
from chia.util.ints import int8, uint8, int16, uint16, int32, uint32, int64, uint64, uint128, int512
from chia.util.struct_stream import StructStream, parse_metadata_from_name
def dataclass_parameter(instance: object) -> ParameterSet:
return pytest.param(instance, id=repr(instance)[len(type(instance).__name__) + 1 : -1])
def dataclass_parameters(instances: Iterable[object]) -> List[ParameterSet]:
return [dataclass_parameter(instance) for instance in instances]
@dataclass(frozen=True)
class BadName:
name: str
error: str
@final
@dataclass(frozen=True)
class Good:
name: str
cls: Type[StructStream]
size: int
bits: int
signed: bool
maximum_exclusive: int
minimum: int
@classmethod
def create(
cls,
name: str,
size: int,
signed: bool,
maximum_exclusive: int,
minimum: int,
) -> Good:
raw_class: Type[StructStream] = type(name, (StructStream,), {})
parsed_cls = parse_metadata_from_name(raw_class)
return cls(
name=name,
cls=parsed_cls,
size=size,
bits=size * 8,
signed=signed,
maximum_exclusive=maximum_exclusive,
minimum=minimum,
)
good_classes = [
Good.create(name="uint8", size=1, signed=False, maximum_exclusive=0xFF + 1, minimum=0),
Good.create(name="int8", size=1, signed=True, maximum_exclusive=0x80, minimum=-0x80),
Good.create(name="uint16", size=2, signed=False, maximum_exclusive=0xFFFF + 1, minimum=0),
Good.create(name="int16", size=2, signed=True, maximum_exclusive=0x8000, minimum=-0x8000),
Good.create(name="uint24", size=3, signed=False, maximum_exclusive=0xFFFFFF + 1, minimum=0),
Good.create(name="int24", size=3, signed=True, maximum_exclusive=0x800000, minimum=-0x800000),
Good.create(
name="uint128",
size=16,
signed=False,
maximum_exclusive=0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF + 1,
minimum=0,
),
Good.create(
name="int128",
size=16,
signed=True,
maximum_exclusive=0x80000000000000000000000000000000,
minimum=-0x80000000000000000000000000000000,
),
]
@pytest.fixture(
name="good",
params=dataclass_parameters(good_classes),
)
def good_fixture(request: SubRequest) -> Good:
return request.param # type: ignore[no-any-return]
class TestStructStream:
def _test_impl(self, cls, upper_boundary, lower_boundary):
def _test_impl(
self,
cls: Type[StructStream],
upper_boundary: int,
lower_boundary: int,
length: int,
struct_format: Optional[str],
) -> None:
with pytest.raises(ValueError):
t = cls(upper_boundary + 1)
@ -23,44 +123,69 @@ class TestStructStream:
t = cls(0)
assert t == 0
def test_int512(self):
with pytest.raises(ValueError):
cls.parse(io.BytesIO(b"\0" * (length - 1)))
with pytest.raises(ValueError):
cls.from_bytes(b"\0" * (length - 1))
with pytest.raises(ValueError):
cls.from_bytes(b"\0" * (length + 1))
if struct_format is not None:
bytes_io = io.BytesIO()
cls(lower_boundary).stream(bytes_io)
assert bytes_io.getvalue() == struct.pack(struct_format, lower_boundary)
bytes_io = io.BytesIO()
cls(upper_boundary).stream(bytes_io)
assert bytes_io.getvalue() == struct.pack(struct_format, upper_boundary)
with pytest.raises(struct.error):
struct.pack(struct_format, lower_boundary - 1)
with pytest.raises(struct.error):
struct.pack(struct_format, upper_boundary + 1)
def test_int512(self) -> None:
# int512 is special. it uses 65 bytes to allow positive and negative
# "uint512"
self._test_impl(
int512,
0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF, # noqa: E501
-0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF, # noqa: E501
length=65,
struct_format=None,
)
def test_uint128(self):
self._test_impl(uint128, 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF, 0)
def test_uint128(self) -> None:
self._test_impl(uint128, 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF, 0, length=16, struct_format=None)
def test_uint64(self):
self._test_impl(uint64, 0xFFFFFFFFFFFFFFFF, 0)
def test_uint64(self) -> None:
self._test_impl(uint64, 0xFFFFFFFFFFFFFFFF, 0, length=8, struct_format="!Q")
def test_int64(self):
self._test_impl(int64, 0x7FFFFFFFFFFFFFFF, -0x8000000000000000)
def test_int64(self) -> None:
self._test_impl(int64, 0x7FFFFFFFFFFFFFFF, -0x8000000000000000, length=8, struct_format="!q")
def test_uint32(self):
self._test_impl(uint32, 0xFFFFFFFF, 0)
def test_uint32(self) -> None:
self._test_impl(uint32, 0xFFFFFFFF, 0, length=4, struct_format="!L")
def test_int32(self):
self._test_impl(int32, 0x7FFFFFFF, -0x80000000)
def test_int32(self) -> None:
self._test_impl(int32, 0x7FFFFFFF, -0x80000000, length=4, struct_format="!l")
def test_uint16(self):
self._test_impl(uint16, 0xFFFF, 0)
def test_uint16(self) -> None:
self._test_impl(uint16, 0xFFFF, 0, length=2, struct_format="!H")
def test_int16(self):
self._test_impl(int16, 0x7FFF, -0x8000)
def test_int16(self) -> None:
self._test_impl(int16, 0x7FFF, -0x8000, length=2, struct_format="!h")
def test_uint8(self):
self._test_impl(uint8, 0xFF, 0)
def test_uint8(self) -> None:
self._test_impl(uint8, 0xFF, 0, length=1, struct_format="!B")
def test_int8(self):
self._test_impl(int8, 0x7F, -0x80)
def test_int8(self) -> None:
self._test_impl(int8, 0x7F, -0x80, length=1, struct_format="!b")
def test_roundtrip(self):
def roundtrip(v):
def test_roundtrip(self) -> None:
def roundtrip(v: StructStream) -> None:
s = io.BytesIO()
v.stream(s)
s.seek(0)
@ -119,3 +244,45 @@ class TestStructStream:
def test_uint32_from_bytes(self) -> None:
assert uint32(b"273") == 273
def test_struct_stream_cannot_be_instantiated_directly(self) -> None:
with pytest.raises(ValueError, match="does not fit"):
StructStream(0)
@pytest.mark.parametrize(
argnames="bad_name",
argvalues=dataclass_parameters(
instances=[
BadName(name="uint", error="expected integer suffix but got: ''"),
BadName(name="blue", error="expected integer suffix but got"),
BadName(name="blue8", error="expected integer suffix but got: ''"),
BadName(name="sint8", error="expected class name"),
BadName(name="redint8", error="expected class name"),
BadName(name="int7", error="must be a multiple of 8"),
BadName(name="int9", error="must be a multiple of 8"),
BadName(name="int31", error="must be a multiple of 8"),
BadName(name="int0", error="bit size must greater than zero"),
# below could not happen in a hard coded class name, but testing for good measure
BadName(name="int-1", error="bit size must greater than zero"),
],
),
)
def test_parse_metadata_from_name_raises(self, bad_name: BadName) -> None:
cls = type(bad_name.name, (StructStream,), {})
with pytest.raises(ValueError, match=bad_name.error):
parse_metadata_from_name(cls)
def test_parse_metadata_from_name_correct_size(self, good: Good) -> None:
assert good.cls.SIZE == good.size
def test_parse_metadata_from_name_correct_bits(self, good: Good) -> None:
assert good.cls.BITS == good.bits
def test_parse_metadata_from_name_correct_signedness(self, good: Good) -> None:
assert good.cls.SIGNED == good.signed
def test_parse_metadata_from_name_correct_maximum(self, good: Good) -> None:
assert good.cls.MAXIMUM_EXCLUSIVE == good.maximum_exclusive
def test_parse_metadata_from_name_correct_minimum(self, good: Good) -> None:
assert good.cls.MINIMUM == good.minimum