diff --git a/examples/distributor_ex.py b/examples/distributor_ex.py index c04ded7..ea69da8 100644 --- a/examples/distributor_ex.py +++ b/examples/distributor_ex.py @@ -1,29 +1,40 @@ +# this example is a little more complex than it needs to be, but I wanted to highlight +# that it is entirely possible to have multiple distributors at the same time + +from typing import Dict, List from locust_plugins.mongoreader import MongoLRUReader -from locust_plugins.csvreader import CSVDictReader -from locust_plugins import distributor +from locust_plugins.csvreader import CSVDictReader, CSVReader +from locust_plugins.distributor import Distributor from locust import HttpUser, task, run_single_user, events from locust.runners import WorkerRunner +distributors = {} + @events.init.add_listener def on_locust_init(environment, **_kwargs): - reader = None + ssn_reader = None + product_reader = None if not isinstance(environment.runner, WorkerRunner): + product_reader = CSVReader("products.csv") csv = True if csv: - reader = CSVDictReader("ssn.tsv", delimiter="\t") + ssn_reader = CSVDictReader("ssn.tsv", delimiter="\t") else: - reader = MongoLRUReader({"foo": "bar"}, "last_login") - distributor.register(environment, reader) + ssn_reader = MongoLRUReader({"foo": "bar"}, "last_login") + product_reader = CSVReader("products.csv") + distributors["customers"] = Distributor(environment, ssn_reader, "customers") + distributors["products"] = Distributor(environment, product_reader, "products") class MyUser(HttpUser): host = "http://www.example.com" @task - def my_task(self): - customer = distributor.getdata(self) - self.client.get(f"/?{customer['ssn']}") + def my_task(self) -> None: + customer: Dict = next(distributors["customers"]) + product: List[str] = next(distributors["products"]) + self.client.get(f"/?customer={customer['ssn']}&product={product[0]}") if __name__ == "__main__": diff --git a/examples/products.csv b/examples/products.csv new file mode 100644 index 0000000..ffa04cb --- /dev/null +++ b/examples/products.csv @@ -0,0 +1,2 @@ +asdf1 +qwerty2 \ No newline at end of file diff --git a/locust_plugins/csvreader.py b/locust_plugins/csvreader.py index 81a49a6..5064c5f 100644 --- a/locust_plugins/csvreader.py +++ b/locust_plugins/csvreader.py @@ -2,7 +2,7 @@ from typing import Iterator, Dict -class CSVReader: +class CSVReader(Iterator): "Read test data from csv file using an iterator" def __init__(self, file, **kwargs): diff --git a/locust_plugins/distributor.py b/locust_plugins/distributor.py index 774f33f..ef0b5ff 100644 --- a/locust_plugins/distributor.py +++ b/locust_plugins/distributor.py @@ -1,61 +1,59 @@ from typing import Dict, Iterator, Optional import logging from gevent.event import AsyncResult -from locust import User - +import greenlet from locust.env import Environment from locust.runners import WorkerRunner _results: Dict[int, AsyncResult] = {} -_iterator: Optional[Iterator[Dict]] = None - - -# received on master -def _distributor_request(environment: Environment, msg, **kwargs): - data = next(_iterator) - environment.runner.send_message( - "distributor_response", - {"payload": data, "user_id": msg.data["user_id"]}, - client_id=msg.data["client_id"], - ) - - -# received on worker -def _distributor_response(environment: Environment, msg, **kwargs): - _results[msg.data["user_id"]].set(msg.data) - - -def register(environment: Environment, iterator: Optional[Iterator[dict]]): - """Register distributor method handlers and tie them to use the iterator that you pass. - - iterator is not used on workers, so you can leave it as None there. - """ - global _iterator - _iterator = iterator - - runner = environment.runner - assert iterator or isinstance(runner, WorkerRunner), "iterator is a mandatory parameter when not on a worker runner" - if runner: - runner.register_message("distributor_request", _distributor_request) - runner.register_message("distributor_response", _distributor_response) - - -def getdata(user: User) -> Dict: - """Get the next data dict from iterator - - Args: - user (User): current user object (we use the object id of the User to keep track of who's waiting for which data) - """ - if not user.environment.runner: # no need to do anything clever if there is no runner - assert _iterator, "Did you forget to call register() before trying to get data?" - return next(_iterator) - if id(user) in _results: - logging.warning("This user was already waiting for data. Strange.") - _results[id(user)] = AsyncResult() - runner = user.environment.runner - runner.send_message("distributor_request", {"user_id": id(user), "client_id": runner.client_id}) - data = _results[id(user)].get()["payload"] # this waits for the reply - del _results[id(user)] - return data +class Distributor: + def __init__(self, environment: Environment, iterator: Optional[Iterator], name="distributor"): + """Register distributor method handlers and tie them to use the iterator that you pass. + + iterator is not used on workers, so you can leave it as None there. + """ + self.iterator = iterator + self.name = name + self.runner = environment.runner + assert iterator or isinstance( + self.runner, WorkerRunner + ), "iterator is a mandatory parameter when not on a worker runner" + if self.runner: + # received on master + def _distributor_request(environment: Environment, msg, **kwargs): + data = next(self.iterator) + self.runner.send_message( + f"_{name}_response", + {"payload": data, "user_id": msg.data["user_id"]}, + client_id=msg.data["client_id"], + ) + + # received on worker + def _distributor_response(environment: Environment, msg, **kwargs): + _results[msg.data["user_id"]].set(msg.data) + + self.runner.register_message(f"_{name}_request", _distributor_request) + self.runner.register_message(f"_{name}_response", _distributor_response) + + def __next__(self): + """Get the next data dict from iterator + + Args: + user (User): current user object (we use the object id of the User to keep track of who's waiting for which data) + """ + if not self.runner: # no need to do anything clever if there is no runner + assert self.iterator + return next(self.iterator) + + gid = greenlet.getcurrent().minimal_ident # type: ignore + + if gid in _results: + logging.warning("This user was already waiting for data. Strange.") + + _results[gid] = AsyncResult() + self.runner.send_message(f"_{self.name}_request", {"user_id": gid, "client_id": self.runner.client_id}) + data = _results[gid].get()["payload"] # this waits for the reply + del _results[gid] + return data diff --git a/tox.ini b/tox.ini index 0b7b44a..2754fbe 100644 --- a/tox.ini +++ b/tox.ini @@ -33,7 +33,9 @@ commands = grep -m 1 'LCP' output.txt bash -ec "! grep 'object has no attribute' output.txt" bash -ec "(cd examples && PYTHONUNBUFFERED=1 locust -f distributor_ex.py --headless -t 5 -u 4 --processes 4) |& tee output.txt || true" - grep -m 1 '2099010101-1111' output.txt + grep -m 1 'customer=2099010101-1111' output.txt + grep -m 1 'product=asdf1' output.txt + bash -ec "! grep -q Traceback output.txt" locust -f examples/jmeter_listener_example.py --headless -t 1 locust-compose up -d sleep 15