Skip to content

Commit

Permalink
allowing more extensibility to customize the mqtt events logged in th…
Browse files Browse the repository at this point in the history
…e database (#173)

* allowing more extensibility in order to be able to customize the mqtt events logged in the database

* fixed syntax issue

* added example with simple custom Mqtt client implementation

* removed unrequired override of initializer

* removed unrequired dependencies
  • Loading branch information
ionutab authored Feb 20, 2024
1 parent 5bcbdb0 commit c7d79ca
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 5 deletions.
33 changes: 33 additions & 0 deletions examples/mqtt_custom_client_ex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import time
import typing

from locust import task, TaskSet
from locust_plugins.users.mqtt import MqttUser
from locust_plugins.users.mqtt import MqttClient


# extend the MqttClient class with your own custom implementation
class MyMqttClient(MqttClient):

# you can override the event name with your custom implementation
def _generate_event_name(self, event_type: str, qos: int, topic: str):
return f"mqtt:{event_type}:{qos}"


class MyUser(MqttUser):
# override the client_cls with your custom MqttClient implementation
client_cls: typing.Type[MyMqttClient] = MyMqttClient

@task
class MyTasks(TaskSet):
# Sleep for a while to allow the client time to connect.
# This is probably not the most "correct" way to do this: a better method
# might be to add a gevent.event.Event to the MqttClient's on_connect
# callback and wait for that (with a timeout) here.
# However, this works well enough for the sake of an example.
def on_start(self):
time.sleep(5)

@task
def say_hello(self):
self.client.publish("hello/locust", b"hello world")
13 changes: 8 additions & 5 deletions locust_plugins/users/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ def __init__(
self._publish_requests: dict[int, PublishedMessageContext] = {}
self._subscribe_requests: dict[int, SubscribeContext] = {}

def _generate_event_name(self, event_type: str, qos: int, topic: str):
return _generate_mqtt_event_name(event_type, qos, topic)

def _on_publish_cb(
self,
client: mqtt.Client,
Expand All @@ -143,7 +146,7 @@ def _on_publish_cb(
# fire successful publish event
self.environment.events.request.fire(
request_type=REQUEST_TYPE,
name=_generate_mqtt_event_name("publish", request_context.qos, request_context.topic),
name=self._generate_event_name("publish", request_context.qos, request_context.topic),
response_time=(cb_time - request_context.start_time) * 1000,
response_length=request_context.payload_size,
exception=None,
Expand Down Expand Up @@ -180,7 +183,7 @@ def _on_subscribe_cb(
if SUBACK_FAILURE in granted_qos:
self.environment.events.request.fire(
request_type=REQUEST_TYPE,
name=_generate_mqtt_event_name("subscribe", request_context.qos, request_context.topic),
name=self._generate_event_name("subscribe", request_context.qos, request_context.topic),
response_time=(cb_time - request_context.start_time) * 1000,
response_length=0,
exception=AssertionError(f"Broker returned an error response during subscription: {granted_qos}"),
Expand All @@ -193,7 +196,7 @@ def _on_subscribe_cb(
# fire successful subscribe event
self.environment.events.request.fire(
request_type=REQUEST_TYPE,
name=_generate_mqtt_event_name("subscribe", request_context.qos, request_context.topic),
name=self._generate_event_name("subscribe", request_context.qos, request_context.topic),
response_time=(cb_time - request_context.start_time) * 1000,
response_length=0,
exception=None,
Expand Down Expand Up @@ -287,7 +290,7 @@ def publish(
if publish_info.rc != mqtt.MQTT_ERR_SUCCESS:
self.environment.events.request.fire(
request_type=REQUEST_TYPE,
name=_generate_mqtt_event_name("publish", request_context.qos, request_context.topic),
name=self._generate_event_name("publish", request_context.qos, request_context.topic),
response_time=0,
response_length=0,
exception=publish_info.rc,
Expand Down Expand Up @@ -325,7 +328,7 @@ def subscribe(
if result != mqtt.MQTT_ERR_SUCCESS:
self.environment.events.request.fire(
request_type=REQUEST_TYPE,
name=_generate_mqtt_event_name("subscribe", request_context.qos, request_context.topic),
name=self._generate_event_name("subscribe", request_context.qos, request_context.topic),
response_time=0,
response_length=0,
exception=result,
Expand Down

0 comments on commit c7d79ca

Please sign in to comment.