Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Initial commit of large event fix #67096

Open
wants to merge 2 commits into
base: 3006.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 75 additions & 30 deletions salt/minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -1663,20 +1663,28 @@ def _send_req_async(self, load, timeout):
)
raise salt.ext.tornado.gen.Return(ret)

def _fire_master(
self,
data=None,
tag=None,
events=None,
pretag=None,
timeout=60,
sync=True,
timeout_handler=None,
include_startup_grains=False,
):
@salt.ext.tornado.gen.coroutine
def _send_req_async_main(self, load, timeout):
"""
Fire an event on the master, or drop message if unable to send.
Send a request to the master's request server. To be called from the
top level process in the main thread only. Worker threads and
processess should call _send_req_sync or _send_req_async as nessecery.
"""
if self.opts["minion_sign_messages"]:
log.trace("Signing event to be published onto the bus.")
minion_privkey_path = os.path.join(self.opts["pki_dir"], "minion.pem")
sig = salt.crypt.sign_message(
minion_privkey_path, salt.serializers.msgpack.serialize(load)
)
load["sig"] = sig
ret = yield self.req_channel.send(
load, timeout=timeout, tries=self.opts["return_retry_tries"]
)
raise salt.ext.tornado.gen.Return(ret)

def _fire_master_prepare(
self, data, tag, events, pretag, include_startup_grains=False
):
load = {
"id": self.opts["id"],
"cmd": "_minion_event",
Expand All @@ -1701,7 +1709,52 @@ def _fire_master(
if k in self.opts["start_event_grains"]
}
load["grains"] = grains_to_add
return load

@salt.ext.tornado.gen.coroutine
def _fire_master_main(
self,
data=None,
tag=None,
events=None,
pretag=None,
timeout=60,
timeout_handler=None,
include_startup_grains=False,
):
load = self._fire_master_prepare(
data, tag, events, pretag, include_startup_grains
)
if timeout_handler is None:

def handle_timeout(*_):
log.info(
"fire_master failed: master could not be contacted. Request"
" timed out."
)
return True

timeout_handler = handle_timeout

yield self._send_req_async_main(load, timeout)

def _fire_master(
self,
data=None,
tag=None,
events=None,
pretag=None,
timeout=60,
sync=True,
timeout_handler=None,
include_startup_grains=False,
):
"""
Fire an event on the master, or drop message if unable to send.
"""
load = self._fire_master_prepare(
data, tag, events, pretag, include_startup_grains
)
if sync:
try:
self._send_req_sync(load, timeout)
Expand All @@ -1726,10 +1779,8 @@ def handle_timeout(*_):

timeout_handler = handle_timeout

with salt.ext.tornado.stack_context.ExceptionStackContext(timeout_handler):
# pylint: disable=unexpected-keyword-arg
self._send_req_async(load, timeout, callback=lambda f: None)
# pylint: enable=unexpected-keyword-arg
# Returning a coroutine, should be awaited
return self._send_req_async(load, timeout)
return True

@salt.ext.tornado.gen.coroutine
Expand Down Expand Up @@ -2306,12 +2357,7 @@ def timeout_handler(*_):
timeout_handler()
return ""
else:
with salt.ext.tornado.stack_context.ExceptionStackContext(timeout_handler):
# pylint: disable=unexpected-keyword-arg
ret_val = self._send_req_async(
load, timeout=timeout, callback=lambda f: None
)
# pylint: enable=unexpected-keyword-arg
ret_val = self._send_req_async(load, timeout=timeout)

log.trace("ret_val = %s", ret_val) # pylint: disable=no-member
return ret_val
Expand Down Expand Up @@ -2792,12 +2838,11 @@ def handle_event(self, package):
elif tag.startswith("fire_master"):
if self.connected:
log.debug("Forwarding master event tag=%s", data["tag"])
self._fire_master(
yield self._fire_master_main(
data["data"],
data["tag"],
data["events"],
data["pretag"],
sync=False,
)
elif tag.startswith(master_event(type="disconnected")) or tag.startswith(
master_event(type="failback")
Expand Down Expand Up @@ -2954,11 +2999,11 @@ def handle_event(self, package):
1
],
)
self._return_pub(data, ret_cmd="_return", sync=False)
yield self._return_pub(data, ret_cmd="_return", sync=False)
elif tag.startswith("_salt_error"):
if self.connected:
log.debug("Forwarding salt error event tag=%s", tag)
self._fire_master(data, tag, sync=False)
yield self._fire_master_main(data, tag)
elif tag.startswith("salt/auth/creds"):
key = tuple(data["key"])
log.debug(
Expand All @@ -2971,7 +3016,7 @@ def handle_event(self, package):
elif tag.startswith("__beacons_return"):
if self.connected:
log.debug("Firing beacons to master")
self._fire_master(events=data["beacons"])
yield self._fire_master_main(events=data["beacons"])

def cleanup_subprocesses(self):
"""
Expand Down Expand Up @@ -3373,12 +3418,12 @@ def fire_master_syndic_start(self):
self._fire_master(
"Syndic {} started at {}".format(self.opts["id"], time.asctime()),
"syndic_start",
sync=False,
sync=True, # sync needs to be false unless called from coroutine.
)
self._fire_master(
"Syndic {} started at {}".format(self.opts["id"], time.asctime()),
tagify([self.opts["id"], "start"], "syndic"),
sync=False,
sync=True, # sync needs to be false unless called from coroutine.
)

# TODO: clean up docs
Expand Down Expand Up @@ -3769,7 +3814,7 @@ def _forward_events(self):
"events": events,
"pretag": tagify(self.opts["id"], base="syndic"),
"timeout": self._return_retry_timer(),
"sync": False,
"sync": True, # Sync needs to be true unless being called from a coroutine
},
)
if self.delayed:
Expand Down
2 changes: 1 addition & 1 deletion salt/utils/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(
close_methods=None,
loop_kwarg=None,
):
self.io_loop = salt.ext.tornado.ioloop.IOLoop()
self.io_loop = salt.ext.tornado.ioloop.IOLoop(make_current=False)
if args is None:
args = []
if kwargs is None:
Expand Down
107 changes: 107 additions & 0 deletions tests/pytests/integration/minion/test_schedule_large_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import sys

import pytest

import salt.utils.event
import salt.utils.platform
import tests.support.helpers
from tests.conftest import FIPS_TESTRUN


@pytest.fixture
def salt_master_1(request, salt_factories):
config_defaults = {
"open_mode": True,
"transport": request.config.getoption("--transport"),
}
config_overrides = {
"interface": "127.0.0.1",
"fips_mode": FIPS_TESTRUN,
"publish_signing_algorithm": (
"PKCS1v15-SHA224" if FIPS_TESTRUN else "PKCS1v15-SHA1"
),
}

factory = salt_factories.salt_master_daemon(
"master-1",
defaults=config_defaults,
overrides=config_overrides,
extra_cli_arguments_after_first_start_failure=["--log-level=info"],
)
with factory.started(start_timeout=120):
yield factory


@pytest.fixture
def salt_minion_1(salt_master_1):
config_defaults = {
"transport": salt_master_1.config["transport"],
}
master_1_port = salt_master_1.config["ret_port"]
master_1_addr = salt_master_1.config["interface"]
config_overrides = {
"master": [
f"{master_1_addr}:{master_1_port}",
],
"test.foo": "baz",
"fips_mode": FIPS_TESTRUN,
"encryption_algorithm": "OAEP-SHA224" if FIPS_TESTRUN else "OAEP-SHA1",
"signing_algorithm": "PKCS1v15-SHA224" if FIPS_TESTRUN else "PKCS1v15-SHA1",
}
factory = salt_master_1.salt_minion_daemon(
"minion-1",
defaults=config_defaults,
overrides=config_overrides,
extra_cli_arguments_after_first_start_failure=["--log-level=info"],
)
with factory.started(start_timeout=120):
yield factory


@pytest.fixture
def script(salt_minion_1, tmp_path):
path = tmp_path / "script.py"
content = f"""
import salt.config
import salt.utils.event

opts = salt.config.minion_config('{salt_minion_1.config_file}')

payload = b'0' * 1048576000

big_event = dict()
for i in range(10000):
big_event[i] = payload = b'0' * 100

with salt.utils.event.get_event("minion", opts=opts) as event:
event.fire_master(big_event, 'bigevent')

"""
path.write_text(tests.support.helpers.dedent(content))
return path


# @pytest.mark.timeout_unless_on_windows(360)
def test_schedule_large_event(salt_master_1, salt_minion_1, script):
cli = salt_master_1.salt_cli(timeout=120)
ret = cli.run(
"schedule.add",
name="myjob",
function="cmd.run",
seconds=5,
job_args=f'["{sys.executable} {script}"]',
minion_tgt=salt_minion_1.id,
)
assert "result" in ret.data
assert ret.data["result"]
with salt.utils.event.get_event(
"master",
salt_master_1.config["sock_dir"],
salt_master_1.config["transport"],
salt_master_1.config,
listen=True,
) as event:
event = event.get_event(tag="bigevent", wait=15)
assert event
assert "data" in event
assert len(event["data"]) == 10000
Loading