Skip to content

Commit

Permalink
Don't block when evaluating requisites of parallel states
Browse files Browse the repository at this point in the history
  • Loading branch information
lkubb committed Oct 10, 2024
1 parent 537f1bb commit b65d8a1
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 20 deletions.
1 change: 1 addition & 0 deletions changelog/59959.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed requisites by parallel states on parallel states being evaluated synchronously (blocking state execution for other parallel states)
2 changes: 1 addition & 1 deletion salt/modules/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,7 @@ def sls_id(id_, mods, test=None, queue=None, state_events=None, **kwargs):
ret = {}
for chunk in chunks:
if chunk.get("__id__", "") == id_:
ret.update(st_.state.call_chunk(chunk, {}, chunks))
ret.update(st_.state.call_chunk(chunk, {}, chunks)[0])

_set_retcode(ret, highstate=highstate)
# Work around Windows multiprocessing bug, set __opts__['test'] back to
Expand Down
68 changes: 51 additions & 17 deletions salt/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -2468,6 +2468,20 @@ def call_chunks(
"""
Iterate over a list of chunks and call them, checking for requires.
"""

def _call_pending(
pending: dict[str, LowChunk], running: dict[str, dict]
) -> tuple[dict[str, LowChunk], dict[str, dict], bool]:
still_pending = {}
for tag, pend in pending.items():
if tag not in running:
running, is_pending = self.call_chunk(pend, running, chunks)
if is_pending:
still_pending[tag] = pend
if self.check_failhard(pend, running):
return still_pending, running, True
return still_pending, running, False

if disabled_states is None:
# Check for any disabled states
disabled = {}
Expand All @@ -2476,7 +2490,11 @@ def call_chunks(
else:
disabled = disabled_states
running = {}
pending_chunks = {}
for low in chunks:
pending_chunks, running, failhard = _call_pending(pending_chunks, running)
if failhard:
return running
if "__FAILHARD__" in running:
running.pop("__FAILHARD__")
return running
Expand All @@ -2486,14 +2504,21 @@ def call_chunks(
action = self.check_pause(low)
if action == "kill":
break
running = self.call_chunk(low, running, chunks)
running, pending = self.call_chunk(low, running, chunks)
if pending:
pending_chunks[tag] = low
if self.check_failhard(low, running):
return running
while pending_chunks:
pending_chunks, running, failhard = _call_pending(pending_chunks, running)
if failhard:
return running
time.sleep(0.01)
while True:
if self.reconcile_procs(running):
break
time.sleep(0.01)
ret = dict(list(disabled.items()) + list(running.items()))
ret = {**disabled, **running}
return ret

def check_failhard(self, low: LowChunk, running: dict[str, dict]):
Expand Down Expand Up @@ -2599,6 +2624,7 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]):
states.
"""
reqs = {}
pending = False
for req_type, chunk in self.dependency_dag.get_dependencies(low):
reqs.setdefault(req_type, []).append(chunk)
fun_stats = set()
Expand All @@ -2617,17 +2643,15 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]):
if run_dict_chunk:
filtered_run_dict[tag] = run_dict_chunk
run_dict = filtered_run_dict

while True:
if self.reconcile_procs(run_dict):
break
time.sleep(0.01)
pending = bool(not self.reconcile_procs(run_dict) and low.get("parallel"))

for chunk in chunks:
tag = _gen_tag(chunk)
if tag not in run_dict:
req_stats.add("unmet")
continue
if pending:
continue
# A state can include a "skip_req" key in the return dict
# with a True value to skip triggering onchanges, watch, or
# other requisites which would result in a only running on a
Expand Down Expand Up @@ -2684,6 +2708,8 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]):

if "unmet" in fun_stats:
status = "unmet"
elif pending:
status = "pending"
elif "fail" in fun_stats:
status = "fail"
elif "skip_req" in fun_stats and (fun_stats & {"onchangesmet", "premet"}):
Expand Down Expand Up @@ -2765,7 +2791,7 @@ def call_chunk(
running: dict[str, dict],
chunks: Sequence[LowChunk],
depth: int = 0,
) -> dict[str, dict]:
) -> tuple[dict[str, dict], bool]:
"""
Execute the chunk if the requisites did not fail
"""
Expand All @@ -2781,8 +2807,13 @@ def call_chunk(

status, reqs = self._check_requisites(low, running)
if status == "unmet":
if self._call_unmet_requisites(low, running, chunks, tag, depth):
return running
running_failhard, pending = self._call_unmet_requisites(
low, running, chunks, tag, depth
)
if running_failhard or pending:
return running, pending
elif status == "pending":
return running, True
elif status == "met":
if low.get("__prereq__"):
self.pre[tag] = self.call(low, chunks, running)
Expand Down Expand Up @@ -2910,7 +2941,7 @@ def call_chunk(
for key in ("__sls__", "__id__", "name"):
running[sub_tag][key] = low.get(key)

return running
return running, False

def _assign_not_run_result_dict(
self,
Expand Down Expand Up @@ -2944,16 +2975,19 @@ def _call_unmet_requisites(
chunks: Sequence[LowChunk],
tag: str,
depth: int,
) -> dict[str, dict]:
) -> tuple[dict[str, dict], bool]:
pending = False
for _, chunk in self.dependency_dag.get_dependencies(low):
# Check to see if the chunk has been run, only run it if
# it has not been run already
ctag = _gen_tag(chunk)
if ctag not in running:
running = self.call_chunk(chunk, running, chunks)
running, pending = self.call_chunk(chunk, running, chunks)
if pending:
return running, pending
if self.check_failhard(chunk, running):
running["__FAILHARD__"] = True
return running
return running, pending
if low.get("__prereq__"):
status, _ = self._check_requisites(low, running)
self.pre[tag] = self.call(low, chunks, running)
Expand All @@ -2975,11 +3009,11 @@ def _call_unmet_requisites(
for key in ("__sls__", "__id__", "name"):
running[tag][key] = low.get(key)
else:
running = self.call_chunk(low, running, chunks, depth)
running, pending = self.call_chunk(low, running, chunks, depth)
if self.check_failhard(low, running):
running["__FAILHARD__"] = True
return running
return {}
return running, pending
return {}, pending

def call_beacons(self, chunks: Iterable[LowChunk], running: dict) -> dict:
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/pytests/unit/modules/state/test_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def call_chunk(data, data1, data2):
"""
Mock call_chunk method
"""
return {"": "ABC"}
return {"": "ABC"}, False

@staticmethod
def call_chunks(data):
Expand Down
2 changes: 1 addition & 1 deletion tests/pytests/unit/state/test_state_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ def test_call_chunk_sub_state_run(minion_opts):
with patch("salt.state.State.call", return_value=mock_call_return):
minion_opts["disabled_requisites"] = ["require"]
state_obj = salt.state.State(minion_opts)
ret = state_obj.call_chunk(low_data, {}, [])
ret, _ = state_obj.call_chunk(low_data, {}, [])
sub_state = ret.get(expected_sub_state_tag)
assert sub_state
assert sub_state["__run_num__"] == 1
Expand Down

0 comments on commit b65d8a1

Please sign in to comment.