From a3b80838703f0bb23b9f381f6f1cdaee3dd61846 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Mon, 25 Oct 2021 14:54:30 -0400 Subject: [PATCH 1/4] Tests to drive development --- test/test_parsable/test_parsable.py | 144 ++++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 test/test_parsable/test_parsable.py diff --git a/test/test_parsable/test_parsable.py b/test/test_parsable/test_parsable.py new file mode 100644 index 0000000..07ae304 --- /dev/null +++ b/test/test_parsable/test_parsable.py @@ -0,0 +1,144 @@ +from typing import Any, Dict, List, Optional, Tuple, Type +import pytest +from headerparser import ( + BodyField, + BodyNotAllowedError, + DuplicateFieldError, + ExtraFields, + Field, + MultiExtraFields, + MultiField, + UnknownFieldError, + decode_bool, + decode_value, + multidict, + parsable, + parse, +) + + +@parsable +class Simple: + simple: str + optional: Optional[str] = None + aliased: Optional[str] = Field(alias="alias", default=None) + multi: List[str] = MultiField() + extra: List[Tuple[str, str]] = ExtraFields() + body: Optional[str] = BodyField() + + +@parsable +class MultiExtra: + foo: int = Field(decoder=decode_value(int)) + bar: bool = Field(decoder=decode_bool) + extra: Dict[str, List[str]] = MultiExtraFields(decoder=multidict) + + +@parsable +class CrissCross: + one: str = Field(alias="two") + two: str = Field(alias="one") + + +@pytest.mark.parametrize( + "data,obj", + [ + ( + Simple, + "Simple: foobar\n", + Simple( + simple="foobar", + optional=None, + aliased=None, + multi=[], + extra=[], + body=None, + ), + ), + ( + Simple, + "Simple: foobar\nOptional: present\nAlias: unknown\nAliased: extra\n" + "Extra: overflow\nMulti: one\nMulti: two\nHyphen-Ated: Hyphen-Ated\n" + "\nThis is the body.\n", + Simple( + simple="foobar", + optional="present", + aliased="unknown", + multi=["one", "two"], + extra=[ + ("aliased", "extra"), + ("extra", "overflow"), + ("hyphen-ated", "Hyphen-Ated"), + ], + body="This is the body.\n", + ), + ), + ( + MultiExtra, + "Foo: 42\nBar: true\nExtra: one\nExtra: two\nOther: stuff\n", + MultiExtra( + foo=42, + bar=True, + extra={"extra": ["one", "two"], "other": ["stuff"]}, + ), + ), + (MultiExtra, "Bar: no\nFoo: 23\n", MultiExtra(foo=23, bar=False, extra={})), + ( + CrissCross, + "ONE: apple\nTWO: banana\n", + CrissCross(one="banana", two="apple"), + ), + ], +) +def test_parse(cls: type, data: str, obj: Any) -> None: + assert parse(cls, data) == obj + + +@pytest.mark.parametrize( + "data,exc_type,exc_str", + [ + ( + Simple, + "Simple: one\nSimple: two\n", + DuplicateFieldError, + "Header field 'simple' occurs more than once", + ), + ( + Simple, + "Simple: foobar\nExtra: one\nExtra: two\n", + DuplicateFieldError, + "Header field 'extra' occurs more than once", + ), + ( + Simple, + "", + TypeError, + "__init__() missing 1 required positional argument: 'simple'", + ), + ( + MultiExtra, + "Foo: forty-two\n", + ValueError, + "invalid literal for int() with base 10: 'forty-two'", + ), + (MultiExtra, "Bar: maybe\n", ValueError, "invalid boolean: 'maybe'"), + ( + MultiExtra, + "Some: field\n\nSome: body\n", + BodyNotAllowedError, + "Message body is present but not allowed", + ), + ( + CrissCross, + "Unknown: field\n", + UnknownFieldError, + "Unknown header field 'unknown'", + ), + ], +) +def test_parse_error( + cls: type, data: str, exc_type: Type[Exception], exc_str: str +) -> None: + with pytest.raises(exc_type) as excinfo: + parse(cls, data) + assert str(excinfo.value) == exc_str From 3f8c784fb596bdce3c5aace63ce48898bc38664d Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Mon, 25 Oct 2021 19:26:14 -0400 Subject: [PATCH 2/4] Most of an implementation --- setup.cfg | 1 + src/headerparser/__init__.py | 25 +- src/headerparser/errors.py | 7 + src/headerparser/mypy.py | 19 ++ src/headerparser/parscls.py | 373 ++++++++++++++++++++++++++++ src/headerparser/types.py | 28 ++- test/test_parsable/test_parsable.py | 31 ++- tox.ini | 5 +- 8 files changed, 469 insertions(+), 20 deletions(-) create mode 100644 src/headerparser/mypy.py create mode 100644 src/headerparser/parscls.py diff --git a/setup.cfg b/setup.cfg index 7b41c04..907a800 100644 --- a/setup.cfg +++ b/setup.cfg @@ -72,3 +72,4 @@ strict_equality = True warn_redundant_casts = True warn_return_any = True warn_unreachable = True +plugins = headerparser.mypy diff --git a/src/headerparser/__init__.py b/src/headerparser/__init__.py index 6ed0b6a..0107efe 100644 --- a/src/headerparser/__init__.py +++ b/src/headerparser/__init__.py @@ -15,6 +15,7 @@ from .errors import ( BodyNotAllowedError, + DuplicateBodyError, DuplicateFieldError, Error, FieldTypeError, @@ -29,6 +30,15 @@ UnknownFieldError, ) from .normdict import NormalizedDict +from .parscls import ( + BodyField, + ExtraFields, + Field, + MultiExtraFields, + MultiField, + parsable, + parse, +) from .parser import HeaderParser from .scanner import ( Scanner, @@ -39,7 +49,7 @@ scan_stanzas_string, scan_string, ) -from .types import BOOL, lower, unfold +from .types import BOOL, decode_bool, decode_value, lower, multidict, unfold __version__ = "0.5.1" __author__ = "John Thorvald Wodder II" @@ -49,15 +59,21 @@ __all__ = [ "BOOL", + "BodyField", "BodyNotAllowedError", + "DuplicateBodyError", "DuplicateFieldError", "Error", - "HeaderParser", + "ExtraFields", + "Field", "FieldTypeError", + "HeaderParser", "InvalidChoiceError", "MalformedHeaderError", "MissingBodyError", "MissingFieldError", + "MultiExtraFields", + "MultiField", "NormalizedDict", "ParserError", "Scanner", @@ -65,7 +81,12 @@ "ScannerError", "UnexpectedFoldingError", "UnknownFieldError", + "decode_bool", + "decode_value", "lower", + "multidict", + "parsable", + "parse", "scan", "scan_next_stanza", "scan_next_stanza_string", diff --git a/src/headerparser/errors.py b/src/headerparser/errors.py index f594877..fca7cb3 100644 --- a/src/headerparser/errors.py +++ b/src/headerparser/errors.py @@ -54,6 +54,13 @@ def __str__(self) -> str: return f"Header field {self.name!r} occurs more than once" +class DuplicateBodyError(ParserError): + """Raised when a body field occurs two or more times in the input""" + + def __str__(self) -> str: + return "Body field occurs more than once" + + class FieldTypeError(ParserError): """Raised when a ``type`` callable raises an exception""" diff --git a/src/headerparser/mypy.py b/src/headerparser/mypy.py new file mode 100644 index 0000000..97a4f57 --- /dev/null +++ b/src/headerparser/mypy.py @@ -0,0 +1,19 @@ +from typing import Any, Type +from mypy.plugin import Plugin +from mypy.plugins.attrs import attr_attrib_makers, attr_class_makers + +attr_class_makers.add("headerparser.parscls.parsable") + +attr_attrib_makers.add("headerparser.parscls.Field") +attr_attrib_makers.add("headerparser.parscls.MultiField") +attr_attrib_makers.add("headerparser.parscls.ExtraFields") +attr_attrib_makers.add("headerparser.parscls.MultiExtraFields") +attr_attrib_makers.add("headerparser.parscls.BodyField") + + +class HeaderParserPlugin(Plugin): + pass + + +def plugin(_version: Any) -> Type[Plugin]: + return HeaderParserPlugin diff --git a/src/headerparser/parscls.py b/src/headerparser/parscls.py new file mode 100644 index 0000000..f8ef1f9 --- /dev/null +++ b/src/headerparser/parscls.py @@ -0,0 +1,373 @@ +from abc import ABC, abstractmethod +from enum import Enum +from typing import ( + Any, + Callable, + Dict, + Iterable, + List, + Mapping, + Optional, + Set, + Tuple, + Type, + TypeVar, + Union, +) +import attr +from .errors import ( + BodyNotAllowedError, + DuplicateBodyError, + DuplicateFieldError, + UnknownFieldError, +) +from .scanner import Scanner +from .types import decode_name + +CLS_ATTR_KEY = "__headerparser_spec__" +METADATA_KEY = "headerparser" + +T = TypeVar("T") + +FieldDecoder = Callable[[str, str], Any] +MultiFieldDecoder = Callable[[str, List[str]], Any] +ExtraFieldsDecoder = Callable[[List[Tuple[str, str]]], Any] +BodyDecoder = Callable[[str], Any] +NameDecoder = Callable[[str], str] + + +class InKey(Enum): + EXTRA = "extra" + BODY = "body" + + +@attr.define +class BaseFieldProcessor: + name: str + + @abstractmethod + def process(self, name: str, value: str) -> None: + ... + + @abstractmethod + def finalize(self, data: Dict[str, Any]) -> None: + ... + + +@attr.define +class FieldProcessor(BaseFieldProcessor): + name: str + in_key: str + decoder: Optional[FieldDecoder] + state: Optional[str] = None + + def process(self, _: str, value: str) -> None: + if self.state is None: + self.state = value + else: + raise DuplicateFieldError(self.in_key) + + def finalize(self, data: Dict[str, Any]) -> None: + if self.state is not None: + value: Any = self.state + if self.decoder is not None: + value = self.decoder(self.name, value) + data[self.name] = value + + +@attr.define +class MultiFieldProcessor(BaseFieldProcessor): + name: str + in_key: str + decoder: Optional[MultiFieldDecoder] + state: List[str] = attr.Factory(list) + + def process(self, _: str, value: str) -> None: + self.state.append(value) + + def finalize(self, data: Dict[str, Any]) -> None: + if self.state: + value: Any = self.state + if self.decoder is not None: + value = self.decoder(self.name, value) + data[self.name] = value + + +@attr.define +class ExtraFieldsProcessor(BaseFieldProcessor): + name: str + decoder: Optional[ExtraFieldsDecoder] + state: List[Tuple[str, str]] = attr.Factory(list) + seen: Set[str] = attr.Factory(set) + + def process(self, name: str, value: str) -> None: + if name in self.seen: + raise DuplicateFieldError(name) + self.state.append((name, value)) + self.seen.add(name) + + def finalize(self, data: Dict[str, Any]) -> None: + if self.state: + value: Any = self.state + if self.decoder is not None: + value = self.decoder(value) + data[self.name] = value + + +@attr.define +class MultiExtraFieldsProcessor(BaseFieldProcessor): + name: str + decoder: Optional[ExtraFieldsDecoder] + state: List[Tuple[str, str]] = attr.Factory(list) + + def process(self, name: str, value: str) -> None: + self.state.append((name, value)) + + def finalize(self, data: Dict[str, Any]) -> None: + if self.state: + value: Any = self.state + if self.decoder is not None: + value = self.decoder(value) + data[self.name] = value + + +@attr.define +class BodyProcessor(BaseFieldProcessor): + name: str + decoder: Optional[BodyDecoder] + state: Optional[str] = None + + def process(self, _: str, value: str) -> None: + if self.state is not None: + raise DuplicateBodyError() + self.state = value + + def finalize(self, data: Dict[str, Any]) -> None: + if self.state is not None: + value: Any = self.state + if self.decoder is not None: + value = self.decoder(value) + data[self.name] = value + + +@attr.define +class BaseFieldSpec(ABC): + name: str + + @property + @abstractmethod + def in_key(self) -> Union[str, InKey]: + ... + + @abstractmethod + def get_processor(self) -> BaseFieldProcessor: + ... + + +@attr.define +class FieldSpec(BaseFieldSpec): + alias: Optional[str] = None + decoder: Optional[FieldDecoder] = None + + @property + def in_key(self) -> str: + return self.alias if self.alias is not None else self.name + + def get_processor(self) -> BaseFieldProcessor: + return FieldProcessor(name=self.name, in_key=self.in_key, decoder=self.decoder) + + +@attr.define +class MultiFieldSpec(BaseFieldSpec): + alias: Optional[str] = None + decoder: Optional[MultiFieldDecoder] = None + + @property + def in_key(self) -> str: + return self.alias if self.alias is not None else self.name + + def get_processor(self) -> BaseFieldProcessor: + return MultiFieldProcessor( + name=self.name, in_key=self.in_key, decoder=self.decoder + ) + + +@attr.define +class ExtraFieldsSpec(BaseFieldSpec): + decoder: Optional[ExtraFieldsDecoder] = None + + @property + def in_key(self) -> InKey: + return InKey.EXTRA + + def get_processor(self) -> BaseFieldProcessor: + return ExtraFieldsProcessor(name=self.name, decoder=self.decoder) + + +class MultiExtraFieldsSpec(ExtraFieldsSpec): + def get_processor(self) -> BaseFieldProcessor: + return MultiExtraFieldsProcessor(name=self.name, decoder=self.decoder) + + +@attr.define +class BodySpec(BaseFieldSpec): + decoder: Optional[BodyDecoder] = None + + @property + def in_key(self) -> InKey: + return InKey.BODY + + def get_processor(self) -> BaseFieldProcessor: + return BodyProcessor(name=self.name, decoder=self.decoder) + + +def Field( + *, + alias: Optional[str] = None, + decoder: Optional[FieldDecoder] = None, + **kwargs: Any, +) -> attr.Attribute: + metadata = kwargs.get("metadata") + if metadata is None: + metadata = {} + metadata[METADATA_KEY] = { + "alias": alias, + "decoder": decoder, + "field_type": FieldSpec, + } + kwargs["metadata"] = metadata + return attr.field(**kwargs) + + +def MultiField( + *, + alias: Optional[str] = None, + decoder: Optional[MultiFieldDecoder] = None, + **kwargs: Any, +) -> attr.Attribute: + metadata = kwargs.get("metadata") + if metadata is None: + metadata = {} + metadata[METADATA_KEY] = { + "alias": alias, + "decoder": decoder, + "field_type": MultiFieldSpec, + } + kwargs["metadata"] = metadata + return attr.field(**kwargs) + + +def ExtraFields( + *, decoder: Optional[ExtraFieldsDecoder] = None, **kwargs: Any +) -> attr.Attribute: + metadata = kwargs.get("metadata") + if metadata is None: + metadata = {} + metadata[METADATA_KEY] = {"decoder": decoder, "field_type": ExtraFieldsSpec} + kwargs["metadata"] = metadata + return attr.field(**kwargs) + + +def MultiExtraFields( + *, decoder: Optional[ExtraFieldsDecoder] = None, **kwargs: Any +) -> attr.Attribute: + metadata = kwargs.get("metadata") + if metadata is None: + metadata = {} + metadata[METADATA_KEY] = {"decoder": decoder, "field_type": MultiExtraFieldsSpec} + kwargs["metadata"] = metadata + return attr.field(**kwargs) + + +def BodyField( + *, decoder: Optional[BodyDecoder] = None, **kwargs: Any +) -> attr.Attribute: + metadata = kwargs.get("metadata") + if metadata is None: + metadata = {} + metadata[METADATA_KEY] = {"decoder": decoder, "field_type": BodySpec} + kwargs["metadata"] = metadata + return attr.field(**kwargs) + + +def convert_name_decoder(decoder: Optional[NameDecoder]) -> NameDecoder: + return decoder if decoder is not None else decode_name + + +def convert_scanner_opts(opts: Optional[Mapping[str, Any]]) -> Dict[str, Any]: + return dict(opts) if opts is not None else {} + + +@attr.define +class ParsableSpec: + name_decoder: NameDecoder = attr.field(converter=convert_name_decoder) + scanner_options: Dict[str, Any] = attr.field(converter=convert_scanner_opts) + fields: Dict[Union[str, InKey], BaseFieldSpec] + + +def parsable( + cls: Type[T] = None, + *, + name_decoder: Optional[NameDecoder] = None, + scanner_options: Optional[Mapping[str, Any]] = None, + **kwargs: Any, +) -> Type[T]: + cls = attr.define(**kwargs)(cls) + fields: Dict[Union[str, InKey], BaseFieldSpec] = {} + for field in attr.fields(cls): + metadata = (field.metadata or {}).get(METADATA_KEY, {}) + assert isinstance(metadata, dict) + metadata = metadata.copy() + ftype = metadata.pop("field_type", FieldSpec) + assert issubclass(ftype, BaseFieldSpec) + spec = ftype(name=field.name, **metadata) + if spec.in_key in fields: + if isinstance(spec.in_key, str): + raise ValueError( + f"Multiple fields for header name {spec.in_key!r} registered" + ) + elif spec.in_key is InKey.EXTRA: + raise ValueError("Multiple extra fields registered") + elif spec.in_key is InKey.BODY: + raise ValueError("Multiple body fields registered") + else: + raise AssertionError( # pragma: no cover + f"Unhandled InKey {spec.in_key!r}" + ) + fields[spec.in_key] = spec + p = ParsableSpec( + name_decoder=name_decoder, + scanner_options=scanner_options, + fields=fields, + ) + setattr(cls, CLS_ATTR_KEY, p) + return cls + + +def parse(cls: Type[T], data: Union[str, Iterable[str]]) -> T: + p = getattr(cls, CLS_ATTR_KEY, None) + if not isinstance(p, ParsableSpec): + raise TypeError(f"{type(p).__name__} is not a parsable class") + sc = Scanner(data, **p.scanner_options) + processors = {k: v.get_processor() for k, v in p.fields.items()} + for (name, value) in sc.scan(): + if name is not None: + name = p.name_decoder(name) + try: + proc = processors[name] + except KeyError: + try: + proc = processors[InKey.EXTRA] + except KeyError: + raise UnknownFieldError(name) + else: + name = "" + try: + proc = processors[InKey.BODY] + except KeyError: + raise BodyNotAllowedError() + proc.process(name, value) + data: Dict[str, Any] = {} + for proc in processors.values(): + proc.finalize(data) + return cls(**data) diff --git a/src/headerparser/types.py b/src/headerparser/types.py index 64c4b28..0d34c00 100644 --- a/src/headerparser/types.py +++ b/src/headerparser/types.py @@ -1,5 +1,9 @@ import re -from typing import Any +from typing import Any, Callable, Dict, Iterable, List, Tuple, TypeVar + +T = TypeVar("T") +K = TypeVar("K") +V = TypeVar("V") TRUTHY = {"yes", "y", "on", "true", "1"} FALSEY = {"no", "n", "off", "false", "0"} @@ -55,3 +59,25 @@ def unfold(s: str) -> str: :rtype: string """ return re.sub(r"[ \t]*[\r\n][ \t\r\n]*", " ", s).strip(" ") + + +def decode_bool(_: str, value: str) -> bool: + return BOOL(value) + + +def decode_value(func: Callable[[str], T]) -> Callable[[str, str], T]: + def decoder(_: str, value: str) -> bool: + return func(value) + + return decoder + + +def multidict(values: Iterable[Tuple[K, V]]) -> Dict[K, List[V]]: + data: Dict[K, List[V]] = {} + for k, v in values: + data.setdefault(k, []).append(v) + return data + + +def decode_name(name: str) -> str: + return re.sub(r"\W", "_", name.lower()) diff --git a/test/test_parsable/test_parsable.py b/test/test_parsable/test_parsable.py index 07ae304..f6185fa 100644 --- a/test/test_parsable/test_parsable.py +++ b/test/test_parsable/test_parsable.py @@ -22,16 +22,16 @@ class Simple: simple: str optional: Optional[str] = None aliased: Optional[str] = Field(alias="alias", default=None) - multi: List[str] = MultiField() - extra: List[Tuple[str, str]] = ExtraFields() - body: Optional[str] = BodyField() + multi: List[str] = MultiField(factory=list) + extra: List[Tuple[str, str]] = ExtraFields(factory=list) + body: Optional[str] = BodyField(default=None) @parsable class MultiExtra: foo: int = Field(decoder=decode_value(int)) bar: bool = Field(decoder=decode_bool) - extra: Dict[str, List[str]] = MultiExtraFields(decoder=multidict) + extra: Dict[str, List[str]] = MultiExtraFields(decoder=multidict, factory=dict) @parsable @@ -41,7 +41,7 @@ class CrissCross: @pytest.mark.parametrize( - "data,obj", + "cls,data,obj", [ ( Simple, @@ -68,7 +68,7 @@ class CrissCross: extra=[ ("aliased", "extra"), ("extra", "overflow"), - ("hyphen-ated", "Hyphen-Ated"), + ("hyphen_ated", "Hyphen-Ated"), ], body="This is the body.\n", ), @@ -95,50 +95,49 @@ def test_parse(cls: type, data: str, obj: Any) -> None: @pytest.mark.parametrize( - "data,exc_type,exc_str", + "cls,data,exc_type,exc_match", [ ( Simple, "Simple: one\nSimple: two\n", DuplicateFieldError, - "Header field 'simple' occurs more than once", + "^Header field 'simple' occurs more than once$", ), ( Simple, "Simple: foobar\nExtra: one\nExtra: two\n", DuplicateFieldError, - "Header field 'extra' occurs more than once", + "^Header field 'extra' occurs more than once$", ), ( Simple, "", TypeError, - "__init__() missing 1 required positional argument: 'simple'", + "missing 1 required positional argument: 'simple'$", ), ( MultiExtra, "Foo: forty-two\n", ValueError, - "invalid literal for int() with base 10: 'forty-two'", + r"^invalid literal for int\(\) with base 10: 'forty-two'$", ), (MultiExtra, "Bar: maybe\n", ValueError, "invalid boolean: 'maybe'"), ( MultiExtra, "Some: field\n\nSome: body\n", BodyNotAllowedError, - "Message body is present but not allowed", + "^Message body is present but not allowed$", ), ( CrissCross, "Unknown: field\n", UnknownFieldError, - "Unknown header field 'unknown'", + "^Unknown header field 'unknown'$", ), ], ) def test_parse_error( - cls: type, data: str, exc_type: Type[Exception], exc_str: str + cls: type, data: str, exc_type: Type[Exception], exc_match: str ) -> None: - with pytest.raises(exc_type) as excinfo: + with pytest.raises(exc_type, match=exc_match): parse(cls, data) - assert str(excinfo.value) == exc_str diff --git a/tox.ini b/tox.ini index 75b0ff8..96f7b64 100644 --- a/tox.ini +++ b/tox.ini @@ -11,7 +11,10 @@ deps = pytest-mock commands = coverage erase - coverage run -m pytest {posargs} --doctest-modules --pyargs headerparser + # --doctest-ignore-import-errors is needed in order to not collect + # headerparser.mypy, which requires mypy to be installed, which is not an + # option on PyPy3.[67] + coverage run -m pytest {posargs} --doctest-modules --pyargs --doctest-ignore-import-errors headerparser coverage run -m pytest {posargs} test README.rst docs/index.rst coverage combine coverage report From bd8bd0e1746d34c8f0152ac541ec7a32a8e26187 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Mon, 25 Oct 2021 21:38:13 -0400 Subject: [PATCH 3/4] Get types to check --- src/headerparser/mypy.py | 8 ++--- src/headerparser/parscls.py | 58 +++++++++++++++++++++++++++---------- src/headerparser/types.py | 2 +- tox.ini | 4 +++ 4 files changed, 52 insertions(+), 20 deletions(-) diff --git a/src/headerparser/mypy.py b/src/headerparser/mypy.py index 97a4f57..cfecc78 100644 --- a/src/headerparser/mypy.py +++ b/src/headerparser/mypy.py @@ -1,8 +1,8 @@ -from typing import Any, Type +from typing import Type from mypy.plugin import Plugin -from mypy.plugins.attrs import attr_attrib_makers, attr_class_makers +from mypy.plugins.attrs import attr_attrib_makers, attr_define_makers -attr_class_makers.add("headerparser.parscls.parsable") +attr_define_makers.add("headerparser.parscls.parsable") attr_attrib_makers.add("headerparser.parscls.Field") attr_attrib_makers.add("headerparser.parscls.MultiField") @@ -15,5 +15,5 @@ class HeaderParserPlugin(Plugin): pass -def plugin(_version: Any) -> Type[Plugin]: +def plugin(_version: str) -> Type[Plugin]: return HeaderParserPlugin diff --git a/src/headerparser/parscls.py b/src/headerparser/parscls.py index f8ef1f9..1c414cf 100644 --- a/src/headerparser/parscls.py +++ b/src/headerparser/parscls.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod from enum import Enum +from functools import partial from typing import ( Any, Callable, @@ -13,6 +14,7 @@ Type, TypeVar, Union, + overload, ) import attr from .errors import ( @@ -28,6 +30,7 @@ METADATA_KEY = "headerparser" T = TypeVar("T") +TT = TypeVar("TT", bound=type) FieldDecoder = Callable[[str, str], Any] MultiFieldDecoder = Callable[[str, List[str]], Any] @@ -226,7 +229,7 @@ def Field( alias: Optional[str] = None, decoder: Optional[FieldDecoder] = None, **kwargs: Any, -) -> attr.Attribute: +) -> Any: metadata = kwargs.get("metadata") if metadata is None: metadata = {} @@ -244,7 +247,7 @@ def MultiField( alias: Optional[str] = None, decoder: Optional[MultiFieldDecoder] = None, **kwargs: Any, -) -> attr.Attribute: +) -> Any: metadata = kwargs.get("metadata") if metadata is None: metadata = {} @@ -257,9 +260,7 @@ def MultiField( return attr.field(**kwargs) -def ExtraFields( - *, decoder: Optional[ExtraFieldsDecoder] = None, **kwargs: Any -) -> attr.Attribute: +def ExtraFields(*, decoder: Optional[ExtraFieldsDecoder] = None, **kwargs: Any) -> Any: metadata = kwargs.get("metadata") if metadata is None: metadata = {} @@ -270,7 +271,7 @@ def ExtraFields( def MultiExtraFields( *, decoder: Optional[ExtraFieldsDecoder] = None, **kwargs: Any -) -> attr.Attribute: +) -> Any: metadata = kwargs.get("metadata") if metadata is None: metadata = {} @@ -279,9 +280,7 @@ def MultiExtraFields( return attr.field(**kwargs) -def BodyField( - *, decoder: Optional[BodyDecoder] = None, **kwargs: Any -) -> attr.Attribute: +def BodyField(*, decoder: Optional[BodyDecoder] = None, **kwargs: Any) -> Any: metadata = kwargs.get("metadata") if metadata is None: metadata = {} @@ -305,14 +304,43 @@ class ParsableSpec: fields: Dict[Union[str, InKey], BaseFieldSpec] +@overload +def parsable( + cls: None = None, + *, + name_decoder: Optional[NameDecoder] = None, + scanner_options: Optional[Mapping[str, Any]] = None, + **kwargs: Any, +) -> Callable[[TT], TT]: + ... + + +@overload def parsable( - cls: Type[T] = None, + cls: TT, *, name_decoder: Optional[NameDecoder] = None, scanner_options: Optional[Mapping[str, Any]] = None, **kwargs: Any, -) -> Type[T]: - cls = attr.define(**kwargs)(cls) +) -> TT: + ... + + +def parsable( + cls: Optional[TT] = None, + *, + name_decoder: Optional[NameDecoder] = None, + scanner_options: Optional[Mapping[str, Any]] = None, + **kwargs: Any, +) -> Union[TT, Callable[[TT], TT]]: + if cls is None: + return partial( # type: ignore[return-value] + parsable, + name_decoder=name_decoder, + scanner_options=scanner_options, + **kwargs, + ) + cls = attr.define(cls, **kwargs) fields: Dict[Union[str, InKey], BaseFieldSpec] = {} for field in attr.fields(cls): metadata = (field.metadata or {}).get(METADATA_KEY, {}) @@ -367,7 +395,7 @@ def parse(cls: Type[T], data: Union[str, Iterable[str]]) -> T: except KeyError: raise BodyNotAllowedError() proc.process(name, value) - data: Dict[str, Any] = {} + output: Dict[str, Any] = {} for proc in processors.values(): - proc.finalize(data) - return cls(**data) + proc.finalize(output) + return cls(**output) # type: ignore[call-arg] diff --git a/src/headerparser/types.py b/src/headerparser/types.py index 0d34c00..9406118 100644 --- a/src/headerparser/types.py +++ b/src/headerparser/types.py @@ -66,7 +66,7 @@ def decode_bool(_: str, value: str) -> bool: def decode_value(func: Callable[[str], T]) -> Callable[[str, str], T]: - def decoder(_: str, value: str) -> bool: + def decoder(_: str, value: str) -> T: return func(value) return decoder diff --git a/tox.ini b/tox.ini index 96f7b64..9630824 100644 --- a/tox.ini +++ b/tox.ini @@ -54,6 +54,10 @@ source = [coverage:report] precision = 2 show_missing = True +exclude_lines = + pragma: no cover + if TYPE_CHECKING: + \.\.\. [flake8] doctests = True From 2d7038a8e2df79dc8ebe603ce986a6e7f0acfe19 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Sat, 15 Oct 2022 19:40:03 -0400 Subject: [PATCH 4/4] Use more future annotations --- src/headerparser/mypy.py | 4 +- src/headerparser/parscls.py | 65 ++++++++++++----------------- src/headerparser/types.py | 8 ++-- test/test_parsable/test_parsable.py | 11 ++--- 4 files changed, 40 insertions(+), 48 deletions(-) diff --git a/src/headerparser/mypy.py b/src/headerparser/mypy.py index cfecc78..b9b44db 100644 --- a/src/headerparser/mypy.py +++ b/src/headerparser/mypy.py @@ -1,4 +1,4 @@ -from typing import Type +from __future__ import annotations from mypy.plugin import Plugin from mypy.plugins.attrs import attr_attrib_makers, attr_define_makers @@ -15,5 +15,5 @@ class HeaderParserPlugin(Plugin): pass -def plugin(_version: str) -> Type[Plugin]: +def plugin(_version: str) -> type[Plugin]: return HeaderParserPlugin diff --git a/src/headerparser/parscls.py b/src/headerparser/parscls.py index 1c414cf..a069fe8 100644 --- a/src/headerparser/parscls.py +++ b/src/headerparser/parscls.py @@ -1,21 +1,10 @@ +from __future__ import annotations from abc import ABC, abstractmethod +from collections.abc import Callable, Iterable, Mapping from enum import Enum from functools import partial -from typing import ( - Any, - Callable, - Dict, - Iterable, - List, - Mapping, - Optional, - Set, - Tuple, - Type, - TypeVar, - Union, - overload, -) +import typing as ty +from typing import Any, List, Optional, Tuple, TypeVar, overload import attr from .errors import ( BodyNotAllowedError, @@ -32,11 +21,11 @@ T = TypeVar("T") TT = TypeVar("TT", bound=type) -FieldDecoder = Callable[[str, str], Any] -MultiFieldDecoder = Callable[[str, List[str]], Any] -ExtraFieldsDecoder = Callable[[List[Tuple[str, str]]], Any] -BodyDecoder = Callable[[str], Any] -NameDecoder = Callable[[str], str] +FieldDecoder = ty.Callable[[str, str], Any] +MultiFieldDecoder = ty.Callable[[str, List[str]], Any] +ExtraFieldsDecoder = ty.Callable[[List[Tuple[str, str]]], Any] +BodyDecoder = ty.Callable[[str], Any] +NameDecoder = ty.Callable[[str], str] class InKey(Enum): @@ -53,7 +42,7 @@ def process(self, name: str, value: str) -> None: ... @abstractmethod - def finalize(self, data: Dict[str, Any]) -> None: + def finalize(self, data: dict[str, Any]) -> None: ... @@ -70,7 +59,7 @@ def process(self, _: str, value: str) -> None: else: raise DuplicateFieldError(self.in_key) - def finalize(self, data: Dict[str, Any]) -> None: + def finalize(self, data: dict[str, Any]) -> None: if self.state is not None: value: Any = self.state if self.decoder is not None: @@ -83,12 +72,12 @@ class MultiFieldProcessor(BaseFieldProcessor): name: str in_key: str decoder: Optional[MultiFieldDecoder] - state: List[str] = attr.Factory(list) + state: list[str] = attr.Factory(list) def process(self, _: str, value: str) -> None: self.state.append(value) - def finalize(self, data: Dict[str, Any]) -> None: + def finalize(self, data: dict[str, Any]) -> None: if self.state: value: Any = self.state if self.decoder is not None: @@ -100,8 +89,8 @@ def finalize(self, data: Dict[str, Any]) -> None: class ExtraFieldsProcessor(BaseFieldProcessor): name: str decoder: Optional[ExtraFieldsDecoder] - state: List[Tuple[str, str]] = attr.Factory(list) - seen: Set[str] = attr.Factory(set) + state: list[tuple[str, str]] = attr.Factory(list) + seen: set[str] = attr.Factory(set) def process(self, name: str, value: str) -> None: if name in self.seen: @@ -109,7 +98,7 @@ def process(self, name: str, value: str) -> None: self.state.append((name, value)) self.seen.add(name) - def finalize(self, data: Dict[str, Any]) -> None: + def finalize(self, data: dict[str, Any]) -> None: if self.state: value: Any = self.state if self.decoder is not None: @@ -121,12 +110,12 @@ def finalize(self, data: Dict[str, Any]) -> None: class MultiExtraFieldsProcessor(BaseFieldProcessor): name: str decoder: Optional[ExtraFieldsDecoder] - state: List[Tuple[str, str]] = attr.Factory(list) + state: list[tuple[str, str]] = attr.Factory(list) def process(self, name: str, value: str) -> None: self.state.append((name, value)) - def finalize(self, data: Dict[str, Any]) -> None: + def finalize(self, data: dict[str, Any]) -> None: if self.state: value: Any = self.state if self.decoder is not None: @@ -145,7 +134,7 @@ def process(self, _: str, value: str) -> None: raise DuplicateBodyError() self.state = value - def finalize(self, data: Dict[str, Any]) -> None: + def finalize(self, data: dict[str, Any]) -> None: if self.state is not None: value: Any = self.state if self.decoder is not None: @@ -159,7 +148,7 @@ class BaseFieldSpec(ABC): @property @abstractmethod - def in_key(self) -> Union[str, InKey]: + def in_key(self) -> str | InKey: ... @abstractmethod @@ -293,15 +282,15 @@ def convert_name_decoder(decoder: Optional[NameDecoder]) -> NameDecoder: return decoder if decoder is not None else decode_name -def convert_scanner_opts(opts: Optional[Mapping[str, Any]]) -> Dict[str, Any]: +def convert_scanner_opts(opts: Optional[Mapping[str, Any]]) -> dict[str, Any]: return dict(opts) if opts is not None else {} @attr.define class ParsableSpec: name_decoder: NameDecoder = attr.field(converter=convert_name_decoder) - scanner_options: Dict[str, Any] = attr.field(converter=convert_scanner_opts) - fields: Dict[Union[str, InKey], BaseFieldSpec] + scanner_options: dict[str, Any] = attr.field(converter=convert_scanner_opts) + fields: dict[str | InKey, BaseFieldSpec] @overload @@ -332,7 +321,7 @@ def parsable( name_decoder: Optional[NameDecoder] = None, scanner_options: Optional[Mapping[str, Any]] = None, **kwargs: Any, -) -> Union[TT, Callable[[TT], TT]]: +) -> TT | Callable[[TT], TT]: if cls is None: return partial( # type: ignore[return-value] parsable, @@ -341,7 +330,7 @@ def parsable( **kwargs, ) cls = attr.define(cls, **kwargs) - fields: Dict[Union[str, InKey], BaseFieldSpec] = {} + fields: dict[str | InKey, BaseFieldSpec] = {} for field in attr.fields(cls): metadata = (field.metadata or {}).get(METADATA_KEY, {}) assert isinstance(metadata, dict) @@ -372,7 +361,7 @@ def parsable( return cls -def parse(cls: Type[T], data: Union[str, Iterable[str]]) -> T: +def parse(cls: type[T], data: str | Iterable[str]) -> T: p = getattr(cls, CLS_ATTR_KEY, None) if not isinstance(p, ParsableSpec): raise TypeError(f"{type(p).__name__} is not a parsable class") @@ -395,7 +384,7 @@ def parse(cls: Type[T], data: Union[str, Iterable[str]]) -> T: except KeyError: raise BodyNotAllowedError() proc.process(name, value) - output: Dict[str, Any] = {} + output: dict[str, Any] = {} for proc in processors.values(): proc.finalize(output) return cls(**output) # type: ignore[call-arg] diff --git a/src/headerparser/types.py b/src/headerparser/types.py index 9406118..67c7451 100644 --- a/src/headerparser/types.py +++ b/src/headerparser/types.py @@ -1,5 +1,7 @@ +from __future__ import annotations +from collections.abc import Callable, Iterable import re -from typing import Any, Callable, Dict, Iterable, List, Tuple, TypeVar +from typing import Any, TypeVar T = TypeVar("T") K = TypeVar("K") @@ -72,8 +74,8 @@ def decoder(_: str, value: str) -> T: return decoder -def multidict(values: Iterable[Tuple[K, V]]) -> Dict[K, List[V]]: - data: Dict[K, List[V]] = {} +def multidict(values: Iterable[tuple[K, V]]) -> dict[K, list[V]]: + data: dict[K, list[V]] = {} for k, v in values: data.setdefault(k, []).append(v) return data diff --git a/test/test_parsable/test_parsable.py b/test/test_parsable/test_parsable.py index f6185fa..6585d5b 100644 --- a/test/test_parsable/test_parsable.py +++ b/test/test_parsable/test_parsable.py @@ -1,4 +1,5 @@ -from typing import Any, Dict, List, Optional, Tuple, Type +from __future__ import annotations +from typing import Any, Optional import pytest from headerparser import ( BodyField, @@ -22,8 +23,8 @@ class Simple: simple: str optional: Optional[str] = None aliased: Optional[str] = Field(alias="alias", default=None) - multi: List[str] = MultiField(factory=list) - extra: List[Tuple[str, str]] = ExtraFields(factory=list) + multi: list[str] = MultiField(factory=list) + extra: list[tuple[str, str]] = ExtraFields(factory=list) body: Optional[str] = BodyField(default=None) @@ -31,7 +32,7 @@ class Simple: class MultiExtra: foo: int = Field(decoder=decode_value(int)) bar: bool = Field(decoder=decode_bool) - extra: Dict[str, List[str]] = MultiExtraFields(decoder=multidict, factory=dict) + extra: dict[str, list[str]] = MultiExtraFields(decoder=multidict, factory=dict) @parsable @@ -137,7 +138,7 @@ def test_parse(cls: type, data: str, obj: Any) -> None: ], ) def test_parse_error( - cls: type, data: str, exc_type: Type[Exception], exc_match: str + cls: type, data: str, exc_type: type[Exception], exc_match: str ) -> None: with pytest.raises(exc_type, match=exc_match): parse(cls, data)