From 78a775e998685c9cd7c12f38bbe1790547273dbc Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Fri, 13 Dec 2024 03:07:46 -0700 Subject: [PATCH 1/2] Add regression test for #66562 --- .../minion/test_schedule_large_event.py | 107 ++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 tests/pytests/integration/minion/test_schedule_large_event.py diff --git a/tests/pytests/integration/minion/test_schedule_large_event.py b/tests/pytests/integration/minion/test_schedule_large_event.py new file mode 100644 index 000000000000..3162cbfe5d6b --- /dev/null +++ b/tests/pytests/integration/minion/test_schedule_large_event.py @@ -0,0 +1,107 @@ +import sys + +import pytest + +import salt.utils.event +import salt.utils.platform +import tests.support.helpers +from tests.conftest import FIPS_TESTRUN + + +@pytest.fixture +def salt_master_1(request, salt_factories): + config_defaults = { + "open_mode": True, + "transport": request.config.getoption("--transport"), + } + config_overrides = { + "interface": "127.0.0.1", + "fips_mode": FIPS_TESTRUN, + "publish_signing_algorithm": ( + "PKCS1v15-SHA224" if FIPS_TESTRUN else "PKCS1v15-SHA1" + ), + } + + factory = salt_factories.salt_master_daemon( + "master-1", + defaults=config_defaults, + overrides=config_overrides, + extra_cli_arguments_after_first_start_failure=["--log-level=info"], + ) + with factory.started(start_timeout=120): + yield factory + + +@pytest.fixture +def salt_minion_1(salt_master_1): + config_defaults = { + "transport": salt_master_1.config["transport"], + } + master_1_port = salt_master_1.config["ret_port"] + master_1_addr = salt_master_1.config["interface"] + config_overrides = { + "master": [ + f"{master_1_addr}:{master_1_port}", + ], + "test.foo": "baz", + "fips_mode": FIPS_TESTRUN, + "encryption_algorithm": "OAEP-SHA224" if FIPS_TESTRUN else "OAEP-SHA1", + "signing_algorithm": "PKCS1v15-SHA224" if FIPS_TESTRUN else "PKCS1v15-SHA1", + } + factory = salt_master_1.salt_minion_daemon( + "minion-1", + defaults=config_defaults, + overrides=config_overrides, + extra_cli_arguments_after_first_start_failure=["--log-level=info"], + ) + with factory.started(start_timeout=120): + yield factory + + +@pytest.fixture +def script(salt_minion_1, tmp_path): + path = tmp_path / "script.py" + content = f""" + import salt.config + import salt.utils.event + + opts = salt.config.minion_config('{salt_minion_1.config_file}') + + payload = b'0' * 1048576000 + + big_event = dict() + for i in range(10000): + big_event[i] = payload = b'0' * 100 + + with salt.utils.event.get_event("minion", opts=opts) as event: + event.fire_master(big_event, 'bigevent') + + """ + path.write_text(tests.support.helpers.dedent(content)) + return path + + +# @pytest.mark.timeout_unless_on_windows(360) +def test_schedule_large_event(salt_master_1, salt_minion_1, script): + cli = salt_master_1.salt_cli(timeout=120) + ret = cli.run( + "schedule.add", + name="myjob", + function="cmd.run", + seconds=5, + job_args=f'["{sys.executable} {script}"]', + minion_tgt=salt_minion_1.id, + ) + assert "result" in ret.data + assert ret.data["result"] + with salt.utils.event.get_event( + "master", + salt_master_1.config["sock_dir"], + salt_master_1.config["transport"], + salt_master_1.config, + listen=True, + ) as event: + event = event.get_event(tag="bigevent", wait=15) + assert event + assert "data" in event + assert len(event["data"]) == 10000 From 80662811d193de4512edfd26e833e57420153f9b Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Thu, 12 Dec 2024 14:13:18 -0700 Subject: [PATCH 2/2] Initial commit of large event fix Clean up un-yielded coroutines --- salt/minion.py | 105 ++++++++++++++++++++++++++----------- salt/utils/asynchronous.py | 2 +- 2 files changed, 76 insertions(+), 31 deletions(-) diff --git a/salt/minion.py b/salt/minion.py index 2c2585637420..c2afbbf1f594 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -1663,20 +1663,28 @@ def _send_req_async(self, load, timeout): ) raise salt.ext.tornado.gen.Return(ret) - def _fire_master( - self, - data=None, - tag=None, - events=None, - pretag=None, - timeout=60, - sync=True, - timeout_handler=None, - include_startup_grains=False, - ): + @salt.ext.tornado.gen.coroutine + def _send_req_async_main(self, load, timeout): """ - Fire an event on the master, or drop message if unable to send. + Send a request to the master's request server. To be called from the + top level process in the main thread only. Worker threads and + processess should call _send_req_sync or _send_req_async as nessecery. """ + if self.opts["minion_sign_messages"]: + log.trace("Signing event to be published onto the bus.") + minion_privkey_path = os.path.join(self.opts["pki_dir"], "minion.pem") + sig = salt.crypt.sign_message( + minion_privkey_path, salt.serializers.msgpack.serialize(load) + ) + load["sig"] = sig + ret = yield self.req_channel.send( + load, timeout=timeout, tries=self.opts["return_retry_tries"] + ) + raise salt.ext.tornado.gen.Return(ret) + + def _fire_master_prepare( + self, data, tag, events, pretag, include_startup_grains=False + ): load = { "id": self.opts["id"], "cmd": "_minion_event", @@ -1701,7 +1709,52 @@ def _fire_master( if k in self.opts["start_event_grains"] } load["grains"] = grains_to_add + return load + + @salt.ext.tornado.gen.coroutine + def _fire_master_main( + self, + data=None, + tag=None, + events=None, + pretag=None, + timeout=60, + timeout_handler=None, + include_startup_grains=False, + ): + load = self._fire_master_prepare( + data, tag, events, pretag, include_startup_grains + ) + if timeout_handler is None: + + def handle_timeout(*_): + log.info( + "fire_master failed: master could not be contacted. Request" + " timed out." + ) + return True + + timeout_handler = handle_timeout + + yield self._send_req_async_main(load, timeout) + def _fire_master( + self, + data=None, + tag=None, + events=None, + pretag=None, + timeout=60, + sync=True, + timeout_handler=None, + include_startup_grains=False, + ): + """ + Fire an event on the master, or drop message if unable to send. + """ + load = self._fire_master_prepare( + data, tag, events, pretag, include_startup_grains + ) if sync: try: self._send_req_sync(load, timeout) @@ -1726,10 +1779,8 @@ def handle_timeout(*_): timeout_handler = handle_timeout - with salt.ext.tornado.stack_context.ExceptionStackContext(timeout_handler): - # pylint: disable=unexpected-keyword-arg - self._send_req_async(load, timeout, callback=lambda f: None) - # pylint: enable=unexpected-keyword-arg + # Returning a coroutine, should be awaited + return self._send_req_async(load, timeout) return True @salt.ext.tornado.gen.coroutine @@ -2306,12 +2357,7 @@ def timeout_handler(*_): timeout_handler() return "" else: - with salt.ext.tornado.stack_context.ExceptionStackContext(timeout_handler): - # pylint: disable=unexpected-keyword-arg - ret_val = self._send_req_async( - load, timeout=timeout, callback=lambda f: None - ) - # pylint: enable=unexpected-keyword-arg + ret_val = self._send_req_async(load, timeout=timeout) log.trace("ret_val = %s", ret_val) # pylint: disable=no-member return ret_val @@ -2792,12 +2838,11 @@ def handle_event(self, package): elif tag.startswith("fire_master"): if self.connected: log.debug("Forwarding master event tag=%s", data["tag"]) - self._fire_master( + yield self._fire_master_main( data["data"], data["tag"], data["events"], data["pretag"], - sync=False, ) elif tag.startswith(master_event(type="disconnected")) or tag.startswith( master_event(type="failback") @@ -2954,11 +2999,11 @@ def handle_event(self, package): 1 ], ) - self._return_pub(data, ret_cmd="_return", sync=False) + yield self._return_pub(data, ret_cmd="_return", sync=False) elif tag.startswith("_salt_error"): if self.connected: log.debug("Forwarding salt error event tag=%s", tag) - self._fire_master(data, tag, sync=False) + yield self._fire_master_main(data, tag) elif tag.startswith("salt/auth/creds"): key = tuple(data["key"]) log.debug( @@ -2971,7 +3016,7 @@ def handle_event(self, package): elif tag.startswith("__beacons_return"): if self.connected: log.debug("Firing beacons to master") - self._fire_master(events=data["beacons"]) + yield self._fire_master_main(events=data["beacons"]) def cleanup_subprocesses(self): """ @@ -3373,12 +3418,12 @@ def fire_master_syndic_start(self): self._fire_master( "Syndic {} started at {}".format(self.opts["id"], time.asctime()), "syndic_start", - sync=False, + sync=True, # sync needs to be false unless called from coroutine. ) self._fire_master( "Syndic {} started at {}".format(self.opts["id"], time.asctime()), tagify([self.opts["id"], "start"], "syndic"), - sync=False, + sync=True, # sync needs to be false unless called from coroutine. ) # TODO: clean up docs @@ -3769,7 +3814,7 @@ def _forward_events(self): "events": events, "pretag": tagify(self.opts["id"], base="syndic"), "timeout": self._return_retry_timer(), - "sync": False, + "sync": True, # Sync needs to be true unless being called from a coroutine }, ) if self.delayed: diff --git a/salt/utils/asynchronous.py b/salt/utils/asynchronous.py index f0048ff19102..911088a3c29b 100644 --- a/salt/utils/asynchronous.py +++ b/salt/utils/asynchronous.py @@ -50,7 +50,7 @@ def __init__( close_methods=None, loop_kwarg=None, ): - self.io_loop = salt.ext.tornado.ioloop.IOLoop() + self.io_loop = salt.ext.tornado.ioloop.IOLoop(make_current=False) if args is None: args = [] if kwargs is None: