Skip to content

Commit

Permalink
Merge pull request #155 from SvenskaSpel/allow-concurrent-distributors
Browse files Browse the repository at this point in the history
Allow concurrent distributors
  • Loading branch information
cyberw authored Dec 12, 2023
2 parents 6d977b8 + c096205 commit 7c4bffb
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 63 deletions.
29 changes: 20 additions & 9 deletions examples/distributor_ex.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,40 @@
# this example is a little more complex than it needs to be, but I wanted to highlight
# that it is entirely possible to have multiple distributors at the same time

from typing import Dict, List
from locust_plugins.mongoreader import MongoLRUReader
from locust_plugins.csvreader import CSVDictReader
from locust_plugins import distributor
from locust_plugins.csvreader import CSVDictReader, CSVReader
from locust_plugins.distributor import Distributor
from locust import HttpUser, task, run_single_user, events
from locust.runners import WorkerRunner

distributors = {}


@events.init.add_listener
def on_locust_init(environment, **_kwargs):
reader = None
ssn_reader = None
product_reader = None
if not isinstance(environment.runner, WorkerRunner):
product_reader = CSVReader("products.csv")
csv = True
if csv:
reader = CSVDictReader("ssn.tsv", delimiter="\t")
ssn_reader = CSVDictReader("ssn.tsv", delimiter="\t")
else:
reader = MongoLRUReader({"foo": "bar"}, "last_login")
distributor.register(environment, reader)
ssn_reader = MongoLRUReader({"foo": "bar"}, "last_login")
product_reader = CSVReader("products.csv")
distributors["customers"] = Distributor(environment, ssn_reader, "customers")
distributors["products"] = Distributor(environment, product_reader, "products")


class MyUser(HttpUser):
host = "http://www.example.com"

@task
def my_task(self):
customer = distributor.getdata(self)
self.client.get(f"/?{customer['ssn']}")
def my_task(self) -> None:
customer: Dict = next(distributors["customers"])
product: List[str] = next(distributors["products"])
self.client.get(f"/?customer={customer['ssn']}&product={product[0]}")


if __name__ == "__main__":
Expand Down
2 changes: 2 additions & 0 deletions examples/products.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
asdf1
qwerty2
2 changes: 1 addition & 1 deletion locust_plugins/csvreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Iterator, Dict


class CSVReader:
class CSVReader(Iterator):
"Read test data from csv file using an iterator"

def __init__(self, file, **kwargs):
Expand Down
102 changes: 50 additions & 52 deletions locust_plugins/distributor.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,59 @@
from typing import Dict, Iterator, Optional
import logging
from gevent.event import AsyncResult
from locust import User

import greenlet
from locust.env import Environment
from locust.runners import WorkerRunner

_results: Dict[int, AsyncResult] = {}
_iterator: Optional[Iterator[Dict]] = None


# received on master
def _distributor_request(environment: Environment, msg, **kwargs):
data = next(_iterator)
environment.runner.send_message(
"distributor_response",
{"payload": data, "user_id": msg.data["user_id"]},
client_id=msg.data["client_id"],
)


# received on worker
def _distributor_response(environment: Environment, msg, **kwargs):
_results[msg.data["user_id"]].set(msg.data)


def register(environment: Environment, iterator: Optional[Iterator[dict]]):
"""Register distributor method handlers and tie them to use the iterator that you pass.
iterator is not used on workers, so you can leave it as None there.
"""
global _iterator
_iterator = iterator

runner = environment.runner
assert iterator or isinstance(runner, WorkerRunner), "iterator is a mandatory parameter when not on a worker runner"
if runner:
runner.register_message("distributor_request", _distributor_request)
runner.register_message("distributor_response", _distributor_response)


def getdata(user: User) -> Dict:
"""Get the next data dict from iterator
Args:
user (User): current user object (we use the object id of the User to keep track of who's waiting for which data)
"""
if not user.environment.runner: # no need to do anything clever if there is no runner
assert _iterator, "Did you forget to call register() before trying to get data?"
return next(_iterator)

if id(user) in _results:
logging.warning("This user was already waiting for data. Strange.")

_results[id(user)] = AsyncResult()
runner = user.environment.runner
runner.send_message("distributor_request", {"user_id": id(user), "client_id": runner.client_id})
data = _results[id(user)].get()["payload"] # this waits for the reply
del _results[id(user)]
return data
class Distributor:
def __init__(self, environment: Environment, iterator: Optional[Iterator], name="distributor"):
"""Register distributor method handlers and tie them to use the iterator that you pass.
iterator is not used on workers, so you can leave it as None there.
"""
self.iterator = iterator
self.name = name
self.runner = environment.runner
assert iterator or isinstance(
self.runner, WorkerRunner
), "iterator is a mandatory parameter when not on a worker runner"
if self.runner:
# received on master
def _distributor_request(environment: Environment, msg, **kwargs):
data = next(self.iterator)
self.runner.send_message(
f"_{name}_response",
{"payload": data, "user_id": msg.data["user_id"]},
client_id=msg.data["client_id"],
)

# received on worker
def _distributor_response(environment: Environment, msg, **kwargs):
_results[msg.data["user_id"]].set(msg.data)

self.runner.register_message(f"_{name}_request", _distributor_request)
self.runner.register_message(f"_{name}_response", _distributor_response)

def __next__(self):
"""Get the next data dict from iterator
Args:
user (User): current user object (we use the object id of the User to keep track of who's waiting for which data)
"""
if not self.runner: # no need to do anything clever if there is no runner
assert self.iterator
return next(self.iterator)

gid = greenlet.getcurrent().minimal_ident # type: ignore

if gid in _results:
logging.warning("This user was already waiting for data. Strange.")

_results[gid] = AsyncResult()
self.runner.send_message(f"_{self.name}_request", {"user_id": gid, "client_id": self.runner.client_id})
data = _results[gid].get()["payload"] # this waits for the reply
del _results[gid]
return data
4 changes: 3 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ commands =
grep -m 1 'LCP' output.txt
bash -ec "! grep 'object has no attribute' output.txt"
bash -ec "(cd examples && PYTHONUNBUFFERED=1 locust -f distributor_ex.py --headless -t 5 -u 4 --processes 4) |& tee output.txt || true"
grep -m 1 '2099010101-1111' output.txt
grep -m 1 'customer=2099010101-1111' output.txt
grep -m 1 'product=asdf1' output.txt
bash -ec "! grep -q Traceback output.txt"
locust -f examples/jmeter_listener_example.py --headless -t 1
locust-compose up -d
sleep 15
Expand Down

0 comments on commit 7c4bffb

Please sign in to comment.