Skip to content

Commit

Permalink
Merge pull request locustio#2583 from locustio/locustfile-distribution
Browse files Browse the repository at this point in the history
Locustfile distribution from master to worker
  • Loading branch information
DennisKrone authored Feb 8, 2024
2 parents 6146a70 + 589f42f commit bf02feb
Show file tree
Hide file tree
Showing 4 changed files with 296 additions and 12 deletions.
9 changes: 6 additions & 3 deletions docs/running-distributed.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,19 @@ Start locust in master mode on one machine::

locust -f my_locustfile.py --master

And then on each worker machine (make sure they also have a copy of the locustfile):
And then on each worker machine:

.. code-block:: bash
locust -f my_locustfile.py --worker --master-host <your master's address> --processes 4
locust -f - --worker --master-host <your master's address> --processes 4
.. note::
The ``-f -`` argument tells Locust to get the locustfile from master instead of from its local filesystem. This feature was introduced in Locust 2.23.0.
Multiple machines, using locust-swarm
=====================================
Both worker and master need access to the locustfile, and when you make changes to it you'll need to restart all Locust processes. `locust-swarm <https://github.com/SvenskaSpel/locust-swarm>`_ automates this for you. It also solves the issue of firewall/network access from workers to master using SSH tunnels (this is often a problem if the master is running on your workstation and workers are running in some datacenter).
When you make changes to the locustfile you'll need to restart all Locust processes. `locust-swarm <https://github.com/SvenskaSpel/locust-swarm>`_ automates this for you. It also solves the issue of firewall/network access from workers to master using SSH tunnels (this is often a problem if the master is running on your workstation and workers are running in some datacenter).
.. code-block:: bash
Expand Down
80 changes: 79 additions & 1 deletion locust/argument_parser.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
from __future__ import annotations

import locust
from locust import runners
from locust.rpc import Message, zmqrpc

import os
import platform
import socket
import sys
import textwrap
from typing import Any, NamedTuple
from uuid import uuid4

import configargparse
import gevent
from gevent.event import Event

version = locust.__version__

Expand Down Expand Up @@ -175,6 +181,46 @@ def get_empty_argument_parser(add_help=True, default_config_files=DEFAULT_CONFIG
return parser


def download_locustfile_from_master(master_host: str, master_port: int) -> str:
client_id = socket.gethostname() + "_download_locustfile_" + uuid4().hex
tempclient = zmqrpc.Client(master_host, master_port, client_id)
got_reply = False

def ask_for_locustfile():
while not got_reply:
tempclient.send(Message("locustfile", None, client_id))
gevent.sleep(1)

def wait_for_reply():
return tempclient.recv()

gevent.spawn(ask_for_locustfile)
try:
# wait same time as for client_ready ack. not that it is really relevant...
msg = gevent.spawn(wait_for_reply).get(timeout=runners.CONNECT_TIMEOUT * runners.CONNECT_RETRY_COUNT)
got_reply = True
except gevent.Timeout:
sys.stderr.write(
f"Got no locustfile response from master, gave up after {runners.CONNECT_TIMEOUT * runners.CONNECT_RETRY_COUNT}s\n"
)
sys.exit(1)

if msg.type != "locustfile":
sys.stderr.write(f"Got wrong message type from master {msg.type}\n")
sys.exit(1)

if "error" in msg.data:
sys.stderr.write(f"Got error from master: {msg.data['error']}\n")
sys.exit(1)

filename = msg.data["filename"]
with open(filename, "w") as local_file:
local_file.write(msg.data["contents"])

tempclient.close()
return filename


def parse_locustfile_option(args=None) -> list[str]:
"""
Construct a command line parser that is only used to parse the -f argument so that we can
Expand All @@ -197,9 +243,41 @@ def parse_locustfile_option(args=None) -> list[str]:
action="store_true",
default=False,
)
# the following arguments are only used for downloading the locustfile from master
parser.add_argument(
"--worker",
action="store_true",
env_var="LOCUST_MODE_WORKER",
)
parser.add_argument(
"--master", # this is just here to prevent argparse from giving the dreaded "ambiguous option: --master could match --master-host, --master-port"
action="store_true",
env_var="LOCUST_MODE_MASTER",
)
parser.add_argument(
"--master-host",
default="127.0.0.1",
env_var="LOCUST_MASTER_NODE_HOST",
)
parser.add_argument(
"--master-port",
type=int,
default=5557,
env_var="LOCUST_MASTER_NODE_PORT",
)

options, _ = parser.parse_known_args(args=args)

if options.locustfile == "-":
if not options.worker:
sys.stderr.write(
"locustfile was set to '-' (meaning to download from master) but --worker was not specified.\n"
)
sys.exit(1)
# having this in argument_parser module is a bit weird, but it needs to be done early
filename = download_locustfile_from_master(options.master_host, options.master_port)
return [filename]

# Comma separated string to list
locustfile_as_list = [locustfile.strip() for locustfile in options.locustfile.split(",")]

Expand Down Expand Up @@ -457,7 +535,7 @@ def setup_parser_arguments(parser):
worker_group.add_argument(
"--worker",
action="store_true",
help="Set locust to run in distributed mode with this process as worker",
help="Set locust to run in distributed mode with this process as worker. Can be combined with setting --locustfile to '-' to download it from master.",
env_var="LOCUST_MODE_WORKER",
)
worker_group.add_argument(
Expand Down
34 changes: 34 additions & 0 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,40 @@ def client_listener(self) -> NoReturn:
# emit a warning if the worker's clock seem to be out of sync with our clock
# if abs(time() - msg.data["time"]) > 5.0:
# warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")
elif msg.type == "locustfile":
logging.debug("Worker requested locust file")
assert self.environment.parsed_options
filename = (
"locustfile.py"
if self.environment.parsed_options.locustfile == "locustfile"
else self.environment.parsed_options.locustfile
)
try:
with open(filename) as f:
file_contents = f.read()
except Exception as e:
logger.error(
f"--locustfile must be a plain filename (not a module name) for file distribution to work {e}"
)
self.send_message(
"locustfile",
client_id=client_id,
data={
"error": f"locustfile parameter on master must be a plain filename (not a module name) (was '{filename}')"
},
)
else:
if getattr(self, "_old_file_contents", file_contents) != file_contents:
logger.warning(
"Locustfile contents changed on disk after first worker requested locustfile, sending new content. If you make any major changes (like changing User class names) you need to restart master."
)
self._old_file_contents = file_contents
self.send_message(
"locustfile",
client_id=client_id,
data={"filename": filename, "contents": file_contents},
)
continue
elif msg.type == "client_stopped":
if msg.node_id not in self.clients:
logger.warning(f"Received {msg.type} message from an unknown worker: {msg.node_id}.")
Expand Down
Loading

0 comments on commit bf02feb

Please sign in to comment.