Skip to content

Commit

Permalink
feat(load_testing): Enable some automatic reconnection handling
Browse files Browse the repository at this point in the history
  • Loading branch information
KaylaBrady committed Jun 25, 2024
1 parent 557b36e commit 870c94f
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 208 deletions.
6 changes: 5 additions & 1 deletion load_testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,9 @@ With `mix phx.server` up in another terminal:
$ cd load_testing/
$ asdf install
$ poetry install
$ poetry run locust --host http://localhost:4000
$ poetry run locust --host http://localhost:4000 --processes -1
# `processes -1` Launches a worker for each logical core.
# Can also run a specified number of workers.
# See https://docs.locust.io/en/stable/running-distributed.html#single-machine for more options
# Keyboard interrupt is only working if running multiple workers
```
3 changes: 2 additions & 1 deletion load_testing/locustfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class MobileAppUser(HttpUser, PhoenixChannelUser):
wait_time = between(1, 5)
socket_path = "/socket"

prob_reset_map_data = 0.3
prob_reset_map_data = 0.02
prob_reset_location = 0.3
prob_reset_nearby_stops = 0.3

Expand Down Expand Up @@ -48,6 +48,7 @@ def nearby_transit(self):
self.stops_channel is not None
and random.random() < self.prob_reset_nearby_stops
):
print("Leaving")
self.stops_channel.leave()
self.stops_channel = None
if self.stops_channel is None:
Expand Down
72 changes: 57 additions & 15 deletions load_testing/phoenix_channel.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# derived from https://github.com/SvenskaSpel/locust-plugins/blob/4.4.3/locust_plugins/users/socketio.py
import json
import logging
import threading
import time
import urllib
import urllib.parse
from typing import Any

import gevent
import rel
import websocket
from gevent.event import AsyncResult
from locust import User
Expand All @@ -27,13 +29,29 @@ def __init__(
self, environment: Environment, url: str, headers: dict | list
) -> None:
self.environment = environment
self.ws = websocket.create_connection(
f"{url}/websocket?vsn=2.0.0", header=headers
)
self.ws_greenlet = gevent.spawn(self.receive_loop)
# websocket.enableTrace(True)
self.ws = websocket.WebSocketApp( f"{url}/websocket?vsn=2.0.0",
on_open=self.on_open,
on_message=self.on_socket_message,
on_error=self.on_error,
on_close=self.on_close)

target_fun = self.run_forever_target

daemon = threading.Thread(target=target_fun)

daemon.daemon = True

daemon.start()

self._next_ref = 0
self.open_pushes: dict[str, PhoenixPush] = dict()

def run_forever_target(self):
self.ws.run_forever(dispatcher=rel, reconnect=2, ping_interval=60)
rel.signal(2, rel.abort) # Keyboard Interrupt
rel.dispatch()

def channel(self, topic: str, payload: dict[str, Any] | None = None):
if payload is None:
payload = dict()
Expand All @@ -43,16 +61,25 @@ def disconnect(self):
self.closing = True
self.ws.close()

def receive_loop(self):
while self.ws.connected:
try:
message = self.ws.recv()
logging.debug(ellipsize_string(f"WSR: {message}", 256))
if message != "":
self.on_message(message)
except Exception:
if not self.closing:
raise
def on_socket_message(self, ws, message):
try:

logging.debug(ellipsize_string(f"WSR: {message}", 256))
if message != "":
self.on_message(message)
except Exception:
if not self.closing:
raise

def on_error(self, ws, error):
print(f"Socket error: {error}")

def on_close(self, ws, close_status_code, close_msg):
print(f"Socket closed: {close_status_code} {close_msg}")

def on_open(self, ws):
print("Socket opened")


def on_message(self, message):
[join_ref, ref, topic, event, payload] = json.loads(message)
Expand Down Expand Up @@ -100,22 +127,37 @@ def __init__(
if payload is None:
payload = dict()
self.join_ref = socket.next_ref()
self.sleep_ref = 1


self.topic = topic
self.join_push = PhoenixPush(
socket, self.join_ref, self.join_ref, topic, "phx_join", payload
)


def join(self):
print("Joining topic")
self.join_push.send()
self.sleep_with_heartbeat(60)
return self.join_push.get_reply()

def leave(self):
leave_push = PhoenixPush(
self.socket, self.join_ref, self.socket.next_ref(), self.topic, "phx_leave"
self.socket, self.join_ref, self.socket.next_ref(), self.topic, "phx_leave", {}
)
leave_push.send()
return leave_push.get_reply()

def sleep_with_heartbeat(self, seconds):
while seconds >= 0:
gevent.sleep(min(15, seconds))
seconds -= 15
self.sleep_ref += 1
# [null,"2","phoenix","heartbeat",{}]
heartbeat_push = PhoenixPush(self.socket, None, self.sleep_ref, "phoenix", "heartbeat", {})
heartbeat_push.send()


class PhoenixPush:
def __init__(
Expand Down
Loading

0 comments on commit 870c94f

Please sign in to comment.