Skip to content

Commit

Permalink
Merge pull request #153 from SvenskaSpel/refactor-synchronizer
Browse files Browse the repository at this point in the history
Refactor synchronizer
  • Loading branch information
cyberw authored Dec 12, 2023
2 parents 3b02bee + 5bd10bf commit 7877666
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 60 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ attr-rgx=[a-z0-9_]{1,30}$
ignored-argument-names=_.*|^ignored_|^unused_|^kwargs|^environment

[MESSAGES CONTROL]
disable=logging-not-lazy,logging-fstring-interpolation,missing-docstring,wrong-import-position,wrong-import-order,too-few-public-methods,invalid-name,protected-access,logging-format-interpolation,dangerous-default-value,global-statement,too-many-locals,too-many-arguments,too-many-instance-attributes,blacklisted-name,attribute-defined-outside-init,broad-except,bare-except,consider-using-with,too-many-branches,unspecified-encoding,arguments-differ,broad-exception-raised,wildcard-import,unused-import,keyword-arg-before-vararg
disable=logging-not-lazy,logging-fstring-interpolation,missing-docstring,wrong-import-position,wrong-import-order,too-few-public-methods,invalid-name,protected-access,logging-format-interpolation,dangerous-default-value,global-statement,too-many-locals,too-many-arguments,too-many-instance-attributes,blacklisted-name,attribute-defined-outside-init,broad-except,bare-except,consider-using-with,too-many-branches,unspecified-encoding,arguments-differ,broad-exception-raised,wildcard-import,keyword-arg-before-vararg

[MASTER]
ignore=locustio,examples
Expand Down
23 changes: 12 additions & 11 deletions examples/synchronizer_ex.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
from typing import Iterator
from locust_plugins.mongoreader import MongoLRUReader
from locust_plugins.csvreader import CSVDictReader
from locust_plugins import synchronizer
from locust import HttpUser, task, run_single_user
from locust import HttpUser, task, run_single_user, events
from locust.runners import WorkerRunner


reader: Iterator
csv = True
if csv:
reader = CSVDictReader("ssn.tsv", delimiter="\t")
else:
reader = MongoLRUReader({"foo": "bar"}, "last_login")
synchronizer.register(reader)
# optionally replace this with lazy initalization of Reader to avoid unnecessarily doing it on workers:
# synchronizer.register(None, MongoLRUReader, {"env": "test", "tb": False, "lb": True}, "last_login")
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
reader = None
if not isinstance(environment.runner, WorkerRunner):
csv = True
if csv:
reader = CSVDictReader("ssn.tsv", delimiter="\t")
else:
reader = MongoLRUReader({"foo": "bar"}, "last_login")
synchronizer.register(environment, reader)


class MyUser(HttpUser):
Expand Down
94 changes: 46 additions & 48 deletions locust_plugins/synchronizer.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,62 @@
from typing import Dict, Iterator, Type, Optional
from typing import Dict, Iterator, Optional
import logging
from gevent.event import AsyncResult
from locust import User, events
from locust import User

from locust.env import Environment
from locust.runners import MasterRunner, WorkerRunner
from locust.runners import WorkerRunner

test_data: Dict[int, AsyncResult] = {}
iterator: Optional[Iterator[Dict]] = None
_results: Dict[int, AsyncResult] = {}
_iterator: Optional[Iterator[Dict]] = None


def register(i: Optional[Iterator[dict]], reader_class: Optional[Type[Iterator[Dict]]] = None, *args, **kwargs):
"""Register synchronizer methods and tie them to use the iterator that you pass.
# received on master
def _synchronizer_request(environment: Environment, msg, **kwargs):
assert _iterator
data = next(_iterator)
environment.runner.send_message(
"synchronizer_response",
{"payload": data, "user_id": msg.data["user_id"]},
client_id=msg.data["client_id"],
)


# received on worker
def _synchronizer_response(environment: Environment, msg, **kwargs):
_results[msg.data["user_id"]].set(msg.data)

To avoid unnecessarily instantiating the iterator on workers (where it isnt used),
you can pass an iterator class and initialization parameters instead of an object instance.
"""
global iterator
iterator = i

@events.test_start.add_listener
def test_start(environment, **_kw):
global iterator
runner = environment.runner
if not i and not isinstance(runner, WorkerRunner):
assert reader_class
logging.debug(f"about to initialize reader class {reader_class}")
iterator = reader_class(*args, **kwargs)
if runner:
# called on master
def user_request(environment: Environment, msg, **kwargs):
assert iterator # should have been instantiated by now...
data = next(iterator)
# data["_id"] = str(data["_id"]) # this is an ObjectId, msgpack doesnt know how to serialize it
environment.runner.send_message(
"synchronizer_response",
{"payload": data, "user_id": msg.data["user_id"]},
client_id=msg.data["client_id"],
)
def register(environment: Environment, iterator: Optional[Iterator[dict]]):
"""Register synchronizer method handlers and tie them to use the iterator that you pass.
# called on worker
def user_response(environment: Environment, msg, **kwargs):
test_data[msg.data["user_id"]].set(msg.data)
iterator is not used on workers, so you can leave it as None there.
"""
global _iterator
_iterator = iterator

runner = environment.runner
if not iterator and not isinstance(runner, WorkerRunner):
raise Exception("iterator is a mandatory parameter when not on a worker runner")
if runner:
runner.register_message("synchronizer_request", _synchronizer_request)
runner.register_message("synchronizer_response", _synchronizer_response)

if not isinstance(runner, WorkerRunner):
runner.register_message("synchronizer_request", user_request)
if not isinstance(runner, MasterRunner):
runner.register_message("synchronizer_response", user_response)

def getdata(user: User) -> Dict:
"""Get the next data dict from iterator
def getdata(u: User) -> Dict:
if not u.environment.runner: # no need to do anything clever if there is no runner
return next(iterator)
Args:
user (User): current user object (we use the object id of the User to keep track of who's waiting for which data)
"""
if not user.environment.runner: # no need to do anything clever if there is no runner
return next(_iterator)

if id(u) in test_data:
logging.warning("This user was already waiting for data. Not sure how to handle this nicely...")
if id(user) in _results:
logging.warning("This user was already waiting for data. Strange.")

test_data[id(u)] = AsyncResult()
runner = u.environment.runner
runner.send_message("synchronizer_request", {"user_id": id(u), "client_id": runner.client_id})
data = test_data[id(u)].get()["payload"]
del test_data[id(u)]
_results[id(user)] = AsyncResult()
runner = user.environment.runner
runner.send_message("synchronizer_request", {"user_id": id(user), "client_id": runner.client_id})
data = _results[id(user)].get()["payload"] # this waits for the reply
del _results[id(user)]
return data

0 comments on commit 7877666

Please sign in to comment.