Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce synchronizer, a way for workers to easily get data from master in an ordered way (using any iterator) #152

Merged
merged 16 commits into from
Dec 11, 2023
Merged
29 changes: 29 additions & 0 deletions examples/synchronizer_ex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import Iterator
from locust_plugins.mongoreader import MongoLRUReader
from locust_plugins.csvreader import CSVDictReader
from locust_plugins import synchronizer
from locust import HttpUser, task, run_single_user


reader: Iterator
csv = True
if csv:
reader = CSVDictReader("ssn.tsv", delimiter="\t")
else:
reader = MongoLRUReader({"env": "test", "tb": False, "lb": True}, "last_login")
cyberw marked this conversation as resolved.
Show resolved Hide resolved
synchronizer.register(reader)
# optionally replace this with lazy initalization of Reader to avoid unnecessarily doing it on workers:
# synchronizer.register(None, MongoLRUReader, {"env": "test", "tb": False, "lb": True}, "last_login")


class MyUser(HttpUser):
host = "http://www.example.com"

@task
def my_task(self):
customer = synchronizer.getdata(self)
self.client.get(f"/?{customer['ssn']}")


if __name__ == "__main__":
run_single_user(MyUser)
5 changes: 3 additions & 2 deletions locust_plugins/csvreader.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import csv
from typing import Iterator, Dict


class CSVReader:
Expand All @@ -21,8 +22,8 @@ def __next__(self):
return next(self.reader)


class CSVDictReader:
"Read test data from csv file using an iterator"
class CSVDictReader(Iterator[Dict]):
"Read test data from csv file using an iterator, returns rows as dicts"

def __init__(self, file, **kwargs):
try:
Expand Down
53 changes: 50 additions & 3 deletions locust_plugins/mongoreader.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,69 @@
from typing import Iterator
from typing import Iterator, Optional, Dict
from pymongo import MongoClient
import pymongo.collection
from pymongo.collection import Collection
from pymongo.cursor import Cursor
from datetime import datetime, timezone
import logging
import time
from contextlib import contextmanager
import os
from os import environ as env
from abc import ABC, abstractmethod
from gevent.lock import Semaphore

dblock = Semaphore()


class MongoLRUReader(Iterator[Dict]):
def __init__(
self,
filter: Dict, # pylint: disable=redefined-builtin
timestamp_field: str,
coll: Optional[Collection] = None,
):
"""A thread safe iterator to read test data from a mongo collection sorted by least-recently-used.

Order is ensured even if Locust is restarted, because the timestamp_field is updated on every iteration step.

Iteration is quite fast, but the first query can be slow if you dont have an index that covers the filter and sort fields.

Args:
filter (Dict): Query filter statement
sort_field (str): Time stamp field, e.g. "last_used"
collection (pymongo.collection.Collection, optional): By default, we use LOCUST_MONGO, LOCUST_MONGO_DATABASE and LOCUST_MONGO_COLLECTION env vars to get the collection, but you can also pass a pre-existing Collection.

"""
self.timestamp_field = timestamp_field
self.coll: Collection = (
coll or MongoClient(env["LOCUST_MONGO"])[env["LOCUST_MONGO_DATABASE"]][env["LOCUST_MONGO_COLLECTION"]]
)
self.cursor: Cursor = self.coll.find(filter, sort=[(self.timestamp_field, 1)])
self.cursor._refresh() # trigger fetch immediately instead of waiting for the first next()

def __next__(self) -> dict:
try:
with dblock:
doc: dict = next(self.cursor)
self.coll.find_one_and_update(
{"_id": doc["_id"]},
{"$set": {self.timestamp_field: datetime.now(tz=timezone.utc)}},
)
return doc
except StopIteration:
with dblock:
self.cursor.rewind()
return next(self.cursor)


### Legacy


class NoUserException(Exception):
pass


class User(dict):
def __init__(self, coll: pymongo.collection.Collection, query: dict):
def __init__(self, coll: Collection, query: dict):
self.coll = coll
with dblock:
data = self.coll.find_one_and_update(
Expand Down Expand Up @@ -55,6 +101,7 @@ def __init__(self, uri=None, database=None, collection=None, filters=[]):
self.reduced_filters = []
self.delay_warning = 0
self.query = {"$and": filters + [{"logged_in": False}]}
logging.warning("MongoReader is deprecated, please switch to MongoReaderLRU")

@contextmanager
def user(self, query: dict = None):
Expand Down
70 changes: 70 additions & 0 deletions locust_plugins/synchronizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from typing import Dict, Iterator, Type, Optional
import logging
from gevent.event import AsyncResult
from locust import User, events

# from locust.exception import StopUser
cyberw marked this conversation as resolved.
Show resolved Hide resolved
from locust.env import Environment
from locust.runners import MasterRunner, WorkerRunner

test_data: Dict[int, AsyncResult] = {}
iterator: Optional[Iterator[Dict]] = None


def register(i: Optional[Iterator[dict]], reader_class: Optional[Type[Iterator[Dict]]] = None, *args, **kwargs):
"""Register synchronizer methods and tie them to use the iterator that you pass.

To avoid unnecessarily instantiating the iterator on workers (where it isnt used),
you can pass an iterator class and initialization parameters instead of an object instance.
"""
global iterator
iterator = i

@events.test_start.add_listener
def test_start(environment, **_kw):
global iterator
runner = environment.runner
if not i and not isinstance(runner, WorkerRunner):
assert reader_class
logging.debug(f"about to initialize reader class {reader_class}")
iterator = reader_class(*args, **kwargs)
if runner:
# called on master
def user_request(environment: Environment, msg, **kwargs):
assert iterator # should have been instantiated by now...
data = next(iterator)
# data["_id"] = str(data["_id"]) # this is an ObjectId, msgpack doesnt know how to serialize it
cyberw marked this conversation as resolved.
Show resolved Hide resolved
environment.runner.send_message(
"synchronizer_response",
{"payload": data, "user_id": msg.data["user_id"]},
client_id=msg.data["client_id"],
)

# called on worker
def user_response(environment: Environment, msg, **kwargs):
test_data[msg.data["user_id"]].set(msg.data)

if not isinstance(runner, WorkerRunner):
runner.register_message("synchronizer_request", user_request)
if not isinstance(runner, MasterRunner):
runner.register_message("synchronizer_response", user_response)


def getdata(u: User):
# try:
cyberw marked this conversation as resolved.
Show resolved Hide resolved
if not u.environment.runner: # no need to do anything clever if there is no runner
return next(iterator)

if id(u) in test_data:
logging.warning("This user was already waiting for data. Not sure how to handle this nicely...")

test_data[id(u)] = AsyncResult()
runner = u.environment.runner
runner.send_message("synchronizer_request", {"user_id": id(u), "client_id": runner.client_id})
data = test_data[id(u)].get()["payload"]
del test_data[id(u)]
return data
# except KeyboardInterrupt:
# # probably we're just shutting down but lets try to be as graceful as possible
# logging.debug("Caught SIGINT"
# raise StopUser()
2 changes: 2 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ commands =
grep -m 1 'Manual' output.txt
grep -m 1 'LCP' output.txt
bash -ec "! grep 'object has no attribute' output.txt"
bash -ec "(cd examples && PYTHONUNBUFFERED=1 locust -f synchronizer_ex.py --headless -t 5 -u 4 --processes 4) |& tee output.txt || true"
grep -m 1 '2099010101-1111' output.txt
locust -f examples/jmeter_listener_example.py --headless -t 1
locust-compose up -d
sleep 15
Expand Down
Loading