Implement callbacks for liveliness and deadline QoS events (#695)

* implement deadline and liveliness qos callbacks

Signed-off-by: Miaofei <miaofei@amazon.com>

* fix windows build

Signed-off-by: Miaofei <miaofei@amazon.com>

* address feedback from pull request

Signed-off-by: Miaofei <miaofei@amazon.com>

* update formatting to be compatible with ros2 coding style and ament_uncrustify

Signed-off-by: Miaofei <miaofei@amazon.com>

* make QOSEventHandlerBase::add_to_wait_set() throw

Signed-off-by: Miaofei <miaofei@amazon.com>

* mark throw_from_rcl_error as [[noreturn]]

Signed-off-by: Miaofei <miaofei@amazon.com>

* fix windows compilation error

Signed-off-by: Miaofei <miaofei@amazon.com>

* Ignore uncrustify for single [[noreturn]] syntax instance

Signed-off-by: Emerson Knapp <eknapp@amazon.com>
This commit is contained in:
M. M 2019-05-03 10:16:39 -07:00 committed by William Woodall
parent ecf35114b6
commit d399fef9c6
37 changed files with 446 additions and 28 deletions

View file

@ -65,6 +65,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/parameter_map.cpp src/rclcpp/parameter_map.cpp
src/rclcpp/parameter_service.cpp src/rclcpp/parameter_service.cpp
src/rclcpp/publisher_base.cpp src/rclcpp/publisher_base.cpp
src/rclcpp/qos_event.cpp
src/rclcpp/service.cpp src/rclcpp/service.cpp
src/rclcpp/signal_handler.cpp src/rclcpp/signal_handler.cpp
src/rclcpp/subscription_base.cpp src/rclcpp/subscription_base.cpp

View file

@ -21,6 +21,7 @@
#include <vector> #include <vector>
#include "rclcpp/client.hpp" #include "rclcpp/client.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/service.hpp" #include "rclcpp/service.hpp"
#include "rclcpp/subscription.hpp" #include "rclcpp/subscription.hpp"
#include "rclcpp/timer.hpp" #include "rclcpp/timer.hpp"
@ -92,6 +93,10 @@ public:
protected: protected:
RCLCPP_DISABLE_COPY(CallbackGroup) RCLCPP_DISABLE_COPY(CallbackGroup)
RCLCPP_PUBLIC
void
add_publisher(const rclcpp::PublisherBase::SharedPtr publisher_ptr);
RCLCPP_PUBLIC RCLCPP_PUBLIC
void void
add_subscription(const rclcpp::SubscriptionBase::SharedPtr subscription_ptr); add_subscription(const rclcpp::SubscriptionBase::SharedPtr subscription_ptr);

View file

@ -31,6 +31,8 @@ create_publisher(
rclcpp::node_interfaces::NodeTopicsInterface * node_topics, rclcpp::node_interfaces::NodeTopicsInterface * node_topics,
const std::string & topic_name, const std::string & topic_name,
const rmw_qos_profile_t & qos_profile, const rmw_qos_profile_t & qos_profile,
const PublisherEventCallbacks & event_callbacks,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool use_intra_process_comms, bool use_intra_process_comms,
std::shared_ptr<AllocatorT> allocator) std::shared_ptr<AllocatorT> allocator)
{ {
@ -39,10 +41,12 @@ create_publisher(
auto pub = node_topics->create_publisher( auto pub = node_topics->create_publisher(
topic_name, topic_name,
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(allocator), rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(event_callbacks, allocator),
publisher_options, publisher_options,
use_intra_process_comms); use_intra_process_comms);
node_topics->add_publisher(pub);
node_topics->add_publisher(pub, group);
return std::dynamic_pointer_cast<PublisherT>(pub); return std::dynamic_pointer_cast<PublisherT>(pub);
} }

View file

@ -38,6 +38,7 @@ create_subscription(
const std::string & topic_name, const std::string & topic_name,
CallbackT && callback, CallbackT && callback,
const rmw_qos_profile_t & qos_profile, const rmw_qos_profile_t & qos_profile,
const SubscriptionEventCallbacks & event_callbacks,
rclcpp::callback_group::CallbackGroup::SharedPtr group, rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications, bool ignore_local_publications,
bool use_intra_process_comms, bool use_intra_process_comms,
@ -52,7 +53,10 @@ create_subscription(
auto factory = rclcpp::create_subscription_factory auto factory = rclcpp::create_subscription_factory
<MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT>( <MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT>(
std::forward<CallbackT>(callback), msg_mem_strat, allocator); std::forward<CallbackT>(callback),
event_callbacks,
msg_mem_strat,
allocator);
auto sub = node_topics->create_subscription( auto sub = node_topics->create_subscription(
topic_name, topic_name,

View file

@ -110,13 +110,15 @@ public:
* \throws std::runtime_error if the rcl_get_error_state returns 0 * \throws std::runtime_error if the rcl_get_error_state returns 0
* \throws RCLErrorBase some child class exception based on ret * \throws RCLErrorBase some child class exception based on ret
*/ */
/* *INDENT-OFF* */ // Uncrustify cannot yet understand [[noreturn]] properly
RCLCPP_PUBLIC RCLCPP_PUBLIC
void void
throw_from_rcl_error( throw_from_rcl_error [[noreturn]] (
rcl_ret_t ret, rcl_ret_t ret,
const std::string & prefix = "", const std::string & prefix = "",
const rcl_error_state_t * error_state = nullptr, const rcl_error_state_t * error_state = nullptr,
void (* reset_error)() = rcl_reset_error); void (* reset_error)() = rcl_reset_error);
/* *INDENT-ON* */
class RCLErrorBase class RCLErrorBase
{ {

View file

@ -51,6 +51,7 @@ public:
virtual size_t number_of_ready_subscriptions() const = 0; virtual size_t number_of_ready_subscriptions() const = 0;
virtual size_t number_of_ready_services() const = 0; virtual size_t number_of_ready_services() const = 0;
virtual size_t number_of_ready_clients() const = 0; virtual size_t number_of_ready_clients() const = 0;
virtual size_t number_of_ready_events() const = 0;
virtual size_t number_of_ready_timers() const = 0; virtual size_t number_of_ready_timers() const = 0;
virtual size_t number_of_guard_conditions() const = 0; virtual size_t number_of_guard_conditions() const = 0;
virtual size_t number_of_waitables() const = 0; virtual size_t number_of_waitables() const = 0;

View file

@ -1126,6 +1126,19 @@ public:
const NodeOptions & const NodeOptions &
get_node_options() const; get_node_options() const;
/// Manually assert that this Node is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE).
/**
* If the rmw Liveliness policy is set to RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE, the creator
* of this node may manually call `assert_liveliness` at some point in time to signal to the rest
* of the system that this Node is still alive.
*
* \return `true` if the liveliness was asserted successfully, otherwise `false`
*/
RCLCPP_PUBLIC
RCUTILS_WARN_UNUSED
bool
assert_liveliness() const;
protected: protected:
/// Construct a sub-node, which will extend the namespace of all entities created with it. /// Construct a sub-node, which will extend the namespace of all entities created with it.
/** /**

View file

@ -97,6 +97,8 @@ Node::create_publisher(
this->node_topics_.get(), this->node_topics_.get(),
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
qos_profile, qos_profile,
options.event_callbacks,
options.callback_group,
use_intra_process, use_intra_process,
allocator); allocator);
} }
@ -181,6 +183,7 @@ Node::create_subscription(
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
std::forward<CallbackT>(callback), std::forward<CallbackT>(callback),
qos_profile, qos_profile,
options.event_callbacks,
options.callback_group, options.callback_group,
options.ignore_local_publications, options.ignore_local_publications,
use_intra_process, use_intra_process,

View file

@ -86,6 +86,11 @@ public:
std::shared_ptr<const rcl_node_t> std::shared_ptr<const rcl_node_t>
get_shared_rcl_node_handle() const; get_shared_rcl_node_handle() const;
RCLCPP_PUBLIC
virtual
bool
assert_liveliness() const;
RCLCPP_PUBLIC RCLCPP_PUBLIC
virtual virtual
rclcpp::callback_group::CallbackGroup::SharedPtr rclcpp::callback_group::CallbackGroup::SharedPtr

View file

@ -102,6 +102,12 @@ public:
std::shared_ptr<const rcl_node_t> std::shared_ptr<const rcl_node_t>
get_shared_rcl_node_handle() const = 0; get_shared_rcl_node_handle() const = 0;
/// Manually assert that this Node is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE).
RCLCPP_PUBLIC
virtual
bool
assert_liveliness() const = 0;
/// Create and return a callback group. /// Create and return a callback group.
RCLCPP_PUBLIC RCLCPP_PUBLIC
virtual virtual

View file

@ -58,7 +58,8 @@ public:
virtual virtual
void void
add_publisher( add_publisher(
rclcpp::PublisherBase::SharedPtr publisher); rclcpp::PublisherBase::SharedPtr publisher,
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group);
RCLCPP_PUBLIC RCLCPP_PUBLIC
virtual virtual

View file

@ -57,7 +57,8 @@ public:
virtual virtual
void void
add_publisher( add_publisher(
rclcpp::PublisherBase::SharedPtr publisher) = 0; rclcpp::PublisherBase::SharedPtr publisher,
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group) = 0;
RCLCPP_PUBLIC RCLCPP_PUBLIC
virtual virtual

View file

@ -127,6 +127,7 @@ public:
"parameter_events", "parameter_events",
std::forward<CallbackT>(callback), std::forward<CallbackT>(callback),
rmw_qos_profile_default, rmw_qos_profile_default,
SubscriptionEventCallbacks(),
nullptr, // group, nullptr, // group,
false, // ignore_local_publications, false, // ignore_local_publications,
false, // use_intra_process_comms_, false, // use_intra_process_comms_,

View file

@ -34,7 +34,6 @@
#include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/intra_process_manager.hpp" #include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/publisher_base.hpp" #include "rclcpp/publisher_base.hpp"
#include "rclcpp/type_support_decl.hpp" #include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
@ -59,6 +58,7 @@ public:
rclcpp::node_interfaces::NodeBaseInterface * node_base, rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic, const std::string & topic,
const rcl_publisher_options_t & publisher_options, const rcl_publisher_options_t & publisher_options,
const PublisherEventCallbacks & event_callbacks,
const std::shared_ptr<MessageAlloc> & allocator) const std::shared_ptr<MessageAlloc> & allocator)
: PublisherBase( : PublisherBase(
node_base, node_base,
@ -68,6 +68,15 @@ public:
message_allocator_(allocator) message_allocator_(allocator)
{ {
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get()); allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get());
if (event_callbacks.deadline_callback) {
this->add_event_handler(event_callbacks.deadline_callback,
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
}
if (event_callbacks.liveliness_callback) {
this->add_event_handler(event_callbacks.liveliness_callback,
RCL_PUBLISHER_LIVELINESS_LOST);
}
} }
virtual ~Publisher() virtual ~Publisher()

View file

@ -23,11 +23,13 @@
#include <memory> #include <memory>
#include <sstream> #include <sstream>
#include <string> #include <string>
#include <vector>
#include "rcl/publisher.h" #include "rcl/publisher.h"
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
#include "rclcpp/mapped_ring_buffer.hpp" #include "rclcpp/mapped_ring_buffer.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/type_support_decl.hpp" #include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
@ -112,6 +114,12 @@ public:
const rcl_publisher_t * const rcl_publisher_t *
get_publisher_handle() const; get_publisher_handle() const;
/// Get all the QoS event handlers associated with this publisher.
/** \return The vector of QoS event handlers. */
RCLCPP_PUBLIC
const std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
get_event_handlers() const;
/// Get subscription count /// Get subscription count
/** \return The number of subscriptions. */ /** \return The number of subscriptions. */
RCLCPP_PUBLIC RCLCPP_PUBLIC
@ -124,6 +132,19 @@ public:
size_t size_t
get_intra_process_subscription_count() const; get_intra_process_subscription_count() const;
/// Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC).
/**
* If the rmw Liveliness policy is set to RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC, the creator
* of this publisher may manually call `assert_liveliness` at some point in time to signal to the
* rest of the system that this Node is still alive.
*
* \return `true` if the liveliness was asserted successfully, otherwise `false`
*/
RCLCPP_PUBLIC
RCUTILS_WARN_UNUSED
bool
assert_liveliness() const;
/// Get the actual QoS settings, after the defaults have been determined. /// Get the actual QoS settings, after the defaults have been determined.
/** /**
* The actual configuration applied when using RMW_QOS_POLICY_*_SYSTEM_DEFAULT * The actual configuration applied when using RMW_QOS_POLICY_*_SYSTEM_DEFAULT
@ -175,11 +196,27 @@ public:
const rcl_publisher_options_t & intra_process_options); const rcl_publisher_options_t & intra_process_options);
protected: protected:
template<typename EventCallbackT>
void
add_event_handler(
const EventCallbackT & callback,
const rcl_publisher_event_type_t event_type)
{
auto handler = std::make_shared<QOSEventHandler<EventCallbackT>>(
callback,
rcl_publisher_event_init,
&publisher_handle_,
event_type);
event_handlers_.emplace_back(handler);
}
std::shared_ptr<rcl_node_t> rcl_node_handle_; std::shared_ptr<rcl_node_t> rcl_node_handle_;
rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher(); rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher();
rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher(); rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher();
std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> event_handlers_;
using IntraProcessManagerWeakPtr = using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>; std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
bool intra_process_is_enabled_; bool intra_process_is_enabled_;

View file

@ -57,13 +57,15 @@ struct PublisherFactory
/// Return a PublisherFactory with functions setup for creating a PublisherT<MessageT, Alloc>. /// Return a PublisherFactory with functions setup for creating a PublisherT<MessageT, Alloc>.
template<typename MessageT, typename Alloc, typename PublisherT> template<typename MessageT, typename Alloc, typename PublisherT>
PublisherFactory PublisherFactory
create_publisher_factory(std::shared_ptr<Alloc> allocator) create_publisher_factory(
const PublisherEventCallbacks & event_callbacks,
std::shared_ptr<Alloc> allocator)
{ {
PublisherFactory factory; PublisherFactory factory;
// factory function that creates a MessageT specific PublisherT // factory function that creates a MessageT specific PublisherT
factory.create_typed_publisher = factory.create_typed_publisher =
[allocator]( [event_callbacks, allocator](
rclcpp::node_interfaces::NodeBaseInterface * node_base, rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic_name, const std::string & topic_name,
rcl_publisher_options_t & publisher_options) -> std::shared_ptr<PublisherT> rcl_publisher_options_t & publisher_options) -> std::shared_ptr<PublisherT>
@ -71,7 +73,12 @@ create_publisher_factory(std::shared_ptr<Alloc> allocator)
auto message_alloc = std::make_shared<typename PublisherT::MessageAlloc>(*allocator.get()); auto message_alloc = std::make_shared<typename PublisherT::MessageAlloc>(*allocator.get());
publisher_options.allocator = allocator::get_rcl_allocator<MessageT>(*message_alloc.get()); publisher_options.allocator = allocator::get_rcl_allocator<MessageT>(*message_alloc.get());
return std::make_shared<PublisherT>(node_base, topic_name, publisher_options, message_alloc); return std::make_shared<PublisherT>(
node_base,
topic_name,
publisher_options,
event_callbacks,
message_alloc);
}; };
// return the factory now that it is populated // return the factory now that it is populated

View file

@ -19,7 +19,9 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "rclcpp/callback_group.hpp"
#include "rclcpp/intra_process_setting.hpp" #include "rclcpp/intra_process_setting.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
namespace rclcpp namespace rclcpp
@ -29,10 +31,12 @@ namespace rclcpp
template<typename Allocator> template<typename Allocator>
struct PublisherOptionsWithAllocator struct PublisherOptionsWithAllocator
{ {
PublisherEventCallbacks event_callbacks;
/// The quality of service profile to pass on to the rmw implementation. /// The quality of service profile to pass on to the rmw implementation.
rmw_qos_profile_t qos_profile = rmw_qos_profile_default; rmw_qos_profile_t qos_profile = rmw_qos_profile_default;
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group;
/// Optional custom allocator. /// Optional custom allocator.
std::shared_ptr<Allocator> allocator = nullptr; std::shared_ptr<Allocator> allocator;
/// Setting to explicitly set intraprocess communications. /// Setting to explicitly set intraprocess communications.
IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault; IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault;
}; };

View file

@ -0,0 +1,126 @@
// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__QOS_EVENT_HPP_
#define RCLCPP__QOS_EVENT_HPP_
#include <functional>
#include "rcl/error_handling.h"
#include "rcutils/logging_macros.h"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/function_traits.hpp"
#include "rclcpp/waitable.hpp"
namespace rclcpp
{
using QOSDeadlineRequestedInfo = rmw_requested_deadline_missed_status_t;
using QOSDeadlineOfferedInfo = rmw_offered_deadline_missed_status_t;
using QOSLivelinessChangedInfo = rmw_liveliness_changed_status_t;
using QOSLivelinessLostInfo = rmw_liveliness_lost_status_t;
using QOSDeadlineRequestedCallbackType = std::function<void (QOSDeadlineRequestedInfo &)>;
using QOSDeadlineOfferedCallbackType = std::function<void (QOSDeadlineOfferedInfo &)>;
using QOSLivelinessChangedCallbackType = std::function<void (QOSLivelinessChangedInfo &)>;
using QOSLivelinessLostCallbackType = std::function<void (QOSLivelinessLostInfo &)>;
/// Contains callbacks for various types of events a Publisher can receive from the middleware.
struct PublisherEventCallbacks
{
QOSDeadlineOfferedCallbackType deadline_callback;
QOSLivelinessLostCallbackType liveliness_callback;
};
/// Contains callbacks for non-message events that a Subscription can receive from the middleware.
struct SubscriptionEventCallbacks
{
QOSDeadlineRequestedCallbackType deadline_callback;
QOSLivelinessChangedCallbackType liveliness_callback;
};
class QOSEventHandlerBase : public Waitable
{
public:
RCLCPP_PUBLIC
virtual ~QOSEventHandlerBase();
/// Get the number of ready events
RCLCPP_PUBLIC
size_t
get_number_of_ready_events() override;
/// Add the Waitable to a wait set.
RCLCPP_PUBLIC
bool
add_to_wait_set(rcl_wait_set_t * wait_set) override;
/// Check if the Waitable is ready.
RCLCPP_PUBLIC
bool
is_ready(rcl_wait_set_t * wait_set) override;
protected:
rcl_event_t event_handle_;
size_t wait_set_event_index_;
};
template<typename EventCallbackT>
class QOSEventHandler : public QOSEventHandlerBase
{
public:
template<typename InitFuncT, typename ParentHandleT, typename EventTypeEnum>
QOSEventHandler(
const EventCallbackT & callback,
InitFuncT init_func,
ParentHandleT parent_handle,
EventTypeEnum event_type)
: event_callback_(callback)
{
event_handle_ = rcl_get_zero_initialized_event();
rcl_ret_t ret = init_func(&event_handle_, parent_handle, event_type);
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret, "could not create event");
}
}
/// Execute any entities of the Waitable that are ready.
void
execute() override
{
EventCallbackInfoT callback_info;
rcl_ret_t ret = rcl_take_event(&event_handle_, &callback_info);
if (ret != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Couldn't take event info: %s", rcl_get_error_string().str);
return;
}
event_callback_(callback_info);
}
private:
using EventCallbackInfoT = typename std::remove_reference<typename
rclcpp::function_traits::function_traits<EventCallbackT>::template argument_type<0>>::type;
EventCallbackT event_callback_;
};
} // namespace rclcpp
#endif // RCLCPP__QOS_EVENT_HPP_

View file

@ -430,6 +430,15 @@ public:
return number_of_services; return number_of_services;
} }
size_t number_of_ready_events() const
{
size_t number_of_events = 0;
for (auto waitable : waitable_handles_) {
number_of_events += waitable->get_number_of_ready_events();
}
return number_of_events;
}
size_t number_of_ready_clients() const size_t number_of_ready_clients() const
{ {
size_t number_of_clients = client_handles_.size(); size_t number_of_clients = client_handles_.size();

View file

@ -42,6 +42,7 @@
#include "rclcpp/subscription_traits.hpp" #include "rclcpp/subscription_traits.hpp"
#include "rclcpp/type_support_decl.hpp" #include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
#include "rclcpp/waitable.hpp"
namespace rclcpp namespace rclcpp
{ {
@ -85,6 +86,7 @@ public:
const std::string & topic_name, const std::string & topic_name,
const rcl_subscription_options_t & subscription_options, const rcl_subscription_options_t & subscription_options,
AnySubscriptionCallback<CallbackMessageT, Alloc> callback, AnySubscriptionCallback<CallbackMessageT, Alloc> callback,
const SubscriptionEventCallbacks & event_callbacks,
typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT, Alloc>::SharedPtr typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT, Alloc>::SharedPtr
memory_strategy = message_memory_strategy::MessageMemoryStrategy<CallbackMessageT, memory_strategy = message_memory_strategy::MessageMemoryStrategy<CallbackMessageT,
Alloc>::create_default()) Alloc>::create_default())
@ -96,7 +98,16 @@ public:
rclcpp::subscription_traits::is_serialized_subscription_argument<CallbackMessageT>::value), rclcpp::subscription_traits::is_serialized_subscription_argument<CallbackMessageT>::value),
any_callback_(callback), any_callback_(callback),
message_memory_strategy_(memory_strategy) message_memory_strategy_(memory_strategy)
{} {
if (event_callbacks.deadline_callback) {
this->add_event_handler(event_callbacks.deadline_callback,
RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED);
}
if (event_callbacks.liveliness_callback) {
this->add_event_handler(event_callbacks.liveliness_callback,
RCL_SUBSCRIPTION_LIVELINESS_CHANGED);
}
}
/// Support dynamically setting the message memory strategy. /// Support dynamically setting the message memory strategy.
/** /**

View file

@ -17,6 +17,7 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <vector>
#include "rcl/subscription.h" #include "rcl/subscription.h"
@ -26,6 +27,7 @@
#include "rclcpp/any_subscription_callback.hpp" #include "rclcpp/any_subscription_callback.hpp"
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/type_support_decl.hpp" #include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
@ -90,6 +92,12 @@ public:
virtual const std::shared_ptr<rcl_subscription_t> virtual const std::shared_ptr<rcl_subscription_t>
get_intra_process_subscription_handle() const; get_intra_process_subscription_handle() const;
/// Get all the QoS event handlers associated with this subscription.
/** \return The vector of QoS event handlers. */
RCLCPP_PUBLIC
const std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
get_event_handlers() const;
/// Borrow a new message. /// Borrow a new message.
/** \return Shared pointer to the fresh message. */ /** \return Shared pointer to the fresh message. */
virtual std::shared_ptr<void> virtual std::shared_ptr<void>
@ -145,9 +153,25 @@ public:
const rcl_subscription_options_t & intra_process_options); const rcl_subscription_options_t & intra_process_options);
protected: protected:
std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_; template<typename EventCallbackT>
std::shared_ptr<rcl_subscription_t> subscription_handle_; void
add_event_handler(
const EventCallbackT & callback,
const rcl_subscription_event_type_t event_type)
{
auto handler = std::make_shared<QOSEventHandler<EventCallbackT>>(
callback,
rcl_subscription_event_init,
get_subscription_handle().get(),
event_type);
event_handlers_.emplace_back(handler);
}
std::shared_ptr<rcl_node_t> node_handle_; std::shared_ptr<rcl_node_t> node_handle_;
std::shared_ptr<rcl_subscription_t> subscription_handle_;
std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_;
std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> event_handlers_;
bool use_intra_process_; bool use_intra_process_;
IntraProcessManagerWeakPtr weak_ipm_; IntraProcessManagerWeakPtr weak_ipm_;

View file

@ -75,6 +75,7 @@ template<
SubscriptionFactory SubscriptionFactory
create_subscription_factory( create_subscription_factory(
CallbackT && callback, CallbackT && callback,
const SubscriptionEventCallbacks & event_callbacks,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy< typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
CallbackMessageT, Alloc>::SharedPtr CallbackMessageT, Alloc>::SharedPtr
msg_mem_strat, msg_mem_strat,
@ -91,7 +92,7 @@ create_subscription_factory(
// factory function that creates a MessageT specific SubscriptionT // factory function that creates a MessageT specific SubscriptionT
factory.create_typed_subscription = factory.create_typed_subscription =
[allocator, msg_mem_strat, any_subscription_callback, message_alloc]( [allocator, msg_mem_strat, any_subscription_callback, event_callbacks, message_alloc](
rclcpp::node_interfaces::NodeBaseInterface * node_base, rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic_name, const std::string & topic_name,
rcl_subscription_options_t & subscription_options rcl_subscription_options_t & subscription_options
@ -109,6 +110,7 @@ create_subscription_factory(
topic_name, topic_name,
subscription_options, subscription_options,
any_subscription_callback, any_subscription_callback,
event_callbacks,
msg_mem_strat); msg_mem_strat);
auto sub_base_ptr = std::dynamic_pointer_cast<SubscriptionBase>(sub); auto sub_base_ptr = std::dynamic_pointer_cast<SubscriptionBase>(sub);
return sub_base_ptr; return sub_base_ptr;

View file

@ -19,7 +19,9 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "rclcpp/callback_group.hpp"
#include "rclcpp/intra_process_setting.hpp" #include "rclcpp/intra_process_setting.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
namespace rclcpp namespace rclcpp
@ -29,14 +31,15 @@ namespace rclcpp
template<typename Allocator> template<typename Allocator>
struct SubscriptionOptionsWithAllocator struct SubscriptionOptionsWithAllocator
{ {
SubscriptionEventCallbacks event_callbacks;
/// The quality of service profile to pass on to the rmw implementation. /// The quality of service profile to pass on to the rmw implementation.
rmw_qos_profile_t qos_profile = rmw_qos_profile_default; rmw_qos_profile_t qos_profile = rmw_qos_profile_default;
/// True to ignore local publications. /// True to ignore local publications.
bool ignore_local_publications = false; bool ignore_local_publications = false;
/// The callback group for this subscription. NULL to use the default callback group. /// The callback group for this subscription. NULL to use the default callback group.
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group = nullptr; rclcpp::callback_group::CallbackGroup::SharedPtr callback_group;
/// Optional custom allocator. /// Optional custom allocator.
std::shared_ptr<Allocator> allocator = nullptr; std::shared_ptr<Allocator> allocator;
/// Setting to explicitly set intraprocess communications. /// Setting to explicitly set intraprocess communications.
IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault; IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault;
}; };

View file

@ -61,6 +61,17 @@ public:
size_t size_t
get_number_of_ready_clients(); get_number_of_ready_clients();
/// Get the number of ready events
/**
* Returns a value of 0 by default.
* This should be overridden if the Waitable contains one or more events.
* \return The number of events associated with the Waitable.
*/
RCLCPP_PUBLIC
virtual
size_t
get_number_of_ready_events();
/// Get the number of ready services /// Get the number of ready services
/** /**
* Returns a value of 0 by default. * Returns a value of 0 by default.
@ -88,6 +99,7 @@ public:
/** /**
* \param[in] wait_set A handle to the wait set to add the Waitable to. * \param[in] wait_set A handle to the wait set to add the Waitable to.
* \return `true` if the Waitable is added successfully, `false` otherwise. * \return `true` if the Waitable is added successfully, `false` otherwise.
* \throws rclcpp::execptions::RCLError from rcl_wait_set_add_*()
*/ */
RCLCPP_PUBLIC RCLCPP_PUBLIC
virtual virtual

View file

@ -60,7 +60,11 @@ Executor::Executor(const ExecutorArgs & args)
// Store the context for later use. // Store the context for later use.
context_ = args.context; context_ = args.context;
ret = rcl_wait_set_init(&wait_set_, 0, 2, 0, 0, 0, context_->get_rcl_context().get(), allocator); ret = rcl_wait_set_init(
&wait_set_,
0, 2, 0, 0, 0, 0,
context_->get_rcl_context().get(),
allocator);
if (RCL_RET_OK != ret) { if (RCL_RET_OK != ret) {
RCUTILS_LOG_ERROR_NAMED( RCUTILS_LOG_ERROR_NAMED(
"rclcpp", "rclcpp",
@ -435,7 +439,8 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
rcl_ret_t ret = rcl_wait_set_resize( rcl_ret_t ret = rcl_wait_set_resize(
&wait_set_, memory_strategy_->number_of_ready_subscriptions(), &wait_set_, memory_strategy_->number_of_ready_subscriptions(),
memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services()); memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
memory_strategy_->number_of_ready_events());
if (RCL_RET_OK != ret) { if (RCL_RET_OK != ret) {
throw std::runtime_error( throw std::runtime_error(
std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str); std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str);

View file

@ -79,6 +79,7 @@ GraphListener::start_if_not_started()
0, // number_of_timers 0, // number_of_timers
0, // number_of_clients 0, // number_of_clients
0, // number_of_services 0, // number_of_services
0, // number_of_events
this->parent_context_->get_rcl_context().get(), this->parent_context_->get_rcl_context().get(),
rcl_get_default_allocator()); rcl_get_default_allocator());
if (RCL_RET_OK != ret) { if (RCL_RET_OK != ret) {
@ -145,7 +146,7 @@ GraphListener::run_loop()
const size_t node_graph_interfaces_size = node_graph_interfaces_.size(); const size_t node_graph_interfaces_size = node_graph_interfaces_.size();
// Add 2 for the interrupt and shutdown guard conditions // Add 2 for the interrupt and shutdown guard conditions
if (wait_set_.size_of_guard_conditions < (node_graph_interfaces_size + 2)) { if (wait_set_.size_of_guard_conditions < (node_graph_interfaces_size + 2)) {
ret = rcl_wait_set_resize(&wait_set_, 0, node_graph_interfaces_size + 2, 0, 0, 0); ret = rcl_wait_set_resize(&wait_set_, 0, node_graph_interfaces_size + 2, 0, 0, 0, 0);
if (RCL_RET_OK != ret) { if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "failed to resize wait set"); throw_from_rcl_error(ret, "failed to resize wait set");
} }

View file

@ -470,3 +470,9 @@ Node::get_node_options() const
{ {
return this->node_options_; return this->node_options_;
} }
bool
Node::assert_liveliness() const
{
return node_base_->assert_liveliness();
}

View file

@ -197,6 +197,12 @@ NodeBase::get_shared_rcl_node_handle() const
return node_handle_; return node_handle_;
} }
bool
NodeBase::assert_liveliness() const
{
return RCL_RET_OK == rcl_node_assert_liveliness(get_rcl_node_handle());
}
rclcpp::callback_group::CallbackGroup::SharedPtr rclcpp::callback_group::CallbackGroup::SharedPtr
NodeBase::create_callback_group(rclcpp::callback_group::CallbackGroupType group_type) NodeBase::create_callback_group(rclcpp::callback_group::CallbackGroupType group_type)
{ {

View file

@ -75,6 +75,8 @@ NodeParameters::NodeParameters(
node_topics.get(), node_topics.get(),
"parameter_events", "parameter_events",
parameter_event_qos_profile, parameter_event_qos_profile,
PublisherEventCallbacks(),
nullptr,
use_intra_process, use_intra_process,
allocator); allocator);
} }

View file

@ -60,11 +60,22 @@ NodeTopics::create_publisher(
void void
NodeTopics::add_publisher( NodeTopics::add_publisher(
rclcpp::PublisherBase::SharedPtr publisher) rclcpp::PublisherBase::SharedPtr publisher,
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group)
{ {
// The publisher is not added to a callback group or anthing like that for now. // Assign to a group.
// It may be stored within the NodeTopics class or the NodeBase class in the future. if (callback_group) {
(void)publisher; if (!node_base_->callback_group_in_node(callback_group)) {
throw std::runtime_error("Cannot create publisher, callback group not in node.");
}
} else {
callback_group = node_base_->get_default_callback_group();
}
for (auto & publisher_event : publisher->get_event_handlers()) {
callback_group->add_waitable(publisher_event);
}
// Notify the executor that a new publisher was created using the parent Node. // Notify the executor that a new publisher was created using the parent Node.
{ {
auto notify_guard_condition_lock = node_base_->acquire_notify_guard_condition_lock(); auto notify_guard_condition_lock = node_base_->acquire_notify_guard_condition_lock();
@ -111,9 +122,13 @@ NodeTopics::add_subscription(
// TODO(jacquelinekay): use custom exception // TODO(jacquelinekay): use custom exception
throw std::runtime_error("Cannot create subscription, callback group not in node."); throw std::runtime_error("Cannot create subscription, callback group not in node.");
} }
callback_group->add_subscription(subscription);
} else { } else {
node_base_->get_default_callback_group()->add_subscription(subscription); callback_group = node_base_->get_default_callback_group();
}
callback_group->add_subscription(subscription);
for (auto & subscription_event : subscription->get_event_handlers()) {
callback_group->add_waitable(subscription_event);
} }
// Notify the executor that a new subscription was created using the parent Node. // Notify the executor that a new subscription was created using the parent Node.

View file

@ -22,6 +22,7 @@
#include <mutex> #include <mutex>
#include <sstream> #include <sstream>
#include <string> #include <string>
#include <vector>
#include "rcl_interfaces/msg/intra_process_message.hpp" #include "rcl_interfaces/msg/intra_process_message.hpp"
#include "rcutils/logging_macros.h" #include "rcutils/logging_macros.h"
@ -30,10 +31,10 @@
#include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/exceptions.hpp" #include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/intra_process_manager.hpp" #include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
#include "rclcpp/node.hpp" #include "rclcpp/node.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
using rclcpp::PublisherBase; using rclcpp::PublisherBase;
@ -80,6 +81,9 @@ PublisherBase::PublisherBase(
PublisherBase::~PublisherBase() PublisherBase::~PublisherBase()
{ {
// must fini the events before fini-ing the publisher
event_handlers_.clear();
if (rcl_publisher_fini(&intra_process_publisher_handle_, rcl_node_handle_.get()) != RCL_RET_OK) { if (rcl_publisher_fini(&intra_process_publisher_handle_, rcl_node_handle_.get()) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED( RCUTILS_LOG_ERROR_NAMED(
"rclcpp", "rclcpp",
@ -153,6 +157,12 @@ PublisherBase::get_publisher_handle() const
return &publisher_handle_; return &publisher_handle_;
} }
const std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
PublisherBase::get_event_handlers() const
{
return event_handlers_;
}
size_t size_t
PublisherBase::get_subscription_count() const PublisherBase::get_subscription_count() const
{ {
@ -207,6 +217,12 @@ PublisherBase::get_actual_qos() const
return *qos; return *qos;
} }
bool
PublisherBase::assert_liveliness() const
{
return RCL_RET_OK == rcl_publisher_assert_liveliness(&publisher_handle_);
}
bool bool
PublisherBase::operator==(const rmw_gid_t & gid) const PublisherBase::operator==(const rmw_gid_t & gid) const
{ {

View file

@ -0,0 +1,55 @@
// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "rclcpp/qos_event.hpp"
namespace rclcpp
{
QOSEventHandlerBase::~QOSEventHandlerBase()
{
if (rcl_event_fini(&event_handle_) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Error in destruction of rcl event handle: %s", rcl_get_error_string().str);
rcl_reset_error();
}
}
/// Get the number of ready events.
size_t
QOSEventHandlerBase::get_number_of_ready_events()
{
return 1;
}
/// Add the Waitable to a wait set.
bool
QOSEventHandlerBase::add_to_wait_set(rcl_wait_set_t * wait_set)
{
rcl_ret_t ret = rcl_wait_set_add_event(wait_set, &event_handle_, &wait_set_event_index_);
if (RCL_RET_OK != ret) {
exceptions::throw_from_rcl_error(ret, "Couldn't add event to wait set");
}
return true;
}
/// Check if the Waitable is ready.
bool
QOSEventHandlerBase::is_ready(rcl_wait_set_t * wait_set)
{
return wait_set->events[wait_set_event_index_] == &event_handle_;
}
} // namespace rclcpp

View file

@ -17,6 +17,7 @@
#include <cstdio> #include <cstdio>
#include <memory> #include <memory>
#include <string> #include <string>
#include <vector>
#include "rclcpp/exceptions.hpp" #include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/expand_topic_or_service_name.hpp"
@ -121,6 +122,12 @@ SubscriptionBase::get_intra_process_subscription_handle() const
return intra_process_subscription_handle_; return intra_process_subscription_handle_;
} }
const std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
SubscriptionBase::get_event_handlers() const
{
return event_handlers_;
}
const rosidl_message_type_support_t & const rosidl_message_type_support_t &
SubscriptionBase::get_message_type_support_handle() const SubscriptionBase::get_message_type_support_handle() const
{ {

View file

@ -223,6 +223,7 @@ void TimeSource::create_clock_sub()
topic_name, topic_name,
std::move(cb), std::move(cb),
rmw_qos_profile_default, rmw_qos_profile_default,
rclcpp::SubscriptionEventCallbacks(),
group, group,
false, false,
false, false,

View file

@ -34,6 +34,12 @@ Waitable::get_number_of_ready_clients()
return 0u; return 0u;
} }
size_t
Waitable::get_number_of_ready_events()
{
return 0u;
}
size_t size_t
Waitable::get_number_of_ready_services() Waitable::get_number_of_ready_services()
{ {

View file

@ -23,10 +23,12 @@
#include "rclcpp/contexts/default_context.hpp" #include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/intra_process_manager.hpp" #include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/event.hpp"
#include "rclcpp/parameter.hpp" #include "rclcpp/parameter.hpp"
#include "rclcpp/create_publisher.hpp" #include "rclcpp/create_publisher.hpp"
#include "rclcpp/create_service.hpp" #include "rclcpp/create_service.hpp"
#include "rclcpp/create_subscription.hpp" #include "rclcpp/create_subscription.hpp"
#include "rclcpp/subscription_options.hpp"
#include "rclcpp/type_support_decl.hpp" #include "rclcpp/type_support_decl.hpp"
#include "lifecycle_publisher.hpp" #include "lifecycle_publisher.hpp"
@ -65,6 +67,8 @@ LifecycleNode::create_publisher(
this->node_topics_.get(), this->node_topics_.get(),
topic_name, topic_name,
qos_profile, qos_profile,
rclcpp::PublisherEventCallbacks(),
nullptr,
use_intra_process_comms_, use_intra_process_comms_,
allocator); allocator);
} }
@ -99,6 +103,7 @@ LifecycleNode::create_subscription(
topic_name, topic_name,
std::forward<CallbackT>(callback), std::forward<CallbackT>(callback),
qos_profile, qos_profile,
rclcpp::SubscriptionEventCallbacks(),
group, group,
ignore_local_publications, ignore_local_publications,
use_intra_process_comms_, use_intra_process_comms_,
@ -128,6 +133,7 @@ LifecycleNode::create_subscription(
topic_name, topic_name,
std::forward<CallbackT>(callback), std::forward<CallbackT>(callback),
qos, qos,
rclcpp::SubscriptionEventCallbacks(),
group, group,
ignore_local_publications, ignore_local_publications,
msg_mem_strat, msg_mem_strat,

View file

@ -61,9 +61,10 @@ public:
rclcpp::node_interfaces::NodeBaseInterface * node_base, rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic, const std::string & topic,
const rcl_publisher_options_t & publisher_options, const rcl_publisher_options_t & publisher_options,
const rclcpp::PublisherEventCallbacks event_callbacks,
std::shared_ptr<MessageAlloc> allocator) std::shared_ptr<MessageAlloc> allocator)
: rclcpp::Publisher<MessageT, Alloc>( : rclcpp::Publisher<MessageT, Alloc>(
node_base, topic, publisher_options, allocator), node_base, topic, publisher_options, event_callbacks, allocator),
enabled_(false), enabled_(false),
logger_(rclcpp::get_logger("LifecyclePublisher")) logger_(rclcpp::get_logger("LifecyclePublisher"))
{ {