Skip to content

Commit

Permalink
Change distributor to be a class instead of just a module. Make it it…
Browse files Browse the repository at this point in the history
…erable and stop requiring a user instance (instead relying on greenlet id). Add a customizable name to differentiate messages between different Distributor instances.
  • Loading branch information
cyberw committed Dec 12, 2023
1 parent d8ef8c1 commit df78122
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 60 deletions.
29 changes: 20 additions & 9 deletions examples/distributor_ex.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,40 @@
# this example is a little more complex than it needs to be, but I wanted to highlight
# that it is entirely possible to have multiple distributors at the same time

from typing import Dict, List
from locust_plugins.mongoreader import MongoLRUReader
from locust_plugins.csvreader import CSVDictReader
from locust_plugins import distributor
from locust_plugins.csvreader import CSVDictReader, CSVReader
from locust_plugins.distributor import Distributor
from locust import HttpUser, task, run_single_user, events
from locust.runners import WorkerRunner

distributors = {}


@events.init.add_listener
def on_locust_init(environment, **_kwargs):
reader = None
ssn_reader = None
product_reader = None
if not isinstance(environment.runner, WorkerRunner):
product_reader = CSVReader("products.csv")
csv = True
if csv:
reader = CSVDictReader("ssn.tsv", delimiter="\t")
ssn_reader = CSVDictReader("ssn.tsv", delimiter="\t")
else:
reader = MongoLRUReader({"foo": "bar"}, "last_login")
distributor.register(environment, reader)
ssn_reader = MongoLRUReader({"foo": "bar"}, "last_login")
product_reader = CSVReader("products.csv")
distributors["customers"] = Distributor(environment, ssn_reader, "customers")
distributors["products"] = Distributor(environment, product_reader, "products")


class MyUser(HttpUser):
host = "http://www.example.com"

@task
def my_task(self):
customer = distributor.getdata(self)
self.client.get(f"/?{customer['ssn']}")
def my_task(self) -> None:
customer: Dict = next(distributors["customers"])
product: List[str] = next(distributors["products"])
self.client.get(f"/?customer={customer['ssn']}&product={product[0]}")


if __name__ == "__main__":
Expand Down
101 changes: 50 additions & 51 deletions locust_plugins/distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,59 @@
import logging
from gevent.event import AsyncResult
from locust import User

import greenlet
from locust.env import Environment
from locust.runners import WorkerRunner

_results: Dict[int, AsyncResult] = {}
_iterator: Optional[Iterator[Dict]] = None


# received on master
def _distributor_request(environment: Environment, msg, **kwargs):
data = next(_iterator)
environment.runner.send_message(
"distributor_response",
{"payload": data, "user_id": msg.data["user_id"]},
client_id=msg.data["client_id"],
)


# received on worker
def _distributor_response(environment: Environment, msg, **kwargs):
_results[msg.data["user_id"]].set(msg.data)


def register(environment: Environment, iterator: Optional[Iterator[dict]]):
"""Register distributor method handlers and tie them to use the iterator that you pass.
iterator is not used on workers, so you can leave it as None there.
"""
global _iterator
_iterator = iterator

runner = environment.runner
assert iterator or isinstance(runner, WorkerRunner), "iterator is a mandatory parameter when not on a worker runner"
if runner:
runner.register_message("distributor_request", _distributor_request)
runner.register_message("distributor_response", _distributor_response)


def getdata(user: User) -> Dict:
"""Get the next data dict from iterator
Args:
user (User): current user object (we use the object id of the User to keep track of who's waiting for which data)
"""
if not user.environment.runner: # no need to do anything clever if there is no runner
assert _iterator, "Did you forget to call register() before trying to get data?"
return next(_iterator)

if id(user) in _results:
logging.warning("This user was already waiting for data. Strange.")

_results[id(user)] = AsyncResult()
runner = user.environment.runner
runner.send_message("distributor_request", {"user_id": id(user), "client_id": runner.client_id})
data = _results[id(user)].get()["payload"] # this waits for the reply
del _results[id(user)]
return data
class Distributor:
def __init__(self, environment: Environment, iterator: Optional[Iterator], name="distributor"):
"""Register distributor method handlers and tie them to use the iterator that you pass.
iterator is not used on workers, so you can leave it as None there.
"""
self.iterator = iterator
self.name = name
self.runner = environment.runner
assert iterator or isinstance(
self.runner, WorkerRunner
), "iterator is a mandatory parameter when not on a worker runner"
if self.runner:
# received on master
def _distributor_request(environment: Environment, msg, **kwargs):
data = next(self.iterator)
self.runner.send_message(
f"_{name}_response",
{"payload": data, "user_id": msg.data["user_id"]},
client_id=msg.data["client_id"],
)

# received on worker
def _distributor_response(environment: Environment, msg, **kwargs):
_results[msg.data["user_id"]].set(msg.data)

self.runner.register_message(f"_{name}_request", _distributor_request)
self.runner.register_message(f"_{name}_response", _distributor_response)

def __next__(self):
"""Get the next data dict from iterator
Args:
user (User): current user object (we use the object id of the User to keep track of who's waiting for which data)
"""
if not self.runner: # no need to do anything clever if there is no runner
assert self.iterator
return next(self.iterator)

gid = greenlet.getcurrent().minimal_ident # type: ignore

if gid in _results:
logging.warning("This user was already waiting for data. Strange.")

_results[gid] = AsyncResult()
self.runner.send_message(f"_{self.name}_request", {"user_id": gid, "client_id": self.runner.client_id})
data = _results[gid].get()["payload"] # this waits for the reply
del _results[gid]
return data

0 comments on commit df78122

Please sign in to comment.