From 57a26d6cfd91261ae160a6aa9b570477430e72d3 Mon Sep 17 00:00:00 2001 From: Andrew Baldwin Date: Thu, 20 Jun 2024 16:37:28 -0400 Subject: [PATCH 1/9] Allow worker to receive files or urls from master --- locust/argument_parser.py | 45 +++++++++++++++++++-------------------- locust/runners.py | 43 +++++++++++++++++++++++++------------ locust/util/url.py | 15 +++++++++++++ 3 files changed, 66 insertions(+), 37 deletions(-) create mode 100644 locust/util/url.py diff --git a/locust/argument_parser.py b/locust/argument_parser.py index c4ee468fea..82546dd480 100644 --- a/locust/argument_parser.py +++ b/locust/argument_parser.py @@ -15,7 +15,6 @@ import textwrap from collections import OrderedDict from typing import Any, NamedTuple -from urllib.parse import urlparse from uuid import uuid4 if sys.version_info >= (3, 11): @@ -27,6 +26,8 @@ import gevent import requests +from .util.url import is_url + version = locust.__version__ @@ -148,20 +149,6 @@ def _parse_locustfile_path(path: str) -> list[str]: return parsed_paths -def is_url(url: str) -> bool: - """ - Check if path is an url - """ - try: - result = urlparse(url) - if result.scheme == "https" or result.scheme == "http": - return True - else: - return False - except ValueError: - return False - - def download_locustfile_from_url(url: str) -> str: """ Attempt to download and save locustfile from url. @@ -271,14 +258,27 @@ def wait_for_reply(): sys.stderr.write(f"Got error from master: {msg.data['error']}\n") sys.exit(1) - filename = msg.data["filename"] - with open(os.path.join(tempfile.gettempdir(), filename), "w", encoding="utf-8") as locustfile: - locustfile.write(msg.data["contents"]) + tempclient.close() + return msg.data + - atexit.register(exit_handler, locustfile.name) +def parse_locustfile_paths_from_master(master_host, master_port): + locustfiles = download_locustfile_from_master(master_host, master_port).get("locustfiles", []) - tempclient.close() - return locustfile.name + def create_locustfile(file_to_create): + filename = file_to_create["filename"] + file_contents = file_to_create["contents"] + + with open(os.path.join(tempfile.gettempdir(), filename), "w", encoding="utf-8") as locustfile: + locustfile.write(file_contents) + + return locustfile.name + + locustfiles = [ + create_locustfile(locustfile) if "contents" in locustfile else locustfile for locustfile in locustfiles + ] + + return parse_locustfile_paths(locustfiles) def parse_locustfile_option(args=None) -> list[str]: @@ -339,8 +339,7 @@ def parse_locustfile_option(args=None) -> list[str]: ) sys.exit(1) # having this in argument_parser module is a bit weird, but it needs to be done early - filename = download_locustfile_from_master(options.master_host, options.master_port) - return [filename] + return parse_locustfile_paths_from_master(options.master_host, options.master_port) locustfile_list = [f.strip() for f in options.locustfile.split(",")] parsed_paths = parse_locustfile_paths(locustfile_list) diff --git a/locust/runners.py b/locust/runners.py index 31d741217e..cb6eaf4207 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -30,8 +30,16 @@ from .dispatch import UsersDispatcher from .exception import RPCError, RPCReceiveError, RPCSendError from .log import get_logs, greenlet_exception_logger -from .rpc import Message, rpc -from .stats import RequestStats, StatsError, setup_distributed_stats_event_listeners +from .rpc import ( + Message, + rpc, +) +from .stats import ( + RequestStats, + StatsError, + setup_distributed_stats_event_listeners, +) +from .util.url import is_url if TYPE_CHECKING: from . import User @@ -1026,31 +1034,38 @@ def client_listener(self) -> NoReturn: elif msg.type == "locustfile": logging.debug("Worker requested locust file") assert self.environment.parsed_options - filename = self.environment.parsed_options.locustfile + locustfile_list = [f.strip() for f in self.environment.parsed_options.locustfile.split(",")] + try: - with open(filename) as f: - file_contents = f.read() + locustfiles = [] + + for filename in locustfile_list: + if is_url(filename): + locustfiles.append(filename) + else: + with open(filename) as f: + filename = os.path.basename(filename) + file_contents = f.read() + + locustfiles.append({"filename": filename, "contents": file_contents}) except Exception as e: - logger.error( - f"--locustfile must be a full path to a single locustfile for file distribution to work {e}" - ) + error_message = "locustfile must be a full path to a single locustfile, a comma-separated list of .py files, or a URL for file distribution to work" + logger.error(f"{error_message} {e}") self.send_message( "locustfile", client_id=client_id, - data={ - "error": f"locustfile must be a full path to a single locustfile for file distribution to work (was '{filename}')" - }, + data={"error": f"{error_message} (was '{filename}')"}, ) else: - if getattr(self, "_old_file_contents", file_contents) != file_contents: + if hasattr(self, "_has_sent_locustfiles"): logger.warning( "Locustfile contents changed on disk after first worker requested locustfile, sending new content. If you make any major changes (like changing User class names) you need to restart master." ) - self._old_file_contents = file_contents + self._has_sent_locustfiles = True self.send_message( "locustfile", client_id=client_id, - data={"filename": os.path.basename(filename), "contents": file_contents}, + data={"locustfiles": locustfiles}, ) continue elif msg.type == "client_stopped": diff --git a/locust/util/url.py b/locust/util/url.py new file mode 100644 index 0000000000..5b2cf64a9b --- /dev/null +++ b/locust/util/url.py @@ -0,0 +1,15 @@ +from urllib.parse import urlparse + + +def is_url(url: str) -> bool: + """ + Check if path is an url + """ + try: + result = urlparse(url) + if result.scheme == "https" or result.scheme == "http": + return True + else: + return False + except ValueError: + return False From 751acb48417ede826f0fdd0d6374114a10dcd714 Mon Sep 17 00:00:00 2001 From: Andrew Baldwin Date: Thu, 20 Jun 2024 16:58:35 -0400 Subject: [PATCH 2/9] Add directory as file distribution option --- locust/argument_parser.py | 10 ++-------- locust/runners.py | 8 +++++++- locust/util/directory.py | 12 ++++++++++++ 3 files changed, 21 insertions(+), 9 deletions(-) create mode 100644 locust/util/directory.py diff --git a/locust/argument_parser.py b/locust/argument_parser.py index 82546dd480..a00bbe1f6e 100644 --- a/locust/argument_parser.py +++ b/locust/argument_parser.py @@ -26,6 +26,7 @@ import gevent import requests +from .util.directory import get_abspaths_in from .util.url import is_url version = locust.__version__ @@ -126,14 +127,7 @@ def _parse_locustfile_path(path: str) -> list[str]: parsed_paths.append(download_locustfile_from_url(path)) elif os.path.isdir(path): # Find all .py files in directory tree - for root, _dirs, fs in os.walk(path): - parsed_paths.extend( - [ - os.path.abspath(os.path.join(root, f)) - for f in fs - if os.path.isfile(os.path.join(root, f)) and f.endswith(".py") and not f.startswith("_") - ] - ) + parsed_paths.extend(get_abspaths_in(path, extension=".py")) if not parsed_paths: sys.stderr.write(f"Could not find any locustfiles in directory '{path}'") sys.exit(1) diff --git a/locust/runners.py b/locust/runners.py index cb6eaf4207..4c42324ff8 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -39,6 +39,7 @@ StatsError, setup_distributed_stats_event_listeners, ) +from .util.directory import get_abspaths_in from .util.url import is_url if TYPE_CHECKING: @@ -1034,7 +1035,12 @@ def client_listener(self) -> NoReturn: elif msg.type == "locustfile": logging.debug("Worker requested locust file") assert self.environment.parsed_options - locustfile_list = [f.strip() for f in self.environment.parsed_options.locustfile.split(",")] + locustfile_options = self.environment.parsed_options.locustfile.split(",") + locustfile_list = [f.strip() for f in locustfile_options if not os.path.isdir(f)] + + for f in locustfile_options: + if os.path.isdir(f): + locustfile_list.extend(get_abspaths_in(f, extension=".py")) try: locustfiles = [] diff --git a/locust/util/directory.py b/locust/util/directory.py new file mode 100644 index 0000000000..853eb77b38 --- /dev/null +++ b/locust/util/directory.py @@ -0,0 +1,12 @@ +import os + + +def get_abspaths_in(path, extension=None): + return [ + os.path.abspath(os.path.join(root, f)) + for root, _dirs, fs in os.walk(path) + for f in fs + if os.path.isfile(os.path.join(root, f)) + and (f.endswith(extension) or extension is None) + and not f.startswith("_") + ] From 2f3e48ef7705905638d7601cf8fa44cd9e6799be Mon Sep 17 00:00:00 2001 From: Andrew Baldwin Date: Thu, 4 Jul 2024 13:36:49 -0400 Subject: [PATCH 3/9] Add parsed_locustfiles to environment --- locust/argument_parser.py | 26 +++++++++++++------------- locust/env.py | 3 +++ locust/main.py | 3 +++ locust/runners.py | 28 ++++++++-------------------- 4 files changed, 27 insertions(+), 33 deletions(-) diff --git a/locust/argument_parser.py b/locust/argument_parser.py index a00bbe1f6e..31aa97949e 100644 --- a/locust/argument_parser.py +++ b/locust/argument_parser.py @@ -256,21 +256,21 @@ def wait_for_reply(): return msg.data -def parse_locustfile_paths_from_master(master_host, master_port): - locustfiles = download_locustfile_from_master(master_host, master_port).get("locustfiles", []) +def parse_locustfiles_from_master(master_host, master_port): + locustfile_sources = download_locustfile_from_master(master_host, master_port).get("locustfiles", []) + locustfiles = [] - def create_locustfile(file_to_create): - filename = file_to_create["filename"] - file_contents = file_to_create["contents"] + for source in locustfile_sources: + if "contents" in source: + filename = source["filename"] + file_contents = source["contents"] - with open(os.path.join(tempfile.gettempdir(), filename), "w", encoding="utf-8") as locustfile: - locustfile.write(file_contents) + with open(os.path.join(tempfile.gettempdir(), filename), "w", encoding="utf-8") as locustfile: + locustfile.write(file_contents) - return locustfile.name - - locustfiles = [ - create_locustfile(locustfile) if "contents" in locustfile else locustfile for locustfile in locustfiles - ] + locustfiles.append(locustfile.name) + else: + locustfiles.append(source) return parse_locustfile_paths(locustfiles) @@ -333,7 +333,7 @@ def parse_locustfile_option(args=None) -> list[str]: ) sys.exit(1) # having this in argument_parser module is a bit weird, but it needs to be done early - return parse_locustfile_paths_from_master(options.master_host, options.master_port) + return parse_locustfiles_from_master(options.master_host, options.master_port) locustfile_list = [f.strip() for f in options.locustfile.split(",")] parsed_paths = parse_locustfile_paths(locustfile_list) diff --git a/locust/env.py b/locust/env.py index 911aba5514..fc8a4639dd 100644 --- a/locust/env.py +++ b/locust/env.py @@ -33,6 +33,7 @@ def __init__( stop_timeout: float | None = None, catch_exceptions=True, parsed_options: Namespace | None = None, + parsed_locustfiles: list[str] | None = None, available_user_classes: dict[str, User] | None = None, available_shape_classes: dict[str, LoadTestShape] | None = None, available_user_tasks: dict[str, list[TaskSet | Callable]] | None = None, @@ -91,6 +92,8 @@ def __init__( """ self.parsed_options = parsed_options """Reference to the parsed command line options (used to pre-populate fields in Web UI). When using Locust as a library, this should either be `None` or an object created by `argument_parser.parse_args()`""" + self.parsed_locustfiles = parsed_locustfiles + """A list of all locustfiles for the test""" self.available_user_classes = available_user_classes """List of the available User Classes to pick from in the UserClass Picker""" self.available_shape_classes = available_shape_classes diff --git a/locust/main.py b/locust/main.py index 9eb06d5aaf..33495af43a 100644 --- a/locust/main.py +++ b/locust/main.py @@ -59,6 +59,7 @@ def create_environment( events=None, shape_class=None, locustfile=None, + parsed_locustfiles=None, available_user_classes=None, available_shape_classes=None, available_user_tasks=None, @@ -74,6 +75,7 @@ def create_environment( host=options.host, reset_stats=options.reset_stats, parsed_options=options, + parsed_locustfiles=parsed_locustfiles, available_user_classes=available_user_classes, available_shape_classes=available_shape_classes, available_user_tasks=available_user_tasks, @@ -349,6 +351,7 @@ def kill_workers(children): events=locust.events, shape_class=shape_class, locustfile=locustfile_path, + parsed_locustfiles=locustfiles, available_user_classes=available_user_classes, available_shape_classes=available_shape_classes, available_user_tasks=available_user_tasks, diff --git a/locust/runners.py b/locust/runners.py index 4c42324ff8..89fd5e86f8 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -30,15 +30,8 @@ from .dispatch import UsersDispatcher from .exception import RPCError, RPCReceiveError, RPCSendError from .log import get_logs, greenlet_exception_logger -from .rpc import ( - Message, - rpc, -) -from .stats import ( - RequestStats, - StatsError, - setup_distributed_stats_event_listeners, -) +from .rpc import Message, rpc +from .stats import RequestStats, StatsError, setup_distributed_stats_event_listeners from .util.directory import get_abspaths_in from .util.url import is_url @@ -1034,16 +1027,16 @@ def client_listener(self) -> NoReturn: # warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.") elif msg.type == "locustfile": logging.debug("Worker requested locust file") - assert self.environment.parsed_options - locustfile_options = self.environment.parsed_options.locustfile.split(",") + assert self.environment.parsed_locustfiles + locustfile_options = self.environment.parsed_locustfiles locustfile_list = [f.strip() for f in locustfile_options if not os.path.isdir(f)] - for f in locustfile_options: - if os.path.isdir(f): - locustfile_list.extend(get_abspaths_in(f, extension=".py")) + for locustfile_option in locustfile_options: + if os.path.isdir(locustfile_option): + locustfile_list.extend(get_abspaths_in(locustfile_option, extension=".py")) try: - locustfiles = [] + locustfiles: list[str | dict[str, str]] = [] for filename in locustfile_list: if is_url(filename): @@ -1063,11 +1056,6 @@ def client_listener(self) -> NoReturn: data={"error": f"{error_message} (was '{filename}')"}, ) else: - if hasattr(self, "_has_sent_locustfiles"): - logger.warning( - "Locustfile contents changed on disk after first worker requested locustfile, sending new content. If you make any major changes (like changing User class names) you need to restart master." - ) - self._has_sent_locustfiles = True self.send_message( "locustfile", client_id=client_id, From d7eed41fe4748b78642cbe7fe8b21f83ceeaf5f0 Mon Sep 17 00:00:00 2001 From: Andrew Baldwin Date: Thu, 4 Jul 2024 13:55:48 -0400 Subject: [PATCH 4/9] Add worker version mismatch warning --- locust/argument_parser.py | 2 +- locust/runners.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/locust/argument_parser.py b/locust/argument_parser.py index 31aa97949e..8b10b4d9e5 100644 --- a/locust/argument_parser.py +++ b/locust/argument_parser.py @@ -219,7 +219,7 @@ def download_locustfile_from_master(master_host: str, master_port: int) -> str: def ask_for_locustfile(): while not got_reply: - tempclient.send(Message("locustfile", None, client_id)) + tempclient.send(Message("locustfile", {"version": version}, client_id)) gevent.sleep(1) def log_warning(): diff --git a/locust/runners.py b/locust/runners.py index 89fd5e86f8..ae30bda97c 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -1026,6 +1026,11 @@ def client_listener(self) -> NoReturn: # if abs(time() - msg.data["time"]) > 5.0: # warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.") elif msg.type == "locustfile": + if msg.data["version"][0:4] == __version__[0:4]: + logger.debug( + f"A worker ({msg.node_id}) running a different patch version ({msg.data['version']}) connected, master version is {__version__}" + ) + logging.debug("Worker requested locust file") assert self.environment.parsed_locustfiles locustfile_options = self.environment.parsed_locustfiles From 901cef62294bdf2f7554d13a03e2dd97c7e67ce4 Mon Sep 17 00:00:00 2001 From: Andrew Baldwin Date: Thu, 4 Jul 2024 16:50:15 -0400 Subject: [PATCH 5/9] Refactor argument parser to share parsed_paths --- locust/argument_parser.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/locust/argument_parser.py b/locust/argument_parser.py index 8b10b4d9e5..287f76b900 100644 --- a/locust/argument_parser.py +++ b/locust/argument_parser.py @@ -253,11 +253,10 @@ def wait_for_reply(): sys.exit(1) tempclient.close() - return msg.data + return msg.data.get("locustfiles", []) -def parse_locustfiles_from_master(master_host, master_port): - locustfile_sources = download_locustfile_from_master(master_host, master_port).get("locustfiles", []) +def parse_locustfiles_from_master(locustfile_sources): locustfiles = [] for source in locustfile_sources: @@ -272,7 +271,7 @@ def parse_locustfiles_from_master(master_host, master_port): else: locustfiles.append(source) - return parse_locustfile_paths(locustfiles) + return locustfiles def parse_locustfile_option(args=None) -> list[str]: @@ -333,9 +332,11 @@ def parse_locustfile_option(args=None) -> list[str]: ) sys.exit(1) # having this in argument_parser module is a bit weird, but it needs to be done early - return parse_locustfiles_from_master(options.master_host, options.master_port) + locustfile_sources = download_locustfile_from_master(options.master_host, options.master_port) + locustfile_list = parse_locustfiles_from_master(locustfile_sources) + else: + locustfile_list = [f.strip() for f in options.locustfile.split(",")] - locustfile_list = [f.strip() for f in options.locustfile.split(",")] parsed_paths = parse_locustfile_paths(locustfile_list) if not parsed_paths: From b02474c8222d83e96862c743c021b04682fad222 Mon Sep 17 00:00:00 2001 From: Andrew Baldwin Date: Thu, 4 Jul 2024 17:46:11 -0400 Subject: [PATCH 6/9] Increase test timeout --- locust/test/test_main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/locust/test/test_main.py b/locust/test/test_main.py index f50625257a..bfd38403d9 100644 --- a/locust/test/test_main.py +++ b/locust/test/test_main.py @@ -1846,7 +1846,7 @@ def t(self): ) try: - stdout = proc_worker.communicate(timeout=5)[0] + stdout = proc_worker.communicate(timeout=15)[0] self.assertIn( "Got error from master: locustfile must be a full path to a single locustfile for file distribution to work", stdout, From 62001189533bf29fb8ba5313ec2b431a757a8c89 Mon Sep 17 00:00:00 2001 From: Andrew Baldwin Date: Fri, 5 Jul 2024 10:15:38 -0400 Subject: [PATCH 7/9] Fix tests --- locust/test/test_main.py | 85 +++++++++++++++++----------------------- 1 file changed, 37 insertions(+), 48 deletions(-) diff --git a/locust/test/test_main.py b/locust/test/test_main.py index bfd38403d9..765249e457 100644 --- a/locust/test/test_main.py +++ b/locust/test/test_main.py @@ -1721,11 +1721,6 @@ def t(self): text=True, ) gevent.sleep(2) - # modify the locustfile to trigger warning about file change when the second worker connects - with open(mocked.file_path, "w") as locustfile: - locustfile.write(LOCUSTFILE_CONTENT) - locustfile.write("\n# New comment\n") - gevent.sleep(2) proc_worker2 = subprocess.Popen( [ "locust", @@ -1742,7 +1737,6 @@ def t(self): stdout_worker2 = proc_worker2.communicate()[0] self.assertIn('All users spawned: {"User1": 1} (1 total users)', stdout) - self.assertIn("Locustfile contents changed on disk after first worker requested locustfile", stdout) self.assertIn("Shutting down (exit code 0)", stdout) self.assertNotIn("Traceback", stdout) self.assertNotIn("Traceback", stdout_worker) @@ -1818,49 +1812,44 @@ def t(self): """ ) with mock_locustfile(content=LOCUSTFILE_CONTENT) as mocked: - with mock_locustfile() as mocked2: - proc = subprocess.Popen( - [ - "locust", - "-f", - f"{mocked.file_path}, {mocked2.file_path}", - "--headless", - "--master", - "-L", - "debug", - ], - stderr=STDOUT, - stdout=PIPE, - text=True, - ) - proc_worker = subprocess.Popen( - [ - "locust", - "-f", - "-", - "--worker", - ], - stderr=STDOUT, - stdout=PIPE, - text=True, - ) + proc = subprocess.Popen( + [ + "locust", + "-f", + f"{mocked.file_path},{mocked.file_path}", + "--headless", + "--master", + "--expect-workers", + "1", + "-t", + "1s", + ], + stderr=STDOUT, + stdout=PIPE, + text=True, + ) + proc_worker = subprocess.Popen( + [ + "locust", + "-f", + "-", + "--worker", + ], + stderr=STDOUT, + stdout=PIPE, + text=True, + ) - try: - stdout = proc_worker.communicate(timeout=15)[0] - self.assertIn( - "Got error from master: locustfile must be a full path to a single locustfile for file distribution to work", - stdout, - ) - proc.kill() - master_stdout = proc.communicate()[0] - self.assertIn( - "--locustfile must be a full path to a single locustfile for file distribution", master_stdout - ) - except Exception: - proc.kill() - proc_worker.kill() - stdout, worker_stderr = proc_worker.communicate() - assert False, f"worker never finished: {stdout}" + stdout = proc.communicate()[0] + stdout_worker = proc_worker.communicate()[0] + + self.assertIn('All users spawned: {"User1": 1} (1 total users)', stdout) + self.assertIn("Shutting down (exit code 0)", stdout) + self.assertNotIn("Traceback", stdout) + self.assertNotIn("Traceback", stdout_worker) + + self.assertEqual(0, proc.returncode) + self.assertEqual(0, proc_worker.returncode) def test_json_schema(self): LOCUSTFILE_CONTENT = textwrap.dedent( From 25922e8fd4227c4ee5087b7106241b43cf9598b4 Mon Sep 17 00:00:00 2001 From: Andrew Baldwin Date: Fri, 5 Jul 2024 10:41:06 -0400 Subject: [PATCH 8/9] Add type hint --- locust/argument_parser.py | 2 +- locust/test/test_main.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/locust/argument_parser.py b/locust/argument_parser.py index 287f76b900..2f202bd1e6 100644 --- a/locust/argument_parser.py +++ b/locust/argument_parser.py @@ -256,7 +256,7 @@ def wait_for_reply(): return msg.data.get("locustfiles", []) -def parse_locustfiles_from_master(locustfile_sources): +def parse_locustfiles_from_master(locustfile_sources) -> list[str]: locustfiles = [] for source in locustfile_sources: diff --git a/locust/test/test_main.py b/locust/test/test_main.py index 765249e457..8a84a74437 100644 --- a/locust/test/test_main.py +++ b/locust/test/test_main.py @@ -1828,6 +1828,7 @@ def t(self): stdout=PIPE, text=True, ) + gevent.sleep(2) proc_worker = subprocess.Popen( [ "locust", From 1a3a9dc5bb38e9e039e4520f6707658cf6485ae1 Mon Sep 17 00:00:00 2001 From: Andrew Baldwin Date: Mon, 8 Jul 2024 07:49:17 -0400 Subject: [PATCH 9/9] Remove version --- package.json | 1 - 1 file changed, 1 deletion(-) diff --git a/package.json b/package.json index 033f9bd91e..d4c0299fa3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,5 @@ { "name": "locust", - "version": "2.16.1", "private": true, "license": "MIT", "scripts": {