diff --git a/streamable/iterators.py b/streamable/iterators.py index 68e6b1b..74cbb56 100644 --- a/streamable/iterators.py +++ b/streamable/iterators.py @@ -680,38 +680,37 @@ def __iter__(self) -> Iterator[Union[T, _RaisingIterator.ExceptionContainer]]: element_to_yield: Deque[Union[T, _RaisingIterator.ExceptionContainer]] = ( deque(maxlen=1) ) - iterator_to_queue: Deque[Iterator[T]] = deque(maxlen=1) + iterator_to_queue: Optional[Iterator[T]] = None # wait, queue, yield (FIFO) while True: if iterator_and_future_pairs: iterator, future = iterator_and_future_pairs.popleft() try: element_to_yield.append(future.result()) - iterator_to_queue.append(iterator) + iterator_to_queue = iterator except StopIteration: pass except Exception as e: element_to_yield.append(_RaisingIterator.ExceptionContainer(e)) - iterator_to_queue.append(iterator) + iterator_to_queue = iterator # queue tasks up to buffersize while len(iterator_and_future_pairs) < self.buffersize: - if iterator_to_queue: - iterator = iterator_to_queue.pop() - else: + if not iterator_to_queue: try: iterable = next(self.iterables_iterator) except StopIteration: break try: - iterator = wrap_error(iter, StopIteration)(iterable) + iterator_to_queue = iter_wo_stopiteration(iterable) except Exception as e: yield _RaisingIterator.ExceptionContainer(e) continue future = executor.submit( - cast(Callable[[Iterable[T]], T], next), iterator + cast(Callable[[Iterable[T]], T], next), iterator_to_queue ) - iterator_and_future_pairs.append((iterator, future)) + iterator_and_future_pairs.append((iterator_to_queue, future)) + iterator_to_queue = None if element_to_yield: yield element_to_yield.pop() if not iterator_and_future_pairs: