adding callbacks and publish for unique_ptrs
This commit is contained in:
parent
c98907c1f5
commit
51ecd3f386
10 changed files with 433 additions and 92 deletions
127
rclcpp/include/rclcpp/any_subscription_callback.hpp
Normal file
127
rclcpp/include/rclcpp/any_subscription_callback.hpp
Normal file
|
@ -0,0 +1,127 @@
|
|||
// Copyright 2014 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_
|
||||
#define RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_
|
||||
|
||||
#include <rclcpp/function_traits.hpp>
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <type_traits>
|
||||
|
||||
#include <rmw/types.h>
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
|
||||
namespace any_subscription_callback
|
||||
{
|
||||
|
||||
template<typename MessageT>
|
||||
struct AnySubscriptionCallback
|
||||
{
|
||||
using SharedPtrCallback = std::function<void(const std::shared_ptr<MessageT> &)>;
|
||||
using SharedPtrWithInfoCallback =
|
||||
std::function<void(const std::shared_ptr<MessageT> &, const rmw_message_info_t &)>;
|
||||
using UniquePtrCallback = std::function<void(std::unique_ptr<MessageT> &)>;
|
||||
using UniquePtrWithInfoCallback =
|
||||
std::function<void(std::unique_ptr<MessageT> &, const rmw_message_info_t &)>;
|
||||
|
||||
SharedPtrCallback shared_ptr_callback;
|
||||
SharedPtrWithInfoCallback shared_ptr_with_info_callback;
|
||||
UniquePtrCallback unique_ptr_callback;
|
||||
UniquePtrWithInfoCallback unique_ptr_with_info_callback;
|
||||
|
||||
AnySubscriptionCallback()
|
||||
: shared_ptr_callback(nullptr), shared_ptr_with_info_callback(nullptr),
|
||||
unique_ptr_callback(nullptr), unique_ptr_with_info_callback(nullptr)
|
||||
{}
|
||||
|
||||
AnySubscriptionCallback(const AnySubscriptionCallback &) = default;
|
||||
|
||||
template<typename CallbackT,
|
||||
typename std::enable_if<
|
||||
function_traits<CallbackT>::arity == 1
|
||||
>::type * = nullptr,
|
||||
typename std::enable_if<
|
||||
std::is_same<
|
||||
typename function_traits<CallbackT>::template argument_type<0>,
|
||||
typename std::shared_ptr<MessageT>
|
||||
>::value
|
||||
>::type * = nullptr
|
||||
>
|
||||
void set(CallbackT callback)
|
||||
{
|
||||
shared_ptr_callback = callback;
|
||||
}
|
||||
|
||||
template<typename CallbackT,
|
||||
typename std::enable_if<
|
||||
function_traits<CallbackT>::arity == 2
|
||||
>::type * = nullptr,
|
||||
typename std::enable_if<
|
||||
std::is_same<
|
||||
typename function_traits<CallbackT>::template argument_type<0>,
|
||||
typename std::shared_ptr<MessageT>
|
||||
>::value
|
||||
>::type * = nullptr
|
||||
>
|
||||
void set(CallbackT callback)
|
||||
{
|
||||
shared_ptr_with_info_callback = callback;
|
||||
}
|
||||
/*
|
||||
template<typename CallbackT,
|
||||
typename std::enable_if<
|
||||
function_traits<CallbackT>::arity == 1
|
||||
>::type * = nullptr,
|
||||
typename std::enable_if<
|
||||
std::is_same<
|
||||
typename function_traits<CallbackT>::template argument_type<0>,
|
||||
typename std::unique_ptr<MessageT>
|
||||
>::value
|
||||
>::type * = nullptr
|
||||
>
|
||||
void set(CallbackT callback)
|
||||
{
|
||||
static_assert(std::is_same<
|
||||
typename function_traits<CallbackT>::template argument_type<0>,
|
||||
typename std::unique_ptr<MessageT>
|
||||
>::value, "Not a unique pointer");
|
||||
unique_ptr_callback = callback;
|
||||
}
|
||||
|
||||
template<typename CallbackT,
|
||||
typename std::enable_if<
|
||||
function_traits<CallbackT>::arity == 2
|
||||
>::type * = nullptr,
|
||||
typename std::enable_if<
|
||||
std::is_same<
|
||||
typename function_traits<CallbackT>::template argument_type<0>,
|
||||
typename std::unique_ptr<MessageT>
|
||||
>::value
|
||||
>::type * = nullptr
|
||||
>
|
||||
void set(CallbackT callback)
|
||||
{
|
||||
unique_ptr_with_info_callback = callback;
|
||||
}
|
||||
*/
|
||||
};
|
||||
|
||||
} /* namespace any_subscription_callback */
|
||||
} /* namespace rclcpp */
|
||||
|
||||
#endif /* RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_ */
|
|
@ -64,7 +64,7 @@ public:
|
|||
virtual void spin() = 0;
|
||||
|
||||
virtual void
|
||||
add_node(rclcpp::node::Node::SharedPtr & node_ptr, bool notify = true)
|
||||
add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify = true)
|
||||
{
|
||||
// Check to ensure node not already added
|
||||
for (auto & weak_node : weak_nodes_) {
|
||||
|
@ -190,7 +190,8 @@ protected:
|
|||
rmw_take_with_info(subscription->subscription_handle_, message.get(), &taken, &message_info);
|
||||
if (ret == RMW_RET_OK) {
|
||||
if (taken) {
|
||||
subscription->handle_message(message, &message_info.publisher_gid);
|
||||
message_info.from_intra_process = false;
|
||||
subscription->handle_message(message, message_info);
|
||||
}
|
||||
} else {
|
||||
fprintf(stderr,
|
||||
|
@ -206,10 +207,16 @@ protected:
|
|||
{
|
||||
rcl_interfaces::msg::IntraProcessMessage ipm;
|
||||
bool taken = false;
|
||||
rmw_ret_t status = rmw_take(subscription->intra_process_subscription_handle_, &ipm, &taken);
|
||||
rmw_message_info_t message_info;
|
||||
rmw_ret_t status = rmw_take_with_info(
|
||||
subscription->intra_process_subscription_handle_,
|
||||
&ipm,
|
||||
&taken,
|
||||
&message_info);
|
||||
if (status == RMW_RET_OK) {
|
||||
if (taken) {
|
||||
subscription->handle_intra_process_message(ipm);
|
||||
message_info.from_intra_process = true;
|
||||
subscription->handle_intra_process_message(ipm, message_info);
|
||||
}
|
||||
} else {
|
||||
fprintf(stderr,
|
||||
|
|
63
rclcpp/include/rclcpp/function_traits.hpp
Normal file
63
rclcpp/include/rclcpp/function_traits.hpp
Normal file
|
@ -0,0 +1,63 @@
|
|||
// Copyright 2014 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP_RCLCPP_FUNCTION_TRAITS_HPP_
|
||||
#define RCLCPP_RCLCPP_FUNCTION_TRAITS_HPP_
|
||||
#include <memory>
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
|
||||
/* NOTE(esteve):
|
||||
* We support service callbacks that can optionally take the request id,
|
||||
* which should be possible with two overloaded create_service methods,
|
||||
* but unfortunately std::function's constructor on VS2015 is too greedy,
|
||||
* so we need a mechanism for checking the arity and the type of each argument
|
||||
* in a callback function.
|
||||
*/
|
||||
|
||||
|
||||
template<typename FunctionT>
|
||||
struct function_traits
|
||||
{
|
||||
static constexpr std::size_t arity =
|
||||
function_traits<decltype( & FunctionT::operator())>::arity - 1;
|
||||
|
||||
|
||||
template<std::size_t N>
|
||||
using argument_type =
|
||||
typename function_traits<decltype( & FunctionT::operator())>::template argument_type<N + 1>;
|
||||
};
|
||||
|
||||
template<typename ReturnTypeT, typename ... Args>
|
||||
struct function_traits<ReturnTypeT(Args ...)>
|
||||
{
|
||||
static constexpr std::size_t arity = sizeof ... (Args);
|
||||
|
||||
template<std::size_t N>
|
||||
using argument_type = typename std::tuple_element<N, std::tuple<Args ...>>::type;
|
||||
};
|
||||
|
||||
template<typename ReturnTypeT, typename ... Args>
|
||||
struct function_traits<ReturnTypeT (*)(Args ...)>: public function_traits<ReturnTypeT(Args ...)>
|
||||
{};
|
||||
|
||||
template<typename ClassT, typename ReturnTypeT, typename ... Args>
|
||||
struct function_traits<ReturnTypeT (ClassT::*)(Args ...) const>
|
||||
: public function_traits<ReturnTypeT(ClassT &, Args ...)>
|
||||
{};
|
||||
|
||||
} /* namespace rclcpp */
|
||||
|
||||
#endif /* RCLCPP_RCLCPP_FUNCTION_TRAITS_HPP_ */
|
|
@ -29,6 +29,7 @@
|
|||
#include <rclcpp/callback_group.hpp>
|
||||
#include <rclcpp/client.hpp>
|
||||
#include <rclcpp/context.hpp>
|
||||
#include <rclcpp/function_traits.hpp>
|
||||
#include <rclcpp/macros.hpp>
|
||||
#include <rclcpp/message_memory_strategy.hpp>
|
||||
#include <rclcpp/parameter.hpp>
|
||||
|
@ -56,42 +57,6 @@ class Executor;
|
|||
namespace node
|
||||
{
|
||||
|
||||
/* NOTE(esteve):
|
||||
* We support service callbacks that can optionally take the request id,
|
||||
* which should be possible with two overloaded create_service methods,
|
||||
* but unfortunately std::function's constructor on VS2015 is too greedy,
|
||||
* so we need a mechanism for checking the arity and the type of each argument
|
||||
* in a callback function.
|
||||
*/
|
||||
template<typename FunctionT>
|
||||
struct function_traits
|
||||
{
|
||||
static constexpr std::size_t arity =
|
||||
function_traits<decltype( & FunctionT::operator())>::arity - 1;
|
||||
|
||||
template<std::size_t N>
|
||||
using argument_type =
|
||||
typename function_traits<decltype( & FunctionT::operator())>::template argument_type<N + 1>;
|
||||
};
|
||||
|
||||
template<typename ReturnTypeT, typename ... Args>
|
||||
struct function_traits<ReturnTypeT(Args ...)>
|
||||
{
|
||||
static constexpr std::size_t arity = sizeof ... (Args);
|
||||
|
||||
template<std::size_t N>
|
||||
using argument_type = typename std::tuple_element<N, std::tuple<Args ...>>::type;
|
||||
};
|
||||
|
||||
template<typename ReturnTypeT, typename ... Args>
|
||||
struct function_traits<ReturnTypeT (*)(Args ...)>: public function_traits<ReturnTypeT(Args ...)>
|
||||
{};
|
||||
|
||||
template<typename ClassT, typename ReturnTypeT, typename ... Args>
|
||||
struct function_traits<ReturnTypeT (ClassT::*)(Args ...) const>
|
||||
: public function_traits<ReturnTypeT(ClassT &, Args ...)>
|
||||
{};
|
||||
|
||||
/* ROS Node Interface.
|
||||
*
|
||||
* This is the single point of entry for creating publishers and subscribers.
|
||||
|
@ -130,12 +95,23 @@ public:
|
|||
Windows build breaks when static member function passed as default
|
||||
argument to msg_mem_strat, nullptr is a workaround.
|
||||
*/
|
||||
template<typename MessageT>
|
||||
template<typename MessageT, typename CallbackT>
|
||||
typename rclcpp::subscription::Subscription<MessageT>::SharedPtr
|
||||
create_subscription(
|
||||
const std::string & topic_name,
|
||||
const rmw_qos_profile_t & qos_profile,
|
||||
std::function<void(const std::shared_ptr<MessageT> &)> callback,
|
||||
CallbackT callback,
|
||||
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
|
||||
bool ignore_local_publications = false,
|
||||
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
|
||||
msg_mem_strat = nullptr);
|
||||
|
||||
template<typename MessageT>
|
||||
typename rclcpp::subscription::Subscription<MessageT>::SharedPtr
|
||||
create_subscription_with_unique_ptr_callback(
|
||||
const std::string & topic_name,
|
||||
const rmw_qos_profile_t & qos_profile,
|
||||
typename rclcpp::subscription::AnySubscriptionCallback<MessageT>::UniquePtrCallback callback,
|
||||
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
|
||||
bool ignore_local_publications = false,
|
||||
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
|
||||
|
@ -148,11 +124,12 @@ public:
|
|||
rclcpp::timer::CallbackType callback,
|
||||
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr);
|
||||
|
||||
rclcpp::timer::WallTimer::SharedPtr
|
||||
create_wall_timer(
|
||||
std::chrono::duration<long double, std::nano> period,
|
||||
rclcpp::timer::CallbackType callback,
|
||||
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr);
|
||||
// TODO(wjwwood): reenable this once I figure out why the demo doesn't build with it.
|
||||
// rclcpp::timer::WallTimer::SharedPtr
|
||||
// create_wall_timer(
|
||||
// std::chrono::duration<long double, std::nano> period,
|
||||
// rclcpp::timer::CallbackType callback,
|
||||
// rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr);
|
||||
|
||||
using CallbackGroup = rclcpp::callback_group::CallbackGroup;
|
||||
using CallbackGroupWeakPtr = std::weak_ptr<CallbackGroup>;
|
||||
|
@ -227,6 +204,16 @@ private:
|
|||
|
||||
publisher::Publisher::SharedPtr events_publisher_;
|
||||
|
||||
template<typename MessageT>
|
||||
typename subscription::Subscription<MessageT>::SharedPtr
|
||||
create_subscription_internal(
|
||||
const std::string & topic_name,
|
||||
const rmw_qos_profile_t & qos_profile,
|
||||
rclcpp::subscription::AnySubscriptionCallback<MessageT> callback,
|
||||
rclcpp::callback_group::CallbackGroup::SharedPtr group,
|
||||
bool ignore_local_publications,
|
||||
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr msg_mem_strat);
|
||||
|
||||
template<
|
||||
typename ServiceT,
|
||||
typename FunctorT,
|
||||
|
|
|
@ -153,7 +153,7 @@ Node::create_publisher(
|
|||
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
|
||||
// *INDENT-OFF*
|
||||
auto shared_publish_callback =
|
||||
[weak_ipm](uint64_t publisher_id, std::shared_ptr<void> msg) -> uint64_t
|
||||
[weak_ipm](uint64_t publisher_id, void * msg, const std::type_info & type_info) -> uint64_t
|
||||
{
|
||||
auto ipm = weak_ipm.lock();
|
||||
if (!ipm) {
|
||||
|
@ -161,8 +161,17 @@ Node::create_publisher(
|
|||
throw std::runtime_error(
|
||||
"intra process publish called after destruction of intra process manager");
|
||||
}
|
||||
auto typed_msg = std::static_pointer_cast<const MessageT>(msg);
|
||||
std::unique_ptr<MessageT> unique_msg(new MessageT(*typed_msg));
|
||||
if (!msg) {
|
||||
throw std::runtime_error("cannot publisher msg which is a null pointer");
|
||||
}
|
||||
auto & message_type_info = typeid(MessageT);
|
||||
if (message_type_info != type_info) {
|
||||
throw std::runtime_error(
|
||||
std::string("published type '") + type_info.name() +
|
||||
"' is incompatible from the publisher type '" + message_type_info.name() + "'");
|
||||
}
|
||||
MessageT * typed_message_ptr = static_cast<MessageT *>(msg);
|
||||
std::unique_ptr<MessageT> unique_msg(typed_message_ptr);
|
||||
uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg);
|
||||
return message_seq;
|
||||
};
|
||||
|
@ -188,12 +197,56 @@ Node::group_in_node(callback_group::CallbackGroup::SharedPtr & group)
|
|||
return group_belongs_to_this_node;
|
||||
}
|
||||
|
||||
template<typename MessageT>
|
||||
typename subscription::Subscription<MessageT>::SharedPtr
|
||||
template<typename MessageT, typename CallbackT>
|
||||
typename rclcpp::subscription::Subscription<MessageT>::SharedPtr
|
||||
Node::create_subscription(
|
||||
const std::string & topic_name,
|
||||
const rmw_qos_profile_t & qos_profile,
|
||||
std::function<void(const std::shared_ptr<MessageT> &)> callback,
|
||||
CallbackT callback,
|
||||
rclcpp::callback_group::CallbackGroup::SharedPtr group,
|
||||
bool ignore_local_publications,
|
||||
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
|
||||
msg_mem_strat)
|
||||
{
|
||||
rclcpp::subscription::AnySubscriptionCallback<MessageT> any_subscription_callback;
|
||||
any_subscription_callback.set(callback);
|
||||
return this->create_subscription_internal(
|
||||
topic_name,
|
||||
qos_profile,
|
||||
any_subscription_callback,
|
||||
group,
|
||||
ignore_local_publications,
|
||||
msg_mem_strat);
|
||||
}
|
||||
|
||||
template<typename MessageT>
|
||||
typename rclcpp::subscription::Subscription<MessageT>::SharedPtr
|
||||
Node::create_subscription_with_unique_ptr_callback(
|
||||
const std::string & topic_name,
|
||||
const rmw_qos_profile_t & qos_profile,
|
||||
typename rclcpp::subscription::AnySubscriptionCallback<MessageT>::UniquePtrCallback callback,
|
||||
rclcpp::callback_group::CallbackGroup::SharedPtr group,
|
||||
bool ignore_local_publications,
|
||||
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
|
||||
msg_mem_strat)
|
||||
{
|
||||
rclcpp::subscription::AnySubscriptionCallback<MessageT> any_subscription_callback;
|
||||
any_subscription_callback.unique_ptr_callback = callback;
|
||||
return this->create_subscription_internal(
|
||||
topic_name,
|
||||
qos_profile,
|
||||
any_subscription_callback,
|
||||
group,
|
||||
ignore_local_publications,
|
||||
msg_mem_strat);
|
||||
}
|
||||
|
||||
template<typename MessageT>
|
||||
typename subscription::Subscription<MessageT>::SharedPtr
|
||||
Node::create_subscription_internal(
|
||||
const std::string & topic_name,
|
||||
const rmw_qos_profile_t & qos_profile,
|
||||
rclcpp::subscription::AnySubscriptionCallback<MessageT> callback,
|
||||
rclcpp::callback_group::CallbackGroup::SharedPtr group,
|
||||
bool ignore_local_publications,
|
||||
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr msg_mem_strat)
|
||||
|
@ -305,17 +358,18 @@ Node::create_wall_timer(
|
|||
return timer;
|
||||
}
|
||||
|
||||
rclcpp::timer::WallTimer::SharedPtr
|
||||
Node::create_wall_timer(
|
||||
std::chrono::duration<long double, std::nano> period,
|
||||
rclcpp::timer::CallbackType callback,
|
||||
rclcpp::callback_group::CallbackGroup::SharedPtr group)
|
||||
{
|
||||
return create_wall_timer(
|
||||
std::chrono::duration_cast<std::chrono::nanoseconds>(period),
|
||||
callback,
|
||||
group);
|
||||
}
|
||||
// TODO(wjwwood): reenable this once I figure out why the demo doesn't build with it.
|
||||
// rclcpp::timer::WallTimer::SharedPtr
|
||||
// Node::create_wall_timer(
|
||||
// std::chrono::duration<long double, std::nano> period,
|
||||
// rclcpp::timer::CallbackType callback,
|
||||
// rclcpp::callback_group::CallbackGroup::SharedPtr group)
|
||||
// {
|
||||
// return create_wall_timer(
|
||||
// std::chrono::duration_cast<std::chrono::nanoseconds>(period),
|
||||
// callback,
|
||||
// group);
|
||||
// }
|
||||
|
||||
template<typename ServiceT>
|
||||
typename client::Client<ServiceT>::SharedPtr
|
||||
|
|
|
@ -229,7 +229,7 @@ public:
|
|||
|
||||
template<typename FunctorT>
|
||||
typename rclcpp::subscription::Subscription<rcl_interfaces::msg::ParameterEvent>::SharedPtr
|
||||
on_parameter_event(FunctorT & callback)
|
||||
on_parameter_event(FunctorT callback)
|
||||
{
|
||||
return node_->create_subscription<rcl_interfaces::msg::ParameterEvent>(
|
||||
"parameter_events", rmw_qos_profile_parameter_events, callback);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
|
@ -87,31 +88,73 @@ public:
|
|||
|
||||
template<typename MessageT>
|
||||
void
|
||||
publish(std::shared_ptr<MessageT> & msg)
|
||||
publish(std::unique_ptr<MessageT> & msg)
|
||||
{
|
||||
rmw_ret_t status;
|
||||
status = rmw_publish(publisher_handle_, msg.get());
|
||||
if (status != RMW_RET_OK) {
|
||||
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
|
||||
throw std::runtime_error(
|
||||
std::string("failed to publish message: ") + rmw_get_error_string_safe());
|
||||
// *INDENT-ON*
|
||||
}
|
||||
this->do_inter_process_publish<MessageT>(msg.get());
|
||||
if (store_intra_process_message_) {
|
||||
uint64_t message_seq = store_intra_process_message_(intra_process_publisher_id_, msg);
|
||||
// Take the pointer from the unique_msg, release it and pass as a void *
|
||||
// to the ipm. The ipm should then capture it again as a unique_ptr of
|
||||
// the correct type.
|
||||
// TODO(wjwwood):
|
||||
// investigate how to transfer the custom deleter (if there is one)
|
||||
// from the incoming unique_ptr through to the ipm's unique_ptr.
|
||||
// See: http://stackoverflow.com/questions/11002641/dynamic-casting-for-unique-ptr
|
||||
MessageT * msg_ptr = msg.get();
|
||||
msg.release();
|
||||
uint64_t message_seq;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(intra_process_publish_mutex_);
|
||||
message_seq =
|
||||
store_intra_process_message_(intra_process_publisher_id_, msg_ptr, typeid(MessageT));
|
||||
}
|
||||
rcl_interfaces::msg::IntraProcessMessage ipm;
|
||||
ipm.publisher_id = intra_process_publisher_id_;
|
||||
ipm.message_sequence = message_seq;
|
||||
status = rmw_publish(intra_process_publisher_handle_, &ipm);
|
||||
auto status = rmw_publish(intra_process_publisher_handle_, &ipm);
|
||||
if (status != RMW_RET_OK) {
|
||||
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
|
||||
throw std::runtime_error(
|
||||
std::string("failed to publish intra process message: ") + rmw_get_error_string_safe());
|
||||
// *INDENT-ON*
|
||||
}
|
||||
} else {
|
||||
// Always destroy the message, even if we don't consume it, for consistency.
|
||||
msg.reset();
|
||||
}
|
||||
}
|
||||
|
||||
template<typename MessageT>
|
||||
void
|
||||
publish(const std::shared_ptr<MessageT> & msg)
|
||||
{
|
||||
// Avoid allocating when not using intra process.
|
||||
if (!store_intra_process_message_) {
|
||||
// In this case we're not using intra process.
|
||||
return this->do_inter_process_publish(msg.get());
|
||||
}
|
||||
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
|
||||
// TODO(wjwwood):
|
||||
// The intra process manager should probably also be able to store
|
||||
// shared_ptr's and do the "smart" thing based on other intra process
|
||||
// subscriptions. For now call the other publish().
|
||||
std::unique_ptr<MessageT> unique_msg(new MessageT(*msg.get()));
|
||||
return this->publish(unique_msg);
|
||||
}
|
||||
|
||||
template<typename MessageT>
|
||||
void
|
||||
publish(const MessageT & msg)
|
||||
{
|
||||
// Avoid allocating when not using intra process.
|
||||
if (!store_intra_process_message_) {
|
||||
// In this case we're not using intra process.
|
||||
return this->do_inter_process_publish(msg.get());
|
||||
}
|
||||
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
|
||||
std::unique_ptr<MessageT> unique_msg(new MessageT(msg));
|
||||
return this->publish(unique_msg);
|
||||
}
|
||||
|
||||
const std::string &
|
||||
get_topic_name() const
|
||||
{
|
||||
|
@ -161,13 +204,27 @@ public:
|
|||
return result;
|
||||
}
|
||||
|
||||
typedef std::function<uint64_t(uint64_t, std::shared_ptr<void>)> StoreSharedMessageCallbackT;
|
||||
typedef std::function<uint64_t(uint64_t, void *, const std::type_info &)> StoreMessageCallbackT;
|
||||
|
||||
protected:
|
||||
template<typename MessageT>
|
||||
void
|
||||
do_inter_process_publish(MessageT * msg)
|
||||
{
|
||||
auto status = rmw_publish(publisher_handle_, msg);
|
||||
if (status != RMW_RET_OK) {
|
||||
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
|
||||
throw std::runtime_error(
|
||||
std::string("failed to publish message: ") + rmw_get_error_string_safe());
|
||||
// *INDENT-ON*
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
setup_intra_process(
|
||||
uint64_t intra_process_publisher_id,
|
||||
StoreSharedMessageCallbackT callback,
|
||||
StoreMessageCallbackT callback,
|
||||
rmw_publisher_t * intra_process_publisher_handle)
|
||||
{
|
||||
intra_process_publisher_id_ = intra_process_publisher_id;
|
||||
|
@ -194,11 +251,13 @@ private:
|
|||
size_t queue_size_;
|
||||
|
||||
uint64_t intra_process_publisher_id_;
|
||||
StoreSharedMessageCallbackT store_intra_process_message_;
|
||||
StoreMessageCallbackT store_intra_process_message_;
|
||||
|
||||
rmw_gid_t rmw_gid_;
|
||||
rmw_gid_t intra_process_rmw_gid_;
|
||||
|
||||
std::mutex intra_process_publish_mutex_;
|
||||
|
||||
};
|
||||
|
||||
} /* namespace publisher */
|
||||
|
|
|
@ -39,6 +39,18 @@ const std::chrono::nanoseconds operator"" _s(long double s)
|
|||
std::chrono::duration<long double>(s));
|
||||
}
|
||||
|
||||
const std::chrono::nanoseconds
|
||||
operator"" _ms(unsigned long long ms)
|
||||
{
|
||||
return std::chrono::milliseconds(ms);
|
||||
}
|
||||
const std::chrono::nanoseconds
|
||||
operator"" _ms(long double ms)
|
||||
{
|
||||
return std::chrono::duration_cast<std::chrono::nanoseconds>(
|
||||
std::chrono::duration<long double, std::milli>(ms));
|
||||
}
|
||||
|
||||
const std::chrono::nanoseconds
|
||||
operator"" _ns(unsigned long long ns)
|
||||
{
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
|
||||
#include <rclcpp/macros.hpp>
|
||||
#include <rclcpp/message_memory_strategy.hpp>
|
||||
#include <rclcpp/any_subscription_callback.hpp>
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
|
@ -94,9 +95,13 @@ public:
|
|||
}
|
||||
|
||||
virtual std::shared_ptr<void> create_message() = 0;
|
||||
virtual void handle_message(std::shared_ptr<void> & message, const rmw_gid_t * sender_id) = 0;
|
||||
virtual void handle_message(
|
||||
std::shared_ptr<void> & message,
|
||||
const rmw_message_info_t & message_info) = 0;
|
||||
virtual void return_message(std::shared_ptr<void> & message) = 0;
|
||||
virtual void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm) = 0;
|
||||
virtual void handle_intra_process_message(
|
||||
rcl_interfaces::msg::IntraProcessMessage & ipm,
|
||||
const rmw_message_info_t & message_info) = 0;
|
||||
|
||||
protected:
|
||||
rmw_subscription_t * intra_process_subscription_handle_;
|
||||
|
@ -113,13 +118,14 @@ private:
|
|||
|
||||
};
|
||||
|
||||
using namespace any_subscription_callback;
|
||||
|
||||
template<typename MessageT>
|
||||
class Subscription : public SubscriptionBase
|
||||
{
|
||||
friend class rclcpp::node::Node;
|
||||
|
||||
public:
|
||||
using CallbackType = std::function<void(const std::shared_ptr<MessageT> &)>;
|
||||
RCLCPP_SMART_PTR_DEFINITIONS(Subscription);
|
||||
|
||||
Subscription(
|
||||
|
@ -127,11 +133,11 @@ public:
|
|||
rmw_subscription_t * subscription_handle,
|
||||
const std::string & topic_name,
|
||||
bool ignore_local_publications,
|
||||
CallbackType callback,
|
||||
AnySubscriptionCallback<MessageT> callback,
|
||||
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr memory_strategy =
|
||||
message_memory_strategy::MessageMemoryStrategy<MessageT>::create_default())
|
||||
: SubscriptionBase(node_handle, subscription_handle, topic_name, ignore_local_publications),
|
||||
callback_(callback),
|
||||
any_callback_(callback),
|
||||
message_memory_strategy_(memory_strategy),
|
||||
get_intra_process_message_callback_(nullptr),
|
||||
matches_any_intra_process_publishers_(nullptr)
|
||||
|
@ -148,17 +154,29 @@ public:
|
|||
return message_memory_strategy_->borrow_message();
|
||||
}
|
||||
|
||||
void handle_message(std::shared_ptr<void> & message, const rmw_gid_t * sender_id)
|
||||
void handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info)
|
||||
{
|
||||
if (matches_any_intra_process_publishers_) {
|
||||
if (matches_any_intra_process_publishers_(sender_id)) {
|
||||
if (matches_any_intra_process_publishers_(&message_info.publisher_gid)) {
|
||||
// In this case, the message will be delivered via intra process and
|
||||
// we should ignore this copy of the message.
|
||||
return;
|
||||
}
|
||||
}
|
||||
auto typed_message = std::static_pointer_cast<MessageT>(message);
|
||||
callback_(typed_message);
|
||||
if (any_callback_.shared_ptr_callback) {
|
||||
any_callback_.shared_ptr_callback(typed_message);
|
||||
} else if (any_callback_.shared_ptr_with_info_callback) {
|
||||
any_callback_.shared_ptr_with_info_callback(typed_message, message_info);
|
||||
} else if (any_callback_.unique_ptr_callback) {
|
||||
std::unique_ptr<MessageT> unique_msg(new MessageT(*typed_message));
|
||||
any_callback_.unique_ptr_callback(unique_msg);
|
||||
} else if (any_callback_.unique_ptr_with_info_callback) {
|
||||
std::unique_ptr<MessageT> unique_msg(new MessageT(*typed_message));
|
||||
any_callback_.unique_ptr_with_info_callback(unique_msg, message_info);
|
||||
} else {
|
||||
throw std::runtime_error("unexpected message without any callback set");
|
||||
}
|
||||
}
|
||||
|
||||
void return_message(std::shared_ptr<void> & message)
|
||||
|
@ -167,7 +185,9 @@ public:
|
|||
message_memory_strategy_->return_message(typed_message);
|
||||
}
|
||||
|
||||
void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm)
|
||||
void handle_intra_process_message(
|
||||
rcl_interfaces::msg::IntraProcessMessage & ipm,
|
||||
const rmw_message_info_t & message_info)
|
||||
{
|
||||
if (!get_intra_process_message_callback_) {
|
||||
// throw std::runtime_error(
|
||||
|
@ -188,8 +208,19 @@ public:
|
|||
// TODO(wjwwood): should we notify someone of this? log error, log warning?
|
||||
return;
|
||||
}
|
||||
typename MessageT::SharedPtr shared_msg = std::move(msg);
|
||||
callback_(shared_msg);
|
||||
if (any_callback_.shared_ptr_callback) {
|
||||
typename MessageT::SharedPtr shared_msg = std::move(msg);
|
||||
any_callback_.shared_ptr_callback(shared_msg);
|
||||
} else if (any_callback_.shared_ptr_with_info_callback) {
|
||||
typename MessageT::SharedPtr shared_msg = std::move(msg);
|
||||
any_callback_.shared_ptr_with_info_callback(shared_msg, message_info);
|
||||
} else if (any_callback_.unique_ptr_callback) {
|
||||
any_callback_.unique_ptr_callback(msg);
|
||||
} else if (any_callback_.unique_ptr_with_info_callback) {
|
||||
any_callback_.unique_ptr_with_info_callback(msg, message_info);
|
||||
} else {
|
||||
throw std::runtime_error("unexpected message without any callback set");
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -213,7 +244,7 @@ private:
|
|||
|
||||
RCLCPP_DISABLE_COPY(Subscription);
|
||||
|
||||
CallbackType callback_;
|
||||
AnySubscriptionCallback<MessageT> any_callback_;
|
||||
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
|
||||
message_memory_strategy_;
|
||||
|
||||
|
|
|
@ -169,7 +169,8 @@ sleep_for(const std::chrono::nanoseconds & nanoseconds)
|
|||
// TODO: determine if posix's nanosleep(2) is more efficient here
|
||||
std::unique_lock<std::mutex> lock(::g_interrupt_mutex);
|
||||
auto cvs = ::g_interrupt_condition_variable.wait_for(lock, nanoseconds);
|
||||
return cvs == std::cv_status::no_timeout;
|
||||
// Return true if the timeout elapsed successfully, otherwise false.
|
||||
return cvs != std::cv_status::no_timeout;
|
||||
}
|
||||
|
||||
} /* namespace utilities */
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue