Skip to content

Commit

Permalink
make flatten resilient to 'generator is already generating' while rem…
Browse files Browse the repository at this point in the history
…aining resilient to 'max recur depth exceeded'
  • Loading branch information
ebonnal committed Oct 17, 2023
1 parent 42a30d6 commit 057f946
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 11 deletions.
24 changes: 14 additions & 10 deletions kioss/_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,21 @@ def __next__(self):
except KeyError: # KeyError: 'pop from an empty set'
raise StopIteration()

def f():
try:
elem = next(next_iterator_elem)
except StopIteration:
def f(backoff: int = 0.005):
while True:
try:
self.iterators_pool.remove(next_iterator_elem)
except KeyError:
pass
return ThreadedFlatteningIteratorWrapper._SKIP

return elem
elem = next(next_iterator_elem)
except StopIteration:
try:
self.iterators_pool.remove(next_iterator_elem)
except KeyError:
pass
return ThreadedFlatteningIteratorWrapper._SKIP
except ValueError: # generator anlready executing
time.sleep(backoff)
backoff *= 2
continue
return elem

return f

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='kioss',
version='0.4.0',
version='0.4.1',
packages=['kioss'],
url='http://github.com/bonnal-enzo/kioss',
license='Apache 2.',
Expand Down
10 changes: 10 additions & 0 deletions tests/test_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,3 +483,13 @@ def test_planning_and_execution_decoupling(self):
self.assertListEqual(a.collect(), list(range(N)))
# test b not affected by a execution
self.assertListEqual(b.collect(), [list(range(N))])

def test_generator_already_generating(self):
self.assertEqual(
Counter(
Pipe(
lambda: [(ten_ms_identity(x) for x in range(N)) for _ in range(3)]
).flatten(n_threads=2)
),
Counter(list(range(N)) + list(range(N)) + list(range(N))),
)

0 comments on commit 057f946

Please sign in to comment.