Create a default warning for qos incompatibility (#1051)

Signed-off-by: Miaofei <miaofei@amazon.com>
This commit is contained in:
Miaofei Mei 2020-04-14 11:16:39 -07:00 committed by GitHub
parent aaf8b3828c
commit 44fa4fe019
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 170 additions and 8 deletions

View file

@ -88,6 +88,21 @@ public:
this->add_event_handler( this->add_event_handler(
options_.event_callbacks.incompatible_qos_callback, options_.event_callbacks.incompatible_qos_callback,
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS); RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
} else if (options_.use_default_callbacks) {
// Register default callback when not specified
try {
this->add_event_handler(
[this](QOSOfferedIncompatibleQoSInfo & info) {
this->default_incompatible_qos_callback(info);
},
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
} catch (UnsupportedEventTypeException & /*exc*/) {
RCLCPP_WARN_ONCE(
rclcpp::get_logger(rcl_node_get_logger_name(rcl_node_handle_.get())),
"This rmw implementation does not support ON_OFFERED_INCOMPATIBLE_QOS "
"events, you will not be notified when Publishers offer an incompatible "
"QoS profile to Subscriptions on the same topic.");
}
} }
// Setup continues in the post construction method, post_init_setup(). // Setup continues in the post construction method, post_init_setup().
} }

View file

@ -208,6 +208,9 @@ protected:
event_handlers_.emplace_back(handler); event_handlers_.emplace_back(handler);
} }
RCLCPP_PUBLIC
void default_incompatible_qos_callback(QOSOfferedIncompatibleQoSInfo & info) const;
std::shared_ptr<rcl_node_t> rcl_node_handle_; std::shared_ptr<rcl_node_t> rcl_node_handle_;
rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher(); rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher();

View file

@ -44,6 +44,9 @@ struct PublisherOptionsBase
/// Callbacks for various events related to publishers. /// Callbacks for various events related to publishers.
PublisherEventCallbacks event_callbacks; PublisherEventCallbacks event_callbacks;
/// Whether or not to use default callbacks when user doesn't supply any in event_callbacks
bool use_default_callbacks = true;
/// Callback group in which the waitable items from the publisher should be placed. /// Callback group in which the waitable items from the publisher should be placed.
std::shared_ptr<rclcpp::callback_group::CallbackGroup> callback_group; std::shared_ptr<rclcpp::callback_group::CallbackGroup> callback_group;

View file

@ -15,15 +15,19 @@
#ifndef RCLCPP__QOS_HPP_ #ifndef RCLCPP__QOS_HPP_
#define RCLCPP__QOS_HPP_ #define RCLCPP__QOS_HPP_
#include <rmw/qos_profiles.h> #include <string>
#include <rmw/types.h>
#include "rclcpp/duration.hpp" #include "rclcpp/duration.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
#include "rmw/incompatible_qos_events_statuses.h"
#include "rmw/qos_profiles.h"
#include "rmw/types.h"
namespace rclcpp namespace rclcpp
{ {
std::string qos_policy_name_from_kind(rmw_qos_policy_kind_t policy_kind);
/// QoS initialization values, cannot be created directly, use KeepAll or KeepLast instead. /// QoS initialization values, cannot be created directly, use KeepAll or KeepLast instead.
struct RCLCPP_PUBLIC QoSInitialization struct RCLCPP_PUBLIC QoSInitialization
{ {

View file

@ -123,6 +123,21 @@ public:
this->add_event_handler( this->add_event_handler(
options.event_callbacks.incompatible_qos_callback, options.event_callbacks.incompatible_qos_callback,
RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS); RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS);
} else if (options_.use_default_callbacks) {
// Register default callback when not specified
try {
this->add_event_handler(
[this](QOSRequestedIncompatibleQoSInfo & info) {
this->default_incompatible_qos_callback(info);
},
RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS);
} catch (UnsupportedEventTypeException & /*exc*/) {
RCLCPP_WARN_ONCE(
rclcpp::get_logger(rcl_node_get_logger_name(node_handle_.get())),
"This rmw implementation does not support ON_REQUESTED_INCOMPATIBLE_QOS "
"events, you will not be notified when Subscriptions request an incompatible "
"QoS profile from Publishers on the same topic.");
}
} }
// Setup intra process publishing if requested. // Setup intra process publishing if requested.

View file

@ -269,6 +269,9 @@ protected:
event_handlers_.emplace_back(handler); event_handlers_.emplace_back(handler);
} }
RCLCPP_PUBLIC
void default_incompatible_qos_callback(QOSRequestedIncompatibleQoSInfo & info) const;
RCLCPP_PUBLIC RCLCPP_PUBLIC
bool bool
matches_any_intra_process_publishers(const rmw_gid_t * sender_gid) const; matches_any_intra_process_publishers(const rmw_gid_t * sender_gid) const;

View file

@ -36,6 +36,9 @@ struct SubscriptionOptionsBase
/// Callbacks for events related to this subscription. /// Callbacks for events related to this subscription.
SubscriptionEventCallbacks event_callbacks; SubscriptionEventCallbacks event_callbacks;
/// Whether or not to use default callbacks when user doesn't supply any in event_callbacks
bool use_default_callbacks = true;
/// True to ignore local publications. /// True to ignore local publications.
bool ignore_local_publications = false; bool ignore_local_publications = false;

View file

@ -244,3 +244,14 @@ PublisherBase::setup_intra_process(
weak_ipm_ = ipm; weak_ipm_ = ipm;
intra_process_is_enabled_ = true; intra_process_is_enabled_ = true;
} }
void
PublisherBase::default_incompatible_qos_callback(QOSOfferedIncompatibleQoSInfo & event) const
{
std::string policy_name = qos_policy_name_from_kind(event.last_policy_kind);
RCLCPP_WARN(
rclcpp::get_logger(rcl_node_get_logger_name(rcl_node_handle_.get())),
"New subscription discovered on this topic, requesting incompatible QoS. "
"No messages will be sent to it. "
"Last incompatible policy: %s", policy_name.c_str());
}

View file

@ -14,11 +14,33 @@
#include "rclcpp/qos.hpp" #include "rclcpp/qos.hpp"
#include <rmw/types.h> #include <string>
#include "rmw/types.h"
namespace rclcpp namespace rclcpp
{ {
std::string qos_policy_name_from_kind(rmw_qos_policy_kind_t policy_kind)
{
switch (policy_kind) {
case RMW_QOS_POLICY_DURABILITY:
return "DURABILITY_QOS_POLICY";
case RMW_QOS_POLICY_DEADLINE:
return "DEADLINE_QOS_POLICY";
case RMW_QOS_POLICY_LIVELINESS:
return "LIVELINESS_QOS_POLICY";
case RMW_QOS_POLICY_RELIABILITY:
return "RELIABILITY_QOS_POLICY";
case RMW_QOS_POLICY_HISTORY:
return "HISTORY_QOS_POLICY";
case RMW_QOS_POLICY_LIFESPAN:
return "LIFESPAN_QOS_POLICY";
default:
return "INVALID_QOS_POLICY";
}
}
QoSInitialization::QoSInitialization(rmw_qos_history_policy_t history_policy_arg, size_t depth_arg) QoSInitialization::QoSInitialization(rmw_qos_history_policy_t history_policy_arg, size_t depth_arg)
: history_policy(history_policy_arg), depth(depth_arg) : history_policy(history_policy_arg), depth(depth_arg)
{} {}

View file

@ -237,6 +237,17 @@ SubscriptionBase::get_intra_process_waitable() const
return ipm->get_subscription_intra_process(intra_process_subscription_id_); return ipm->get_subscription_intra_process(intra_process_subscription_id_);
} }
void
SubscriptionBase::default_incompatible_qos_callback(QOSRequestedIncompatibleQoSInfo & event) const
{
std::string policy_name = qos_policy_name_from_kind(event.last_policy_kind);
RCLCPP_WARN(
rclcpp::get_logger(rcl_node_get_logger_name(node_handle_.get())),
"New publisher discovered on this topic, offering incompatible QoS. "
"No messages will be sent to it. "
"Last incompatible policy: %s", policy_name.c_str());
}
bool bool
SubscriptionBase::matches_any_intra_process_publishers(const rmw_gid_t * sender_gid) const SubscriptionBase::matches_any_intra_process_publishers(const rmw_gid_t * sender_gid) const
{ {

View file

@ -14,10 +14,14 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <chrono>
#include <functional>
#include <future>
#include <memory> #include <memory>
#include <string> #include <string>
#include "rclcpp/rclcpp.hpp" #include "rclcpp/rclcpp.hpp"
#include "rcutils/logging.h"
#include "rmw/rmw.h" #include "rmw/rmw.h"
#include "test_msgs/msg/empty.hpp" #include "test_msgs/msg/empty.hpp"
@ -31,9 +35,14 @@ protected:
void SetUp() void SetUp()
{ {
node = std::make_shared<rclcpp::Node>("test_qos_event", "/ns");
is_fastrtps = is_fastrtps =
std::string(rmw_get_implementation_identifier()).find("rmw_fastrtps") != std::string::npos; std::string(rmw_get_implementation_identifier()).find("rmw_fastrtps") != std::string::npos;
node = std::make_shared<rclcpp::Node>("test_qos_event", "/ns");
message_callback = [node = node.get()](const test_msgs::msg::Empty::SharedPtr /*msg*/) {
RCLCPP_INFO(node->get_logger(), "Message received");
};
} }
void TearDown() void TearDown()
@ -44,6 +53,7 @@ protected:
static constexpr char topic_name[] = "test_topic"; static constexpr char topic_name[] = "test_topic";
rclcpp::Node::SharedPtr node; rclcpp::Node::SharedPtr node;
bool is_fastrtps; bool is_fastrtps;
std::function<void(const test_msgs::msg::Empty::SharedPtr)> message_callback;
}; };
constexpr char TestQosEvent::topic_name[]; constexpr char TestQosEvent::topic_name[];
@ -104,10 +114,6 @@ TEST_F(TestQosEvent, test_subscription_constructor)
{ {
rclcpp::SubscriptionOptions options; rclcpp::SubscriptionOptions options;
auto message_callback = [node = node.get()](const test_msgs::msg::Empty::SharedPtr /*msg*/) {
RCLCPP_INFO(node->get_logger(), "Message received");
};
// options arg with no callbacks // options arg with no callbacks
auto subscription = node->create_subscription<test_msgs::msg::Empty>( auto subscription = node->create_subscription<test_msgs::msg::Empty>(
topic_name, 10, message_callback, options); topic_name, 10, message_callback, options);
@ -150,3 +156,69 @@ TEST_F(TestQosEvent, test_subscription_constructor)
EXPECT_TRUE(is_fastrtps); EXPECT_TRUE(is_fastrtps);
} }
} }
/*
Testing construction of a subscriptions with QoS event callback functions.
*/
std::string * g_pub_log_msg;
std::string * g_sub_log_msg;
std::promise<void> * g_log_msgs_promise;
TEST_F(TestQosEvent, test_default_incompatible_qos_callbacks)
{
rcutils_logging_output_handler_t original_output_handler = rcutils_logging_get_output_handler();
std::string pub_log_msg;
std::string sub_log_msg;
std::promise<void> log_msgs_promise;
g_pub_log_msg = &pub_log_msg;
g_sub_log_msg = &sub_log_msg;
g_log_msgs_promise = &log_msgs_promise;
auto logger_callback = [](
const rcutils_log_location_t * /*location*/,
int /*level*/, const char * /*name*/, rcutils_time_point_value_t /*timestamp*/,
const char * format, va_list * args) -> void {
char buffer[1024];
vsnprintf(buffer, sizeof(buffer), format, *args);
const std::string msg = buffer;
if (msg.rfind("New subscription discovered", 0) == 0) {
*g_pub_log_msg = buffer;
} else if (msg.rfind("New publisher discovered", 0) == 0) {
*g_sub_log_msg = buffer;
}
if (!g_pub_log_msg->empty() && !g_sub_log_msg->empty()) {
g_log_msgs_promise->set_value();
}
};
rcutils_logging_set_output_handler(logger_callback);
std::shared_future<void> log_msgs_future = log_msgs_promise.get_future();
rclcpp::QoS qos_profile_publisher(10);
qos_profile_publisher.durability(RMW_QOS_POLICY_DURABILITY_VOLATILE);
auto publisher = node->create_publisher<test_msgs::msg::Empty>(
topic_name, qos_profile_publisher);
rclcpp::QoS qos_profile_subscription(10);
qos_profile_subscription.durability(RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL);
auto subscription = node->create_subscription<test_msgs::msg::Empty>(
topic_name, qos_profile_subscription, message_callback);
rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(node->get_node_base_interface());
ex.spin_until_future_complete(log_msgs_future, std::chrono::seconds(10));
if (!is_fastrtps) {
EXPECT_EQ(
"New subscription discovered on this topic, requesting incompatible QoS. "
"No messages will be sent to it. Last incompatible policy: DURABILITY_QOS_POLICY",
pub_log_msg);
EXPECT_EQ(
"New publisher discovered on this topic, offering incompatible QoS. "
"No messages will be sent to it. Last incompatible policy: DURABILITY_QOS_POLICY",
sub_log_msg);
}
rcutils_logging_set_output_handler(original_output_handler);
}