diff --git a/setup.cfg b/setup.cfg index 7b41c04..907a800 100644 --- a/setup.cfg +++ b/setup.cfg @@ -72,3 +72,4 @@ strict_equality = True warn_redundant_casts = True warn_return_any = True warn_unreachable = True +plugins = headerparser.mypy diff --git a/src/headerparser/__init__.py b/src/headerparser/__init__.py index 6ed0b6a..0107efe 100644 --- a/src/headerparser/__init__.py +++ b/src/headerparser/__init__.py @@ -15,6 +15,7 @@ from .errors import ( BodyNotAllowedError, + DuplicateBodyError, DuplicateFieldError, Error, FieldTypeError, @@ -29,6 +30,15 @@ UnknownFieldError, ) from .normdict import NormalizedDict +from .parscls import ( + BodyField, + ExtraFields, + Field, + MultiExtraFields, + MultiField, + parsable, + parse, +) from .parser import HeaderParser from .scanner import ( Scanner, @@ -39,7 +49,7 @@ scan_stanzas_string, scan_string, ) -from .types import BOOL, lower, unfold +from .types import BOOL, decode_bool, decode_value, lower, multidict, unfold __version__ = "0.5.1" __author__ = "John Thorvald Wodder II" @@ -49,15 +59,21 @@ __all__ = [ "BOOL", + "BodyField", "BodyNotAllowedError", + "DuplicateBodyError", "DuplicateFieldError", "Error", - "HeaderParser", + "ExtraFields", + "Field", "FieldTypeError", + "HeaderParser", "InvalidChoiceError", "MalformedHeaderError", "MissingBodyError", "MissingFieldError", + "MultiExtraFields", + "MultiField", "NormalizedDict", "ParserError", "Scanner", @@ -65,7 +81,12 @@ "ScannerError", "UnexpectedFoldingError", "UnknownFieldError", + "decode_bool", + "decode_value", "lower", + "multidict", + "parsable", + "parse", "scan", "scan_next_stanza", "scan_next_stanza_string", diff --git a/src/headerparser/errors.py b/src/headerparser/errors.py index f594877..fca7cb3 100644 --- a/src/headerparser/errors.py +++ b/src/headerparser/errors.py @@ -54,6 +54,13 @@ def __str__(self) -> str: return f"Header field {self.name!r} occurs more than once" +class DuplicateBodyError(ParserError): + """Raised when a body field occurs two or more times in the input""" + + def __str__(self) -> str: + return "Body field occurs more than once" + + class FieldTypeError(ParserError): """Raised when a ``type`` callable raises an exception""" diff --git a/src/headerparser/mypy.py b/src/headerparser/mypy.py new file mode 100644 index 0000000..b9b44db --- /dev/null +++ b/src/headerparser/mypy.py @@ -0,0 +1,19 @@ +from __future__ import annotations +from mypy.plugin import Plugin +from mypy.plugins.attrs import attr_attrib_makers, attr_define_makers + +attr_define_makers.add("headerparser.parscls.parsable") + +attr_attrib_makers.add("headerparser.parscls.Field") +attr_attrib_makers.add("headerparser.parscls.MultiField") +attr_attrib_makers.add("headerparser.parscls.ExtraFields") +attr_attrib_makers.add("headerparser.parscls.MultiExtraFields") +attr_attrib_makers.add("headerparser.parscls.BodyField") + + +class HeaderParserPlugin(Plugin): + pass + + +def plugin(_version: str) -> type[Plugin]: + return HeaderParserPlugin diff --git a/src/headerparser/parscls.py b/src/headerparser/parscls.py new file mode 100644 index 0000000..a069fe8 --- /dev/null +++ b/src/headerparser/parscls.py @@ -0,0 +1,390 @@ +from __future__ import annotations +from abc import ABC, abstractmethod +from collections.abc import Callable, Iterable, Mapping +from enum import Enum +from functools import partial +import typing as ty +from typing import Any, List, Optional, Tuple, TypeVar, overload +import attr +from .errors import ( + BodyNotAllowedError, + DuplicateBodyError, + DuplicateFieldError, + UnknownFieldError, +) +from .scanner import Scanner +from .types import decode_name + +CLS_ATTR_KEY = "__headerparser_spec__" +METADATA_KEY = "headerparser" + +T = TypeVar("T") +TT = TypeVar("TT", bound=type) + +FieldDecoder = ty.Callable[[str, str], Any] +MultiFieldDecoder = ty.Callable[[str, List[str]], Any] +ExtraFieldsDecoder = ty.Callable[[List[Tuple[str, str]]], Any] +BodyDecoder = ty.Callable[[str], Any] +NameDecoder = ty.Callable[[str], str] + + +class InKey(Enum): + EXTRA = "extra" + BODY = "body" + + +@attr.define +class BaseFieldProcessor: + name: str + + @abstractmethod + def process(self, name: str, value: str) -> None: + ... + + @abstractmethod + def finalize(self, data: dict[str, Any]) -> None: + ... + + +@attr.define +class FieldProcessor(BaseFieldProcessor): + name: str + in_key: str + decoder: Optional[FieldDecoder] + state: Optional[str] = None + + def process(self, _: str, value: str) -> None: + if self.state is None: + self.state = value + else: + raise DuplicateFieldError(self.in_key) + + def finalize(self, data: dict[str, Any]) -> None: + if self.state is not None: + value: Any = self.state + if self.decoder is not None: + value = self.decoder(self.name, value) + data[self.name] = value + + +@attr.define +class MultiFieldProcessor(BaseFieldProcessor): + name: str + in_key: str + decoder: Optional[MultiFieldDecoder] + state: list[str] = attr.Factory(list) + + def process(self, _: str, value: str) -> None: + self.state.append(value) + + def finalize(self, data: dict[str, Any]) -> None: + if self.state: + value: Any = self.state + if self.decoder is not None: + value = self.decoder(self.name, value) + data[self.name] = value + + +@attr.define +class ExtraFieldsProcessor(BaseFieldProcessor): + name: str + decoder: Optional[ExtraFieldsDecoder] + state: list[tuple[str, str]] = attr.Factory(list) + seen: set[str] = attr.Factory(set) + + def process(self, name: str, value: str) -> None: + if name in self.seen: + raise DuplicateFieldError(name) + self.state.append((name, value)) + self.seen.add(name) + + def finalize(self, data: dict[str, Any]) -> None: + if self.state: + value: Any = self.state + if self.decoder is not None: + value = self.decoder(value) + data[self.name] = value + + +@attr.define +class MultiExtraFieldsProcessor(BaseFieldProcessor): + name: str + decoder: Optional[ExtraFieldsDecoder] + state: list[tuple[str, str]] = attr.Factory(list) + + def process(self, name: str, value: str) -> None: + self.state.append((name, value)) + + def finalize(self, data: dict[str, Any]) -> None: + if self.state: + value: Any = self.state + if self.decoder is not None: + value = self.decoder(value) + data[self.name] = value + + +@attr.define +class BodyProcessor(BaseFieldProcessor): + name: str + decoder: Optional[BodyDecoder] + state: Optional[str] = None + + def process(self, _: str, value: str) -> None: + if self.state is not None: + raise DuplicateBodyError() + self.state = value + + def finalize(self, data: dict[str, Any]) -> None: + if self.state is not None: + value: Any = self.state + if self.decoder is not None: + value = self.decoder(value) + data[self.name] = value + + +@attr.define +class BaseFieldSpec(ABC): + name: str + + @property + @abstractmethod + def in_key(self) -> str | InKey: + ... + + @abstractmethod + def get_processor(self) -> BaseFieldProcessor: + ... + + +@attr.define +class FieldSpec(BaseFieldSpec): + alias: Optional[str] = None + decoder: Optional[FieldDecoder] = None + + @property + def in_key(self) -> str: + return self.alias if self.alias is not None else self.name + + def get_processor(self) -> BaseFieldProcessor: + return FieldProcessor(name=self.name, in_key=self.in_key, decoder=self.decoder) + + +@attr.define +class MultiFieldSpec(BaseFieldSpec): + alias: Optional[str] = None + decoder: Optional[MultiFieldDecoder] = None + + @property + def in_key(self) -> str: + return self.alias if self.alias is not None else self.name + + def get_processor(self) -> BaseFieldProcessor: + return MultiFieldProcessor( + name=self.name, in_key=self.in_key, decoder=self.decoder + ) + + +@attr.define +class ExtraFieldsSpec(BaseFieldSpec): + decoder: Optional[ExtraFieldsDecoder] = None + + @property + def in_key(self) -> InKey: + return InKey.EXTRA + + def get_processor(self) -> BaseFieldProcessor: + return ExtraFieldsProcessor(name=self.name, decoder=self.decoder) + + +class MultiExtraFieldsSpec(ExtraFieldsSpec): + def get_processor(self) -> BaseFieldProcessor: + return MultiExtraFieldsProcessor(name=self.name, decoder=self.decoder) + + +@attr.define +class BodySpec(BaseFieldSpec): + decoder: Optional[BodyDecoder] = None + + @property + def in_key(self) -> InKey: + return InKey.BODY + + def get_processor(self) -> BaseFieldProcessor: + return BodyProcessor(name=self.name, decoder=self.decoder) + + +def Field( + *, + alias: Optional[str] = None, + decoder: Optional[FieldDecoder] = None, + **kwargs: Any, +) -> Any: + metadata = kwargs.get("metadata") + if metadata is None: + metadata = {} + metadata[METADATA_KEY] = { + "alias": alias, + "decoder": decoder, + "field_type": FieldSpec, + } + kwargs["metadata"] = metadata + return attr.field(**kwargs) + + +def MultiField( + *, + alias: Optional[str] = None, + decoder: Optional[MultiFieldDecoder] = None, + **kwargs: Any, +) -> Any: + metadata = kwargs.get("metadata") + if metadata is None: + metadata = {} + metadata[METADATA_KEY] = { + "alias": alias, + "decoder": decoder, + "field_type": MultiFieldSpec, + } + kwargs["metadata"] = metadata + return attr.field(**kwargs) + + +def ExtraFields(*, decoder: Optional[ExtraFieldsDecoder] = None, **kwargs: Any) -> Any: + metadata = kwargs.get("metadata") + if metadata is None: + metadata = {} + metadata[METADATA_KEY] = {"decoder": decoder, "field_type": ExtraFieldsSpec} + kwargs["metadata"] = metadata + return attr.field(**kwargs) + + +def MultiExtraFields( + *, decoder: Optional[ExtraFieldsDecoder] = None, **kwargs: Any +) -> Any: + metadata = kwargs.get("metadata") + if metadata is None: + metadata = {} + metadata[METADATA_KEY] = {"decoder": decoder, "field_type": MultiExtraFieldsSpec} + kwargs["metadata"] = metadata + return attr.field(**kwargs) + + +def BodyField(*, decoder: Optional[BodyDecoder] = None, **kwargs: Any) -> Any: + metadata = kwargs.get("metadata") + if metadata is None: + metadata = {} + metadata[METADATA_KEY] = {"decoder": decoder, "field_type": BodySpec} + kwargs["metadata"] = metadata + return attr.field(**kwargs) + + +def convert_name_decoder(decoder: Optional[NameDecoder]) -> NameDecoder: + return decoder if decoder is not None else decode_name + + +def convert_scanner_opts(opts: Optional[Mapping[str, Any]]) -> dict[str, Any]: + return dict(opts) if opts is not None else {} + + +@attr.define +class ParsableSpec: + name_decoder: NameDecoder = attr.field(converter=convert_name_decoder) + scanner_options: dict[str, Any] = attr.field(converter=convert_scanner_opts) + fields: dict[str | InKey, BaseFieldSpec] + + +@overload +def parsable( + cls: None = None, + *, + name_decoder: Optional[NameDecoder] = None, + scanner_options: Optional[Mapping[str, Any]] = None, + **kwargs: Any, +) -> Callable[[TT], TT]: + ... + + +@overload +def parsable( + cls: TT, + *, + name_decoder: Optional[NameDecoder] = None, + scanner_options: Optional[Mapping[str, Any]] = None, + **kwargs: Any, +) -> TT: + ... + + +def parsable( + cls: Optional[TT] = None, + *, + name_decoder: Optional[NameDecoder] = None, + scanner_options: Optional[Mapping[str, Any]] = None, + **kwargs: Any, +) -> TT | Callable[[TT], TT]: + if cls is None: + return partial( # type: ignore[return-value] + parsable, + name_decoder=name_decoder, + scanner_options=scanner_options, + **kwargs, + ) + cls = attr.define(cls, **kwargs) + fields: dict[str | InKey, BaseFieldSpec] = {} + for field in attr.fields(cls): + metadata = (field.metadata or {}).get(METADATA_KEY, {}) + assert isinstance(metadata, dict) + metadata = metadata.copy() + ftype = metadata.pop("field_type", FieldSpec) + assert issubclass(ftype, BaseFieldSpec) + spec = ftype(name=field.name, **metadata) + if spec.in_key in fields: + if isinstance(spec.in_key, str): + raise ValueError( + f"Multiple fields for header name {spec.in_key!r} registered" + ) + elif spec.in_key is InKey.EXTRA: + raise ValueError("Multiple extra fields registered") + elif spec.in_key is InKey.BODY: + raise ValueError("Multiple body fields registered") + else: + raise AssertionError( # pragma: no cover + f"Unhandled InKey {spec.in_key!r}" + ) + fields[spec.in_key] = spec + p = ParsableSpec( + name_decoder=name_decoder, + scanner_options=scanner_options, + fields=fields, + ) + setattr(cls, CLS_ATTR_KEY, p) + return cls + + +def parse(cls: type[T], data: str | Iterable[str]) -> T: + p = getattr(cls, CLS_ATTR_KEY, None) + if not isinstance(p, ParsableSpec): + raise TypeError(f"{type(p).__name__} is not a parsable class") + sc = Scanner(data, **p.scanner_options) + processors = {k: v.get_processor() for k, v in p.fields.items()} + for (name, value) in sc.scan(): + if name is not None: + name = p.name_decoder(name) + try: + proc = processors[name] + except KeyError: + try: + proc = processors[InKey.EXTRA] + except KeyError: + raise UnknownFieldError(name) + else: + name = "" + try: + proc = processors[InKey.BODY] + except KeyError: + raise BodyNotAllowedError() + proc.process(name, value) + output: dict[str, Any] = {} + for proc in processors.values(): + proc.finalize(output) + return cls(**output) # type: ignore[call-arg] diff --git a/src/headerparser/types.py b/src/headerparser/types.py index 64c4b28..67c7451 100644 --- a/src/headerparser/types.py +++ b/src/headerparser/types.py @@ -1,5 +1,11 @@ +from __future__ import annotations +from collections.abc import Callable, Iterable import re -from typing import Any +from typing import Any, TypeVar + +T = TypeVar("T") +K = TypeVar("K") +V = TypeVar("V") TRUTHY = {"yes", "y", "on", "true", "1"} FALSEY = {"no", "n", "off", "false", "0"} @@ -55,3 +61,25 @@ def unfold(s: str) -> str: :rtype: string """ return re.sub(r"[ \t]*[\r\n][ \t\r\n]*", " ", s).strip(" ") + + +def decode_bool(_: str, value: str) -> bool: + return BOOL(value) + + +def decode_value(func: Callable[[str], T]) -> Callable[[str, str], T]: + def decoder(_: str, value: str) -> T: + return func(value) + + return decoder + + +def multidict(values: Iterable[tuple[K, V]]) -> dict[K, list[V]]: + data: dict[K, list[V]] = {} + for k, v in values: + data.setdefault(k, []).append(v) + return data + + +def decode_name(name: str) -> str: + return re.sub(r"\W", "_", name.lower()) diff --git a/test/test_parsable/test_parsable.py b/test/test_parsable/test_parsable.py new file mode 100644 index 0000000..6585d5b --- /dev/null +++ b/test/test_parsable/test_parsable.py @@ -0,0 +1,144 @@ +from __future__ import annotations +from typing import Any, Optional +import pytest +from headerparser import ( + BodyField, + BodyNotAllowedError, + DuplicateFieldError, + ExtraFields, + Field, + MultiExtraFields, + MultiField, + UnknownFieldError, + decode_bool, + decode_value, + multidict, + parsable, + parse, +) + + +@parsable +class Simple: + simple: str + optional: Optional[str] = None + aliased: Optional[str] = Field(alias="alias", default=None) + multi: list[str] = MultiField(factory=list) + extra: list[tuple[str, str]] = ExtraFields(factory=list) + body: Optional[str] = BodyField(default=None) + + +@parsable +class MultiExtra: + foo: int = Field(decoder=decode_value(int)) + bar: bool = Field(decoder=decode_bool) + extra: dict[str, list[str]] = MultiExtraFields(decoder=multidict, factory=dict) + + +@parsable +class CrissCross: + one: str = Field(alias="two") + two: str = Field(alias="one") + + +@pytest.mark.parametrize( + "cls,data,obj", + [ + ( + Simple, + "Simple: foobar\n", + Simple( + simple="foobar", + optional=None, + aliased=None, + multi=[], + extra=[], + body=None, + ), + ), + ( + Simple, + "Simple: foobar\nOptional: present\nAlias: unknown\nAliased: extra\n" + "Extra: overflow\nMulti: one\nMulti: two\nHyphen-Ated: Hyphen-Ated\n" + "\nThis is the body.\n", + Simple( + simple="foobar", + optional="present", + aliased="unknown", + multi=["one", "two"], + extra=[ + ("aliased", "extra"), + ("extra", "overflow"), + ("hyphen_ated", "Hyphen-Ated"), + ], + body="This is the body.\n", + ), + ), + ( + MultiExtra, + "Foo: 42\nBar: true\nExtra: one\nExtra: two\nOther: stuff\n", + MultiExtra( + foo=42, + bar=True, + extra={"extra": ["one", "two"], "other": ["stuff"]}, + ), + ), + (MultiExtra, "Bar: no\nFoo: 23\n", MultiExtra(foo=23, bar=False, extra={})), + ( + CrissCross, + "ONE: apple\nTWO: banana\n", + CrissCross(one="banana", two="apple"), + ), + ], +) +def test_parse(cls: type, data: str, obj: Any) -> None: + assert parse(cls, data) == obj + + +@pytest.mark.parametrize( + "cls,data,exc_type,exc_match", + [ + ( + Simple, + "Simple: one\nSimple: two\n", + DuplicateFieldError, + "^Header field 'simple' occurs more than once$", + ), + ( + Simple, + "Simple: foobar\nExtra: one\nExtra: two\n", + DuplicateFieldError, + "^Header field 'extra' occurs more than once$", + ), + ( + Simple, + "", + TypeError, + "missing 1 required positional argument: 'simple'$", + ), + ( + MultiExtra, + "Foo: forty-two\n", + ValueError, + r"^invalid literal for int\(\) with base 10: 'forty-two'$", + ), + (MultiExtra, "Bar: maybe\n", ValueError, "invalid boolean: 'maybe'"), + ( + MultiExtra, + "Some: field\n\nSome: body\n", + BodyNotAllowedError, + "^Message body is present but not allowed$", + ), + ( + CrissCross, + "Unknown: field\n", + UnknownFieldError, + "^Unknown header field 'unknown'$", + ), + ], +) +def test_parse_error( + cls: type, data: str, exc_type: type[Exception], exc_match: str +) -> None: + with pytest.raises(exc_type, match=exc_match): + parse(cls, data) diff --git a/tox.ini b/tox.ini index 75b0ff8..9630824 100644 --- a/tox.ini +++ b/tox.ini @@ -11,7 +11,10 @@ deps = pytest-mock commands = coverage erase - coverage run -m pytest {posargs} --doctest-modules --pyargs headerparser + # --doctest-ignore-import-errors is needed in order to not collect + # headerparser.mypy, which requires mypy to be installed, which is not an + # option on PyPy3.[67] + coverage run -m pytest {posargs} --doctest-modules --pyargs --doctest-ignore-import-errors headerparser coverage run -m pytest {posargs} test README.rst docs/index.rst coverage combine coverage report @@ -51,6 +54,10 @@ source = [coverage:report] precision = 2 show_missing = True +exclude_lines = + pragma: no cover + if TYPE_CHECKING: + \.\.\. [flake8] doctests = True