diff --git a/pyrcf/components/ctrl_loop_debuggers/__init__.py b/pyrcf/components/ctrl_loop_debuggers/__init__.py index 04ce36f..7600ade 100644 --- a/pyrcf/components/ctrl_loop_debuggers/__init__.py +++ b/pyrcf/components/ctrl_loop_debuggers/__init__.py @@ -3,3 +3,6 @@ from .ctrl_loop_debugger_base import CtrlLoopDebuggerBase, DummyDebugger from .data_recorder_debugger import ComponentDataRecorderDebugger +from .data_publish_debuggers import ComponentDataPublisherDebugger, PlotjugglerLoopDebugger +from .pybullet_robot_plan_debugger import PybulletRobotPlanDebugger +from .pybullet_robot_state_debugger import PybulletRobotStateDebugger diff --git a/pyrcf/components/ctrl_loop_debuggers/ctrl_loop_data_publisher_base.py b/pyrcf/components/ctrl_loop_debuggers/ctrl_loop_data_publisher_base.py new file mode 100644 index 0000000..eedc053 --- /dev/null +++ b/pyrcf/components/ctrl_loop_debuggers/ctrl_loop_data_publisher_base.py @@ -0,0 +1,232 @@ +from typing import List, Tuple, Callable, Any +from dataclasses import dataclass + +from .ctrl_loop_debugger_base import CtrlLoopDebuggerBase +from ...utils.time_utils import ClockBase, PythonPerfClock +from ...core.types import GlobalMotionPlan, LocalMotionPlan, RobotState, RobotCmd +from ...core.types.debug_types import JointStatesCompareType, Pose3DCompareType +from ...utils.data_io_utils.pyrcf_publisher import PyRCFPublisherBase, PyRCFPublisherZMQ + + +@dataclass +class CtrlLoopDataStreamConfig: + """Configurations for data streamer in the simple managed control loop. + + Get the configuration from the `SimpleManagedCtrlLoop` object using + `control_loop.data_streamer_config`. Modify the parameters (see below) in this config as + required before calling the `control_loop.run()` method. + + NOTE: Try to keep the rate of the publisher (stream_freq) a factor of the rate of the control + loop. Otherwise, the RateTrigger will not be able to keep the required rate (because + RateLimiter enforces a sleep). + + NOTE: Because the publisher uses a custom encoder for encoding PyRCF data types to serialisable + data, this debugger can affect the speed of the control loop. + + Parameters: + publish_robot_states (bool): Defaults to True. + publish_agent_outputs (bool): Publish output from each agent in the control loop. (Local + plan (if available) and RobotCmd from each agent). Defaults to True. + publish_global_plan (bool): Defaults to True. + publish_robot_cmd (bool): Defaults to True. + """ + + publish_robot_states: bool = True + publish_global_plan: bool = True + publish_agent_outputs: bool = True + """Publish output from each agent in the control loop. (Local plan (if available) + and RobotCmd from each agent). Defaults to True.""" + publish_robot_cmd: bool = True + publish_joint_states_comparison: bool = True + """If set to True, will publish joint states from different sources (robot state, + agent plan outputs, agent control command output, final robot command) in a + JointStatesCompareType object, to make it easier to compare values. (see + `examples/utils_demo/demo_plotjuggler_loop_debugger.py` file)""" + publish_ee_pose_comparison: bool = False + """If set to True, will publish End-effector pose data from different sources + (robot state (state estimator), agent plan outputs (if ee_reference is present)) + using Pose3DCompareType object, to make it easier to compare values. Only publishes + poses for end-effectors available in `robot_state.state_estimates.ee_states.ee_names`. + (See `examples/utils_demo/demo_plotjuggler_loop_debugger.py` file for details on + how Pose3DCompareType can be used for other pose objects.)""" + publish_debug_msg: bool = True + """If set to True, will publish custom debug data from provided function handles.""" + prefix_name: str = "ctrl_loop" + + +class CtrlLoopDataPublisherBase(CtrlLoopDebuggerBase): + """Base class for debuggers that stream the data from all the components in the control loop + using some publisher that can stream strings/bytes. At the moment, supports only zmq and ros + publishing. + """ + + def __init__( + self, + publisher: PyRCFPublisherBase = None, + rate: float = None, + clock: ClockBase = PythonPerfClock(), + debug_publish_callables: Callable[[], Any] | List[Callable[[], Any]] = None, + ): + """Base class for debuggers that stream the data from all the components in the control + loop to a specified tcp port using zmq, or via ros2 publisher topics. + + Args: + port (int): tcp port to publish data to. + rate (float, optional): Rate at which this should be triggered. Defaults to None + (i.e. use control loop rate). + clock (ClockBase, optional): The clock to use for timer. Defaults to PythonPerfClock(). + debug_publish_callables (Callable[[], Any] | List[Callable[[], Any]]): Function handle + (or list of) that return publishable data (json encodable) for additional streaming + to plotjuggler. Use this for debugging components. These method(s)/functions(s) + will be called in the control loop and their return value added to the data being + published (if publishing is enabled in self.data_streamer_config). Defaults to None. + publisher_type (Literal["zmq", "ros"], optional): Type of publisher to use. Defaults to + "zmq". + """ + super().__init__(rate=rate, clock=clock) + + self._publish_data: bool = False + self._publisher = publisher if publisher is not None else PyRCFPublisherZMQ() + if rate is None or rate >= 0.0: + self._publish_data = True + + self._additional_debug_handles = [] + self.add_debug_handle_to_publish(debug_publish_callables=debug_publish_callables) + self._data_streamer_config = CtrlLoopDataStreamConfig() + + @property + def data_streamer_config(self) -> CtrlLoopDataStreamConfig: + """Config for the data stream published using this CtrlLoopDataPublisher debugger. + + Change its values before calling the run() method, if you want to modify them. + + Returns: + CtrlLoopDataStreamConfig: Modifyable config for the data stream published + using this CtrlLoopDataPublisher debugger. + """ + return self._data_streamer_config + + def add_debug_handle_to_publish( + self, debug_publish_callables: Callable[[], Any] | List[Callable[[], Any]] + ): + """Add additional function handles to publish extra debug data. + + See example 03. + + Args: + debug_publish_callables (Callable[[], Any] | List[Callable[[], Any]]): Function handle + (or list of) that return publishable data (json encodable) for additional streaming + to plotjuggler. Use this for debugging components. These method(s)/functions(s) + will be called in the control loop and their return value added to the data being + published (if publishing is enabled in self.data_streamer_config). + + Raises: + ValueError: If invalid value provided as argument. + """ + if debug_publish_callables is not None: + if callable(debug_publish_callables): + debug_publish_callables = [debug_publish_callables] + if not isinstance(debug_publish_callables, list): + raise ValueError( + f"{self.__class__.__name__}: Argument to this function should" + " be a callable or a list of callables." + ) + assert all( + callable(ele) for ele in debug_publish_callables + ), "All elements in `debug_publish_callables` should be a function handle." + for handle in debug_publish_callables: + if handle not in self._additional_debug_handles: + self._additional_debug_handles.append(handle) + + def _run_once_impl( + self, + t: float, + dt: float, + robot_state: RobotState, + global_plan: GlobalMotionPlan, + agent_outputs: List[Tuple[LocalMotionPlan, RobotCmd]], + robot_cmd: RobotCmd, + ): + if not self._publish_data: + return + + data = {"t": t, "dt": dt} + if self._data_streamer_config.publish_robot_states: + data["robot_states"] = robot_state + if self._data_streamer_config.publish_global_plan: + data["global_plan"] = global_plan + if self._data_streamer_config.publish_agent_outputs: + for n, agent_output in enumerate(agent_outputs): + data[f"agent_{n+1}"] = { + "local_plan": agent_output[0], + "ctrl_cmd": agent_output[1], + } + if self._data_streamer_config.publish_robot_cmd: + data["robot_cmd"] = robot_cmd + if self._data_streamer_config.publish_joint_states_comparison: + joint_states_compare = JointStatesCompareType(joint_states_list=[], row_names=[]) + # add joint state data from robot interface to compare + joint_states_compare.joint_states_list.append(robot_state.joint_states) + joint_states_compare.row_names.append("robot_state.joint_states") + # add joint state reference from global planner + joint_states_compare.joint_states_list.append(global_plan.joint_references) + joint_states_compare.row_names.append("global_plan.joint_references") + # add final robot joint commands to compare + joint_states_compare.joint_states_list.append(robot_cmd.joint_commands) + joint_states_compare.row_names.append("robot_cmd.joint_commands") + # add joint commands from each agent separately to compare + for n, agent_output in enumerate(agent_outputs): + if agent_output[0] is not None: + joint_states_compare.joint_states_list.append(agent_output[0].joint_references) + joint_states_compare.row_names.append( + f"agent_{n+1}.local_plan.joint_references" + ) + joint_states_compare.joint_states_list.append(agent_output[1].joint_commands) + joint_states_compare.row_names.append(f"agent_{n+1}.ctrl_cmd.joint_commands") + data["joint_states_comparison"] = joint_states_compare + if self._data_streamer_config.publish_ee_pose_comparison: + ee_pose_compare = {} # Pose3DCompareType(pose_list=[], row_names=[]) + if robot_state.state_estimates.end_effector_states.ee_poses is not None: + ee_names = robot_state.state_estimates.end_effector_states.ee_names + if ee_names is not None: + for ee_name in ee_names: + # ee pose from state estimate + ee_pose_compare[ee_name] = Pose3DCompareType(pose_list=[], row_names=[]) + p, _, _, _ = robot_state.state_estimates.end_effector_states.get_state_of( + ee_name=ee_name + ) + ee_pose_compare[ee_name].pose_list.append(p) + ee_pose_compare[ee_name].row_names.append("state_estimate") + # ee pose reference from global plan + try: + gpp, _, _, _ = global_plan.end_effector_references.get_state_of( + ee_name=ee_name + ) + except (KeyError, AttributeError): + pass + else: + ee_pose_compare[ee_name].pose_list.append(gpp) + ee_pose_compare[ee_name].row_names.append( + "global_plan.end_effector_references" + ) + # ee pose reference from local planner(s)/agents + for n, agent_output in enumerate(agent_outputs): + if agent_output[0] is not None: + try: + pp, _, _, _ = agent_output[ + 0 + ].end_effector_references.get_state_of(ee_name=ee_name) + except (KeyError, AttributeError): + pass + else: + ee_pose_compare[ee_name].pose_list.append(pp) + ee_pose_compare[ee_name].row_names.append( + f"agent_{n+1}.local_plan.end_effector_references" + ) + data["ee_pose_comparison"] = ee_pose_compare + if self._data_streamer_config.publish_debug_msg: + data["debug_data"] = [_get_data() for _get_data in self._additional_debug_handles] + self._publisher.publish(data={self._data_streamer_config.prefix_name: data}) + + def shutdown(self): + self._publisher.close() diff --git a/pyrcf/components/ctrl_loop_debuggers/data_publish_debuggers.py b/pyrcf/components/ctrl_loop_debuggers/data_publish_debuggers.py new file mode 100644 index 0000000..87b5869 --- /dev/null +++ b/pyrcf/components/ctrl_loop_debuggers/data_publish_debuggers.py @@ -0,0 +1,87 @@ +from typing import Any, Callable, List +from ...utils.time_utils import ClockBase, PythonPerfClock +from .ctrl_loop_data_publisher_base import CtrlLoopDataPublisherBase +from ...utils.data_io_utils.pyrcf_publisher import ( + DEFAULT_ZMQ_PUBLISH_PORT, + DEFAULT_PLOTJUGGLER_PUBLISH_PORT, + PyRCFPublisherZMQ, +) + + +class ComponentDataPublisherDebugger(CtrlLoopDataPublisherBase): + """This control loop debugger will stream the data from all the components in the control loop + so that it can be subscribed to by a zmq subscriber (see `PyRCFSubscriberZMQ`).""" + + def __init__( + self, + rate: float = None, + clock: ClockBase = PythonPerfClock(), + debug_publish_callables: Callable[[], Any] | List[Callable[[], Any]] = None, + port: int = DEFAULT_ZMQ_PUBLISH_PORT, + ): + """ + This control loop debugger will stream the data from all the components in the control loop + so that it can be subscribed to by a zmq subscriber (see `PyRCFSubscriberZMQ`). + + NOTE: Because the publisher uses a custom encoder for encoding PyRCF data types to + serialisable data, this debugger can affect the speed of the control loop (because lossless + data is prioritised over loop rate). It is better to use the `ComponentDataRecorderDebugger` + as it is faster if your computer is powerful enough. + + Args: + rate (float, optional): Rate at which this should be triggered. Defaults to None + (i.e. use control loop rate). + clock (ClockBase, optional): The clock to use for timer. Defaults to PythonPerfClock(). + debug_publish_callables (Callable[[], Any] | List[Callable[[], Any]]): Function handle + (or list of) that return publishable data (json encodable) for additional streaming + to plotjuggler. Use this for debugging components. These method(s)/functions(s) + will be called in the control loop and their return value added to the data being + published (if publishing is enabled in self.data_streamer_config). Defaults to None. + port (int, optional): port to publish in. Defaults to 5001. + """ + super().__init__( + publisher=PyRCFPublisherZMQ(port=port), + rate=rate, + clock=clock, + debug_publish_callables=debug_publish_callables, + ) + + +class PlotjugglerLoopDebugger(ComponentDataPublisherDebugger): + """This control loop debugger will stream the data from all the components in the control loop + so that it can be visualised in plotjuggler.""" + + def __init__( + self, + rate: float = None, + clock: ClockBase = PythonPerfClock(), + debug_publish_callables: Callable[[], Any] | List[Callable[[], Any]] = None, + port: int = DEFAULT_PLOTJUGGLER_PUBLISH_PORT, + ): + """This control loop debugger will stream the data from all the components in the control + loop so that it can be visualised in plotjuggler. + + NOTE: publishes to port 9872 (default for plotjuggler) + + NOTE: Because the publisher uses a custom encoder for encoding PyRCF data types to + serialisable data, this debugger can affect the speed of the control loop. + + Args: + rate (float, optional): Rate at which this should be triggered. Defaults to None + (i.e. use control loop rate). + clock (ClockBase, optional): The clock to use for timer. Defaults to PythonPerfClock(). + debug_publish_callables (Callable[[], Any] | List[Callable[[], Any]]): Function handle + (or list of) that return publishable data (json encodable) for additional streaming + to plotjuggler. Use this for debugging components. These method(s)/functions(s) + will be called in the control loop and their return value added to the data being + published (if publishing is enabled in self.data_streamer_config). Defaults to + None. + port (int, optional): port to publish in. Defaults to 9872 (plotjuggler zmq subscriber + default). + """ + super().__init__( + rate=rate, + clock=clock, + debug_publish_callables=debug_publish_callables, + port=port, + ) diff --git a/pyrcf/components/ctrl_loop_debuggers/pybullet_robot_plan_debugger.py b/pyrcf/components/ctrl_loop_debuggers/pybullet_robot_plan_debugger.py new file mode 100644 index 0000000..ccc2977 --- /dev/null +++ b/pyrcf/components/ctrl_loop_debuggers/pybullet_robot_plan_debugger.py @@ -0,0 +1,213 @@ +"""This control loop debugger will create a dummy visual robot in pybullet which +will try to follow position/pose plan in the control loop. This can be used to +visualise ideal robot poses/configurations and to compare performance of +controllers in tracking them. +""" + +from typing import List, Tuple, Mapping + +from .ctrl_loop_debugger_base import CtrlLoopDebuggerBase +from ...core.types import ( + GlobalMotionPlan, + LocalMotionPlan, + PlannerMode, + ControlMode, + RobotCmd, + RobotState, +) +from ...utils.time_utils import ClockBase, PythonPerfClock +from ...utils.sim_utils.pybullet_debug_robot import PybulletDebugRobot +from ...components.callback_handlers.pb_gui_utils import PybulletGUIButton, PybulletDebugFrameViz +from ...utils.frame_transforms import PoseTrasfrom +from ...utils.math_utils import rpy2quat +from ...components.robot_interfaces.simulation.pybullet_robot import PybulletRobot + +# pylint: disable=W0212 + + +class PybulletRobotPlanDebugger(CtrlLoopDebuggerBase): + """This control loop debugger will create a dummy visual robot in pybullet which + will try to follow position/pose plan in the control loop. This can be used to + visualise ideal robot poses/configurations and to compare performance of + controllers in tracking them. + """ + + def __init__( + self, + urdf_path: str, + rate: float = None, + clock: ClockBase = PythonPerfClock(), + cid: int = 0, + enable_toggle_button: bool = True, + show_ee_targets: bool = True, + use_curr_pose_if_plan_unavailable: bool = True, + **pybullet_debug_robot_kwargs, + ): + """This control loop debugger will create a dummy visual robot in pybullet which + will try to follow position/pose plan in the control loop. This can be used to + visualise ideal robot poses/configurations and to compare performance of + controllers in tracking them. + + Args: + urdf_path (str): Path to urdf for the debug robot. + rate (float, optional): Rate at which this should be triggered. Defaults to None + (i.e. use control loop rate). + clock (ClockBase, optional): The clock to use for timer. Defaults to PythonPerfClock(). + cid (int, optional): The pybullet physics client ID to connect to. Defaults to 0. + enable_toggle_button (bool, optional): If set to True, this will create a button in + the pybullet GUI to enable/disable the VISUALISATION of this debugger. Defaults to + True. + NOTE: This only affects the visualisation! Does not disable the actual debugger. + NOTE: There can be an overhead when the visualisation is toggled on and off. + + **pybullet_debug_robot_kwargs: Additional keyword arguments can be passed here. All + keyword arguments to `PybulletDebugRobot` is allowed here (e.g. use `rgba=[,,,]` + for setting the visualisation color of this debug robot) + """ + super().__init__(rate=rate, clock=clock) + + self._debugger_robot = PybulletDebugRobot( + urdf_path=urdf_path, cid=cid, **pybullet_debug_robot_kwargs + ) + self._toggler_button = None + if enable_toggle_button: + self._toggler_button = PybulletGUIButton(name="Toggle plan debugger viz", cid=cid) + + self._show_ee_targets = show_ee_targets + self._ee_frame_viz: Mapping[str, PybulletDebugFrameViz] = {} + self._use_curr_pose = use_curr_pose_if_plan_unavailable + + def _run_once_impl( + self, + t: float, + dt: float, + robot_state: RobotState, + global_plan: GlobalMotionPlan, + agent_outputs: List[Tuple[LocalMotionPlan, RobotCmd]], + robot_cmd: RobotCmd, + ): + if global_plan.planner_mode == PlannerMode.IDLE: + self._debugger_robot.set_base_pose( + position=robot_state.state_estimates.pose.position, + orientation=robot_state.state_estimates.pose.orientation, + ) + return + if self._toggler_button is not None and self._toggler_button.was_pressed(): + self._debugger_robot.toggle_visualisation() + target_base_pose = ( + ( + robot_state.state_estimates.pose.position, + robot_state.state_estimates.pose.orientation, + ) + if self._use_curr_pose + else None + ) + joint_targets, joint_names = None, None + for output in agent_outputs: + # for each agent + local_plan = output[0] + control_cmd = output[1] + if local_plan is not None and local_plan.control_mode == ControlMode.IDLE: + # local plan can be None (e.g. for RL agent), but if there is a local + # plan and it is setting the control mode to IDLE, we don't look any further + continue + + # first check joint position from the control command and store this to + # use as the debug visualisers value, but will be overwritten if there is a + # reference in the local plan + if control_cmd.joint_commands.joint_positions is not None: + joint_targets = control_cmd.joint_commands.joint_positions + joint_names = control_cmd.joint_commands.joint_names + + if local_plan is not None: + if local_plan.relative_pose is not None: + # store target base pose for the debug robot if available in the local plan msg + target_base_pose = PoseTrasfrom.teleop2world( + p_in_teleop=local_plan.relative_pose.position, + q_in_teleop=rpy2quat(local_plan.relative_pose.rpy), + base_pos_in_world=robot_state.state_estimates.pose.position, + base_ori_in_world=robot_state.state_estimates.pose.orientation, + ) + if local_plan.joint_references.joint_positions is not None: + # overwrite any previous joint reference value for the debugger robot if this + # local plan message has joint references + joint_targets = local_plan.joint_references.joint_positions + joint_names = local_plan.joint_references.joint_names + + if ( + self._show_ee_targets + and local_plan.end_effector_references.ee_names is not None + and local_plan.end_effector_references.ee_poses is not None + ): + # update ee target frame visualisation if there is end-effector references pose + # in the local plan message + for n, ee_name in enumerate(local_plan.end_effector_references.ee_names): + pos = local_plan.end_effector_references.ee_poses[n].position + ori = local_plan.end_effector_references.ee_poses[n].orientation + if ee_name not in self._ee_frame_viz: + self._ee_frame_viz[ee_name] = PybulletDebugFrameViz( + cid=self._debugger_robot._cid, + position=pos, + orientation=ori, + ) + else: + self._ee_frame_viz[ee_name].update_frame_pose(position=pos, orientation=ori) + if target_base_pose is not None: + self._debugger_robot.set_base_pose( + position=target_base_pose[0], orientation=target_base_pose[1] + ) + if joint_targets is not None: + self._debugger_robot.set_joint_positions( + joint_positions=joint_targets, joint_names=joint_names + ) + + def shutdown(self): + self._debugger_robot.close() + + @classmethod + def fromBulletRobotInstance( + cls: "PybulletRobotPlanDebugger", + robot: PybulletRobot, + rate: float = None, + clock: ClockBase = PythonPerfClock(), + enable_toggle_button: bool = True, + show_ee_targets: bool = True, + use_curr_pose_if_plan_unavailable: bool = True, + **pybullet_debug_robot_kwargs, + ) -> "PybulletRobotPlanDebugger": + """Creates an instance of PybulletRobotPlanDebugger using the provided PybulletRobot + child class object. + + Args: + robot (PybulletRobot): The robot instance (will not be modified). + rate (float, optional): Rate at which this should be triggered. Defaults to None + (i.e. use control loop rate). + clock (ClockBase, optional): The clock to use for timer. Defaults to PythonPerfClock(). + cid (int, optional): The pybullet physics client ID to connect to. Defaults to 0. + enable_toggle_button (bool, optional): If set to True, this will create a button in + the pybullet GUI to enable/disable the VISUALISATION of this debugger. Defaults to + True. + NOTE: This only affects the visualisation! Does not disable the actual debugger. + NOTE: There can be an overhead when the visualisation is toggled on and off. + **pybullet_debug_robot_kwargs: Additional keyword arguments can be passed here. All + keyword arguments to `PybulletDebugRobot` is allowed here (e.g. use `rgba=[,,,]` + for setting the visualisation color of this debug robot) + + Returns: + PybulletRobotPlanDebugger: PybulletRobotPlanDebugger instance created using the urdf and + cid from the passed `PybulletRobot` object. + """ + assert isinstance( + robot, PybulletRobot + ), "The robot instance has to be derived from PybulletRobot base class." + + return cls( + urdf_path=robot._sim_robot.urdf_path, + rate=rate, + clock=clock, + cid=robot._sim_robot.cid, + enable_toggle_button=enable_toggle_button, + show_ee_targets=show_ee_targets, + use_curr_pose_if_plan_unavailable=use_curr_pose_if_plan_unavailable, + **pybullet_debug_robot_kwargs, + ) diff --git a/pyrcf/components/ctrl_loop_debuggers/pybullet_robot_state_debugger.py b/pyrcf/components/ctrl_loop_debuggers/pybullet_robot_state_debugger.py new file mode 100644 index 0000000..93515dc --- /dev/null +++ b/pyrcf/components/ctrl_loop_debuggers/pybullet_robot_state_debugger.py @@ -0,0 +1,121 @@ +from typing import List, Tuple + +from .ctrl_loop_debugger_base import CtrlLoopDebuggerBase +from ...core.types import GlobalMotionPlan, LocalMotionPlan, RobotCmd, RobotState +from ...utils.time_utils import ClockBase, PythonPerfClock +from ...utils.sim_utils.pybullet_debug_robot import PybulletDebugRobot +from ...components.callback_handlers.pb_gui_utils import PybulletGUIButton +from ...components.robot_interfaces.simulation.pybullet_robot import PybulletRobot + + +class PybulletRobotStateDebugger(CtrlLoopDebuggerBase): + """This control loop debugger will create a dummy visual robot in pybullet which + will simply mimic the joint states and base pose read from the `robot_state` (after + passing through the state estimator update). + This class can be used to visualise the robot states being read when the robot is + running outside pybullet (e.g. real robot or remote). + """ + + def __init__( + self, + urdf_path: str, + rate: float = None, + clock: ClockBase = PythonPerfClock(), + cid: int = 0, + enable_toggle_button: bool = True, + **bullet_debug_robot_kwargs, + ): + """This control loop debugger will create a dummy visual robot in pybullet which + will simply mimic the joint states and base pose read from the `robot_state` (after + passing through the state estimator update). + This class can be used to visualise the robot states being read when the robot is + running outside pybullet (e.g. real robot or remote). + + Args: + urdf_path (str): Path to urdf for the debug robot. + rate (float, optional): Rate at which this should be triggered. Defaults to None + (i.e. use control loop rate). + clock (ClockBase, optional): The clock to use for timer. Defaults to PythonPerfClock(). + cid (int, optional): The pybullet physics client ID to connect to. Defaults to 0. + enable_toggle_button (bool, optional): If set to True, this will create a button in + the pybullet GUI to enable/disable the VISUALISATION of this debugger. Defaults to + True. + NOTE: This only affects the visualisation! Does not disable the actual debugger. + NOTE: There can be an overhead when the visualisation is toggled on and off. + **bullet_debug_robot_kwargs: Additional keyword arguments can be passed here. All + keyword arguments to `PybulletDebugRobot` is allowed here (e.g. use `rgba=[,,,]` for + setting the visualisation color of this debug robot) + """ + super().__init__(rate=rate, clock=clock) + + self._debugger_robot = PybulletDebugRobot( + urdf_path=urdf_path, cid=cid, **bullet_debug_robot_kwargs + ) + self._toggler_button = None + if enable_toggle_button: + self._toggler_button = PybulletGUIButton(name="Toggle robot state viz", cid=cid) + + def _run_once_impl( + self, + t: float, + dt: float, + robot_state: RobotState, + global_plan: GlobalMotionPlan, + agent_outputs: List[Tuple[LocalMotionPlan, RobotCmd]], + robot_cmd: RobotCmd, + ): + self._debugger_robot.set_base_pose( + position=robot_state.state_estimates.pose.position, + orientation=robot_state.state_estimates.pose.orientation, + ) + self._debugger_robot.set_joint_positions( + joint_positions=robot_state.joint_states.joint_positions, + joint_names=robot_state.joint_states.joint_names, + ) + + def shutdown(self): + self._debugger_robot.close() + + @classmethod + def fromBulletRobotInstance( + cls: "PybulletRobotStateDebugger", + robot: PybulletRobot, + rate: float = None, + clock: ClockBase = PythonPerfClock(), + enable_toggle_button: bool = True, + **bullet_debug_robot_kwargs, + ) -> "PybulletRobotStateDebugger": + """Creates an instance of PybulletRobotStateDebugger using an existing PybulletRobot + child class object. + + Args: + robot (PybulletRobot): The robot instance (will not be modified). + rate (float, optional): Rate at which this should be triggered. Defaults to None + (i.e. use control loop rate). + clock (ClockBase, optional): The clock to use for timer. Defaults to PythonPerfClock(). + cid (int, optional): The pybullet physics client ID to connect to. Defaults to 0. + enable_toggle_button (bool, optional): If set to True, this will create a button in + the pybullet GUI to enable/disable the VISUALISATION of this debugger. Defaults to + True. + NOTE: This only affects the visualisation! Does not disable the actual debugger. + NOTE: There can be an overhead when the visualisation is toggled on and off. + **bullet_debug_robot_kwargs: Additional keyword arguments can be passed here. All + keyword arguments to `PybulletDebugRobot` is allowed here (e.g. use `rgba=[,,,]` for + setting the visualisation color of this debug robot) + + Returns: + PybulletRobotStateDebugger: PybulletRobotStateDebugger instance created using the urdf and + cid from the passed `robot` object. + """ + assert isinstance( + robot, PybulletRobot + ), "The robot instance has to be derived from PybulletRobot base class." + + return cls( + urdf_path=robot._sim_robot.urdf_path, # pylint: disable=W0212 + rate=rate, + clock=clock, + cid=robot._sim_robot.cid, # pylint: disable=W0212 + enable_toggle_button=enable_toggle_button, + **bullet_debug_robot_kwargs, + ) diff --git a/pyrcf/utils/frame_transforms.py b/pyrcf/utils/frame_transforms.py new file mode 100644 index 0000000..9641264 --- /dev/null +++ b/pyrcf/utils/frame_transforms.py @@ -0,0 +1,182 @@ +"""Utility functions to help with coordinate frame transforms.""" + +from typing import Tuple, TypeAlias +import numpy as np +from pybullet import multiplyTransforms, invertTransform + +from .math_utils import quat2rpy, rpy2quat, vec2skew + +QuatType: TypeAlias = np.ndarray +"""Numpy array representating quaternion in format [x,y,z,w]""" + +Vector3D: TypeAlias = np.ndarray +"""Numpy array representating 3D cartesian vector in format [x,y,z]""" + + +def transform_pose_to_frame( + pos_in_frame_1: Vector3D, + quat_in_frame_1: QuatType, + frame_2_pos_in_frame_1: Vector3D, + frame_2_quat_in_frame_1: QuatType, +) -> Tuple[Vector3D, QuatType]: + """Transform a pose from a global frame to a local frame. + + Both the input pose and the local frame's pose should be described in a + common global frame. + + Args: + pos_in_frame_1 (Vector3D): The position of the input pose object to be transformed in + a common global frame. + quat_in_frame_1 (QuatType): The orientation of the input pose object to be transformed in + a common global frame (quaternion: [x,y,z,w]). + frame_2_pos_in_frame_1 (Vector3D): Position of the origin of the target coordinate frame + with respect to the common global frame. + frame_2_quat_in_frame_1 (QuatType): The orientation of the target frame in the global + frame. + + Returns: + Tuple[Vector3D, QuatType]: Position and orientation of the input pose in the new target + frame. + """ + p, q = multiplyTransforms( + *invertTransform(frame_2_pos_in_frame_1, frame_2_quat_in_frame_1), + pos_in_frame_1, + quat_in_frame_1, + ) + return np.array(p), np.array(q) + + +def twist_transform(twist_a: np.ndarray, frame_b_in_a: np.ndarray) -> np.ndarray: + """Transform a twist screw (linear velocity & angular velocity) to another coordinate frame. + + Args: + twist_a (np.ndarray): 6D twist screw (linear x, linear y, linear z, angular x, angular y, + angular z) + frame_b_in_a (np.ndarray): Transformation matrix of the pose of the new frame in the old + frame. + + Returns: + np.ndarray: Twist screw in the new frame (linear x, linear y, linear z, angular x, + angular y, angular z). + """ + rot_mat = frame_b_in_a[0:3, 0:3] + translation = frame_b_in_a[0:3, 3] + skew_translation = vec2skew(translation) + velocity_transform_mat = np.block( + [ + [rot_mat, np.matmul(skew_translation, rot_mat)], + [np.zeros((3, 3)), rot_mat], + ] + ) + output_twist = np.matmul(velocity_transform_mat, twist_a) + return output_twist + + +def get_relative_pose_between_vectors( + pos1: Vector3D, quat1: QuatType, pos2: Vector3D, quat2: QuatType +) -> Tuple[Vector3D, QuatType]: + """Given two frame poses in the same coordinate frame, return the pose of + the second frame with respect to the first. + + Args: + pos1 (Vector3D): Position of the first frame in a global frame. + quat1 (QuatType): Orientation quaternion of the first frame. + pos2 (Vector3D): Position of the second frame in the same global frame. + quat2 (QuatType): Orientation of the second frame. + + Returns: + Tuple[Vector3D, QuatType]: Position and orientation of the second frame + with respect to the first. + """ + return transform_pose_to_frame( + pos_in_frame_1=pos2, + quat_in_frame_1=quat2, + frame_2_pos_in_frame_1=pos1, + frame_2_quat_in_frame_1=quat1, + ) + + +class PoseTrasfrom: + """Transform a pose (point and orientation) between LMCF coordinate frames.""" + + @staticmethod + def TELEOP_FRAME_POSE_IN_WORLD( + base_pos_in_world: Vector3D, base_ori_in_world: QuatType + ) -> Tuple[Vector3D, QuatType]: + """This is the definition of the teleop frame.""" + return np.array([base_pos_in_world[0], base_pos_in_world[1], 0.0]), rpy2quat( + rpy=[0, 0, quat2rpy(base_ori_in_world)[2]] + ) + + @staticmethod + def BASE_FRAME_POSE_IN_TELEOP( + base_pos_in_world: Vector3D, base_ori_in_world: QuatType + ) -> Tuple[Vector3D, QuatType]: + """This is the definition of the teleop frame.""" + rpy = quat2rpy(base_ori_in_world) + return np.array([0.0, 0.0, base_pos_in_world[2]]), rpy2quat(rpy=[rpy[0], rpy[1], 0.0]) + + @staticmethod + def teleop2base( + p_in_teleop: Vector3D, + q_in_teleop: QuatType, + base_pos_in_world: Vector3D, + base_ori_in_world: QuatType, + ) -> Tuple[Vector3D, QuatType]: + """Transform a pose in teleop frame to base frame.""" + return transform_pose_to_frame( + p_in_teleop, + q_in_teleop, + *PoseTrasfrom.BASE_FRAME_POSE_IN_TELEOP(base_pos_in_world, base_ori_in_world), + ) + + @staticmethod + def base2teleop( + p_in_base: Vector3D, + q_in_base: QuatType, + base_pos_in_world: Vector3D, + base_ori_in_world: QuatType, + ) -> Tuple[Vector3D, QuatType]: + """Transform a pose in base frame to teleop frame.""" + return transform_pose_to_frame( + p_in_base, + q_in_base, + *invertTransform( + *( + PoseTrasfrom.BASE_FRAME_POSE_IN_TELEOP( + base_pos_in_world, + base_ori_in_world, + ) + ) + ), + ) + + @staticmethod + def teleop2world( + p_in_teleop: Vector3D, + q_in_teleop: QuatType, + base_pos_in_world: Vector3D, + base_ori_in_world: QuatType, + ) -> Tuple[Vector3D, QuatType]: + """Transform a pose in teleop frame to world frame.""" + return transform_pose_to_frame( + p_in_teleop, + q_in_teleop, + *invertTransform( + *PoseTrasfrom.TELEOP_FRAME_POSE_IN_WORLD(base_pos_in_world, base_ori_in_world) + ), + ) + + @staticmethod + def world2teleop( + p_in_world: Vector3D, + q_in_world: QuatType, + base_pos_in_world: Vector3D, + base_ori_in_world: QuatType, + ) -> Tuple[Vector3D, QuatType]: + """Transform a pose in world frame to teleop frame.""" + return transform_pose_to_frame( + p_in_world, + q_in_world, + *PoseTrasfrom.TELEOP_FRAME_POSE_IN_WORLD(base_pos_in_world, base_ori_in_world), + ) diff --git a/test_scripts/joint_interpolator_ctrl_loop.py b/test_scripts/joint_interpolator_ctrl_loop.py index bb576c8..8454c29 100644 --- a/test_scripts/joint_interpolator_ctrl_loop.py +++ b/test_scripts/joint_interpolator_ctrl_loop.py @@ -17,6 +17,9 @@ 5. Extension of this controller that also adds gravity compensation torques to the robot command, making the joint tracking much better. + +6. There is also a debug robot in the simulation (using `PybulletRobotPlanDebugger`) +that shows the actual output from the planner (useful for controller tuning/debugging). """ from pyrcf.components.robot_interfaces.simulation import PybulletRobot @@ -26,14 +29,21 @@ PybulletGUIGlobalPlannerInterface, ) from pyrcf.components.controllers import JointPDController, GravityCompensatedPDController +from pyrcf.components.ctrl_loop_debuggers import ( + PlotjugglerLoopDebugger, + PybulletRobotPlanDebugger, +) # pylint: disable=W0212 -USE_GRAVITY_COMP_CONTROLLER: bool = True +USE_GRAVITY_COMP_CONTROLLER: bool = False """Set this flag to True to use a PD controller with gravity compensation. If set to False, will use a regular PD joint position tracking controller. Try enabling and disabling to see difference in tracking.""" +SHOW_PLANNED_PATH: bool = True +"""Flag to enable/disable debug robot in pybullet that shows the local plan output.""" + if __name__ == "__main__": # load a manipulator robot from AwesomeRobotDescriptions (can load any robot # from the awesome robot lists using the `fromAwesomeRobotDescriptions` class @@ -81,11 +91,20 @@ ) ctrl_loop_rate: float = 240 # 240hz control loop - + debuggers = [PlotjugglerLoopDebugger(rate=None)] + if SHOW_PLANNED_PATH: + debuggers.append( + # this debugger can be used to visualise the planner output that the + # controller is trying to track + PybulletRobotPlanDebugger(urdf_path=robot._sim_robot.urdf_path, rate=30) + ) try: control_loop.run( loop_rate=ctrl_loop_rate, clock=robot.get_sim_clock(), + # PlotjugglerLoopDebugger will publish all data from control loop + # to plotjuggler + debuggers=debuggers, ) except KeyboardInterrupt: print("Closing control loop")