Skip to content

Commit

Permalink
WIP: Introduce data_synchronizer (and iterator-based SimpleMongoReader)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberw committed Dec 6, 2023
1 parent 52bb571 commit 0f44c81
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 3 deletions.
36 changes: 36 additions & 0 deletions examples/datareader_ex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from locust_plugins.mongoreader import SimpleMongoReader
from locust_plugins.csvreader import CSVDictReader
from locust_plugins.synchronizer import data_synchronizer, getdata
from locust import HttpUser, task, events, runners, run_single_user
import time


@events.init.add_listener
def on_locust_init(environment, **kwargs):
# ssn_reader = CSVDictReader("ssn.tsv", delimiter="\t")
ssn_reader = SimpleMongoReader({"env": "test", "tb": False, "lb": True}, "last_login")
data_synchronizer(ssn_reader)


class MyUser(HttpUser):
@task
def my_task(self):
start_time = time.time()
start_perf_counter = time.perf_counter()
user = getdata(self)
self.environment.events.request.fire(
request_type="fake",
# name=user["ssn"],
name="fake",
start_time=start_time,
response_time=(time.perf_counter() - start_perf_counter) * 1000,
response_length=0,
context={**self.context()},
exception=None,
)

host = "http://localhost:8089"


if __name__ == "__main__":
run_single_user(MyUser)
5 changes: 3 additions & 2 deletions locust_plugins/csvreader.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import csv
from typing import Iterator, Dict


class CSVReader:
Expand All @@ -21,8 +22,8 @@ def __next__(self):
return next(self.reader)


class CSVDictReader:
"Read test data from csv file using an iterator"
class CSVDictReader(Iterator[Dict]):
"Read test data from csv file using an iterator, returns rows as dicts"

def __init__(self, file, **kwargs):
try:
Expand Down
51 changes: 50 additions & 1 deletion locust_plugins/mongoreader.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Iterator
from typing import Iterator, Optional, Dict, List, Tuple
from pymongo import MongoClient
import pymongo.collection
from datetime import datetime, timezone
Expand All @@ -8,10 +8,59 @@
import os
from abc import ABC, abstractmethod
from gevent.lock import Semaphore
from locust.runners import Runner, MasterRunner, WorkerRunner
from locust.env import Environment
from gevent.event import AsyncResult

dblock = Semaphore()


class SimpleMongoReader(Iterator[Dict]):
"Read test data from mongo collection file using an iterator"

def __init__(
self,
query: Optional[Dict] = None,
sort_column=None,
sort: List[Tuple[str, int]] = [],
uri=None,
database=None,
collection=None,
):
self.query = query
self.sort_column = sort_column
if self.sort_column:
if sort:
raise Exception("Dont set both sort column and sort")
self.sort = [(self.sort_column, 1)]
else:
self.sort = sort
self.uri = uri or os.environ["LOCUST_MONGO"]
self.database = database or os.environ["LOCUST_MONGO_DATABASE"]
self.collection = collection or os.environ["LOCUST_MONGO_COLLECTION"]
self.coll = MongoClient(self.uri)[self.database][self.collection]
self.cursor = self.coll.find(self.query, sort=self.sort)

def __next__(self):
try:
with dblock:
doc = next(self.cursor)
if self.sort_column:
self.coll.find_one_and_update(
{"_id": doc["_id"]},
{"$set": {self.sort_column: datetime.now(tz=timezone.utc)}},
)
return doc
except StopIteration:
with dblock:
# there is a tiny chance the last find_one_and_update has not yet completed
# so give it a little extra time so we dont accidentally get data that was just used
if "w=0" in self.uri:
time.sleep(0.5)
self.cursor = self.coll.find(self.query, sort=self.sort)
return next(self.cursor)


class NoUserException(Exception):
pass

Expand Down
58 changes: 58 additions & 0 deletions locust_plugins/synchronizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import json
from typing import Dict, Iterator

from gevent.event import AsyncResult
from locust import User, events
from locust.exception import StopUser
from locust.env import Environment
from locust.runners import MasterRunner, WorkerRunner
import bson.objectid

test_data: Dict[int, AsyncResult] = {}
datasource_iterator: Iterator[Dict]


def data_synchronizer(f: Iterator[dict]):
@events.test_start.add_listener
def test_start(environment, **_kw):
global datasource_iterator
datasource_iterator = f
runner = environment.runner
if runner:
# called on master
def user_request(environment: Environment, msg, **kwargs):
data = next(f)
# data["_id"] = str(data["_id"]) # this is an ObjectId, msgpack doesnt know how to serialize it
print(data)
environment.runner.send_message(
"user_response",
{"payload": data, "user_id": msg.data["user_id"]},
client_id=msg.data["client_id"],
)

# called on worker
def user_response(environment: Environment, msg, **kwargs):
assert test_data
test_data[msg.data["user_id"]].set(msg.data)

if not isinstance(runner, WorkerRunner):
runner.register_message("user_request", user_request)
if not isinstance(runner, MasterRunner):
runner.register_message("user_response", user_response)


def getdata(u: User):
# try:
if not u.environment.runner: # no need to do anything clever if there is no runner
return next(datasource_iterator)

test_data[id(u)] = AsyncResult()
runner = u.environment.runner
runner.send_message("user_request", {"user_id": id(u), "client_id": runner.client_id})
data = test_data[id(u)].get()["payload"]
del test_data[id(u)]
return data
# except KeyboardInterrupt:
# # probably we're just shutting down but lets try to be as graceful as possible
# logging.debug("Caught SIGINT"
# raise StopUser()

0 comments on commit 0f44c81

Please sign in to comment.