From 0182fdec7c33f91a9f3c9fb4c465a34f8308faac Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sat, 28 Dec 2024 00:07:02 +0000 Subject: [PATCH 1/8] iterators: use `time.perf_counter` instead of `time.time` --- streamable/iterators.py | 26 +++++++++++++++----------- tests/test_readme.py | 8 ++++---- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/streamable/iterators.py b/streamable/iterators.py index 712d5f6..b6f1661 100644 --- a/streamable/iterators.py +++ b/streamable/iterators.py @@ -192,11 +192,13 @@ def __init__( self._current_group: List[T] = [] def _interval_seconds_have_elapsed(self) -> bool: - return (time.time() - self._last_group_yielded_at) >= self._interval_seconds + return ( + time.perf_counter() - self._last_group_yielded_at + ) >= self._interval_seconds def __next__(self) -> List[T]: if not self._last_group_yielded_at: - self._last_group_yielded_at = time.time() + self._last_group_yielded_at = time.perf_counter() if self._to_be_raised: e, self._to_be_raised = self._to_be_raised, None raise e @@ -211,7 +213,7 @@ def __next__(self) -> List[T]: self._to_be_raised = e group, self._current_group = self._current_group, [] - self._last_group_yielded_at = time.time() + self._last_group_yielded_at = time.perf_counter() return group @@ -229,7 +231,9 @@ def __init__( self._groups_by: DefaultDict[U, List[T]] = defaultdict(list) def _interval_seconds_have_elapsed(self) -> bool: - return (time.time() - self._last_group_yielded_at) >= self._interval_seconds + return ( + time.perf_counter() - self._last_group_yielded_at + ) >= self._interval_seconds def _group_next_elem(self) -> None: elem = next(self.iterator) @@ -255,12 +259,12 @@ def _pop_largest_group(self) -> Tuple[U, List[T]]: return largest_group_key, self._groups_by.pop(largest_group_key) def _return_group(self, group: Tuple[U, List[T]]) -> Tuple[U, List[T]]: - self._last_group_yielded_at = time.time() + self._last_group_yielded_at = time.perf_counter() return group def __next__(self) -> Tuple[U, List[T]]: if not self._last_group_yielded_at: - self._last_group_yielded_at = time.time() + self._last_group_yielded_at = time.perf_counter() if self._is_exhausted: if self._groups_by: return self._return_group(self._pop_first_group()) @@ -351,12 +355,12 @@ def __init__(self, iterator: Iterator[T], what: str, base: int = 2) -> None: self._n_yields = 0 self._n_errors = 0 self._logged_n_calls = 0 - self._start_time = time.time() + self._start_time = time.perf_counter() def _log(self) -> None: get_logger().info( "[%s %s] %s", - f"duration={datetime.datetime.fromtimestamp(time.time()) - datetime.datetime.fromtimestamp(self._start_time)}", + f"duration={datetime.datetime.fromtimestamp(time.perf_counter()) - datetime.datetime.fromtimestamp(self._start_time)}", f"errors={self._n_errors}", f"{self._n_yields} {self.what} yielded", ) @@ -407,10 +411,10 @@ def __init__(self, iterator: Iterator[T], interval: datetime.timedelta) -> None: def __next__(self) -> T: elem, catched_error = self.safe_next() if self._last_yield_at: - elapsed_time = time.time() - self._last_yield_at + elapsed_time = time.perf_counter() - self._last_yield_at if elapsed_time < self._interval_seconds: time.sleep(self._interval_seconds - elapsed_time) - self._last_yield_at = time.time() + self._last_yield_at = time.perf_counter() if catched_error: raise catched_error @@ -436,7 +440,7 @@ def __init__( def __next__(self) -> T: elem, catched_error = self.safe_next() - now = time.time() + now = time.perf_counter() if not self._offset: self._offset = now now -= self._offset diff --git a/tests/test_readme.py b/tests/test_readme.py index aa69fe8..4957b09 100644 --- a/tests/test_readme.py +++ b/tests/test_readme.py @@ -105,10 +105,10 @@ def test_throttle_example(self) -> None: integers_5_per_sec: Stream[int] = integers.throttle(per_second=3) - start = time.time() + start = time.perf_counter() # takes 3s: ceil(10 integers / 3 per_second) - 1 assert list(integers_5_per_sec) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] - assert 2.99 < time.time() - start < 3.25 + assert 2.99 < time.perf_counter() - start < 3.25 from datetime import timedelta @@ -117,10 +117,10 @@ def test_throttle_example(self) -> None: .throttle(interval=timedelta(milliseconds=100)) ) - start = time.time() + start = time.perf_counter() # takes 900 millis: (10 integers - 1) * 100 millis assert list(integers_every_100_millis) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] - assert 0.89 < time.time() - start < 0.95 + assert 0.89 < time.perf_counter() - start < 0.95 def test_group_example(self) -> None: global integers_by_parity From 0336d1dece84c7439f4ba3096cf5de0f6ecf236f Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sat, 28 Dec 2024 21:31:51 +0000 Subject: [PATCH 2/8] `.distinct`: do not support dedup on non-hashable values --- streamable/iterators.py | 28 ++++++---------------------- streamable/stream.py | 4 ++-- tests/test_stream.py | 16 ++++++---------- 3 files changed, 14 insertions(+), 34 deletions(-) diff --git a/streamable/iterators.py b/streamable/iterators.py index b6f1661..aac92f2 100644 --- a/streamable/iterators.py +++ b/streamable/iterators.py @@ -99,31 +99,15 @@ def __init__(self, iterator: Iterator[T], by: Optional[Callable[[T], Any]]) -> N validate_iterator(iterator) self.iterator = iterator self.by = noop_stopiteration(by) if by else None - self._already_seen_set: Set[Any] = set() - self._already_seen_list: List[Any] = list() - - def _value(self, elem): - return self.by(elem) if self.by else elem - - def _see(self, elem: Any): - value = self._value(elem) - try: - self._already_seen_set.add(value) - except TypeError: - self._already_seen_list.append(value) - - def _has_been_seen(self, elem: Any): - value = self._value(elem) - try: - return value in self._already_seen_set - except TypeError: - return value in self._already_seen_list + self._already_seen: Set[Any] = set() def __next__(self) -> T: - elem = next(self.iterator) - while self._has_been_seen(elem): + while True: elem = next(self.iterator) - self._see(elem) + value = self.by(elem) if self.by else elem + if value not in self._already_seen: + break + self._already_seen.add(value) return elem diff --git a/streamable/stream.py b/streamable/stream.py index 7373205..de51be0 100644 --- a/streamable/stream.py +++ b/streamable/stream.py @@ -167,8 +167,8 @@ def distinct( self, by: Optional[Callable[[T], Any]] = None, consecutive_only: bool = False ) -> "Stream": """ - Filters the stream to yield only distinct elements, `foo` and `bar` considered duplicates if `foo == bar`. - If `by` is specified, `foo` and `bar` are considered duplicates if `by(foo) == by(bar)`. + Filters the stream to yield only distinct elements, `foo` and `bar` considered duplicates if `hash(foo) == hash(bar)`. + If `by` is specified, `foo` and `bar` are considered duplicates if `hash(by(foo)) == hash(by(bar))`. Among duplicates, the first encountered occurence in upstream order is yielded. diff --git a/tests/test_stream.py b/tests/test_stream.py index 9b08300..d7ea9be 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -1253,16 +1253,12 @@ def test_distinct(self) -> None: [], msg="`distinct` should yield zero elements on empty stream", ) - self.assertEqual( - list(Stream([[1], [2], [1], [2]]).distinct()), - [[1], [2]], - msg="`distinct` should work with non-hashable elements", - ) - self.assertEqual( - list(Stream([[1], "foo", [2], [1], [2], "foo"]).distinct()), - [[1], "foo", [2]], - msg="`distinct` should work with a mix of hashable and non-hashable elements", - ) + with self.assertRaisesRegex( + TypeError, + "unhashable type: 'list'", + msg="`distinct` should raise for non-hashable elements", + ): + list(Stream([[1]]).distinct()) def test_catch(self) -> None: self.assertEqual( From 557dbfc2a1aead3373cbb4f6cff7753ecd0128fa Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sat, 28 Dec 2024 21:43:18 +0000 Subject: [PATCH 3/8] `.group[by]`: avoid calling `perf_counter` if `interval` is not set --- streamable/iterators.py | 54 ++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/streamable/iterators.py b/streamable/iterators.py index aac92f2..0054526 100644 --- a/streamable/iterators.py +++ b/streamable/iterators.py @@ -148,7 +148,7 @@ def __next__(self) -> U: self._current_iterator_elem = noop_stopiteration(iter)(iterable_elem) -class _GroupIteratorInitMixin(Generic[T]): +class GroupIteratorMixin(Generic[T]): def __init__( self, iterator: Iterator[T], @@ -160,12 +160,28 @@ def __init__( validate_group_interval(interval) self.iterator = iterator self.size = size or cast(int, float("inf")) + self.interval = interval self._interval_seconds = interval.total_seconds() if interval else float("inf") self._to_be_raised: Optional[Exception] = None self._last_group_yielded_at: float = 0 + def _interval_seconds_have_elapsed(self) -> bool: + if not self.interval: + return False + return ( + time.perf_counter() - self._last_group_yielded_at + ) >= self._interval_seconds + + def _remember_group_time(self) -> None: + if self.interval: + self._last_group_yielded_at = time.perf_counter() -class GroupIterator(_GroupIteratorInitMixin[T], Iterator[List[T]]): + def _init_last_group_time(self) -> None: + if self.interval and not self._last_group_yielded_at: + self._last_group_yielded_at = time.perf_counter() + + +class GroupIterator(GroupIteratorMixin[T], Iterator[List[T]]): def __init__( self, iterator: Iterator[T], @@ -175,14 +191,8 @@ def __init__( super().__init__(iterator, size, interval) self._current_group: List[T] = [] - def _interval_seconds_have_elapsed(self) -> bool: - return ( - time.perf_counter() - self._last_group_yielded_at - ) >= self._interval_seconds - def __next__(self) -> List[T]: - if not self._last_group_yielded_at: - self._last_group_yielded_at = time.perf_counter() + self._init_last_group_time() if self._to_be_raised: e, self._to_be_raised = self._to_be_raised, None raise e @@ -197,11 +207,11 @@ def __next__(self) -> List[T]: self._to_be_raised = e group, self._current_group = self._current_group, [] - self._last_group_yielded_at = time.perf_counter() + self._remember_group_time() return group -class GroupbyIterator(_GroupIteratorInitMixin[T], Iterator[Tuple[U, List[T]]]): +class GroupbyIterator(GroupIteratorMixin[T], Iterator[Tuple[U, List[T]]]): def __init__( self, iterator: Iterator[T], @@ -214,11 +224,6 @@ def __init__( self._is_exhausted = False self._groups_by: DefaultDict[U, List[T]] = defaultdict(list) - def _interval_seconds_have_elapsed(self) -> bool: - return ( - time.perf_counter() - self._last_group_yielded_at - ) >= self._interval_seconds - def _group_next_elem(self) -> None: elem = next(self.iterator) self._groups_by[self.by(elem)].append(elem) @@ -242,21 +247,17 @@ def _pop_largest_group(self) -> Tuple[U, List[T]]: return largest_group_key, self._groups_by.pop(largest_group_key) - def _return_group(self, group: Tuple[U, List[T]]) -> Tuple[U, List[T]]: - self._last_group_yielded_at = time.perf_counter() - return group - def __next__(self) -> Tuple[U, List[T]]: - if not self._last_group_yielded_at: - self._last_group_yielded_at = time.perf_counter() + self._init_last_group_time() if self._is_exhausted: if self._groups_by: - return self._return_group(self._pop_first_group()) + return self._pop_first_group() raise StopIteration if self._to_be_raised: if self._groups_by: - return self._return_group(self._pop_first_group()) + self._remember_group_time() + return self._pop_first_group() e, self._to_be_raised = self._to_be_raised, None raise e @@ -268,9 +269,8 @@ def __next__(self) -> Tuple[U, List[T]]: self._group_next_elem() full_group = self._pop_full_group() - if full_group: - return self._return_group(full_group) - return self._return_group(self._pop_largest_group()) + self._remember_group_time() + return full_group or self._pop_largest_group() except StopIteration: self._is_exhausted = True From ded0e22cb78495f25a364f0d3581751867979b65 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sat, 28 Dec 2024 21:43:58 +0000 Subject: [PATCH 4/8] make lint: log the coverage report --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index 397cdf7..f29b0d3 100644 --- a/Makefile +++ b/Makefile @@ -17,6 +17,7 @@ venv: test: $(VENV_DIR)/bin/python -m coverage run -m unittest --failfast + $(VENV_DIR)/bin/coverage report $(VENV_DIR)/bin/coverage html type-check: From 11d88b377bc678c6c469beb6c47184b8af2683b3 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sat, 28 Dec 2024 22:50:21 +0000 Subject: [PATCH 5/8] README: toggle --- README.md | 89 +++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 66 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index fe76f9c..3c6d1f8 100644 --- a/README.md +++ b/README.md @@ -82,15 +82,13 @@ Iterate over a `Stream[T]` just as you would over any other `Iterable[T]`, eleme 1.0 ``` - - ---- - # 📒 ***Operations*** *A dozen expressive lazy operations and that’s it!* -# `.map` +## `.map` + +
expand doc > Applies a transformation on elements: @@ -179,7 +177,11 @@ zeros: Stream[int] = ( assert list(zeros) == [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] ``` -# `.foreach` +
+ +## `.foreach` + +
expand doc > Applies a side effect on elements: @@ -203,8 +205,11 @@ assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] ### async-based concurrency > Like `.map` it has a sibling `.aforeach` operation for async. +
-# `.filter` +## `.filter` + +
expand doc > Keeps only the elements that satisfy a condition: @@ -213,8 +218,11 @@ even_integers: Stream[int] = integers.filter(lambda n: n % 2 == 0) assert list(even_integers) == [0, 2, 4, 6, 8] ``` +
+ +## `.throttle` -# `.throttle` +
expand doc > Limits the number of yields `per_second`/`per_minute`/`per_hour`: @@ -238,8 +246,11 @@ integers_every_100_millis = ( # takes 900 millis: (10 integers - 1) * 100 millis assert list(integers_every_100_millis) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] ``` +
+ +## `.group` -# `.group` +
expand doc > Groups elements into `List`s: @@ -274,8 +285,11 @@ integers_by_parity_by_2: Stream[List[int]] = ( assert list(integers_by_parity_by_2) == [[0, 2], [1, 3], [4, 6], [5, 7], [8], [9]] ``` +
-## `.groupby` +### `.groupby` + +
expand doc > Like `.group`, but groups into `(key, elements)` tuples: ```python @@ -300,8 +314,11 @@ counts_by_parity: Stream[Tuple[str, int]] = ( assert list(counts_by_parity) == [("even", 5), ("odd", 5)] ``` +
+ +## `.flatten` -# `.flatten` +
expand doc > Ungroups elements assuming that they are `Iterable`s: @@ -323,8 +340,11 @@ mixed_ones_and_zeros: Stream[int] = ( assert list(mixed_ones_and_zeros) == [0, 1, 0, 1, 0, 1, 0, 1] ``` +
-# `.catch` +## `.catch` + +
expand doc > Catches a given type of exceptions, and optionally yields a `replacement` value: @@ -354,8 +374,11 @@ assert list(status_codes_ignoring_resolution_errors) == [200, 404] ``` > It has an optional `finally_raise: bool` parameter to raise the first catched exception when iteration ends. +
+ +## `.truncate` -# `.truncate` +
expand doc > Ends iteration once a given number of elements have been yielded: @@ -372,8 +395,11 @@ five_first_integers: Stream[int] = integers.truncate(when=lambda n: n == 5) assert list(five_first_integers) == [0, 1, 2, 3, 4] ``` +
+ +## `.skip` -# `.skip` +
expand doc > Skips the first specified number of elements: @@ -382,8 +408,11 @@ integers_after_five: Stream[int] = integers.skip(5) assert list(integers_after_five) == [5, 6, 7, 8, 9] ``` +
-# `.distinct` +## `.distinct` + +
expand doc > Removes duplicates: @@ -415,8 +444,11 @@ consecutively_distinct_chars: Stream[str] = ( assert list(consecutively_distinct_chars) == ["f", "o", "b", "a", "r", "f", "o"] ``` +
+ +## `.observe` -# `.observe` +
expand doc > Logs the progress of iterations: ```python @@ -432,16 +464,22 @@ INFO: [duration=0:00:04.003852 errors=0] 10 integers yielded > [!NOTE] > The amount of logs will never be overwhelming because they are produced logarithmically (base 2): the 11th log will be produced after 1,024 elements have been yielded, the 21th log after 1,048,576 elements, ... +
-# `+` +## `+` + +
expand doc > Concatenates streams: ```python assert list(integers + integers) == [0, 1, 2, 3 ,4, 5, 6, 7, 8, 9, 0, 1, 2, 3 ,4, 5, 6, 7, 8, 9] ``` +
+ +## `zip` -# `zip` +
expand doc > [!TIP] > Use the standard `zip` function: @@ -456,21 +494,26 @@ cubes: Stream[int] = ( assert list(cubes) == [0, 1, 8, 27, 64, 125, 216, 343, 512, 729] ``` +
-# Shorthands for consuming the stream - +## Shorthands for consuming the stream > [!NOTE] > Although consuming the stream is beyond the scope of this library, it provides two basic shorthands to trigger an iteration: -## `.count` +### `.count` + +
expand doc > Iterates over the stream until exhaustion and returns the number of elements yielded: ```python assert integers.count() == 10 ``` +
+ +### `()` -## `()` +
expand doc > *Calling* the stream iterates over it until exhaustion and returns it: ```python @@ -479,8 +522,8 @@ appending_integers: Stream[int] = integers.foreach(state.append) assert appending_integers() is appending_integers assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] ``` +
---- # 💡 Tips ## Extract-Transform-Load From 035996d28e85ee4fd7e1dbdee229768c3650b22c Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sat, 28 Dec 2024 22:34:22 +0000 Subject: [PATCH 6/8] introduce `WrappedError` to neutralize unexpected `StopIteration`s --- streamable/functions.py | 4 ++-- streamable/iterators.py | 22 ++++++++++++---------- streamable/util/exceptions.py | 2 -- streamable/util/functiontools.py | 28 +++++++++++++--------------- streamable/visitors/iterator.py | 4 ++-- tests/test_stream.py | 12 ++++++------ 6 files changed, 35 insertions(+), 37 deletions(-) delete mode 100644 streamable/util/exceptions.py diff --git a/streamable/functions.py b/streamable/functions.py index 3ee0a86..f9795ca 100644 --- a/streamable/functions.py +++ b/streamable/functions.py @@ -34,7 +34,7 @@ YieldsPerPeriodThrottleIterator, ) from streamable.util.constants import NO_REPLACEMENT -from streamable.util.functiontools import noop_stopiteration +from streamable.util.functiontools import wrap_error from streamable.util.validationtools import ( validate_concurrency, validate_count, @@ -130,7 +130,7 @@ def map( validate_iterator(iterator) validate_concurrency(concurrency) if concurrency == 1: - return builtins.map(noop_stopiteration(transformation), iterator) + return builtins.map(wrap_error(transformation, StopIteration), iterator) else: return OSConcurrentMapIterator( iterator, diff --git a/streamable/iterators.py b/streamable/iterators.py index 0054526..12d20f7 100644 --- a/streamable/iterators.py +++ b/streamable/iterators.py @@ -29,7 +29,7 @@ cast, ) -from streamable.util.functiontools import noop_stopiteration +from streamable.util.functiontools import wrap_error from streamable.util.loggertools import get_logger from streamable.util.validationtools import ( validate_buffersize, @@ -69,7 +69,7 @@ def __init__( validate_iterator(iterator) self.iterator = iterator self.kind = kind - self.when = noop_stopiteration(when) + self.when = wrap_error(when, StopIteration) self.replacement = replacement self.finally_raise = finally_raise self._to_be_finally_raised: Optional[Exception] = None @@ -98,7 +98,7 @@ class DistinctIterator(Iterator[T]): def __init__(self, iterator: Iterator[T], by: Optional[Callable[[T], Any]]) -> None: validate_iterator(iterator) self.iterator = iterator - self.by = noop_stopiteration(by) if by else None + self.by = wrap_error(by, StopIteration) if by else None self._already_seen: Set[Any] = set() def __next__(self) -> T: @@ -115,7 +115,7 @@ class ConsecutiveDistinctIterator(Iterator[T]): def __init__(self, iterator: Iterator[T], by: Optional[Callable[[T], Any]]) -> None: validate_iterator(iterator) self.iterator = iterator - self.by = noop_stopiteration(by) if by else None + self.by = wrap_error(by, StopIteration) if by else None self._has_yielded = False self._last_value: Any = None @@ -145,7 +145,9 @@ def __next__(self) -> U: return next(self._current_iterator_elem) except StopIteration: iterable_elem = next(self.iterator) - self._current_iterator_elem = noop_stopiteration(iter)(iterable_elem) + self._current_iterator_elem = wrap_error(iter, StopIteration)( + iterable_elem + ) class GroupIteratorMixin(Generic[T]): @@ -220,7 +222,7 @@ def __init__( interval: Optional[datetime.timedelta], ) -> None: super().__init__(iterator, size, interval) - self.by = noop_stopiteration(by) + self.by = wrap_error(by, StopIteration) self._is_exhausted = False self._groups_by: DefaultDict[U, List[T]] = defaultdict(list) @@ -317,7 +319,7 @@ class PredicateTruncateIterator(Iterator[T]): def __init__(self, iterator: Iterator[T], when: Callable[[T], Any]) -> None: validate_iterator(iterator) self.iterator = iterator - self.when = noop_stopiteration(when) + self.when = wrap_error(when, StopIteration) self._satisfied = False def __next__(self): @@ -531,7 +533,7 @@ def __init__( ) -> None: super().__init__(iterator, buffersize, ordered) validate_concurrency(concurrency) - self.transformation = noop_stopiteration(transformation) + self.transformation = wrap_error(transformation, StopIteration) self.concurrency = concurrency self.executor: Executor self.via = via @@ -603,7 +605,7 @@ def __init__( ordered: bool, ) -> None: super().__init__(iterator, buffersize, ordered) - self.transformation = noop_stopiteration(transformation) + self.transformation = wrap_error(transformation, StopIteration) async def _safe_transformation( self, elem: T @@ -700,7 +702,7 @@ def __iter__(self) -> Iterator[Union[T, _RaisingIterator.ExceptionContainer]]: except StopIteration: break try: - iterator = noop_stopiteration(iter)(iterable) + iterator = wrap_error(iter, StopIteration)(iterable) except Exception as e: yield _RaisingIterator.ExceptionContainer(e) continue diff --git a/streamable/util/exceptions.py b/streamable/util/exceptions.py deleted file mode 100644 index 82bd68b..0000000 --- a/streamable/util/exceptions.py +++ /dev/null @@ -1,2 +0,0 @@ -class NoopStopIteration(Exception): - pass diff --git a/streamable/util/functiontools.py b/streamable/util/functiontools.py index f8f2768..555cbb2 100644 --- a/streamable/util/functiontools.py +++ b/streamable/util/functiontools.py @@ -1,31 +1,29 @@ from typing import Any, Callable, Coroutine, Generic, Tuple, Type, TypeVar, overload -from streamable.util.exceptions import NoopStopIteration - T = TypeVar("T") R = TypeVar("R") -class _CatchAndRaiseAs(Generic[T, R]): - def __init__( - self, - func: Callable[[T], R], - catched_error: Type[Exception], - raised_error: Type[Exception], - ) -> None: +class WrappedError(Exception): + def __init__(self, error: Exception): + super().__init__(repr(error)) + self.error = error + + +class _ErrorWrappingDecorator(Generic[T, R]): + def __init__(self, func: Callable[[T], R], error_type: Type[Exception]) -> None: self.func = func - self.catched_error = catched_error - self.raised_error = raised_error + self.error_type = error_type def __call__(self, arg: T) -> R: try: return self.func(arg) - except self.catched_error as e: - raise self.raised_error(str(e)) from e + except self.error_type as e: + raise WrappedError(e) from e -def noop_stopiteration(func: Callable[[T], R]) -> Callable[[T], R]: - return _CatchAndRaiseAs(func, StopIteration, NoopStopIteration) +def wrap_error(func: Callable[[T], R], error_type: Type[Exception]) -> Callable[[T], R]: + return _ErrorWrappingDecorator(func, error_type) class _Sidify(Generic[T]): diff --git a/streamable/visitors/iterator.py b/streamable/visitors/iterator.py index a04a3ce..d6c8e62 100644 --- a/streamable/visitors/iterator.py +++ b/streamable/visitors/iterator.py @@ -18,7 +18,7 @@ ThrottleStream, TruncateStream, ) -from streamable.util.functiontools import async_sidify, noop_stopiteration, sidify +from streamable.util.functiontools import async_sidify, sidify, wrap_error from streamable.visitors import Visitor T = TypeVar("T") @@ -44,7 +44,7 @@ def visit_distinct_stream(self, stream: DistinctStream[T]) -> Iterator[T]: def visit_filter_stream(self, stream: FilterStream[T]) -> Iterator[T]: return filter( - noop_stopiteration(stream._when), + wrap_error(stream._when, StopIteration), cast(Iterable[T], stream.upstream.accept(self)), ) diff --git a/tests/test_stream.py b/tests/test_stream.py index d7ea9be..ec34adc 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -23,8 +23,7 @@ from parameterized import parameterized # type: ignore from streamable import Stream -from streamable.util.exceptions import NoopStopIteration -from streamable.util.functiontools import star +from streamable.util.functiontools import WrappedError, star T = TypeVar("T") R = TypeVar("R") @@ -541,7 +540,7 @@ def side_effect(x: int, func: Callable[[int], int]): ] for raised_exc, catched_exc in [ (TestError, TestError), - (StopIteration, (NoopStopIteration, RuntimeError)), + (StopIteration, (WrappedError, RuntimeError)), ] for concurrency in [1, 2] for method, throw_func_, throw_for_odd_func_ in [ @@ -673,7 +672,7 @@ def test_flatten_typing(self) -> None: [exception_type, mapped_exception_type, concurrency] for exception_type, mapped_exception_type in [ (TestError, TestError), - (StopIteration, NoopStopIteration), + (StopIteration, WrappedError), ] for concurrency in [1, 2] ] @@ -1086,8 +1085,9 @@ def f(i): [[0], [1]], msg="`group` should yield incomplete groups when `by` raises", ) - with self.assertRaises( - NoopStopIteration, + with self.assertRaisesRegex( + WrappedError, + "StopIteration()", msg="`group` should raise and skip `elem` if `by(elem)` raises", ): next(stream_iter) From 1facb3bdc51efad9de0c4b34afcd734971870429 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 29 Dec 2024 16:59:17 +0000 Subject: [PATCH 7/8] `ConsecutiveDistinctIterator`: refactor --- streamable/iterators.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/streamable/iterators.py b/streamable/iterators.py index 12d20f7..92c2dd2 100644 --- a/streamable/iterators.py +++ b/streamable/iterators.py @@ -119,15 +119,12 @@ def __init__(self, iterator: Iterator[T], by: Optional[Callable[[T], Any]]) -> N self._has_yielded = False self._last_value: Any = None - def _next_elem_and_value(self) -> Tuple[T, Any]: - elem = next(self.iterator) - value = self.by(elem) if self.by else elem - return elem, value - def __next__(self) -> T: - elem, value = self._next_elem_and_value() - while self._has_yielded and value == self._last_value: - elem, value = self._next_elem_and_value() + while True: + elem = next(self.iterator) + value = self.by(elem) if self.by else elem + if not self._has_yielded or value != self._last_value: + break self._has_yielded = True self._last_value = value return elem From b1aea8bab841d132e4b6e2a0cab58aa697d48ac2 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 29 Dec 2024 01:00:35 +0000 Subject: [PATCH 8/8] 1.4.3: `.distinct` only hashable values --- version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.py b/version.py index 0540878..1017378 100644 --- a/version.py +++ b/version.py @@ -1,2 +1,2 @@ # to show the CHANGELOG: git log -- version.py -__version__ = "1.4.2" +__version__ = "1.4.3"