Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow concurrent distributors #155

Merged
merged 5 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions examples/distributor_ex.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,40 @@
# this example is a little more complex than it needs to be, but I wanted to highlight
# that it is entirely possible to have multiple distributors at the same time

from typing import Dict, List
from locust_plugins.mongoreader import MongoLRUReader
from locust_plugins.csvreader import CSVDictReader
from locust_plugins import distributor
from locust_plugins.csvreader import CSVDictReader, CSVReader
from locust_plugins.distributor import Distributor
from locust import HttpUser, task, run_single_user, events
from locust.runners import WorkerRunner

distributors = {}


@events.init.add_listener
def on_locust_init(environment, **_kwargs):
reader = None
ssn_reader = None
product_reader = None
if not isinstance(environment.runner, WorkerRunner):
product_reader = CSVReader("products.csv")
csv = True
if csv:
reader = CSVDictReader("ssn.tsv", delimiter="\t")
ssn_reader = CSVDictReader("ssn.tsv", delimiter="\t")
else:
reader = MongoLRUReader({"foo": "bar"}, "last_login")
distributor.register(environment, reader)
ssn_reader = MongoLRUReader({"foo": "bar"}, "last_login")
product_reader = CSVReader("products.csv")
distributors["customers"] = Distributor(environment, ssn_reader, "customers")
distributors["products"] = Distributor(environment, product_reader, "products")


class MyUser(HttpUser):
host = "http://www.example.com"

@task
def my_task(self):
customer = distributor.getdata(self)
self.client.get(f"/?{customer['ssn']}")
def my_task(self) -> None:
customer: Dict = next(distributors["customers"])
product: List[str] = next(distributors["products"])
self.client.get(f"/?customer={customer['ssn']}&product={product[0]}")


if __name__ == "__main__":
Expand Down
2 changes: 2 additions & 0 deletions examples/products.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
asdf1
qwerty2
2 changes: 1 addition & 1 deletion locust_plugins/csvreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Iterator, Dict


class CSVReader:
class CSVReader(Iterator):
"Read test data from csv file using an iterator"

def __init__(self, file, **kwargs):
Expand Down
102 changes: 50 additions & 52 deletions locust_plugins/distributor.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,59 @@
from typing import Dict, Iterator, Optional
import logging
from gevent.event import AsyncResult
from locust import User

import greenlet
from locust.env import Environment
from locust.runners import WorkerRunner

_results: Dict[int, AsyncResult] = {}
_iterator: Optional[Iterator[Dict]] = None


# received on master
def _distributor_request(environment: Environment, msg, **kwargs):
data = next(_iterator)
environment.runner.send_message(
"distributor_response",
{"payload": data, "user_id": msg.data["user_id"]},
client_id=msg.data["client_id"],
)


# received on worker
def _distributor_response(environment: Environment, msg, **kwargs):
_results[msg.data["user_id"]].set(msg.data)


def register(environment: Environment, iterator: Optional[Iterator[dict]]):
"""Register distributor method handlers and tie them to use the iterator that you pass.

iterator is not used on workers, so you can leave it as None there.
"""
global _iterator
_iterator = iterator

runner = environment.runner
assert iterator or isinstance(runner, WorkerRunner), "iterator is a mandatory parameter when not on a worker runner"
if runner:
runner.register_message("distributor_request", _distributor_request)
runner.register_message("distributor_response", _distributor_response)


def getdata(user: User) -> Dict:
"""Get the next data dict from iterator

Args:
user (User): current user object (we use the object id of the User to keep track of who's waiting for which data)
"""
if not user.environment.runner: # no need to do anything clever if there is no runner
assert _iterator, "Did you forget to call register() before trying to get data?"
return next(_iterator)

if id(user) in _results:
logging.warning("This user was already waiting for data. Strange.")

_results[id(user)] = AsyncResult()
runner = user.environment.runner
runner.send_message("distributor_request", {"user_id": id(user), "client_id": runner.client_id})
data = _results[id(user)].get()["payload"] # this waits for the reply
del _results[id(user)]
return data
class Distributor:
def __init__(self, environment: Environment, iterator: Optional[Iterator], name="distributor"):
"""Register distributor method handlers and tie them to use the iterator that you pass.

iterator is not used on workers, so you can leave it as None there.
"""
self.iterator = iterator
self.name = name
self.runner = environment.runner
assert iterator or isinstance(
self.runner, WorkerRunner
), "iterator is a mandatory parameter when not on a worker runner"
if self.runner:
# received on master
def _distributor_request(environment: Environment, msg, **kwargs):
data = next(self.iterator)
self.runner.send_message(
f"_{name}_response",
{"payload": data, "user_id": msg.data["user_id"]},
client_id=msg.data["client_id"],
)

# received on worker
def _distributor_response(environment: Environment, msg, **kwargs):
_results[msg.data["user_id"]].set(msg.data)

self.runner.register_message(f"_{name}_request", _distributor_request)
self.runner.register_message(f"_{name}_response", _distributor_response)

def __next__(self):
"""Get the next data dict from iterator

Args:
user (User): current user object (we use the object id of the User to keep track of who's waiting for which data)
"""
if not self.runner: # no need to do anything clever if there is no runner
assert self.iterator
return next(self.iterator)

gid = greenlet.getcurrent().minimal_ident # type: ignore

if gid in _results:
logging.warning("This user was already waiting for data. Strange.")

_results[gid] = AsyncResult()
self.runner.send_message(f"_{self.name}_request", {"user_id": gid, "client_id": self.runner.client_id})
data = _results[gid].get()["payload"] # this waits for the reply
del _results[gid]
return data
4 changes: 3 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ commands =
grep -m 1 'LCP' output.txt
bash -ec "! grep 'object has no attribute' output.txt"
bash -ec "(cd examples && PYTHONUNBUFFERED=1 locust -f distributor_ex.py --headless -t 5 -u 4 --processes 4) |& tee output.txt || true"
grep -m 1 '2099010101-1111' output.txt
grep -m 1 'customer=2099010101-1111' output.txt
grep -m 1 'product=asdf1' output.txt
bash -ec "! grep -q Traceback output.txt"
locust -f examples/jmeter_listener_example.py --headless -t 1
locust-compose up -d
sleep 15
Expand Down
Loading