Skip to content

Commit

Permalink
use timeout instead of heartbeats (#18)
Browse files Browse the repository at this point in the history
* WIP tcp server

* Add EtherCATMaster and driver to server

* Test client-server communication

* Formatting

* Integrate changes from main

* Use AsyncIO to run ethercat and 0MQ concurrently

* Variable naming

* Stop server message

* Use threading for server, since it's I/O bound, so GIL is not an issue

* Fix formatting

* Fix formatting

* Fix formatting

* Change log level to trace, otherwise we can't read logs properly

* Change log level to trace, otherwise we can't read logs properly

* Start expecting heartbeat messages after first request

* Use property

* Move logging call about sleep above sleep statement

* Multithreading improvements, including daemon threads

* Client update

* use timeout instead of heartbeat

* fix issues in implementation

* print message when velocity timeout reached

* fix typo in client test

* rename Client to KELORobile

* docstrings in google format

* join thread instead of empty while loop

---------

Co-authored-by: Mathieu De Coster <[email protected]>
  • Loading branch information
bitscuity and m-decoster authored Jul 8, 2024
1 parent 449fd2a commit 8c2c7a5
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 105 deletions.
20 changes: 17 additions & 3 deletions airo-tulip/airo_tulip/platform_driver.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import math
import time
from enum import Enum
from typing import List

Expand Down Expand Up @@ -32,6 +33,8 @@ def __init__(self, master: pysoem.Master, wheel_configs: List[WheelConfig]):
self._process_data = []
self._wheel_enabled = [True] * self._num_wheels
self._step_count = 0
self._timeout = 0
self._timeout_message_printed = True

# Constants taken directly from KELO: https://github.com/kelo-robotics/kelo_tulip/blob/1a8db0626b3d399b62b65b31c004e7b1831756d7/src/PlatformDriver.cpp
self._wheel_distance = 0.0775
Expand All @@ -49,12 +52,16 @@ def __init__(self, master: pysoem.Master, wheel_configs: List[WheelConfig]):
self._vpc = VelocityPlatformController()
self._vpc.initialise(self._wheel_configs)

def set_platform_velocity_target(self, vel_x: float, vel_y: float, vel_a: float) -> None:
if math.sqrt(vel_x ** 2 + vel_y ** 2) > 1.0:
def set_platform_velocity_target(self, vel_x: float, vel_y: float, vel_a: float, timeout: float) -> None:
if math.sqrt(vel_x**2 + vel_y**2) > 1.0:
raise ValueError("Cannot set target linear velocity higher than 1.0 m/s")
if abs(vel_a) > math.pi / 8:
raise ValueError("Cannot set target angular velocity higher than pi/8 rad/s")
if timeout < 0.0:
raise ValueError("Cannot set negative timeout")
self._vpc.set_platform_velocity_target(vel_x, vel_y, vel_a)
self._timeout = time.time() + timeout
self._timeout_message_printed = False

def step(self) -> bool:
self._step_count += 1
Expand All @@ -69,6 +76,12 @@ def step(self) -> bool:

self._update_encoders()

if self._timeout < time.time():
self._vpc.set_platform_velocity_target(0.0, 0.0, 0.0)
if not self._timeout_message_printed:
logger.info("platform stopped early due to velocity target timeout")
self._timeout_message_printed = True

if self._state == PlatformDriverState.INIT:
return self._step_init()
if self._state == PlatformDriverState.READY:
Expand Down Expand Up @@ -189,7 +202,8 @@ def _do_control(self) -> None:
data.setpoint2 = setpoint2

logger.trace(
f"wheel {i} enabled {self._wheel_enabled[i]} sp1 {setpoint1} sp2 {setpoint2} enc {self._process_data[i].encoder_pivot}")
f"wheel {i} enabled {self._wheel_enabled[i]} sp1 {setpoint1} sp2 {setpoint2} enc {self._process_data[i].encoder_pivot}"
)

self._set_process_data(i, data)

Expand Down
48 changes: 48 additions & 0 deletions airo-tulip/airo_tulip/server/kelo_robile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import zmq
from airo_tulip.server.messages import (
RequestMessage,
ResponseMessage,
SetPlatformVelocityTargetMessage,
StopServerMessage,
)
from loguru import logger


class KELORobile:
def __init__(self, robot_ip: str, robot_port: int):
address = f"tcp://{robot_ip}:{robot_port}"
logger.info(f"Connecting to {address}...")
self._zmq_ctx = zmq.Context()
self._zmq_socket = self._zmq_ctx.socket(zmq.REQ)
self._zmq_socket.connect(address)
logger.info(f"Connected to {address}.")

def set_platform_velocity_target(
self, vel_x: float, vel_y: float, vel_a: float, timeout: float = 1.0
) -> ResponseMessage:
"""Set the x, y and angular velocity of the complete mobile platform.
Args:
vel_x: Linear velocity of platform in x (forward) direction in m/s.
vel_y: Linear velocity of platform in y (left) direction in m/s.
vel_a: Linear velocity of platform in angular direction in rad/s.
timeout: Duration in seconds after which the movement is automatically stopped (default 1.0).
Returns:
A ResponseMessage object indicating the response status of the request.
"""
msg = SetPlatformVelocityTargetMessage(vel_x, vel_y, vel_a, timeout)
return self._transceive_message(msg)

def stop_server(self) -> ResponseMessage:
"""Stops the remote server.
Returns:
A ResponseMessage object indicating the response status of the request.
"""
msg = StopServerMessage()
return self._transceive_message(msg)

def _transceive_message(self, req: RequestMessage) -> ResponseMessage:
self._zmq_socket.send_pyobj(req)
return self._zmq_socket.recv_pyobj()
23 changes: 15 additions & 8 deletions airo-tulip/airo_tulip/server/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,34 @@


@dataclass
class SetPlatformVelocityTargetMessage:
vel_x: float
vel_y: float
vel_a: float
class RequestMessage:
pass


@dataclass
class StopServerMessage:
class ResponseMessage:
pass


@dataclass
class HeartbeatMessage:
class SetPlatformVelocityTargetMessage(RequestMessage):
vel_x: float
vel_y: float
vel_a: float
timeout: float


@dataclass
class StopServerMessage(RequestMessage):
pass


@dataclass
class ErrorResponse:
class ErrorResponse(ResponseMessage):
message: str
cause: str


@dataclass
class OkResponse:
class OkResponse(ResponseMessage):
pass
39 changes: 15 additions & 24 deletions airo-tulip/airo_tulip/server/server.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import time
from threading import Thread, Event
from threading import Event, Thread
from typing import List

import zmq
import zmq.asyncio
from airo_tulip.robile_platform import RobilePlatform
from airo_tulip.server.messages import SetPlatformVelocityTargetMessage, ErrorResponse, OkResponse, StopServerMessage, \
HeartbeatMessage
from airo_tulip.server.messages import ErrorResponse, OkResponse, SetPlatformVelocityTargetMessage, StopServerMessage
from airo_tulip.structs import WheelConfig
from loguru import logger

Expand All @@ -32,16 +31,21 @@ class TulipServer:
to communicate with the TulipServer over a TCP socket, connecting with 0MQ. We use the REQ/REP
message pattern (https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/patterns/client_server.html)."""

def __init__(self, robot_ip: str, robot_port: int, robot_configuration: RobotConfiguration,
loop_frequency: float = 20, max_time_between_heartbeats: float = 1):
def __init__(
self,
robot_ip: str,
robot_port: int,
robot_configuration: RobotConfiguration,
loop_frequency: float = 20,
):
"""Initialize the server.
Args:
robot_ip: The IP address of the robot.
robot_port: The port on which to run this server.
robot_configuration: The robot configuration.
loop_frequency: The frequency (Hz) with which EtherCAT messages are received and sent.
max_time_between_heartbeats: The maximum time in seconds between heartbeats. If no heartbeat message was received from the client for this much time, stop everything."""
"""
# ZMQ socket.
address = f"tcp://{robot_ip}:{robot_port}"
logger.info(f"Binding to {address}...")
Expand All @@ -57,7 +61,6 @@ def __init__(self, robot_ip: str, robot_port: int, robot_configuration: RobotCon
self._request_handlers = {
SetPlatformVelocityTargetMessage.__name__: self._handle_set_platform_velocity_target_request,
StopServerMessage.__name__: self._handle_stop_server_request,
HeartbeatMessage.__name__: self._handle_heartbeat_request,
}

# Robot platform.
Expand All @@ -66,15 +69,9 @@ def __init__(self, robot_ip: str, robot_port: int, robot_configuration: RobotCon

self._loop_frequency = loop_frequency

self._last_heartbeat = None
self._max_time_between_heartbeats = max_time_between_heartbeats

def _request_loop(self):
while not self._should_stop.is_set():
request = self._zmq_socket.recv_pyobj()
# As soon as we've received a request, the client should start sending out heartbeats.
# Technically, this request could also be something else, but we should start expecting heartbeat messages.
self._last_heartbeat = time.time()
logger.info("Handling client request.")
response = self._handle_request(request)
# Send response.
Expand Down Expand Up @@ -105,11 +102,8 @@ def run(self):
thread_requests = Thread(target=self._request_loop, daemon=True)
thread_requests.start()

while not self._should_stop.is_set():
# Stop if we haven't received a heartbeat in a while. Safety first.
if self._last_heartbeat is not None and time.time() - self._last_heartbeat > self._max_time_between_heartbeats:
logger.warning("No heartbeat message received in time. Stopping platform for safety reasons!")
self._platform.driver.set_platform_velocity_target(0.0, 0.0, 0.0)
# Run until stop flag set by joining EtherCAT thread
thread_ethercat.join()

self._zmq_socket.close()
self._zmq_ctx.term()
Expand All @@ -122,7 +116,9 @@ def _handle_request(self, request):

def _handle_set_platform_velocity_target_request(self, request: SetPlatformVelocityTargetMessage):
try:
self._platform.driver.set_platform_velocity_target(request.vel_x, request.vel_y, request.vel_a)
self._platform.driver.set_platform_velocity_target(
request.vel_x, request.vel_y, request.vel_a, request.timeout
)
logger.info("Request handled successfully.")
return OkResponse()
except ValueError as e:
Expand All @@ -133,8 +129,3 @@ def _handle_stop_server_request(self, _request: StopServerMessage):
logger.info("Received stop request.")
self._should_stop.set()
return OkResponse()

def _handle_heartbeat_request(self, request: HeartbeatMessage):
logger.info(f"Received heartbeat from client. Seconds since epoch (client): {request.client_time}.")
self._last_heartbeat = time.time()
return OkResponse()
70 changes: 0 additions & 70 deletions airo-tulip/scripts/test_client.py

This file was deleted.

42 changes: 42 additions & 0 deletions airo-tulip/scripts/test_kelo_robile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import math
import time

from airo_tulip.server.kelo_robile import KELORobile


def test():
mobi = KELORobile("localhost", 49789)

mobi.set_platform_velocity_target(0.5, 0.0, 0.0)
time.sleep(3) # movement should timeout

mobi.set_platform_velocity_target(0.0, 0.0, math.pi / 8, timeout=2.0)
time.sleep(2)

mobi.set_platform_velocity_target(0.2, 0.0, 0.0)
time.sleep(1)

mobi.set_platform_velocity_target(0.0, 0.2, 0.0)
time.sleep(1)

mobi.set_platform_velocity_target(-0.2, 0.0, 0.0)
time.sleep(1)

mobi.set_platform_velocity_target(0.0, -0.2, 0.0)
time.sleep(1)

mobi.set_platform_velocity_target(0.0, 0.0, -math.pi / 8, timeout=2.0)
time.sleep(2)

mobi.set_platform_velocity_target(-0.5, 0.0, 0.0)
time.sleep(3) # movement should timeout

mobi.set_platform_velocity_target(0.0, 0.0, 0.0)
time.sleep(0.5)

mobi.stop_server()
time.sleep(0.5)


if __name__ == "__main__":
test()

0 comments on commit 8c2c7a5

Please sign in to comment.