Skip to content

Commit

Permalink
_ConcurrentFlattenIterable: improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Jan 3, 2025
1 parent 5c001e5 commit b46faf2
Showing 1 changed file with 8 additions and 9 deletions.
17 changes: 8 additions & 9 deletions streamable/iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,38 +680,37 @@ def __iter__(self) -> Iterator[Union[T, _RaisingIterator.ExceptionContainer]]:
element_to_yield: Deque[Union[T, _RaisingIterator.ExceptionContainer]] = (
deque(maxlen=1)
)
iterator_to_queue: Deque[Iterator[T]] = deque(maxlen=1)
iterator_to_queue: Optional[Iterator[T]] = None
# wait, queue, yield (FIFO)
while True:
if iterator_and_future_pairs:
iterator, future = iterator_and_future_pairs.popleft()
try:
element_to_yield.append(future.result())
iterator_to_queue.append(iterator)
iterator_to_queue = iterator
except StopIteration:
pass
except Exception as e:
element_to_yield.append(_RaisingIterator.ExceptionContainer(e))
iterator_to_queue.append(iterator)
iterator_to_queue = iterator

# queue tasks up to buffersize
while len(iterator_and_future_pairs) < self.buffersize:
if iterator_to_queue:
iterator = iterator_to_queue.pop()
else:
if not iterator_to_queue:
try:
iterable = next(self.iterables_iterator)
except StopIteration:
break
try:
iterator = wrap_error(iter, StopIteration)(iterable)
iterator_to_queue = iter_wo_stopiteration(iterable)
except Exception as e:
yield _RaisingIterator.ExceptionContainer(e)
continue
future = executor.submit(
cast(Callable[[Iterable[T]], T], next), iterator
cast(Callable[[Iterable[T]], T], next), iterator_to_queue
)
iterator_and_future_pairs.append((iterator, future))
iterator_and_future_pairs.append((iterator_to_queue, future))
iterator_to_queue = None
if element_to_yield:
yield element_to_yield.pop()
if not iterator_and_future_pairs:
Expand Down

0 comments on commit b46faf2

Please sign in to comment.