Skip to content

Commit

Permalink
for_all_nodes, handle exception/exit
Browse files Browse the repository at this point in the history
Fix #164
  • Loading branch information
albertz committed Dec 16, 2024
1 parent 0f7aeab commit e05bbcd
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions sisyphus/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ def for_all_nodes(self, f, nodes=None, bottom_up=False, *, pool: Optional[Thread

pool_lock = threading.Lock()
finished_lock = threading.Lock()
stopped_event = threading.Event()
if not pool:
pool = self.pool

Expand All @@ -555,6 +556,8 @@ def runner(job):
"""
sis_id = job._sis_id()
with pool_lock:
if stopped_event.is_set():
return
if sis_id not in visited:
visited[sis_id] = pool.apply_async(
tools.default_handle_exception_interrupt_main_thread(runner_helper), (job,)
Expand All @@ -564,6 +567,8 @@ def runner_helper(job):
"""
:param Job job:
"""
if stopped_event.is_set():
return
# make sure all inputs are updated
job._sis_runnable()
nonlocal finished
Expand All @@ -583,12 +588,17 @@ def runner_helper(job):
with finished_lock:
finished += 1

for node in nodes:
runner(node)
try:
for node in nodes:
runner(node)

# Check if all jobs are finished
while len(visited) != finished:
time.sleep(0.1)
# Check if all jobs are finished
while len(visited) != finished:
time.sleep(0.1)
except BaseException:
with pool_lock:
stopped_event.set()
raise

# Check again and create output set
out = set()
Expand Down

0 comments on commit e05bbcd

Please sign in to comment.