Skip to content

Commit

Permalink
Various changes to synchronizer
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberw committed Dec 11, 2023
1 parent c6fa65b commit 153ca51
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 69 deletions.
40 changes: 15 additions & 25 deletions examples/datareader_ex.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,26 @@
from locust_plugins.mongoreader import SimpleMongoReader

# from locust_plugins.csvreader import CSVDictReader
from locust_plugins.synchronizer import data_synchronizer, getdata
from typing import Iterator
from locust_plugins.mongoreader import MongoLRUReader
from locust_plugins.csvreader import CSVDictReader
from locust_plugins import synchronizer
from locust import HttpUser, task, events, run_single_user
import time


@events.init.add_listener
def on_locust_init(environment, **kwargs):
# ssn_reader = CSVDictReader("ssn.tsv", delimiter="\t")
ssn_reader = SimpleMongoReader({"env": "test", "tb": False, "lb": True}, "last_login")
data_synchronizer(ssn_reader)
reader: Iterator
csv = True
if csv:
reader = CSVDictReader("ssn.tsv", delimiter="\t")
else:
reader = MongoLRUReader({"env": "test", "tb": False, "lb": True}, "last_login")
synchronizer.register(reader)


class MyUser(HttpUser):
host = "http://www.example.com"

@task
def my_task(self):
start_time = time.time()
start_perf_counter = time.perf_counter()
user = getdata(self)
self.environment.events.request.fire(
request_type="fake",
name=user["ssn"],
# name="fake",
start_time=start_time,
response_time=(time.perf_counter() - start_perf_counter) * 1000,
response_length=0,
context={**self.context()},
exception=None,
)

host = "http://localhost:8089"
customer = synchronizer.getdata(self)
self.client.get(f"/{customer['ssn']}")


if __name__ == "__main__":
Expand Down
71 changes: 38 additions & 33 deletions locust_plugins/mongoreader.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,69 @@
from typing import Iterator, Optional, Dict, List, Tuple
from typing import Iterator, Optional, Dict
from pymongo import MongoClient
import pymongo.collection
from pymongo.collection import Collection
from pymongo.cursor import Cursor
from datetime import datetime, timezone
import logging
import time
from contextlib import contextmanager
import os
from os import environ as env
from abc import ABC, abstractmethod
from gevent.lock import Semaphore

dblock = Semaphore()


class SimpleMongoReader(Iterator[Dict]):
"Read test data from mongo collection file using an iterator"

class MongoLRUReader(Iterator[Dict]):
def __init__(
self,
query: Optional[Dict] = None,
sort_column=None,
sort: List[Tuple[str, int]] = [],
uri=None,
database=None,
collection=None,
filter: Dict, # pylint: disable=redefined-builtin
timestamp_field: str,
coll: Optional[Collection] = None,
):
self.query = query
self.sort_column = sort_column
if self.sort_column:
if sort:
raise Exception("Dont set both sort column and sort")
self.sort = [(self.sort_column, 1)]
else:
self.sort = sort
self.uri = uri or os.environ["LOCUST_MONGO"]
self.database = database or os.environ["LOCUST_MONGO_DATABASE"]
self.collection = collection or os.environ["LOCUST_MONGO_COLLECTION"]
self.coll = MongoClient(self.uri)[self.database][self.collection]
self.cursor = self.coll.find(self.query, sort=self.sort)

def __next__(self):
"""A thread safe iterator to read test data from a mongo collection sorted by least-recently-used.
Order is ensured even if Locust is restarted, because the timestamp_field is updated on every iteration step.
Iteration is quite fast, but the first query can be slow if you dont have an index that covers the filter and sort fields.
Args:
filter (Dict): Query filter statement
sort_field (str): Time stamp field, e.g. "last_used"
collection (pymongo.collection.Collection, optional): By default, we use LOCUST_MONGO, LOCUST_MONGO_DATABASE and LOCUST_MONGO_COLLECTION env vars to get the collection, but you can also pass a pre-existing Collection.
"""
self.timestamp_field = timestamp_field
self.coll: Collection = (
coll or MongoClient(env["LOCUST_MONGO"])[env["LOCUST_MONGO_DATABASE"]][env["LOCUST_MONGO_COLLECTION"]]
)
self.cursor: Cursor = self.coll.find(filter, sort=[(self.timestamp_field, 1)])
self.cursor._refresh() # trigger fetch immediately instead of waiting for the first next()

def __next__(self) -> dict:
try:
with dblock:
doc = next(self.cursor)
if self.sort_column:
self.coll.find_one_and_update(
{"_id": doc["_id"]},
{"$set": {self.sort_column: datetime.now(tz=timezone.utc)}},
)
doc: dict = next(self.cursor)
self.coll.find_one_and_update(
{"_id": doc["_id"]},
{"$set": {self.timestamp_field: datetime.now(tz=timezone.utc)}},
)
return doc
except StopIteration:
with dblock:
self.cursor.rewind()
return next(self.cursor)


### Legacy


class NoUserException(Exception):
pass


class User(dict):
def __init__(self, coll: pymongo.collection.Collection, query: dict):
def __init__(self, coll: Collection, query: dict):
self.coll = coll
with dblock:
data = self.coll.find_one_and_update(
Expand Down Expand Up @@ -97,6 +101,7 @@ def __init__(self, uri=None, database=None, collection=None, filters=[]):
self.reduced_filters = []
self.delay_warning = 0
self.query = {"$and": filters + [{"logged_in": False}]}
logging.warning("MongoReader is deprecated, please switch to MongoReaderLRU")

@contextmanager
def user(self, query: dict = None):
Expand Down
26 changes: 15 additions & 11 deletions locust_plugins/synchronizer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Dict, Iterator

import logging
from gevent.event import AsyncResult
from locust import User, events

Expand All @@ -8,22 +8,23 @@
from locust.runners import MasterRunner, WorkerRunner

test_data: Dict[int, AsyncResult] = {}
datasource_iterator: Iterator[Dict]
iterator: Iterator[Dict] = None


def register(i: Iterator[dict]):
global iterator
iterator = i

def data_synchronizer(f: Iterator[dict]):
@events.test_start.add_listener
def test_start(environment, **_kw):
global datasource_iterator
datasource_iterator = f
runner = environment.runner
if runner:
# called on master
def user_request(environment: Environment, msg, **kwargs):
data = next(f)
data = next(iterator)
# data["_id"] = str(data["_id"]) # this is an ObjectId, msgpack doesnt know how to serialize it
environment.runner.send_message(
"user_response",
"synchronizer_response",
{"payload": data, "user_id": msg.data["user_id"]},
client_id=msg.data["client_id"],
)
Expand All @@ -34,19 +35,22 @@ def user_response(environment: Environment, msg, **kwargs):
test_data[msg.data["user_id"]].set(msg.data)

if not isinstance(runner, WorkerRunner):
runner.register_message("user_request", user_request)
runner.register_message("synchronizer_request", user_request)
if not isinstance(runner, MasterRunner):
runner.register_message("user_response", user_response)
runner.register_message("synchronizer_response", user_response)


def getdata(u: User):
# try:
if not u.environment.runner: # no need to do anything clever if there is no runner
return next(datasource_iterator)
return next(iterator)

if id(u) in test_data:
logging.warning("This user was already waiting for data. Not sure how to handle this nicely...")

test_data[id(u)] = AsyncResult()
runner = u.environment.runner
runner.send_message("user_request", {"user_id": id(u), "client_id": runner.client_id})
runner.send_message("synchronizer_request", {"user_id": id(u), "client_id": runner.client_id})
data = test_data[id(u)].get()["payload"]
del test_data[id(u)]
return data
Expand Down

0 comments on commit 153ca51

Please sign in to comment.