diff --git a/panther_power_control/package.xml b/panther_power_control/package.xml
index 09a2d88e6..4f2e89941 100644
--- a/panther_power_control/package.xml
+++ b/panther_power_control/package.xml
@@ -24,6 +24,6 @@
std_srvs
- python-rpi.gpio-pip
+ python3-libgpiod
\ No newline at end of file
diff --git a/panther_power_control/src/power_board_node.py b/panther_power_control/src/power_board_node.py
index 53651c19f..aab84d98c 100755
--- a/panther_power_control/src/power_board_node.py
+++ b/panther_power_control/src/power_board_node.py
@@ -1,7 +1,7 @@
#!/usr/bin/python3
-from dataclasses import dataclass
-import RPi.GPIO as GPIO
+import gpiod
+import math
from threading import Lock
import rospy
@@ -14,36 +14,16 @@
from panther_msgs.msg import DriverState, IOState
-@dataclass
-class PatherGPIO:
- AUX_PW_EN = 18 # Enable auxiliary power, eg. supply to robotic arms etc.
- CHRG_DISABLE = 19 # Disable charger
- CHRG_SENSE = 7 # Charger sensor (1 - charger plugged in)
- DRIVER_EN = 23 # Enable motor drivers (1 - on)
- E_STOP_RESET = 27 # Works as IN/OUT, IN - gives info if E-stop in on (1 - off),
- # OUT - send 1 to reset estop
- FAN_SW = 15 # Turn on the fan (1 - on)
- SHDN_INIT = 16 # Shutdown Init managed by systemd service
- VDIG_OFF = 21 # Turn the digital power off eg. NUC, Router etc. (1 - off)
- VMOT_ON = 6 # Enable mamin power supply to motors (1 - on)
- WATCHDOG = 14 # Watchdog pin, if PWM is on this pin Panther will work
-
- # define inverse logic pins here to be used by _read_pin() method
- inverse_logic_pins = [VDIG_OFF, E_STOP_RESET, CHRG_SENSE]
-
- def __setattr__(self, name: str, value: int) -> None:
- raise AttributeError(f'Can\'t reassign constant {name}')
-
-
class Watchdog:
- def __init__(self, pin: int) -> None:
- self._pin = pin
+ def __init__(self, line: gpiod.Line) -> None:
self._last_state = False
self._enabled = True
+ self._watchdog_pin = line
+
def __call__(self) -> None:
if self._enabled:
- GPIO.output(self._pin, self._last_state)
+ self._watchdog_pin.set_value(self._last_state)
self._last_state = not self._last_state
def turn_on(self) -> None:
@@ -52,26 +32,66 @@ def turn_on(self) -> None:
def turn_off(self) -> None:
self._enabled = False
self._last_state = False
- GPIO.output(self._pin, self._last_state)
+ self._watchdog_pin.set_value(self._last_state)
class PowerBoardNode:
def __init__(self, name: str) -> None:
- rospy.init_node(name, anonymous=False)
+ self._node_name = name
+ rospy.init_node(self._node_name, anonymous=False)
self._pins_lock = Lock()
self._e_stop_lock = Lock()
+ self._watchdog_lock = Lock()
+ self._io_state_lock = Lock()
self._clearing_e_stop = False
- self._pins = PatherGPIO()
- self._setup_gpio()
+ out_line_names = {
+ 'AUX_PW_EN': False, # Enable auxiliary power, eg. supply to robotic arms etc.
+ 'CHRG_DISABLE': True, # Disable charger
+ 'DRIVER_EN': False, # Enable motor drivers (1 - on)
+ 'FAN_SW': False, # Turn on the fan (1 - on)
+ 'VDIG_OFF': False, # Turn the digital power off eg. NUC, Router etc. (1 - off)
+ 'VMOT_ON': False, # Enable main power supply to motors (1 - on)
+ 'WATCHDOG': False, # Watchdog pin, if PWM is on this pin Panther will work
+ }
+ in_line_names = {
+ 'CHRG_SENSE': True, # Charger sensor (0 - charger connected, 1 - not connected)
+ 'E_STOP_RESET': False, # Works as IN/OUT,
+ # IN - gives info if E-stop is on (1 - off), OUT - send 1 to reset estop
+ 'SHDN_INIT': False, # Shutdown Init managed by systemd service
+ }
+
+ self._chip = gpiod.Chip('gpiochip0', gpiod.Chip.OPEN_BY_NAME)
+ self._lines = {
+ name: self._chip.find_line(name)
+ for name in list(out_line_names.keys()) + list(in_line_names.keys())
+ }
+ not_matched_pins = [name for name, line in self._lines.items() if line is None]
+ if not_matched_pins:
+ for pin in not_matched_pins:
+ rospy.logerr(f'[{rospy.get_name()}] Failed to find pin: \'{pin}\'')
+ rospy.signal_shutdown('Failed to find GPIO lines')
+ return
+
+ for name, line in self._lines.items():
+ line.request(
+ self._node_name,
+ gpiod.LINE_REQ_DIR_OUT if name in out_line_names.keys() else gpiod.LINE_REQ_DIR_IN,
+ default_val=out_line_names[name]
+ if name in out_line_names.keys()
+ else in_line_names[name],
+ )
+
+ self._watchdog = Watchdog(self._lines['WATCHDOG'])
self._motor_start_sequence()
- self._watchdog = Watchdog(self._pins.WATCHDOG)
self._gpio_wait = 0.05 # seconds
- self._e_stop_interrupt_time = float('inf')
- self._chrg_sense_interrupt_time = float('inf')
+ self._last_e_stop_state = not self._lines['E_STOP_RESET'].get_value()
+ self._last_shdn_init_state = self._lines['SHDN_INIT'].get_value()
+ self._e_stop_pressed_time = float('inf')
+ self._shdn_init_detect_time = float('inf')
self._cmd_vel_msg_time = rospy.get_time()
self._can_net_err = True
@@ -83,17 +103,17 @@ def __init__(self, name: str) -> None:
self._e_stop_state_pub = rospy.Publisher('hardware/e_stop', Bool, queue_size=1, latch=True)
self._io_state_pub = rospy.Publisher('hardware/io_state', IOState, queue_size=1, latch=True)
- msg = Bool(self._read_pin(self._pins.E_STOP_RESET))
+ msg = Bool(not self._lines['E_STOP_RESET'].get_value())
self._e_stop_state_pub.publish(msg)
io_state = IOState()
- io_state.aux_power = self._read_pin(self._pins.AUX_PW_EN)
- io_state.charger_connected = self._read_pin(self._pins.CHRG_SENSE)
- io_state.fan = self._read_pin(self._pins.FAN_SW)
+ io_state.aux_power = self._lines['AUX_PW_EN'].get_value()
+ io_state.charger_connected = not self._lines['CHRG_SENSE'].get_value()
+ io_state.fan = self._lines['FAN_SW'].get_value()
io_state.power_button = False
- io_state.digital_power = self._read_pin(self._pins.VDIG_OFF)
- io_state.charger_enabled = not self._read_pin(self._pins.CHRG_DISABLE)
- io_state.motor_on = self._read_pin(self._pins.DRIVER_EN)
+ io_state.digital_power = not self._lines['VDIG_OFF'].get_value()
+ io_state.charger_enabled = not self._lines['CHRG_DISABLE'].get_value()
+ io_state.motor_on = self._lines['DRIVER_EN'].get_value()
self._io_state_pub.publish(io_state)
# -------------------------------
@@ -141,26 +161,23 @@ def __init__(self, name: str) -> None:
# Timers
# -------------------------------
- # 5 Hz publish non asynch pin state
- self._publish_pin_state_timer = rospy.Timer(rospy.Duration(0.2), self._publish_pin_state_cb)
+ # 20 Hz publish non asynch pin state
+ self._publish_pin_state_timer = rospy.Timer(
+ rospy.Duration(0.05), self._publish_pin_state_cb
+ )
# 50 Hz of software PWM. Timer running at 100 Hz for raising and falling edges
self._watchdog_timer = rospy.Timer(rospy.Duration(0.01), self._watchdog_cb)
- # -------------------------------
- # GPIO callbacks
- # -------------------------------
-
- # register e-stop and power button pin change imminently
- GPIO.add_event_detect(
- self._pins.E_STOP_RESET, GPIO.BOTH, callback=self._gpio_interrupt_cb, bouncetime=200
- )
-
- GPIO.add_event_detect(
- self._pins.SHDN_INIT, GPIO.RISING, callback=self._gpio_interrupt_cb, bouncetime=200
- )
-
rospy.loginfo(f'[{rospy.get_name()}] Node started')
+ def __del__(self):
+ with self._pins_lock:
+ for line in self._lines.values():
+ if line:
+ line.release()
+ if self._chip:
+ self._chip.close()
+
def _cmd_vel_cb(self, *args) -> None:
with self._e_stop_lock:
self._cmd_vel_msg_time = rospy.get_time()
@@ -171,54 +188,62 @@ def _motor_controllers_state_cb(self, msg: DriverState) -> None:
{msg.rear.fault_flag.can_net_err, msg.front.fault_flag.can_net_err}
)
- def _gpio_interrupt_cb(self, pin: int) -> None:
- with self._pins_lock:
- if pin == self._pins.SHDN_INIT:
- self._chrg_sense_interrupt_time = rospy.get_time()
-
- if pin == self._pins.E_STOP_RESET:
- self._e_stop_interrupt_time = rospy.get_time()
-
def _publish_pin_state_cb(self, *args) -> None:
+ pin_state_time = rospy.get_time()
with self._pins_lock:
- charger_state = self._read_pin(self._pins.CHRG_SENSE)
- self._publish_io_state('charger_connected', charger_state)
+ charger_state = not self._lines['CHRG_SENSE'].get_value()
+ shdn_init_val = self._lines['SHDN_INIT'].get_value()
+ estop_state = not self._lines['E_STOP_RESET'].get_value()
+
+ self._publish_io_state('charger_connected', charger_state)
+
+ # filter short spikes of voltage on GPIO
+ if math.isinf(self._shdn_init_detect_time) and shdn_init_val:
+ self._shdn_init_detect_time = pin_state_time
+ elif pin_state_time - self._shdn_init_detect_time > self._gpio_wait:
+ if shdn_init_val and not self._last_shdn_init_state:
+ self._last_shdn_init_state = shdn_init_val
+ rospy.loginfo(f'[{rospy.get_name()}] Shutdown button pressed.')
+ self._publish_io_state('power_button', True)
+ self._shdn_init_detect_time = float('inf')
- # filter short spikes of voltage on GPIO
- if rospy.get_time() - self._chrg_sense_interrupt_time > self._gpio_wait:
- if self._read_pin(self._pins.SHDN_INIT):
- rospy.loginfo(f'[{rospy.get_name()}] Shutdown button pressed.')
- self._publish_io_state('power_button', True)
- self._chrg_sense_interrupt_time = float('inf')
+ with self._e_stop_lock:
+ last_e_stop_state = self._last_e_stop_state
- if rospy.get_time() - self._e_stop_interrupt_time > self._gpio_wait:
- self._e_stop_event()
- self._e_stop_interrupt_time = float('inf')
+ if math.isinf(self._e_stop_pressed_time) and estop_state != last_e_stop_state:
+ self._e_stop_pressed_time = pin_state_time
+ elif pin_state_time - self._e_stop_pressed_time > self._gpio_wait:
+ self._e_stop_event()
+ self._e_stop_pressed_time = float('inf')
def _watchdog_cb(self, *args) -> None:
- self._watchdog()
+ with self._watchdog_lock:
+ self._watchdog()
def _aux_power_enable_cb(self, req: SetBoolRequest) -> SetBoolResponse:
- res = self._set_bool_srv_handle(req.data, self._pins.AUX_PW_EN, 'Aux power enable')
+ res = self._set_bool_srv_handle(req.data, 'AUX_PW_EN', 'Aux power enable')
if res.success:
self._publish_io_state('aux_power', req.data)
return res
def _charger_enable_cb(self, req: SetBoolRequest) -> SetBoolResponse:
- res = self._set_bool_srv_handle(not req.data, self._pins.CHRG_DISABLE, 'Charger disable')
+ res = self._set_bool_srv_handle(not req.data, 'CHRG_DISABLE', 'Charger disable')
if res.success:
self._publish_io_state('charger_enabled', req.data)
return res
def _digital_power_enable_cb(self, req: SetBoolRequest) -> SetBoolResponse:
- res = self._set_bool_srv_handle(req.data, self._pins.VDIG_OFF, 'Digital power enable')
+ res = self._set_bool_srv_handle(not req.data, 'VDIG_OFF', 'Digital power enable')
if res.success:
self._publish_io_state('digital_power', req.data)
return res
def _e_stop_reset_cb(self, req: TriggerRequest) -> TriggerResponse:
+ with self._pins_lock:
+ estop_state = not self._lines['E_STOP_RESET'].get_value()
+
with self._e_stop_lock:
- if self._validate_gpio_pin(self._pins.E_STOP_RESET, False):
+ if not estop_state:
return TriggerResponse(True, 'E-STOP is not active, reset is not needed')
elif rospy.get_time() - self._cmd_vel_msg_time <= 2.0:
return TriggerResponse(
@@ -228,13 +253,18 @@ def _e_stop_reset_cb(self, req: TriggerRequest) -> TriggerResponse:
elif self._can_net_err:
return TriggerResponse(
False,
- 'E-STOP reset failed, unable to communicate with motor controllers! Please check connection with motor controllers.',
+ 'E-STOP reset failed, unable to communicate with motor controllers! '
+ 'Please check connection with motor controllers.',
)
self._reset_e_stop()
- if self._validate_gpio_pin(self._pins.E_STOP_RESET, True):
- self._watchdog.turn_off()
+ with self._pins_lock:
+ estop_state = not self._lines['E_STOP_RESET'].get_value()
+
+ if estop_state:
+ with self._watchdog_lock:
+ self._watchdog.turn_off()
return TriggerResponse(
False,
'E-STOP reset failed, check for pressed E-STOP buttons or other triggers',
@@ -243,20 +273,22 @@ def _e_stop_reset_cb(self, req: TriggerRequest) -> TriggerResponse:
return TriggerResponse(True, 'E-STOP reset successful')
def _e_stop_trigger_cb(self, req: TriggerRequest) -> TriggerResponse:
- self._watchdog.turn_off()
+ with self._watchdog_lock:
+ self._watchdog.turn_off()
return TriggerResponse(True, f'E-STOP triggered, watchdog turned off')
def _fan_enable_cb(self, req: SetBoolRequest) -> SetBoolResponse:
- res = self._set_bool_srv_handle(req.data, self._pins.FAN_SW, 'Fan enable')
+ res = self._set_bool_srv_handle(req.data, 'FAN_SW', 'Fan enable')
if res.success:
self._publish_io_state('fan', req.data)
return res
def _motor_enable_cb(self, req: SetBoolRequest) -> SetBoolResponse:
- if self._validate_gpio_pin(self._pins.DRIVER_EN, req.data):
- return SetBoolResponse(True, f'Motor state already set to: {req.data}')
+ with self._pins_lock:
+ if self._lines['DRIVER_EN'].get_value() == req.data:
+ return SetBoolResponse(True, f'Motor state already set to: {req.data}')
- res = self._set_bool_srv_handle(req.data, self._pins.DRIVER_EN, 'Motor drivers enable')
+ res = self._set_bool_srv_handle(req.data, 'DRIVER_EN', 'Motor drivers enable')
if not res.success:
return res
@@ -268,22 +300,23 @@ def _motor_enable_cb(self, req: SetBoolRequest) -> SetBoolResponse:
try:
reset_script_res = self._reset_roboteq_script_client.call()
if not reset_script_res.success:
- res = self._set_bool_srv_handle(False, self._pins.DRIVER_EN, 'Motor drivers enable')
+ res = self._set_bool_srv_handle(False, 'DRIVER_EN', 'Motor drivers enable')
if res.success:
self._publish_io_state('motor_on', False)
return SetBoolResponse(reset_script_res.success, reset_script_res.message)
except rospy.ServiceException as e:
- res = self._set_bool_srv_handle(False, self._pins.DRIVER_EN, 'Motor drivers enable')
+ res = self._set_bool_srv_handle(False, 'DRIVER_EN', 'Motor drivers enable')
if res.success:
self._publish_io_state('motor_on', False)
return SetBoolResponse(False, f'Failed to reset roboteq script: {e}')
return res
- def _set_bool_srv_handle(self, value: bool, pin: int, name: str) -> SetBoolResponse:
+ def _set_bool_srv_handle(self, value: bool, pin_name: str, name: str) -> SetBoolResponse:
rospy.logdebug(f'[{rospy.get_name()}] Requested {name} = {value}')
- self._write_to_pin(pin, value)
- success = self._validate_gpio_pin(pin, value)
+ with self._pins_lock:
+ self._lines[pin_name].set_value(value)
+ success = self._lines[pin_name].get_value() == value
msg = f'{name} write {value} failed'
if success:
msg = f'{name} write {value} successful'
@@ -291,63 +324,53 @@ def _set_bool_srv_handle(self, value: bool, pin: int, name: str) -> SetBoolRespo
return SetBoolResponse(success, msg)
def _reset_e_stop(self) -> None:
- self._clearing_e_stop = True
- GPIO.setup(self._pins.E_STOP_RESET, GPIO.OUT)
- self._watchdog.turn_on()
+ with self._e_stop_lock:
+ self._clearing_e_stop = True
+
+ with self._pins_lock:
+ req_type = gpiod.LINE_REQ_DIR_OUT
+ self._lines['E_STOP_RESET'].release()
+ self._lines['E_STOP_RESET'].request(self._node_name, type=req_type)
+
+ with self._watchdog_lock:
+ self._watchdog.turn_on()
- self._write_to_pin(self._pins.E_STOP_RESET, False)
+ with self._pins_lock:
+ self._lines['E_STOP_RESET'].set_value(True)
rospy.sleep(0.1)
- GPIO.setup(self._pins.E_STOP_RESET, GPIO.IN)
+ with self._pins_lock:
+ req_type = gpiod.LINE_REQ_DIR_IN
+ self._lines['E_STOP_RESET'].release()
+ self._lines['E_STOP_RESET'].request(self._node_name, type=req_type)
rospy.sleep(0.1)
- self._clearing_e_stop = False
+
+ with self._e_stop_lock:
+ self._clearing_e_stop = False
self._e_stop_event()
def _e_stop_event(self) -> None:
- e_stop_state = self._read_pin(self._pins.E_STOP_RESET)
- if e_stop_state != self._e_stop_state_pub.impl.latch.data and not self._clearing_e_stop:
- self._e_stop_state_pub.publish(e_stop_state)
+ with self._pins_lock:
+ e_stop_state = not self._lines['E_STOP_RESET'].get_value()
+
+ with self._e_stop_lock:
+ if e_stop_state != self._last_e_stop_state and not self._clearing_e_stop:
+ self._last_e_stop_state = e_stop_state
+ self._e_stop_state_pub.publish(e_stop_state)
def _publish_io_state(self, attribute: str, val: bool) -> None:
- last_msg = self._io_state_pub.impl.latch
- if getattr(last_msg, attribute) != val:
- setattr(last_msg, attribute, val)
- self._io_state_pub.publish(last_msg)
+ with self._io_state_lock:
+ last_msg = self._io_state_pub.impl.latch
+ if getattr(last_msg, attribute) != val:
+ setattr(last_msg, attribute, val)
+ self._io_state_pub.publish(last_msg)
def _motor_start_sequence(self) -> None:
- self._write_to_pin(self._pins.VMOT_ON, 1)
+ self._lines['VMOT_ON'].set_value(True)
rospy.sleep(0.5)
- self._write_to_pin(self._pins.DRIVER_EN, 1)
+ self._lines['DRIVER_EN'].set_value(True)
rospy.sleep(0.2)
- def _setup_gpio(self) -> None:
- GPIO.setwarnings(False)
- GPIO.setmode(GPIO.BCM)
- GPIO.setup(self._pins.AUX_PW_EN, GPIO.OUT, initial=0)
- GPIO.setup(self._pins.CHRG_DISABLE, GPIO.OUT, initial=1)
- GPIO.setup(self._pins.CHRG_SENSE, GPIO.IN)
- GPIO.setup(self._pins.DRIVER_EN, GPIO.OUT, initial=0)
- GPIO.setup(self._pins.E_STOP_RESET, GPIO.IN) # USED AS I/O
- GPIO.setup(self._pins.FAN_SW, GPIO.OUT, initial=0)
- GPIO.setup(self._pins.SHDN_INIT, GPIO.IN)
- GPIO.setup(self._pins.VDIG_OFF, GPIO.OUT, initial=0)
- GPIO.setup(self._pins.VMOT_ON, GPIO.OUT, initial=0)
- GPIO.setup(self._pins.WATCHDOG, GPIO.OUT, initial=0)
-
- def _read_pin(self, pin: int) -> bool:
- if pin in self._pins.inverse_logic_pins:
- return not GPIO.input(pin)
- return GPIO.input(pin)
-
- def _write_to_pin(self, pin: int, value: bool) -> None:
- if pin in self._pins.inverse_logic_pins:
- GPIO.output(pin, not value)
- return
- GPIO.output(pin, value)
-
- def _validate_gpio_pin(self, pin: int, value: bool) -> bool:
- return self._read_pin(pin) == value
-
def main():
power_board_node = PowerBoardNode('power_board_node')
@@ -359,5 +382,3 @@ def main():
main()
except rospy.ROSInterruptException:
pass
- finally:
- GPIO.cleanup()
diff --git a/panther_power_control/src/relays_node.py b/panther_power_control/src/relays_node.py
index 650a14501..edae6a767 100755
--- a/panther_power_control/src/relays_node.py
+++ b/panther_power_control/src/relays_node.py
@@ -1,7 +1,6 @@
#!/usr/bin/python3
-from dataclasses import dataclass
-import RPi.GPIO as GPIO
+import gpiod
from threading import Lock
import rospy
@@ -13,25 +12,35 @@
from panther_msgs.msg import DriverState, IOState
-@dataclass
-class PatherGPIO:
- MOTOR_ON = 6 # Pin to enable motor controllers
- STAGE2_INPUT = 22 # Check if power can be forwarded to motor controllers
-
- def __setattr__(self, name: str, value: int) -> None:
- raise AttributeError(f'Can\'t reassign constant {name}')
-
-
class RelaysNode:
def __init__(self, name: str) -> None:
- rospy.init_node(name, anonymous=False)
+ self._node_name = name
+ rospy.init_node(self._node_name, anonymous=False)
self._lock = Lock()
- self._pins = PatherGPIO()
- self._setup_gpio()
+ line_names = {
+ 'MOTOR_ON': False, # Used to enable motors
+ 'STAGE2_INPUT': False, # Input from 2nd stage of rotary power switch
+ }
+
+ self._chip = gpiod.Chip('gpiochip0', gpiod.Chip.OPEN_BY_NAME)
+ self._lines = {name: self._chip.find_line(name) for name in list(line_names.keys())}
+ not_matched_pins = [name for name, line in self._lines.items() if line is None]
+ if not_matched_pins:
+ for pin in not_matched_pins:
+ rospy.logerr(f'[{rospy.get_name()}] Failed to find pin: \'{pin}\'')
+ rospy.signal_shutdown('Failed to find GPIO lines')
+ return
+
+ self._lines['MOTOR_ON'].request(
+ self._node_name, type=gpiod.LINE_REQ_DIR_OUT, default_val=line_names['MOTOR_ON']
+ )
+ self._lines['STAGE2_INPUT'].request(
+ self._node_name, type=gpiod.LINE_REQ_DIR_IN, default_val=line_names['STAGE2_INPUT']
+ )
- self._e_stop_state = not GPIO.input(self._pins.STAGE2_INPUT)
+ self._e_stop_state = not self._lines['STAGE2_INPUT'].get_value()
self._cmd_vel_msg_time = rospy.get_time()
self._can_net_err = True
@@ -46,7 +55,7 @@ def __init__(self, name: str) -> None:
self._e_stop_state_pub.publish(self._e_stop_state)
self._io_state = IOState()
- self._io_state.motor_on = GPIO.input(self._pins.STAGE2_INPUT)
+ self._io_state.motor_on = self._lines['STAGE2_INPUT'].get_value()
self._io_state.aux_power = False
self._io_state.charger_connected = False
self._io_state.fan = False
@@ -86,6 +95,13 @@ def __init__(self, name: str) -> None:
rospy.loginfo(f'[{rospy.get_name()}] Node started')
+ def __del__(self):
+ for line in self._lines.values():
+ if line:
+ line.release()
+ if self._chip:
+ self._chip.close()
+
def _cmd_vel_cb(self, *args) -> None:
with self._lock:
self._cmd_vel_msg_time = rospy.get_time()
@@ -98,6 +114,9 @@ def _motor_controllers_state_cb(self, msg: DriverState) -> None:
def _e_stop_reset_cb(self, req: TriggerRequest) -> TriggerResponse:
with self._lock:
+ if not self._e_stop_state:
+ return TriggerResponse(True, 'E-STOP is not active, reset is not needed')
+
if rospy.get_time() - self._cmd_vel_msg_time <= 2.0:
return TriggerResponse(
False,
@@ -106,7 +125,8 @@ def _e_stop_reset_cb(self, req: TriggerRequest) -> TriggerResponse:
elif self._can_net_err:
return TriggerResponse(
False,
- 'E-STOP reset failed, unable to communicate with motor controllers! Please check connection with motor controllers.',
+ 'E-STOP reset failed, unable to communicate with motor controllers! '
+ 'Please check connection with motor controllers.',
)
self._e_stop_state = False
@@ -115,23 +135,20 @@ def _e_stop_reset_cb(self, req: TriggerRequest) -> TriggerResponse:
def _e_stop_trigger_cb(self, req: TriggerRequest) -> TriggerResponse:
with self._lock:
+ if self._e_stop_state:
+ return TriggerResponse(True, 'E-SROP already triggered')
+
self._e_stop_state = True
self._e_stop_state_pub.publish(self._e_stop_state)
return TriggerResponse(True, 'E-SROP triggered successful')
def _set_motor_state_timer_cb(self, *args) -> None:
- motor_state = GPIO.input(self._pins.STAGE2_INPUT)
- GPIO.output(self._pins.MOTOR_ON, motor_state)
+ motor_state = self._lines['STAGE2_INPUT'].get_value()
+ self._lines['MOTOR_ON'].set_value(motor_state)
if self._io_state.motor_on != motor_state:
self._io_state.motor_on = motor_state
self._io_state_pub.publish(self._io_state)
- def _setup_gpio(self) -> None:
- GPIO.setwarnings(False)
- GPIO.setmode(GPIO.BCM)
- GPIO.setup(self._pins.MOTOR_ON, GPIO.OUT)
- GPIO.setup(self._pins.STAGE2_INPUT, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
-
def main():
relays_node = RelaysNode('relays_node')
@@ -143,5 +160,3 @@ def main():
main()
except rospy.ROSInterruptException:
pass
- finally:
- GPIO.cleanup()