diff --git a/ros2topic/package.xml b/ros2topic/package.xml
index 6bdd4701d..d570fbc01 100644
--- a/ros2topic/package.xml
+++ b/ros2topic/package.xml
@@ -20,6 +20,8 @@
ros2cli
python3-numpy
+ python3-prompt-toolkit
+ python3-pygments
python3-yaml
rclpy
rosidl_runtime_py
diff --git a/ros2topic/ros2topic/verb/pub.py b/ros2topic/ros2topic/verb/pub.py
index cfb865959..3b2026385 100644
--- a/ros2topic/ros2topic/verb/pub.py
+++ b/ros2topic/ros2topic/verb/pub.py
@@ -13,9 +13,13 @@
# limitations under the License.
import time
-from typing import Optional
-from typing import TypeVar
+from typing import List, Optional, Tuple, TypeVar
+from prompt_toolkit import prompt
+from prompt_toolkit.formatted_text import HTML
+from prompt_toolkit.key_binding import KeyBindings
+from prompt_toolkit.lexers import PygmentsLexer
+from pygments.lexers.data import YamlLexer
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy
@@ -29,6 +33,7 @@
from ros2topic.api import TopicTypeCompleter
from ros2topic.verb import VerbExtension
from rosidl_runtime_py import set_message_fields
+from rosidl_runtime_py.convert import message_to_yaml
from rosidl_runtime_py.utilities import get_message
import yaml
@@ -85,6 +90,9 @@ def add_arguments(self, parser, cli_name):
parser.add_argument(
'-p', '--print', metavar='N', type=int, default=1,
help='Only print every N-th published message (default: 1)')
+ parser.add_argument(
+ '-i', '--interactive', action='store_true',
+ help='Interactively edit and send the message')
group = parser.add_mutually_exclusive_group()
group.add_argument(
'-1', '--once', action='store_true',
@@ -137,7 +145,7 @@ def main(self, *, args):
return main(args)
-def main(args):
+def main(args) -> Optional[str]:
qos_profile = get_pub_qos_profile()
qos_profile_name = args.qos_profile
@@ -151,12 +159,23 @@ def main(args):
if args.once:
times = 1
+ if args.interactive:
+ print('Interactive mode...')
+ # Show the tui
+ default_msg, default_timestamp_fields = parse_msg(args.message_type, args.values)
+ content = show_interactive_tui(message_to_yaml(default_msg))
+ else:
+ content = args.values
+
+ # Parse the yaml string and get a message object of the desired type
+ msg, timestamp_fields = parse_msg(args.message_type, content)
+
with DirectNode(args, node_name=args.node_name) as node:
return publisher(
node.node,
- args.message_type,
args.topic_name,
- args.values,
+ msg,
+ timestamp_fields,
1. / args.rate,
args.print,
times,
@@ -166,44 +185,105 @@ def main(args):
args.keep_alive)
+def show_interactive_tui(default_msg_str: str) -> str:
+ """
+ Show a tui to edit a given message yaml.
+
+ :param msg_str: Message yaml string which is initially presented to the user
+ :param default_msg_str: Message yaml string with default values for the given message
+ :return: The message yaml string edited by the user
+ """
+ # Create the bottom bar to pressent the options to the user
+ def bottom_toolbar():
+ return HTML(' Continue: alt+enter | Exit: ctrl+c | Reset: ctrl+r')
+
+ # Create key bindings for the prompt
+ bindings = KeyBindings()
+ @bindings.add('c-r')
+ def _(event):
+ """Reset the promt to the default message."""
+ event.app.current_buffer.text = default_msg_str
+
+ # Show prompt to edit the message before sending it
+ return prompt(
+ "> ",
+ multiline=True,
+ default=default_msg_str,
+ lexer=PygmentsLexer(YamlLexer),
+ mouse_support=True,
+ bottom_toolbar=bottom_toolbar,
+ key_bindings=bindings)
+
+
+def parse_msg(msg_type: str, yaml_values: Optional[str] = None) -> Tuple[MsgType, List]:
+ """
+ Parse the name and contents of a given message.
+
+ :param msg_type: Name of the message as a string (e.g. std_msgs/msg/Header)
+ :param yaml_values: Contents of the message as a string in YAML layout
+ :returns: An constructed instance of the message type
+ """
+ # Get the message type from the name string
+ try:
+ msg_module = get_message(msg_type)
+ except (AttributeError, ModuleNotFoundError, ValueError):
+ raise RuntimeError('The passed message type is invalid')
+ # Create a default instance of the message with the given name
+ msg = msg_module()
+ timestamp_fields = []
+ # Check if we want to add values to the message
+ if yaml_values is not None:
+ # Load the user provided fields of the message
+ values_dictionary = yaml.safe_load(yaml_values)
+ if not isinstance(values_dictionary, dict):
+ raise RuntimeError('The passed value needs to be a dictionary in YAML format')
+ # Set all fields in the message to the provided values
+ try:
+ # Unfortunately, if you specifi
+ timestamp_fields = set_message_fields(
+ msg, values_dictionary, expand_header_auto=True, expand_time_now=True)
+ except Exception as e:
+ raise RuntimeError('Failed to populate field: {0}'.format(e))
+ return msg, timestamp_fields
+
+
def publisher(
node: Node,
- message_type: MsgType,
topic_name: str,
- values: dict,
+ msg: MsgType,
+ timestamp_fields: list,
period: float,
print_nth: int,
times: int,
wait_matching_subscriptions: int,
qos_profile: QoSProfile,
keep_alive: float,
-) -> Optional[str]:
- """Initialize a node with a single publisher and run its publish loop (maybe only once)."""
- try:
- msg_module = get_message(message_type)
- except (AttributeError, ModuleNotFoundError, ValueError):
- raise RuntimeError('The passed message type is invalid')
- values_dictionary = yaml.safe_load(values)
- if not isinstance(values_dictionary, dict):
- return 'The passed value needs to be a dictionary in YAML format'
+) -> None:
+ """
+ Initialize a node with a single publisher and run its publish loop (maybe only once).
- pub = node.create_publisher(msg_module, topic_name, qos_profile)
+ :param node: The node used for publishing the given message
+ :param topic_name: The topic on which the the message is published
+ :param msg: The message to be published
+ :param timestamp_fields: Any timestamp fields that need to be populated
+ :param period: Period after which the msg is published again
+ :param print_nth: Interval in which the message is printed
+ :param times: Number of times the message is published
+ :param wait_matching_subscriptions: Wait until there is a certain number of subscribtions
+ :param qos_profile: QoS profile
+ :param keep_alive: Time the node is kept alive after the message was sent
+ """
+ pub = node.create_publisher(type(msg), topic_name, qos_profile)
times_since_last_log = 0
while pub.get_subscription_count() < wait_matching_subscriptions:
- # Print a message reporting we're waiting each 1s, check condition each 100ms.
+ # Print a message reporting we're waiting 1s, but check the condition every 100ms.
if not times_since_last_log:
print(
f'Waiting for at least {wait_matching_subscriptions} matching subscription(s)...')
times_since_last_log = (times_since_last_log + 1) % 10
time.sleep(0.1)
- msg = msg_module()
- try:
- timestamp_fields = set_message_fields(
- msg, values_dictionary, expand_header_auto=True, expand_time_now=True)
- except Exception as e:
- return 'Failed to populate field: {0}'.format(e)
print('publisher: beginning loop')
count = 0
@@ -222,9 +302,7 @@ def timer_callback():
timer = node.create_timer(period, timer_callback)
while times == 0 or count < times:
rclpy.spin_once(node)
- # give some time for the messages to reach the wire before exiting
- time.sleep(keep_alive)
node.destroy_timer(timer)
- else:
- # give some time for the messages to reach the wire before exiting
- time.sleep(keep_alive)
+
+ # give some time for the messages to reach the wire before exiting
+ time.sleep(keep_alive)