Skip to content

Commit

Permalink
Merge pull request #152 from SvenskaSpel/introduce-synchronizer
Browse files Browse the repository at this point in the history
Introduce synchronizer, a way for workers to easily get data from master in an ordered way (using any iterator)
  • Loading branch information
cyberw authored Dec 11, 2023
2 parents 52bb571 + 06abaf8 commit 3b02bee
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ attr-rgx=[a-z0-9_]{1,30}$
ignored-argument-names=_.*|^ignored_|^unused_|^kwargs|^environment

[MESSAGES CONTROL]
disable=logging-not-lazy,logging-fstring-interpolation,missing-docstring,wrong-import-position,wrong-import-order,too-few-public-methods,invalid-name,protected-access,logging-format-interpolation,dangerous-default-value,global-statement,too-many-locals,too-many-arguments,too-many-instance-attributes,blacklisted-name,attribute-defined-outside-init,broad-except,bare-except,consider-using-with,too-many-branches,unspecified-encoding,arguments-differ,broad-exception-raised
disable=logging-not-lazy,logging-fstring-interpolation,missing-docstring,wrong-import-position,wrong-import-order,too-few-public-methods,invalid-name,protected-access,logging-format-interpolation,dangerous-default-value,global-statement,too-many-locals,too-many-arguments,too-many-instance-attributes,blacklisted-name,attribute-defined-outside-init,broad-except,bare-except,consider-using-with,too-many-branches,unspecified-encoding,arguments-differ,broad-exception-raised,wildcard-import,unused-import,keyword-arg-before-vararg

[MASTER]
ignore=locustio,examples
Expand Down
29 changes: 29 additions & 0 deletions examples/synchronizer_ex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import Iterator
from locust_plugins.mongoreader import MongoLRUReader
from locust_plugins.csvreader import CSVDictReader
from locust_plugins import synchronizer
from locust import HttpUser, task, run_single_user


reader: Iterator
csv = True
if csv:
reader = CSVDictReader("ssn.tsv", delimiter="\t")
else:
reader = MongoLRUReader({"foo": "bar"}, "last_login")
synchronizer.register(reader)
# optionally replace this with lazy initalization of Reader to avoid unnecessarily doing it on workers:
# synchronizer.register(None, MongoLRUReader, {"env": "test", "tb": False, "lb": True}, "last_login")


class MyUser(HttpUser):
host = "http://www.example.com"

@task
def my_task(self):
customer = synchronizer.getdata(self)
self.client.get(f"/?{customer['ssn']}")


if __name__ == "__main__":
run_single_user(MyUser)
5 changes: 3 additions & 2 deletions locust_plugins/csvreader.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import csv
from typing import Iterator, Dict


class CSVReader:
Expand All @@ -21,8 +22,8 @@ def __next__(self):
return next(self.reader)


class CSVDictReader:
"Read test data from csv file using an iterator"
class CSVDictReader(Iterator[Dict]):
"Read test data from csv file using an iterator, returns rows as dicts"

def __init__(self, file, **kwargs):
try:
Expand Down
53 changes: 50 additions & 3 deletions locust_plugins/mongoreader.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,69 @@
from typing import Iterator
from typing import Iterator, Optional, Dict
from pymongo import MongoClient
import pymongo.collection
from pymongo.collection import Collection
from pymongo.cursor import Cursor
from datetime import datetime, timezone
import logging
import time
from contextlib import contextmanager
import os
from os import environ as env
from abc import ABC, abstractmethod
from gevent.lock import Semaphore

dblock = Semaphore()


class MongoLRUReader(Iterator[Dict]):
def __init__(
self,
filter: Dict, # pylint: disable=redefined-builtin
timestamp_field: str,
coll: Optional[Collection] = None,
):
"""A thread safe iterator to read test data from a mongo collection sorted by least-recently-used.
Order is ensured even if Locust is restarted, because the timestamp_field is updated on every iteration step.
Iteration is quite fast, but the first query can be slow if you dont have an index that covers the filter and sort fields.
Args:
filter (Dict): Query filter statement
sort_field (str): Time stamp field, e.g. "last_used"
collection (pymongo.collection.Collection, optional): By default, we use LOCUST_MONGO, LOCUST_MONGO_DATABASE and LOCUST_MONGO_COLLECTION env vars to get the collection, but you can also pass a pre-existing Collection.
"""
self.timestamp_field = timestamp_field
self.coll: Collection = (
coll or MongoClient(env["LOCUST_MONGO"])[env["LOCUST_MONGO_DATABASE"]][env["LOCUST_MONGO_COLLECTION"]]
)
self.cursor: Cursor = self.coll.find(filter, sort=[(self.timestamp_field, 1)])
self.cursor._refresh() # trigger fetch immediately instead of waiting for the first next()

def __next__(self) -> dict:
try:
with dblock:
doc: dict = next(self.cursor)
self.coll.find_one_and_update(
{"_id": doc["_id"]},
{"$set": {self.timestamp_field: datetime.now(tz=timezone.utc)}},
)
return doc
except StopIteration:
with dblock:
self.cursor.rewind()
return next(self.cursor)


### Legacy


class NoUserException(Exception):
pass


class User(dict):
def __init__(self, coll: pymongo.collection.Collection, query: dict):
def __init__(self, coll: Collection, query: dict):
self.coll = coll
with dblock:
data = self.coll.find_one_and_update(
Expand Down Expand Up @@ -55,6 +101,7 @@ def __init__(self, uri=None, database=None, collection=None, filters=[]):
self.reduced_filters = []
self.delay_warning = 0
self.query = {"$and": filters + [{"logged_in": False}]}
logging.warning("MongoReader is deprecated, please switch to MongoReaderLRU")

@contextmanager
def user(self, query: dict = None):
Expand Down
64 changes: 64 additions & 0 deletions locust_plugins/synchronizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from typing import Dict, Iterator, Type, Optional
import logging
from gevent.event import AsyncResult
from locust import User, events

from locust.env import Environment
from locust.runners import MasterRunner, WorkerRunner

test_data: Dict[int, AsyncResult] = {}
iterator: Optional[Iterator[Dict]] = None


def register(i: Optional[Iterator[dict]], reader_class: Optional[Type[Iterator[Dict]]] = None, *args, **kwargs):
"""Register synchronizer methods and tie them to use the iterator that you pass.
To avoid unnecessarily instantiating the iterator on workers (where it isnt used),
you can pass an iterator class and initialization parameters instead of an object instance.
"""
global iterator
iterator = i

@events.test_start.add_listener
def test_start(environment, **_kw):
global iterator
runner = environment.runner
if not i and not isinstance(runner, WorkerRunner):
assert reader_class
logging.debug(f"about to initialize reader class {reader_class}")
iterator = reader_class(*args, **kwargs)
if runner:
# called on master
def user_request(environment: Environment, msg, **kwargs):
assert iterator # should have been instantiated by now...
data = next(iterator)
# data["_id"] = str(data["_id"]) # this is an ObjectId, msgpack doesnt know how to serialize it
environment.runner.send_message(
"synchronizer_response",
{"payload": data, "user_id": msg.data["user_id"]},
client_id=msg.data["client_id"],
)

# called on worker
def user_response(environment: Environment, msg, **kwargs):
test_data[msg.data["user_id"]].set(msg.data)

if not isinstance(runner, WorkerRunner):
runner.register_message("synchronizer_request", user_request)
if not isinstance(runner, MasterRunner):
runner.register_message("synchronizer_response", user_response)


def getdata(u: User) -> Dict:
if not u.environment.runner: # no need to do anything clever if there is no runner
return next(iterator)

if id(u) in test_data:
logging.warning("This user was already waiting for data. Not sure how to handle this nicely...")

test_data[id(u)] = AsyncResult()
runner = u.environment.runner
runner.send_message("synchronizer_request", {"user_id": id(u), "client_id": runner.client_id})
data = test_data[id(u)].get()["payload"]
del test_data[id(u)]
return data
2 changes: 2 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ commands =
grep -m 1 'Manual' output.txt
grep -m 1 'LCP' output.txt
bash -ec "! grep 'object has no attribute' output.txt"
bash -ec "(cd examples && PYTHONUNBUFFERED=1 locust -f synchronizer_ex.py --headless -t 5 -u 4 --processes 4) |& tee output.txt || true"
grep -m 1 '2099010101-1111' output.txt
locust -f examples/jmeter_listener_example.py --headless -t 1
locust-compose up -d
sleep 15
Expand Down

0 comments on commit 3b02bee

Please sign in to comment.