diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index e7f3503..e62f216 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -88,6 +88,21 @@ public: this->add_event_handler( options_.event_callbacks.incompatible_qos_callback, RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS); + } else if (options_.use_default_callbacks) { + // Register default callback when not specified + try { + this->add_event_handler( + [this](QOSOfferedIncompatibleQoSInfo & info) { + this->default_incompatible_qos_callback(info); + }, + RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS); + } catch (UnsupportedEventTypeException & /*exc*/) { + RCLCPP_WARN_ONCE( + rclcpp::get_logger(rcl_node_get_logger_name(rcl_node_handle_.get())), + "This rmw implementation does not support ON_OFFERED_INCOMPATIBLE_QOS " + "events, you will not be notified when Publishers offer an incompatible " + "QoS profile to Subscriptions on the same topic."); + } } // Setup continues in the post construction method, post_init_setup(). } diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index 4fbcf46..9627aca 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -208,6 +208,9 @@ protected: event_handlers_.emplace_back(handler); } + RCLCPP_PUBLIC + void default_incompatible_qos_callback(QOSOfferedIncompatibleQoSInfo & info) const; + std::shared_ptr rcl_node_handle_; rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher(); diff --git a/rclcpp/include/rclcpp/publisher_options.hpp b/rclcpp/include/rclcpp/publisher_options.hpp index cd78d51..75d0e69 100644 --- a/rclcpp/include/rclcpp/publisher_options.hpp +++ b/rclcpp/include/rclcpp/publisher_options.hpp @@ -44,6 +44,9 @@ struct PublisherOptionsBase /// Callbacks for various events related to publishers. PublisherEventCallbacks event_callbacks; + /// Whether or not to use default callbacks when user doesn't supply any in event_callbacks + bool use_default_callbacks = true; + /// Callback group in which the waitable items from the publisher should be placed. std::shared_ptr callback_group; diff --git a/rclcpp/include/rclcpp/qos.hpp b/rclcpp/include/rclcpp/qos.hpp index 04e3666..244ac69 100644 --- a/rclcpp/include/rclcpp/qos.hpp +++ b/rclcpp/include/rclcpp/qos.hpp @@ -15,15 +15,19 @@ #ifndef RCLCPP__QOS_HPP_ #define RCLCPP__QOS_HPP_ -#include -#include +#include #include "rclcpp/duration.hpp" #include "rclcpp/visibility_control.hpp" +#include "rmw/incompatible_qos_events_statuses.h" +#include "rmw/qos_profiles.h" +#include "rmw/types.h" namespace rclcpp { +std::string qos_policy_name_from_kind(rmw_qos_policy_kind_t policy_kind); + /// QoS initialization values, cannot be created directly, use KeepAll or KeepLast instead. struct RCLCPP_PUBLIC QoSInitialization { diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 190a437..4e602b2 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -123,6 +123,21 @@ public: this->add_event_handler( options.event_callbacks.incompatible_qos_callback, RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS); + } else if (options_.use_default_callbacks) { + // Register default callback when not specified + try { + this->add_event_handler( + [this](QOSRequestedIncompatibleQoSInfo & info) { + this->default_incompatible_qos_callback(info); + }, + RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS); + } catch (UnsupportedEventTypeException & /*exc*/) { + RCLCPP_WARN_ONCE( + rclcpp::get_logger(rcl_node_get_logger_name(node_handle_.get())), + "This rmw implementation does not support ON_REQUESTED_INCOMPATIBLE_QOS " + "events, you will not be notified when Subscriptions request an incompatible " + "QoS profile from Publishers on the same topic."); + } } // Setup intra process publishing if requested. diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 6275c69..7ee8c62 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -269,6 +269,9 @@ protected: event_handlers_.emplace_back(handler); } + RCLCPP_PUBLIC + void default_incompatible_qos_callback(QOSRequestedIncompatibleQoSInfo & info) const; + RCLCPP_PUBLIC bool matches_any_intra_process_publishers(const rmw_gid_t * sender_gid) const; diff --git a/rclcpp/include/rclcpp/subscription_options.hpp b/rclcpp/include/rclcpp/subscription_options.hpp index 64fcac5..2e27de6 100644 --- a/rclcpp/include/rclcpp/subscription_options.hpp +++ b/rclcpp/include/rclcpp/subscription_options.hpp @@ -36,6 +36,9 @@ struct SubscriptionOptionsBase /// Callbacks for events related to this subscription. SubscriptionEventCallbacks event_callbacks; + /// Whether or not to use default callbacks when user doesn't supply any in event_callbacks + bool use_default_callbacks = true; + /// True to ignore local publications. bool ignore_local_publications = false; diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index bcef23a..c982a95 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -244,3 +244,14 @@ PublisherBase::setup_intra_process( weak_ipm_ = ipm; intra_process_is_enabled_ = true; } + +void +PublisherBase::default_incompatible_qos_callback(QOSOfferedIncompatibleQoSInfo & event) const +{ + std::string policy_name = qos_policy_name_from_kind(event.last_policy_kind); + RCLCPP_WARN( + rclcpp::get_logger(rcl_node_get_logger_name(rcl_node_handle_.get())), + "New subscription discovered on this topic, requesting incompatible QoS. " + "No messages will be sent to it. " + "Last incompatible policy: %s", policy_name.c_str()); +} diff --git a/rclcpp/src/rclcpp/qos.cpp b/rclcpp/src/rclcpp/qos.cpp index 4063fed..b0b3987 100644 --- a/rclcpp/src/rclcpp/qos.cpp +++ b/rclcpp/src/rclcpp/qos.cpp @@ -14,11 +14,33 @@ #include "rclcpp/qos.hpp" -#include +#include + +#include "rmw/types.h" namespace rclcpp { +std::string qos_policy_name_from_kind(rmw_qos_policy_kind_t policy_kind) +{ + switch (policy_kind) { + case RMW_QOS_POLICY_DURABILITY: + return "DURABILITY_QOS_POLICY"; + case RMW_QOS_POLICY_DEADLINE: + return "DEADLINE_QOS_POLICY"; + case RMW_QOS_POLICY_LIVELINESS: + return "LIVELINESS_QOS_POLICY"; + case RMW_QOS_POLICY_RELIABILITY: + return "RELIABILITY_QOS_POLICY"; + case RMW_QOS_POLICY_HISTORY: + return "HISTORY_QOS_POLICY"; + case RMW_QOS_POLICY_LIFESPAN: + return "LIFESPAN_QOS_POLICY"; + default: + return "INVALID_QOS_POLICY"; + } +} + QoSInitialization::QoSInitialization(rmw_qos_history_policy_t history_policy_arg, size_t depth_arg) : history_policy(history_policy_arg), depth(depth_arg) {} diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index d7f0da1..2af068e 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -237,6 +237,17 @@ SubscriptionBase::get_intra_process_waitable() const return ipm->get_subscription_intra_process(intra_process_subscription_id_); } +void +SubscriptionBase::default_incompatible_qos_callback(QOSRequestedIncompatibleQoSInfo & event) const +{ + std::string policy_name = qos_policy_name_from_kind(event.last_policy_kind); + RCLCPP_WARN( + rclcpp::get_logger(rcl_node_get_logger_name(node_handle_.get())), + "New publisher discovered on this topic, offering incompatible QoS. " + "No messages will be sent to it. " + "Last incompatible policy: %s", policy_name.c_str()); +} + bool SubscriptionBase::matches_any_intra_process_publishers(const rmw_gid_t * sender_gid) const { diff --git a/rclcpp/test/test_qos_event.cpp b/rclcpp/test/test_qos_event.cpp index 2de87d5..c0f6076 100644 --- a/rclcpp/test/test_qos_event.cpp +++ b/rclcpp/test/test_qos_event.cpp @@ -14,10 +14,14 @@ #include +#include +#include +#include #include #include #include "rclcpp/rclcpp.hpp" +#include "rcutils/logging.h" #include "rmw/rmw.h" #include "test_msgs/msg/empty.hpp" @@ -31,9 +35,14 @@ protected: void SetUp() { - node = std::make_shared("test_qos_event", "/ns"); is_fastrtps = std::string(rmw_get_implementation_identifier()).find("rmw_fastrtps") != std::string::npos; + + node = std::make_shared("test_qos_event", "/ns"); + + message_callback = [node = node.get()](const test_msgs::msg::Empty::SharedPtr /*msg*/) { + RCLCPP_INFO(node->get_logger(), "Message received"); + }; } void TearDown() @@ -44,6 +53,7 @@ protected: static constexpr char topic_name[] = "test_topic"; rclcpp::Node::SharedPtr node; bool is_fastrtps; + std::function message_callback; }; constexpr char TestQosEvent::topic_name[]; @@ -104,10 +114,6 @@ TEST_F(TestQosEvent, test_subscription_constructor) { rclcpp::SubscriptionOptions options; - auto message_callback = [node = node.get()](const test_msgs::msg::Empty::SharedPtr /*msg*/) { - RCLCPP_INFO(node->get_logger(), "Message received"); - }; - // options arg with no callbacks auto subscription = node->create_subscription( topic_name, 10, message_callback, options); @@ -150,3 +156,69 @@ TEST_F(TestQosEvent, test_subscription_constructor) EXPECT_TRUE(is_fastrtps); } } + +/* + Testing construction of a subscriptions with QoS event callback functions. + */ +std::string * g_pub_log_msg; +std::string * g_sub_log_msg; +std::promise * g_log_msgs_promise; +TEST_F(TestQosEvent, test_default_incompatible_qos_callbacks) +{ + rcutils_logging_output_handler_t original_output_handler = rcutils_logging_get_output_handler(); + + std::string pub_log_msg; + std::string sub_log_msg; + std::promise log_msgs_promise; + g_pub_log_msg = &pub_log_msg; + g_sub_log_msg = &sub_log_msg; + g_log_msgs_promise = &log_msgs_promise; + auto logger_callback = []( + const rcutils_log_location_t * /*location*/, + int /*level*/, const char * /*name*/, rcutils_time_point_value_t /*timestamp*/, + const char * format, va_list * args) -> void { + char buffer[1024]; + vsnprintf(buffer, sizeof(buffer), format, *args); + const std::string msg = buffer; + if (msg.rfind("New subscription discovered", 0) == 0) { + *g_pub_log_msg = buffer; + } else if (msg.rfind("New publisher discovered", 0) == 0) { + *g_sub_log_msg = buffer; + } + + if (!g_pub_log_msg->empty() && !g_sub_log_msg->empty()) { + g_log_msgs_promise->set_value(); + } + }; + rcutils_logging_set_output_handler(logger_callback); + + std::shared_future log_msgs_future = log_msgs_promise.get_future(); + + rclcpp::QoS qos_profile_publisher(10); + qos_profile_publisher.durability(RMW_QOS_POLICY_DURABILITY_VOLATILE); + auto publisher = node->create_publisher( + topic_name, qos_profile_publisher); + + rclcpp::QoS qos_profile_subscription(10); + qos_profile_subscription.durability(RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL); + auto subscription = node->create_subscription( + topic_name, qos_profile_subscription, message_callback); + + rclcpp::executors::SingleThreadedExecutor ex; + ex.add_node(node->get_node_base_interface()); + + ex.spin_until_future_complete(log_msgs_future, std::chrono::seconds(10)); + + if (!is_fastrtps) { + EXPECT_EQ( + "New subscription discovered on this topic, requesting incompatible QoS. " + "No messages will be sent to it. Last incompatible policy: DURABILITY_QOS_POLICY", + pub_log_msg); + EXPECT_EQ( + "New publisher discovered on this topic, offering incompatible QoS. " + "No messages will be sent to it. Last incompatible policy: DURABILITY_QOS_POLICY", + sub_log_msg); + } + + rcutils_logging_set_output_handler(original_output_handler); +}