From 9ce9dbd241b7ecfcaadf8c4cac8df861ba1dce73 Mon Sep 17 00:00:00 2001 From: jeanluc Date: Thu, 10 Oct 2024 16:42:57 +0200 Subject: [PATCH] Don't block when evaluating requisites of parallel states --- changelog/59959.fixed.md | 1 + salt/modules/state.py | 2 +- salt/state.py | 65 ++++++++++++++----- .../pytests/unit/modules/state/test_state.py | 2 +- .../pytests/unit/state/test_state_compiler.py | 2 +- 5 files changed, 53 insertions(+), 19 deletions(-) create mode 100644 changelog/59959.fixed.md diff --git a/changelog/59959.fixed.md b/changelog/59959.fixed.md new file mode 100644 index 000000000000..4e98e5f7e5de --- /dev/null +++ b/changelog/59959.fixed.md @@ -0,0 +1 @@ +Fixed requisites by parallel states on parallel states being evaluated synchronously (blocking state execution for other parallel states) diff --git a/salt/modules/state.py b/salt/modules/state.py index cf4a592fd2f7..33819f4a6b73 100644 --- a/salt/modules/state.py +++ b/salt/modules/state.py @@ -1926,7 +1926,7 @@ def sls_id(id_, mods, test=None, queue=None, state_events=None, **kwargs): ret = {} for chunk in chunks: if chunk.get("__id__", "") == id_: - ret.update(st_.state.call_chunk(chunk, {}, chunks)) + ret.update(st_.state.call_chunk(chunk, {}, chunks)[0]) _set_retcode(ret, highstate=highstate) # Work around Windows multiprocessing bug, set __opts__['test'] back to diff --git a/salt/state.py b/salt/state.py index 9865e602597f..2ec3cd91a0b5 100644 --- a/salt/state.py +++ b/salt/state.py @@ -2468,6 +2468,20 @@ def call_chunks( """ Iterate over a list of chunks and call them, checking for requires. """ + + def _call_pending( + pending: dict[str, LowChunk], running: dict[str, dict] + ) -> tuple[dict[str, LowChunk], dict[str, dict], bool]: + still_pending = {} + for tag, pend in pending.items(): + if tag not in running: + running, is_pending = self.call_chunk(pend, running, chunks) + if is_pending: + still_pending[tag] = pend + if self.check_failhard(pend, running): + return still_pending, running, True + return still_pending, running, False + if disabled_states is None: # Check for any disabled states disabled = {} @@ -2476,7 +2490,11 @@ def call_chunks( else: disabled = disabled_states running = {} + pending_chunks = {} for low in chunks: + pending_chunks, running, failhard = _call_pending(pending_chunks, running) + if failhard: + return running if "__FAILHARD__" in running: running.pop("__FAILHARD__") return running @@ -2486,9 +2504,15 @@ def call_chunks( action = self.check_pause(low) if action == "kill": break - running = self.call_chunk(low, running, chunks) + running, pending = self.call_chunk(low, running, chunks) + if pending: + pending_chunks[tag] = low if self.check_failhard(low, running): return running + while pending_chunks: + pending_chunks, running, failhard = _call_pending(pending_chunks, running) + if failhard: + return running while True: if self.reconcile_procs(running): break @@ -2599,6 +2623,7 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]): states. """ reqs = {} + pending = False for req_type, chunk in self.dependency_dag.get_dependencies(low): reqs.setdefault(req_type, []).append(chunk) fun_stats = set() @@ -2617,17 +2642,15 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]): if run_dict_chunk: filtered_run_dict[tag] = run_dict_chunk run_dict = filtered_run_dict - - while True: - if self.reconcile_procs(run_dict): - break - time.sleep(0.01) + pending = bool(not self.reconcile_procs(run_dict) and low.get("parallel")) for chunk in chunks: tag = _gen_tag(chunk) if tag not in run_dict: req_stats.add("unmet") continue + if pending: + continue # A state can include a "skip_req" key in the return dict # with a True value to skip triggering onchanges, watch, or # other requisites which would result in a only running on a @@ -2684,6 +2707,8 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]): if "unmet" in fun_stats: status = "unmet" + elif pending: + status = "pending" elif "fail" in fun_stats: status = "fail" elif "skip_req" in fun_stats and (fun_stats & {"onchangesmet", "premet"}): @@ -2765,7 +2790,7 @@ def call_chunk( running: dict[str, dict], chunks: Sequence[LowChunk], depth: int = 0, - ) -> dict[str, dict]: + ) -> tuple[dict[str, dict], bool]: """ Execute the chunk if the requisites did not fail """ @@ -2781,8 +2806,13 @@ def call_chunk( status, reqs = self._check_requisites(low, running) if status == "unmet": - if self._call_unmet_requisites(low, running, chunks, tag, depth): - return running + running_failhard, pending = self._call_unmet_requisites( + low, running, chunks, tag, depth + ) + if running_failhard or pending: + return running, pending + elif status == "pending": + return running, True elif status == "met": if low.get("__prereq__"): self.pre[tag] = self.call(low, chunks, running) @@ -2910,7 +2940,7 @@ def call_chunk( for key in ("__sls__", "__id__", "name"): running[sub_tag][key] = low.get(key) - return running + return running, False def _assign_not_run_result_dict( self, @@ -2944,16 +2974,19 @@ def _call_unmet_requisites( chunks: Sequence[LowChunk], tag: str, depth: int, - ) -> dict[str, dict]: + ) -> tuple[dict[str, dict], bool]: + pending = False for _, chunk in self.dependency_dag.get_dependencies(low): # Check to see if the chunk has been run, only run it if # it has not been run already ctag = _gen_tag(chunk) if ctag not in running: - running = self.call_chunk(chunk, running, chunks) + running, pending = self.call_chunk(chunk, running, chunks) + if pending: + return running, pending if self.check_failhard(chunk, running): running["__FAILHARD__"] = True - return running + return running, pending if low.get("__prereq__"): status, _ = self._check_requisites(low, running) self.pre[tag] = self.call(low, chunks, running) @@ -2975,11 +3008,11 @@ def _call_unmet_requisites( for key in ("__sls__", "__id__", "name"): running[tag][key] = low.get(key) else: - running = self.call_chunk(low, running, chunks, depth) + running, pending = self.call_chunk(low, running, chunks, depth) if self.check_failhard(low, running): running["__FAILHARD__"] = True - return running - return {} + return running, pending + return {}, pending def call_beacons(self, chunks: Iterable[LowChunk], running: dict) -> dict: """ diff --git a/tests/pytests/unit/modules/state/test_state.py b/tests/pytests/unit/modules/state/test_state.py index c8fbafb6c1bb..8fb78d71b8c5 100644 --- a/tests/pytests/unit/modules/state/test_state.py +++ b/tests/pytests/unit/modules/state/test_state.py @@ -107,7 +107,7 @@ def call_chunk(data, data1, data2): """ Mock call_chunk method """ - return {"": "ABC"} + return {"": "ABC"}, False @staticmethod def call_chunks(data): diff --git a/tests/pytests/unit/state/test_state_compiler.py b/tests/pytests/unit/state/test_state_compiler.py index c2a474f1892b..122a12aaa769 100644 --- a/tests/pytests/unit/state/test_state_compiler.py +++ b/tests/pytests/unit/state/test_state_compiler.py @@ -853,7 +853,7 @@ def test_call_chunk_sub_state_run(minion_opts): with patch("salt.state.State.call", return_value=mock_call_return): minion_opts["disabled_requisites"] = ["require"] state_obj = salt.state.State(minion_opts) - ret = state_obj.call_chunk(low_data, {}, []) + ret, _ = state_obj.call_chunk(low_data, {}, []) sub_state = ret.get(expected_sub_state_tag) assert sub_state assert sub_state["__run_num__"] == 1