diff --git a/Makefile b/Makefile
index 397cdf7..f29b0d3 100644
--- a/Makefile
+++ b/Makefile
@@ -17,6 +17,7 @@ venv:
test:
$(VENV_DIR)/bin/python -m coverage run -m unittest --failfast
+ $(VENV_DIR)/bin/coverage report
$(VENV_DIR)/bin/coverage html
type-check:
diff --git a/README.md b/README.md
index fe76f9c..3c6d1f8 100644
--- a/README.md
+++ b/README.md
@@ -82,15 +82,13 @@ Iterate over a `Stream[T]` just as you would over any other `Iterable[T]`, eleme
1.0
```
-
-
----
-
# 📒 ***Operations***
*A dozen expressive lazy operations and that’s it!*
-# `.map`
+## `.map`
+
+expand doc
> Applies a transformation on elements:
@@ -179,7 +177,11 @@ zeros: Stream[int] = (
assert list(zeros) == [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
```
-# `.foreach`
+
+
+## `.foreach`
+
+expand doc
> Applies a side effect on elements:
@@ -203,8 +205,11 @@ assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
### async-based concurrency
> Like `.map` it has a sibling `.aforeach` operation for async.
+
-# `.filter`
+## `.filter`
+
+expand doc
> Keeps only the elements that satisfy a condition:
@@ -213,8 +218,11 @@ even_integers: Stream[int] = integers.filter(lambda n: n % 2 == 0)
assert list(even_integers) == [0, 2, 4, 6, 8]
```
+
+
+## `.throttle`
-# `.throttle`
+expand doc
> Limits the number of yields `per_second`/`per_minute`/`per_hour`:
@@ -238,8 +246,11 @@ integers_every_100_millis = (
# takes 900 millis: (10 integers - 1) * 100 millis
assert list(integers_every_100_millis) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
```
+
+
+## `.group`
-# `.group`
+expand doc
> Groups elements into `List`s:
@@ -274,8 +285,11 @@ integers_by_parity_by_2: Stream[List[int]] = (
assert list(integers_by_parity_by_2) == [[0, 2], [1, 3], [4, 6], [5, 7], [8], [9]]
```
+
-## `.groupby`
+### `.groupby`
+
+expand doc
> Like `.group`, but groups into `(key, elements)` tuples:
```python
@@ -300,8 +314,11 @@ counts_by_parity: Stream[Tuple[str, int]] = (
assert list(counts_by_parity) == [("even", 5), ("odd", 5)]
```
+
+
+## `.flatten`
-# `.flatten`
+expand doc
> Ungroups elements assuming that they are `Iterable`s:
@@ -323,8 +340,11 @@ mixed_ones_and_zeros: Stream[int] = (
assert list(mixed_ones_and_zeros) == [0, 1, 0, 1, 0, 1, 0, 1]
```
+
-# `.catch`
+## `.catch`
+
+expand doc
> Catches a given type of exceptions, and optionally yields a `replacement` value:
@@ -354,8 +374,11 @@ assert list(status_codes_ignoring_resolution_errors) == [200, 404]
```
> It has an optional `finally_raise: bool` parameter to raise the first catched exception when iteration ends.
+
+
+## `.truncate`
-# `.truncate`
+expand doc
> Ends iteration once a given number of elements have been yielded:
@@ -372,8 +395,11 @@ five_first_integers: Stream[int] = integers.truncate(when=lambda n: n == 5)
assert list(five_first_integers) == [0, 1, 2, 3, 4]
```
+
+
+## `.skip`
-# `.skip`
+expand doc
> Skips the first specified number of elements:
@@ -382,8 +408,11 @@ integers_after_five: Stream[int] = integers.skip(5)
assert list(integers_after_five) == [5, 6, 7, 8, 9]
```
+
-# `.distinct`
+## `.distinct`
+
+expand doc
> Removes duplicates:
@@ -415,8 +444,11 @@ consecutively_distinct_chars: Stream[str] = (
assert list(consecutively_distinct_chars) == ["f", "o", "b", "a", "r", "f", "o"]
```
+
+
+## `.observe`
-# `.observe`
+expand doc
> Logs the progress of iterations:
```python
@@ -432,16 +464,22 @@ INFO: [duration=0:00:04.003852 errors=0] 10 integers yielded
> [!NOTE]
> The amount of logs will never be overwhelming because they are produced logarithmically (base 2): the 11th log will be produced after 1,024 elements have been yielded, the 21th log after 1,048,576 elements, ...
+
-# `+`
+## `+`
+
+expand doc
> Concatenates streams:
```python
assert list(integers + integers) == [0, 1, 2, 3 ,4, 5, 6, 7, 8, 9, 0, 1, 2, 3 ,4, 5, 6, 7, 8, 9]
```
+
+
+## `zip`
-# `zip`
+expand doc
> [!TIP]
> Use the standard `zip` function:
@@ -456,21 +494,26 @@ cubes: Stream[int] = (
assert list(cubes) == [0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
```
+
-# Shorthands for consuming the stream
-
+## Shorthands for consuming the stream
> [!NOTE]
> Although consuming the stream is beyond the scope of this library, it provides two basic shorthands to trigger an iteration:
-## `.count`
+### `.count`
+
+expand doc
> Iterates over the stream until exhaustion and returns the number of elements yielded:
```python
assert integers.count() == 10
```
+
+
+### `()`
-## `()`
+expand doc
> *Calling* the stream iterates over it until exhaustion and returns it:
```python
@@ -479,8 +522,8 @@ appending_integers: Stream[int] = integers.foreach(state.append)
assert appending_integers() is appending_integers
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
```
+
----
# 💡 Tips
## Extract-Transform-Load
diff --git a/streamable/functions.py b/streamable/functions.py
index 3ee0a86..f9795ca 100644
--- a/streamable/functions.py
+++ b/streamable/functions.py
@@ -34,7 +34,7 @@
YieldsPerPeriodThrottleIterator,
)
from streamable.util.constants import NO_REPLACEMENT
-from streamable.util.functiontools import noop_stopiteration
+from streamable.util.functiontools import wrap_error
from streamable.util.validationtools import (
validate_concurrency,
validate_count,
@@ -130,7 +130,7 @@ def map(
validate_iterator(iterator)
validate_concurrency(concurrency)
if concurrency == 1:
- return builtins.map(noop_stopiteration(transformation), iterator)
+ return builtins.map(wrap_error(transformation, StopIteration), iterator)
else:
return OSConcurrentMapIterator(
iterator,
diff --git a/streamable/iterators.py b/streamable/iterators.py
index 712d5f6..92c2dd2 100644
--- a/streamable/iterators.py
+++ b/streamable/iterators.py
@@ -29,7 +29,7 @@
cast,
)
-from streamable.util.functiontools import noop_stopiteration
+from streamable.util.functiontools import wrap_error
from streamable.util.loggertools import get_logger
from streamable.util.validationtools import (
validate_buffersize,
@@ -69,7 +69,7 @@ def __init__(
validate_iterator(iterator)
self.iterator = iterator
self.kind = kind
- self.when = noop_stopiteration(when)
+ self.when = wrap_error(when, StopIteration)
self.replacement = replacement
self.finally_raise = finally_raise
self._to_be_finally_raised: Optional[Exception] = None
@@ -98,32 +98,16 @@ class DistinctIterator(Iterator[T]):
def __init__(self, iterator: Iterator[T], by: Optional[Callable[[T], Any]]) -> None:
validate_iterator(iterator)
self.iterator = iterator
- self.by = noop_stopiteration(by) if by else None
- self._already_seen_set: Set[Any] = set()
- self._already_seen_list: List[Any] = list()
-
- def _value(self, elem):
- return self.by(elem) if self.by else elem
-
- def _see(self, elem: Any):
- value = self._value(elem)
- try:
- self._already_seen_set.add(value)
- except TypeError:
- self._already_seen_list.append(value)
-
- def _has_been_seen(self, elem: Any):
- value = self._value(elem)
- try:
- return value in self._already_seen_set
- except TypeError:
- return value in self._already_seen_list
+ self.by = wrap_error(by, StopIteration) if by else None
+ self._already_seen: Set[Any] = set()
def __next__(self) -> T:
- elem = next(self.iterator)
- while self._has_been_seen(elem):
+ while True:
elem = next(self.iterator)
- self._see(elem)
+ value = self.by(elem) if self.by else elem
+ if value not in self._already_seen:
+ break
+ self._already_seen.add(value)
return elem
@@ -131,19 +115,16 @@ class ConsecutiveDistinctIterator(Iterator[T]):
def __init__(self, iterator: Iterator[T], by: Optional[Callable[[T], Any]]) -> None:
validate_iterator(iterator)
self.iterator = iterator
- self.by = noop_stopiteration(by) if by else None
+ self.by = wrap_error(by, StopIteration) if by else None
self._has_yielded = False
self._last_value: Any = None
- def _next_elem_and_value(self) -> Tuple[T, Any]:
- elem = next(self.iterator)
- value = self.by(elem) if self.by else elem
- return elem, value
-
def __next__(self) -> T:
- elem, value = self._next_elem_and_value()
- while self._has_yielded and value == self._last_value:
- elem, value = self._next_elem_and_value()
+ while True:
+ elem = next(self.iterator)
+ value = self.by(elem) if self.by else elem
+ if not self._has_yielded or value != self._last_value:
+ break
self._has_yielded = True
self._last_value = value
return elem
@@ -161,10 +142,12 @@ def __next__(self) -> U:
return next(self._current_iterator_elem)
except StopIteration:
iterable_elem = next(self.iterator)
- self._current_iterator_elem = noop_stopiteration(iter)(iterable_elem)
+ self._current_iterator_elem = wrap_error(iter, StopIteration)(
+ iterable_elem
+ )
-class _GroupIteratorInitMixin(Generic[T]):
+class GroupIteratorMixin(Generic[T]):
def __init__(
self,
iterator: Iterator[T],
@@ -176,12 +159,28 @@ def __init__(
validate_group_interval(interval)
self.iterator = iterator
self.size = size or cast(int, float("inf"))
+ self.interval = interval
self._interval_seconds = interval.total_seconds() if interval else float("inf")
self._to_be_raised: Optional[Exception] = None
self._last_group_yielded_at: float = 0
+ def _interval_seconds_have_elapsed(self) -> bool:
+ if not self.interval:
+ return False
+ return (
+ time.perf_counter() - self._last_group_yielded_at
+ ) >= self._interval_seconds
+
+ def _remember_group_time(self) -> None:
+ if self.interval:
+ self._last_group_yielded_at = time.perf_counter()
-class GroupIterator(_GroupIteratorInitMixin[T], Iterator[List[T]]):
+ def _init_last_group_time(self) -> None:
+ if self.interval and not self._last_group_yielded_at:
+ self._last_group_yielded_at = time.perf_counter()
+
+
+class GroupIterator(GroupIteratorMixin[T], Iterator[List[T]]):
def __init__(
self,
iterator: Iterator[T],
@@ -191,12 +190,8 @@ def __init__(
super().__init__(iterator, size, interval)
self._current_group: List[T] = []
- def _interval_seconds_have_elapsed(self) -> bool:
- return (time.time() - self._last_group_yielded_at) >= self._interval_seconds
-
def __next__(self) -> List[T]:
- if not self._last_group_yielded_at:
- self._last_group_yielded_at = time.time()
+ self._init_last_group_time()
if self._to_be_raised:
e, self._to_be_raised = self._to_be_raised, None
raise e
@@ -211,11 +206,11 @@ def __next__(self) -> List[T]:
self._to_be_raised = e
group, self._current_group = self._current_group, []
- self._last_group_yielded_at = time.time()
+ self._remember_group_time()
return group
-class GroupbyIterator(_GroupIteratorInitMixin[T], Iterator[Tuple[U, List[T]]]):
+class GroupbyIterator(GroupIteratorMixin[T], Iterator[Tuple[U, List[T]]]):
def __init__(
self,
iterator: Iterator[T],
@@ -224,13 +219,10 @@ def __init__(
interval: Optional[datetime.timedelta],
) -> None:
super().__init__(iterator, size, interval)
- self.by = noop_stopiteration(by)
+ self.by = wrap_error(by, StopIteration)
self._is_exhausted = False
self._groups_by: DefaultDict[U, List[T]] = defaultdict(list)
- def _interval_seconds_have_elapsed(self) -> bool:
- return (time.time() - self._last_group_yielded_at) >= self._interval_seconds
-
def _group_next_elem(self) -> None:
elem = next(self.iterator)
self._groups_by[self.by(elem)].append(elem)
@@ -254,21 +246,17 @@ def _pop_largest_group(self) -> Tuple[U, List[T]]:
return largest_group_key, self._groups_by.pop(largest_group_key)
- def _return_group(self, group: Tuple[U, List[T]]) -> Tuple[U, List[T]]:
- self._last_group_yielded_at = time.time()
- return group
-
def __next__(self) -> Tuple[U, List[T]]:
- if not self._last_group_yielded_at:
- self._last_group_yielded_at = time.time()
+ self._init_last_group_time()
if self._is_exhausted:
if self._groups_by:
- return self._return_group(self._pop_first_group())
+ return self._pop_first_group()
raise StopIteration
if self._to_be_raised:
if self._groups_by:
- return self._return_group(self._pop_first_group())
+ self._remember_group_time()
+ return self._pop_first_group()
e, self._to_be_raised = self._to_be_raised, None
raise e
@@ -280,9 +268,8 @@ def __next__(self) -> Tuple[U, List[T]]:
self._group_next_elem()
full_group = self._pop_full_group()
- if full_group:
- return self._return_group(full_group)
- return self._return_group(self._pop_largest_group())
+ self._remember_group_time()
+ return full_group or self._pop_largest_group()
except StopIteration:
self._is_exhausted = True
@@ -329,7 +316,7 @@ class PredicateTruncateIterator(Iterator[T]):
def __init__(self, iterator: Iterator[T], when: Callable[[T], Any]) -> None:
validate_iterator(iterator)
self.iterator = iterator
- self.when = noop_stopiteration(when)
+ self.when = wrap_error(when, StopIteration)
self._satisfied = False
def __next__(self):
@@ -351,12 +338,12 @@ def __init__(self, iterator: Iterator[T], what: str, base: int = 2) -> None:
self._n_yields = 0
self._n_errors = 0
self._logged_n_calls = 0
- self._start_time = time.time()
+ self._start_time = time.perf_counter()
def _log(self) -> None:
get_logger().info(
"[%s %s] %s",
- f"duration={datetime.datetime.fromtimestamp(time.time()) - datetime.datetime.fromtimestamp(self._start_time)}",
+ f"duration={datetime.datetime.fromtimestamp(time.perf_counter()) - datetime.datetime.fromtimestamp(self._start_time)}",
f"errors={self._n_errors}",
f"{self._n_yields} {self.what} yielded",
)
@@ -407,10 +394,10 @@ def __init__(self, iterator: Iterator[T], interval: datetime.timedelta) -> None:
def __next__(self) -> T:
elem, catched_error = self.safe_next()
if self._last_yield_at:
- elapsed_time = time.time() - self._last_yield_at
+ elapsed_time = time.perf_counter() - self._last_yield_at
if elapsed_time < self._interval_seconds:
time.sleep(self._interval_seconds - elapsed_time)
- self._last_yield_at = time.time()
+ self._last_yield_at = time.perf_counter()
if catched_error:
raise catched_error
@@ -436,7 +423,7 @@ def __init__(
def __next__(self) -> T:
elem, catched_error = self.safe_next()
- now = time.time()
+ now = time.perf_counter()
if not self._offset:
self._offset = now
now -= self._offset
@@ -543,7 +530,7 @@ def __init__(
) -> None:
super().__init__(iterator, buffersize, ordered)
validate_concurrency(concurrency)
- self.transformation = noop_stopiteration(transformation)
+ self.transformation = wrap_error(transformation, StopIteration)
self.concurrency = concurrency
self.executor: Executor
self.via = via
@@ -615,7 +602,7 @@ def __init__(
ordered: bool,
) -> None:
super().__init__(iterator, buffersize, ordered)
- self.transformation = noop_stopiteration(transformation)
+ self.transformation = wrap_error(transformation, StopIteration)
async def _safe_transformation(
self, elem: T
@@ -712,7 +699,7 @@ def __iter__(self) -> Iterator[Union[T, _RaisingIterator.ExceptionContainer]]:
except StopIteration:
break
try:
- iterator = noop_stopiteration(iter)(iterable)
+ iterator = wrap_error(iter, StopIteration)(iterable)
except Exception as e:
yield _RaisingIterator.ExceptionContainer(e)
continue
diff --git a/streamable/stream.py b/streamable/stream.py
index 7373205..de51be0 100644
--- a/streamable/stream.py
+++ b/streamable/stream.py
@@ -167,8 +167,8 @@ def distinct(
self, by: Optional[Callable[[T], Any]] = None, consecutive_only: bool = False
) -> "Stream":
"""
- Filters the stream to yield only distinct elements, `foo` and `bar` considered duplicates if `foo == bar`.
- If `by` is specified, `foo` and `bar` are considered duplicates if `by(foo) == by(bar)`.
+ Filters the stream to yield only distinct elements, `foo` and `bar` considered duplicates if `hash(foo) == hash(bar)`.
+ If `by` is specified, `foo` and `bar` are considered duplicates if `hash(by(foo)) == hash(by(bar))`.
Among duplicates, the first encountered occurence in upstream order is yielded.
diff --git a/streamable/util/exceptions.py b/streamable/util/exceptions.py
deleted file mode 100644
index 82bd68b..0000000
--- a/streamable/util/exceptions.py
+++ /dev/null
@@ -1,2 +0,0 @@
-class NoopStopIteration(Exception):
- pass
diff --git a/streamable/util/functiontools.py b/streamable/util/functiontools.py
index f8f2768..555cbb2 100644
--- a/streamable/util/functiontools.py
+++ b/streamable/util/functiontools.py
@@ -1,31 +1,29 @@
from typing import Any, Callable, Coroutine, Generic, Tuple, Type, TypeVar, overload
-from streamable.util.exceptions import NoopStopIteration
-
T = TypeVar("T")
R = TypeVar("R")
-class _CatchAndRaiseAs(Generic[T, R]):
- def __init__(
- self,
- func: Callable[[T], R],
- catched_error: Type[Exception],
- raised_error: Type[Exception],
- ) -> None:
+class WrappedError(Exception):
+ def __init__(self, error: Exception):
+ super().__init__(repr(error))
+ self.error = error
+
+
+class _ErrorWrappingDecorator(Generic[T, R]):
+ def __init__(self, func: Callable[[T], R], error_type: Type[Exception]) -> None:
self.func = func
- self.catched_error = catched_error
- self.raised_error = raised_error
+ self.error_type = error_type
def __call__(self, arg: T) -> R:
try:
return self.func(arg)
- except self.catched_error as e:
- raise self.raised_error(str(e)) from e
+ except self.error_type as e:
+ raise WrappedError(e) from e
-def noop_stopiteration(func: Callable[[T], R]) -> Callable[[T], R]:
- return _CatchAndRaiseAs(func, StopIteration, NoopStopIteration)
+def wrap_error(func: Callable[[T], R], error_type: Type[Exception]) -> Callable[[T], R]:
+ return _ErrorWrappingDecorator(func, error_type)
class _Sidify(Generic[T]):
diff --git a/streamable/visitors/iterator.py b/streamable/visitors/iterator.py
index a04a3ce..d6c8e62 100644
--- a/streamable/visitors/iterator.py
+++ b/streamable/visitors/iterator.py
@@ -18,7 +18,7 @@
ThrottleStream,
TruncateStream,
)
-from streamable.util.functiontools import async_sidify, noop_stopiteration, sidify
+from streamable.util.functiontools import async_sidify, sidify, wrap_error
from streamable.visitors import Visitor
T = TypeVar("T")
@@ -44,7 +44,7 @@ def visit_distinct_stream(self, stream: DistinctStream[T]) -> Iterator[T]:
def visit_filter_stream(self, stream: FilterStream[T]) -> Iterator[T]:
return filter(
- noop_stopiteration(stream._when),
+ wrap_error(stream._when, StopIteration),
cast(Iterable[T], stream.upstream.accept(self)),
)
diff --git a/tests/test_readme.py b/tests/test_readme.py
index aa69fe8..4957b09 100644
--- a/tests/test_readme.py
+++ b/tests/test_readme.py
@@ -105,10 +105,10 @@ def test_throttle_example(self) -> None:
integers_5_per_sec: Stream[int] = integers.throttle(per_second=3)
- start = time.time()
+ start = time.perf_counter()
# takes 3s: ceil(10 integers / 3 per_second) - 1
assert list(integers_5_per_sec) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- assert 2.99 < time.time() - start < 3.25
+ assert 2.99 < time.perf_counter() - start < 3.25
from datetime import timedelta
@@ -117,10 +117,10 @@ def test_throttle_example(self) -> None:
.throttle(interval=timedelta(milliseconds=100))
)
- start = time.time()
+ start = time.perf_counter()
# takes 900 millis: (10 integers - 1) * 100 millis
assert list(integers_every_100_millis) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- assert 0.89 < time.time() - start < 0.95
+ assert 0.89 < time.perf_counter() - start < 0.95
def test_group_example(self) -> None:
global integers_by_parity
diff --git a/tests/test_stream.py b/tests/test_stream.py
index 9b08300..ec34adc 100644
--- a/tests/test_stream.py
+++ b/tests/test_stream.py
@@ -23,8 +23,7 @@
from parameterized import parameterized # type: ignore
from streamable import Stream
-from streamable.util.exceptions import NoopStopIteration
-from streamable.util.functiontools import star
+from streamable.util.functiontools import WrappedError, star
T = TypeVar("T")
R = TypeVar("R")
@@ -541,7 +540,7 @@ def side_effect(x: int, func: Callable[[int], int]):
]
for raised_exc, catched_exc in [
(TestError, TestError),
- (StopIteration, (NoopStopIteration, RuntimeError)),
+ (StopIteration, (WrappedError, RuntimeError)),
]
for concurrency in [1, 2]
for method, throw_func_, throw_for_odd_func_ in [
@@ -673,7 +672,7 @@ def test_flatten_typing(self) -> None:
[exception_type, mapped_exception_type, concurrency]
for exception_type, mapped_exception_type in [
(TestError, TestError),
- (StopIteration, NoopStopIteration),
+ (StopIteration, WrappedError),
]
for concurrency in [1, 2]
]
@@ -1086,8 +1085,9 @@ def f(i):
[[0], [1]],
msg="`group` should yield incomplete groups when `by` raises",
)
- with self.assertRaises(
- NoopStopIteration,
+ with self.assertRaisesRegex(
+ WrappedError,
+ "StopIteration()",
msg="`group` should raise and skip `elem` if `by(elem)` raises",
):
next(stream_iter)
@@ -1253,16 +1253,12 @@ def test_distinct(self) -> None:
[],
msg="`distinct` should yield zero elements on empty stream",
)
- self.assertEqual(
- list(Stream([[1], [2], [1], [2]]).distinct()),
- [[1], [2]],
- msg="`distinct` should work with non-hashable elements",
- )
- self.assertEqual(
- list(Stream([[1], "foo", [2], [1], [2], "foo"]).distinct()),
- [[1], "foo", [2]],
- msg="`distinct` should work with a mix of hashable and non-hashable elements",
- )
+ with self.assertRaisesRegex(
+ TypeError,
+ "unhashable type: 'list'",
+ msg="`distinct` should raise for non-hashable elements",
+ ):
+ list(Stream([[1]]).distinct())
def test_catch(self) -> None:
self.assertEqual(
diff --git a/version.py b/version.py
index 0540878..1017378 100644
--- a/version.py
+++ b/version.py
@@ -1,2 +1,2 @@
# to show the CHANGELOG: git log -- version.py
-__version__ = "1.4.2"
+__version__ = "1.4.3"