Skip to content

Commit

Permalink
rename test_data to results
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberw committed Dec 11, 2023
1 parent 3cd4483 commit 899df2f
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions locust_plugins/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from locust.env import Environment
from locust.runners import MasterRunner, WorkerRunner

test_data: Dict[int, AsyncResult] = {}
results: Dict[int, AsyncResult] = {}
iterator: Optional[Iterator[Dict]] = None


Expand All @@ -24,9 +24,8 @@ def register(environment: Environment, reader: Optional[Iterator[dict]]):
if runner:
# called on master
def user_request(environment: Environment, msg, **kwargs):
assert iterator # should have been instantiated by now...
assert iterator
data = next(iterator)
# data["_id"] = str(data["_id"]) # this is an ObjectId, msgpack doesnt know how to serialize it
environment.runner.send_message(
"synchronizer_response",
{"payload": data, "user_id": msg.data["user_id"]},
Expand All @@ -35,24 +34,29 @@ def user_request(environment: Environment, msg, **kwargs):

# called on worker
def user_response(environment: Environment, msg, **kwargs):
test_data[msg.data["user_id"]].set(msg.data)
results[msg.data["user_id"]].set(msg.data)

if not isinstance(runner, WorkerRunner):
runner.register_message("synchronizer_request", user_request)
if not isinstance(runner, MasterRunner):
runner.register_message("synchronizer_response", user_response)


def getdata(u: User) -> Dict:
if not u.environment.runner: # no need to do anything clever if there is no runner
def getdata(user: User) -> Dict:
"""Get the next data dict from reader
Args:
u (User): current user object (we use the object id of the user to keep track of who's waiting for which data)
"""
if not user.environment.runner: # no need to do anything clever if there is no runner
return next(iterator)

if id(u) in test_data:
logging.warning("This user was already waiting for data. Not sure how to handle this nicely...")
if id(user) in results:
logging.warning("This user was already waiting for data. Strange.")

test_data[id(u)] = AsyncResult()
runner = u.environment.runner
runner.send_message("synchronizer_request", {"user_id": id(u), "client_id": runner.client_id})
data = test_data[id(u)].get()["payload"]
del test_data[id(u)]
results[id(user)] = AsyncResult()
runner = user.environment.runner
runner.send_message("synchronizer_request", {"user_id": id(user), "client_id": runner.client_id})
data = results[id(user)].get()["payload"] # this waits for the reply
del results[id(user)]
return data

0 comments on commit 899df2f

Please sign in to comment.