Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bytes Reader #77

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 93 additions & 84 deletions poetry.lock

Large diffs are not rendered by default.

15 changes: 11 additions & 4 deletions src/parsita/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
"parse_method",
]
import re
from typing import Any, Sequence
from typing import Any, Sequence, Union

from .state import Input, Output, Result, SequenceReader, StringReader
from .state import Input, Output, Reader, Result, SequenceReader, StringReader

# Global mutable state

Expand Down Expand Up @@ -43,10 +43,17 @@ def default_parse_method(self, source: str) -> Result[Output]:
return completely_parse_reader(self, reader)


def basic_parse(self, source: Sequence[Input]) -> Result[Output]:
def basic_parse(self, source: Union[Sequence[Input], Reader, bytes]) -> Result[Output]:
from .parsers import completely_parse_reader

reader = SequenceReader(source)
if isinstance(source, Reader):
reader = source
elif isinstance(source, bytes):
from parsita.state import BytesReader

reader = BytesReader(source)
else:
reader = SequenceReader(source)

return completely_parse_reader(self, reader)

Expand Down
2 changes: 1 addition & 1 deletion src/parsita/parsers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ._optional import OptionalParser, opt
from ._predicate import PredicateParser, pred
from ._regex import RegexParser, reg
from ._repeated import RepeatedOnceParser, RepeatedParser, rep, rep1
from ._repeated import RepeatedOnceParser, RepeatedParser, rep, rep1, rep_n
from ._repeated_seperated import RepeatedOnceSeparatedParser, RepeatedSeparatedParser, rep1sep, repsep
from ._sequential import DiscardLeftParser, DiscardRightParser, SequentialParser
from ._success import FailureParser, SuccessParser, failure, success
Expand Down
12 changes: 11 additions & 1 deletion src/parsita/parsers/_repeated.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__all__ = ["RepeatedOnceParser", "rep1", "RepeatedParser", "rep"]
__all__ = ["RepeatedOnceParser", "rep1", "RepeatedParser", "rep", "rep_n"]

from typing import Generic, List, Optional, Sequence, Union

Expand Down Expand Up @@ -103,3 +103,13 @@ def rep(
if isinstance(parser, str):
parser = lit(parser)
return RepeatedParser(parser, min=min, max=max)


def rep_n(parser: Union[Parser, Sequence[Input]], *, n: int):
"""
match a parser precisely n times (synonym for rep(parser, min=n, max=n))
Args:
parser: Parser or literal
n: Nonnegative integer defining the exact number of entries to be matched
"""
return rep(parser, min=n, max=n)
2 changes: 1 addition & 1 deletion src/parsita/state/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ._exceptions import ParseError, RecursionError
from ._reader import Reader, SequenceReader, StringReader
from ._reader import BytesReader, Reader, SequenceReader, StringReader
from ._result import Failure, Result, Success
from ._state import Continue, Input, Output, State
102 changes: 97 additions & 5 deletions src/parsita/state/_reader.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from __future__ import annotations

__all__ = ["Reader", "SequenceReader", "StringReader"]
__all__ = ["Reader", "SequenceReader", "StringReader", "BytesReader"]

import re
from dataclasses import dataclass
from io import StringIO
from typing import Generic, Sequence, TypeVar
from typing import Generic, Sequence, Tuple, TypeVar

Input = TypeVar("Input")

Expand Down Expand Up @@ -117,14 +117,106 @@ def first(self) -> Input:

@property
def rest(self) -> SequenceReader[Input]:
return SequenceReader(self.source, self.position + 1)
return self.__class__(self.source, self.position + 1)

@property
def finished(self) -> bool:
return self.position >= len(self.source)

def drop(self, count: int) -> SequenceReader[Input]:
return SequenceReader(self.source, self.position + count)
return self.__class__(self.source, self.position + count)


@dataclass(frozen=True)
class BytesReader(SequenceReader):
"""
A reader for bytes.
"""

@staticmethod
def get_printable_form_of_byte(byte: int) -> str:
if byte < 33 or byte > 126:
return f"{byte:02x}"
else:
return chr(byte)

def get_error_feedback_for_bytes(self) -> Tuple[int, str, str]:
# bytes are different. since there's no newlines, we'll report the number of bytes until the error position,
# then the three previous bytes, the error position, and then the next 10 bytes before we show the number of
# bytes remaining in the buffer.
#
# everything will be printed as spaced hex, because, you know, why not?

if not isinstance(self.source, bytes):
raise TypeError("get_error_feedback_for_bytes can only be called on a StringReader with a bytes source")

current_position = self.position

prior_bytes = self.source[:current_position]
prefix = ""
if len(prior_bytes) > 3:
prior_byte_count_before_3 = len(prior_bytes) - 3
prefix = f"{prior_byte_count_before_3} Bytes …"

immediately_prior_bytes = prior_bytes[-3:]
printable_immediately_prior_bytes = [self.get_printable_form_of_byte(b) for b in immediately_prior_bytes]
printable_immediately_prior_bytes_joined = " ".join(printable_immediately_prior_bytes)
prefix = f"{prefix} {printable_immediately_prior_bytes_joined}"

if len(self.source) == current_position:
printable_current_byte = "<EOF>"
else:
current_byte = self.source[current_position]
printable_current_byte = self.get_printable_form_of_byte(current_byte)

following_bytes = self.source[current_position + 1 :]
suffix = ""

if len(following_bytes) > 10:
following_byte_count_after_10 = len(following_bytes) - 10
suffix = f" … {following_byte_count_after_10} Bytes"

immediately_following_bytes = following_bytes[:10]
printable_immediately_following_bytes = [
self.get_printable_form_of_byte(b) for b in immediately_following_bytes
]
printable_immediately_following_bytes_joined = " ".join(printable_immediately_following_bytes)
suffix = f"{printable_immediately_following_bytes_joined}{suffix}"

error_line = f"{prefix} {printable_current_byte} {suffix}"

pointer_prefix = f'{" " * (len(prefix))}'
pointer = f"{'^' * len(printable_current_byte)}"
pointer_suffix = f'{" " * (len(suffix))}'
pointer_line = f"{pointer_prefix} {pointer} {pointer_suffix}"

return current_position + 1, error_line + "\n", pointer_line + "\n"

def expected_error(self, expected: Sequence[str]) -> str:
"""Generate a basic error to include the current state.

A parser can supply only a representation of what it is expecting to
this method and the reader will provide the context, including the index
to the error.

Args:
expected: A list of representations of what the parser is currently
expecting

Returns:
A full error message
"""
current_position, error_line, pointer_line = self.get_error_feedback_for_bytes()

expected_string = " or ".join(expected)

if self.finished:
message = f"Expected {expected_string} but found end of source"
else:
printable_next_token = self.get_printable_form_of_byte(self.next_token())
message = f"{expected_string} at position {current_position}, but found {printable_next_token}"

return f"{message}\n{error_line}{pointer_line}"


# Python lacks character type, so "str" will be used for both the sequence and the elements
Expand Down Expand Up @@ -169,7 +261,7 @@ def next_token(self) -> str:
else:
return self.source[match.start() : match.end()]

def current_line(self):
def current_line(self) -> Tuple[int, int, str, str]:
characters_consumed = 0
for line_index, line in enumerate(StringIO(self.source)):
if characters_consumed + len(line) > self.position:
Expand Down
Loading