Merge branch 'master' into document_user_fns

Conflicts:
	rclcpp/include/rclcpp/node.hpp
	rclcpp/include/rclcpp/subscription.hpp
This commit is contained in:
Brian Gerkey 2015-08-28 13:50:31 -07:00
commit 1b631754e5
12 changed files with 440 additions and 97 deletions

View file

@ -14,6 +14,8 @@ if(AMENT_ENABLE_TESTING)
find_package(ament_lint_auto REQUIRED)
ament_lint_auto_find_test_dependencies()
find_package(rmw REQUIRED)
if(NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -Wextra")
endif()
@ -23,7 +25,9 @@ if(AMENT_ENABLE_TESTING)
ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp)
ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp)
if(TARGET test_intra_process_manager)
target_include_directories(test_intra_process_manager PUBLIC "${rcl_interfaces_INCLUDE_DIRS}")
target_include_directories(test_intra_process_manager PUBLIC
"${rcl_interfaces_INCLUDE_DIRS}"
"${rmw_INCLUDE_DIRS}")
endif()
endif()

View file

@ -0,0 +1,127 @@
// Copyright 2014 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_
#define RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_
#include <rclcpp/function_traits.hpp>
#include <functional>
#include <memory>
#include <type_traits>
#include <rmw/types.h>
namespace rclcpp
{
namespace any_subscription_callback
{
template<typename MessageT>
struct AnySubscriptionCallback
{
using SharedPtrCallback = std::function<void(const std::shared_ptr<MessageT> &)>;
using SharedPtrWithInfoCallback =
std::function<void(const std::shared_ptr<MessageT> &, const rmw_message_info_t &)>;
using UniquePtrCallback = std::function<void(std::unique_ptr<MessageT> &)>;
using UniquePtrWithInfoCallback =
std::function<void(std::unique_ptr<MessageT> &, const rmw_message_info_t &)>;
SharedPtrCallback shared_ptr_callback;
SharedPtrWithInfoCallback shared_ptr_with_info_callback;
UniquePtrCallback unique_ptr_callback;
UniquePtrWithInfoCallback unique_ptr_with_info_callback;
AnySubscriptionCallback()
: shared_ptr_callback(nullptr), shared_ptr_with_info_callback(nullptr),
unique_ptr_callback(nullptr), unique_ptr_with_info_callback(nullptr)
{}
AnySubscriptionCallback(const AnySubscriptionCallback &) = default;
template<typename CallbackT,
typename std::enable_if<
function_traits<CallbackT>::arity == 1
>::type * = nullptr,
typename std::enable_if<
std::is_same<
typename function_traits<CallbackT>::template argument_type<0>,
typename std::shared_ptr<MessageT>
>::value
>::type * = nullptr
>
void set(CallbackT callback)
{
shared_ptr_callback = callback;
}
template<typename CallbackT,
typename std::enable_if<
function_traits<CallbackT>::arity == 2
>::type * = nullptr,
typename std::enable_if<
std::is_same<
typename function_traits<CallbackT>::template argument_type<0>,
typename std::shared_ptr<MessageT>
>::value
>::type * = nullptr
>
void set(CallbackT callback)
{
shared_ptr_with_info_callback = callback;
}
/*
template<typename CallbackT,
typename std::enable_if<
function_traits<CallbackT>::arity == 1
>::type * = nullptr,
typename std::enable_if<
std::is_same<
typename function_traits<CallbackT>::template argument_type<0>,
typename std::unique_ptr<MessageT>
>::value
>::type * = nullptr
>
void set(CallbackT callback)
{
static_assert(std::is_same<
typename function_traits<CallbackT>::template argument_type<0>,
typename std::unique_ptr<MessageT>
>::value, "Not a unique pointer");
unique_ptr_callback = callback;
}
template<typename CallbackT,
typename std::enable_if<
function_traits<CallbackT>::arity == 2
>::type * = nullptr,
typename std::enable_if<
std::is_same<
typename function_traits<CallbackT>::template argument_type<0>,
typename std::unique_ptr<MessageT>
>::value
>::type * = nullptr
>
void set(CallbackT callback)
{
unique_ptr_with_info_callback = callback;
}
*/
};
} /* namespace any_subscription_callback */
} /* namespace rclcpp */
#endif /* RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_ */

View file

@ -88,7 +88,7 @@ public:
* node was added, it will wake up.
*/
virtual void
add_node(rclcpp::node::Node::SharedPtr & node_ptr, bool notify = true)
add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify = true)
{
// Check to ensure node not already added
for (auto & weak_node : weak_nodes_) {
@ -248,7 +248,8 @@ protected:
rmw_take_with_info(subscription->subscription_handle_, message.get(), &taken, &message_info);
if (ret == RMW_RET_OK) {
if (taken) {
subscription->handle_message(message, &message_info.publisher_gid);
message_info.from_intra_process = false;
subscription->handle_message(message, message_info);
}
} else {
fprintf(stderr,
@ -264,10 +265,16 @@ protected:
{
rcl_interfaces::msg::IntraProcessMessage ipm;
bool taken = false;
rmw_ret_t status = rmw_take(subscription->intra_process_subscription_handle_, &ipm, &taken);
rmw_message_info_t message_info;
rmw_ret_t status = rmw_take_with_info(
subscription->intra_process_subscription_handle_,
&ipm,
&taken,
&message_info);
if (status == RMW_RET_OK) {
if (taken) {
subscription->handle_intra_process_message(ipm);
message_info.from_intra_process = true;
subscription->handle_intra_process_message(ipm, message_info);
}
} else {
fprintf(stderr,

View file

@ -0,0 +1,63 @@
// Copyright 2014 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP_RCLCPP_FUNCTION_TRAITS_HPP_
#define RCLCPP_RCLCPP_FUNCTION_TRAITS_HPP_
#include <memory>
namespace rclcpp
{
/* NOTE(esteve):
* We support service callbacks that can optionally take the request id,
* which should be possible with two overloaded create_service methods,
* but unfortunately std::function's constructor on VS2015 is too greedy,
* so we need a mechanism for checking the arity and the type of each argument
* in a callback function.
*/
template<typename FunctionT>
struct function_traits
{
static constexpr std::size_t arity =
function_traits<decltype( & FunctionT::operator())>::arity - 1;
template<std::size_t N>
using argument_type =
typename function_traits<decltype( & FunctionT::operator())>::template argument_type<N + 1>;
};
template<typename ReturnTypeT, typename ... Args>
struct function_traits<ReturnTypeT(Args ...)>
{
static constexpr std::size_t arity = sizeof ... (Args);
template<std::size_t N>
using argument_type = typename std::tuple_element<N, std::tuple<Args ...>>::type;
};
template<typename ReturnTypeT, typename ... Args>
struct function_traits<ReturnTypeT (*)(Args ...)>: public function_traits<ReturnTypeT(Args ...)>
{};
template<typename ClassT, typename ReturnTypeT, typename ... Args>
struct function_traits<ReturnTypeT (ClassT::*)(Args ...) const>
: public function_traits<ReturnTypeT(ClassT &, Args ...)>
{};
} /* namespace rclcpp */
#endif /* RCLCPP_RCLCPP_FUNCTION_TRAITS_HPP_ */

View file

@ -29,6 +29,7 @@
#include <rclcpp/callback_group.hpp>
#include <rclcpp/client.hpp>
#include <rclcpp/context.hpp>
#include <rclcpp/function_traits.hpp>
#include <rclcpp/macros.hpp>
#include <rclcpp/message_memory_strategy.hpp>
#include <rclcpp/parameter.hpp>
@ -55,43 +56,6 @@ class Executor;
namespace node
{
/* NOTE(esteve):
* We support service callbacks that can optionally take the request id,
* which should be possible with two overloaded create_service methods,
* but unfortunately std::function's constructor on VS2015 is too greedy,
* so we need a mechanism for checking the arity and the type of each argument
* in a callback function.
*/
template<typename FunctionT>
struct function_traits
{
static constexpr std::size_t arity =
function_traits<decltype( & FunctionT::operator())>::arity - 1;
template<std::size_t N>
using argument_type =
typename function_traits<decltype( & FunctionT::operator())>::template argument_type<N + 1>;
};
template<typename ReturnTypeT, typename ... Args>
struct function_traits<ReturnTypeT(Args ...)>
{
static constexpr std::size_t arity = sizeof ... (Args);
template<std::size_t N>
using argument_type = typename std::tuple_element<N, std::tuple<Args ...>>::type;
};
template<typename ReturnTypeT, typename ... Args>
struct function_traits<ReturnTypeT (*)(Args ...)>: public function_traits<ReturnTypeT(Args ...)>
{};
template<typename ClassT, typename ReturnTypeT, typename ... Args>
struct function_traits<ReturnTypeT (ClassT::*)(Args ...) const>
: public function_traits<ReturnTypeT(ClassT &, Args ...)>
{};
/// Node is the single point of entry for creating publishers and subscribers.
class Node
{
@ -153,12 +117,23 @@ public:
Windows build breaks when static member function passed as default
argument to msg_mem_strat, nullptr is a workaround.
*/
template<typename MessageT>
template<typename MessageT, typename CallbackT>
typename rclcpp::subscription::Subscription<MessageT>::SharedPtr
create_subscription(
const std::string & topic_name,
const rmw_qos_profile_t & qos_profile,
std::function<void(const std::shared_ptr<MessageT> &)> callback,
CallbackT callback,
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
bool ignore_local_publications = false,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
msg_mem_strat = nullptr);
template<typename MessageT>
typename rclcpp::subscription::Subscription<MessageT>::SharedPtr
create_subscription_with_unique_ptr_callback(
const std::string & topic_name,
const rmw_qos_profile_t & qos_profile,
typename rclcpp::subscription::AnySubscriptionCallback<MessageT>::UniquePtrCallback callback,
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
bool ignore_local_publications = false,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
@ -182,11 +157,12 @@ public:
* \param[in] callback User-defined callback function.
* \param[in] group Callback group to execute this timer's callback in.
*/
rclcpp::timer::WallTimer::SharedPtr
create_wall_timer(
std::chrono::duration<long double, std::nano> period,
rclcpp::timer::CallbackType callback,
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr);
// TODO(wjwwood): reenable this once I figure out why the demo doesn't build with it.
// rclcpp::timer::WallTimer::SharedPtr
// create_wall_timer(
// std::chrono::duration<long double, std::nano> period,
// rclcpp::timer::CallbackType callback,
// rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr);
using CallbackGroup = rclcpp::callback_group::CallbackGroup;
using CallbackGroupWeakPtr = std::weak_ptr<CallbackGroup>;
@ -261,6 +237,16 @@ private:
publisher::Publisher::SharedPtr events_publisher_;
template<typename MessageT>
typename subscription::Subscription<MessageT>::SharedPtr
create_subscription_internal(
const std::string & topic_name,
const rmw_qos_profile_t & qos_profile,
rclcpp::subscription::AnySubscriptionCallback<MessageT> callback,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr msg_mem_strat);
template<
typename ServiceT,
typename FunctorT,

View file

@ -153,7 +153,7 @@ Node::create_publisher(
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
// *INDENT-OFF*
auto shared_publish_callback =
[weak_ipm](uint64_t publisher_id, std::shared_ptr<void> msg) -> uint64_t
[weak_ipm](uint64_t publisher_id, void * msg, const std::type_info & type_info) -> uint64_t
{
auto ipm = weak_ipm.lock();
if (!ipm) {
@ -161,8 +161,17 @@ Node::create_publisher(
throw std::runtime_error(
"intra process publish called after destruction of intra process manager");
}
auto typed_msg = std::static_pointer_cast<const MessageT>(msg);
std::unique_ptr<MessageT> unique_msg(new MessageT(*typed_msg));
if (!msg) {
throw std::runtime_error("cannot publisher msg which is a null pointer");
}
auto & message_type_info = typeid(MessageT);
if (message_type_info != type_info) {
throw std::runtime_error(
std::string("published type '") + type_info.name() +
"' is incompatible from the publisher type '" + message_type_info.name() + "'");
}
MessageT * typed_message_ptr = static_cast<MessageT *>(msg);
std::unique_ptr<MessageT> unique_msg(typed_message_ptr);
uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg);
return message_seq;
};
@ -188,12 +197,56 @@ Node::group_in_node(callback_group::CallbackGroup::SharedPtr & group)
return group_belongs_to_this_node;
}
template<typename MessageT>
typename subscription::Subscription<MessageT>::SharedPtr
template<typename MessageT, typename CallbackT>
typename rclcpp::subscription::Subscription<MessageT>::SharedPtr
Node::create_subscription(
const std::string & topic_name,
const rmw_qos_profile_t & qos_profile,
std::function<void(const std::shared_ptr<MessageT> &)> callback,
CallbackT callback,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
msg_mem_strat)
{
rclcpp::subscription::AnySubscriptionCallback<MessageT> any_subscription_callback;
any_subscription_callback.set(callback);
return this->create_subscription_internal(
topic_name,
qos_profile,
any_subscription_callback,
group,
ignore_local_publications,
msg_mem_strat);
}
template<typename MessageT>
typename rclcpp::subscription::Subscription<MessageT>::SharedPtr
Node::create_subscription_with_unique_ptr_callback(
const std::string & topic_name,
const rmw_qos_profile_t & qos_profile,
typename rclcpp::subscription::AnySubscriptionCallback<MessageT>::UniquePtrCallback callback,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
msg_mem_strat)
{
rclcpp::subscription::AnySubscriptionCallback<MessageT> any_subscription_callback;
any_subscription_callback.unique_ptr_callback = callback;
return this->create_subscription_internal(
topic_name,
qos_profile,
any_subscription_callback,
group,
ignore_local_publications,
msg_mem_strat);
}
template<typename MessageT>
typename subscription::Subscription<MessageT>::SharedPtr
Node::create_subscription_internal(
const std::string & topic_name,
const rmw_qos_profile_t & qos_profile,
rclcpp::subscription::AnySubscriptionCallback<MessageT> callback,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr msg_mem_strat)
@ -305,17 +358,18 @@ Node::create_wall_timer(
return timer;
}
rclcpp::timer::WallTimer::SharedPtr
Node::create_wall_timer(
std::chrono::duration<long double, std::nano> period,
rclcpp::timer::CallbackType callback,
rclcpp::callback_group::CallbackGroup::SharedPtr group)
{
return create_wall_timer(
std::chrono::duration_cast<std::chrono::nanoseconds>(period),
callback,
group);
}
// TODO(wjwwood): reenable this once I figure out why the demo doesn't build with it.
// rclcpp::timer::WallTimer::SharedPtr
// Node::create_wall_timer(
// std::chrono::duration<long double, std::nano> period,
// rclcpp::timer::CallbackType callback,
// rclcpp::callback_group::CallbackGroup::SharedPtr group)
// {
// return create_wall_timer(
// std::chrono::duration_cast<std::chrono::nanoseconds>(period),
// callback,
// group);
// }
template<typename ServiceT>
typename client::Client<ServiceT>::SharedPtr

View file

@ -229,7 +229,7 @@ public:
template<typename FunctorT>
typename rclcpp::subscription::Subscription<rcl_interfaces::msg::ParameterEvent>::SharedPtr
on_parameter_event(FunctorT & callback)
on_parameter_event(FunctorT callback)
{
return node_->create_subscription<rcl_interfaces::msg::ParameterEvent>(
"parameter_events", rmw_qos_profile_parameter_events, callback);

View file

@ -19,6 +19,7 @@
#include <iostream>
#include <memory>
#include <mutex>
#include <sstream>
#include <string>
@ -103,31 +104,73 @@ public:
*/
template<typename MessageT>
void
publish(std::shared_ptr<MessageT> & msg)
publish(std::unique_ptr<MessageT> & msg)
{
rmw_ret_t status;
status = rmw_publish(publisher_handle_, msg.get());
if (status != RMW_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to publish message: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
this->do_inter_process_publish<MessageT>(msg.get());
if (store_intra_process_message_) {
uint64_t message_seq = store_intra_process_message_(intra_process_publisher_id_, msg);
// Take the pointer from the unique_msg, release it and pass as a void *
// to the ipm. The ipm should then capture it again as a unique_ptr of
// the correct type.
// TODO(wjwwood):
// investigate how to transfer the custom deleter (if there is one)
// from the incoming unique_ptr through to the ipm's unique_ptr.
// See: http://stackoverflow.com/questions/11002641/dynamic-casting-for-unique-ptr
MessageT * msg_ptr = msg.get();
msg.release();
uint64_t message_seq;
{
std::lock_guard<std::mutex> lock(intra_process_publish_mutex_);
message_seq =
store_intra_process_message_(intra_process_publisher_id_, msg_ptr, typeid(MessageT));
}
rcl_interfaces::msg::IntraProcessMessage ipm;
ipm.publisher_id = intra_process_publisher_id_;
ipm.message_sequence = message_seq;
status = rmw_publish(intra_process_publisher_handle_, &ipm);
auto status = rmw_publish(intra_process_publisher_handle_, &ipm);
if (status != RMW_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to publish intra process message: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
} else {
// Always destroy the message, even if we don't consume it, for consistency.
msg.reset();
}
}
template<typename MessageT>
void
publish(const std::shared_ptr<MessageT> & msg)
{
// Avoid allocating when not using intra process.
if (!store_intra_process_message_) {
// In this case we're not using intra process.
return this->do_inter_process_publish(msg.get());
}
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
// TODO(wjwwood):
// The intra process manager should probably also be able to store
// shared_ptr's and do the "smart" thing based on other intra process
// subscriptions. For now call the other publish().
std::unique_ptr<MessageT> unique_msg(new MessageT(*msg.get()));
return this->publish(unique_msg);
}
template<typename MessageT>
void
publish(const MessageT & msg)
{
// Avoid allocating when not using intra process.
if (!store_intra_process_message_) {
// In this case we're not using intra process.
return this->do_inter_process_publish(msg.get());
}
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
std::unique_ptr<MessageT> unique_msg(new MessageT(msg));
return this->publish(unique_msg);
}
/// Get the topic that this publisher publishes on.
// \return The topic name.
const std::string &
@ -197,13 +240,27 @@ public:
return result;
}
typedef std::function<uint64_t(uint64_t, std::shared_ptr<void>)> StoreSharedMessageCallbackT;
typedef std::function<uint64_t(uint64_t, void *, const std::type_info &)> StoreMessageCallbackT;
protected:
template<typename MessageT>
void
do_inter_process_publish(MessageT * msg)
{
auto status = rmw_publish(publisher_handle_, msg);
if (status != RMW_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to publish message: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
}
void
setup_intra_process(
uint64_t intra_process_publisher_id,
StoreSharedMessageCallbackT callback,
StoreMessageCallbackT callback,
rmw_publisher_t * intra_process_publisher_handle)
{
intra_process_publisher_id_ = intra_process_publisher_id;
@ -230,11 +287,13 @@ private:
size_t queue_size_;
uint64_t intra_process_publisher_id_;
StoreSharedMessageCallbackT store_intra_process_message_;
StoreMessageCallbackT store_intra_process_message_;
rmw_gid_t rmw_gid_;
rmw_gid_t intra_process_rmw_gid_;
std::mutex intra_process_publish_mutex_;
};
} /* namespace publisher */

View file

@ -39,6 +39,18 @@ const std::chrono::nanoseconds operator"" _s(long double s)
std::chrono::duration<long double>(s));
}
const std::chrono::nanoseconds
operator"" _ms(unsigned long long ms)
{
return std::chrono::milliseconds(ms);
}
const std::chrono::nanoseconds
operator"" _ms(long double ms)
{
return std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::duration<long double, std::milli>(ms));
}
const std::chrono::nanoseconds
operator"" _ns(unsigned long long ns)
{

View file

@ -27,6 +27,7 @@
#include <rclcpp/macros.hpp>
#include <rclcpp/message_memory_strategy.hpp>
#include <rclcpp/any_subscription_callback.hpp>
namespace rclcpp
{
@ -106,19 +107,21 @@ public:
/// Borrow a new message.
// \return Shared pointer to the fresh message.
virtual std::shared_ptr<void> create_message() = 0;
/// Check if we need to handle the message, and execute the callback if we do.
/**
* \param[in] message Shared pointer to the message to handle.
* \param[in] sender_id Global identifier of the entity that sent this message.
* \param[in] message_info Metadata associated with this message.
*/
virtual void handle_message(std::shared_ptr<void> & message, const rmw_gid_t * sender_id) = 0;
virtual void handle_message(
std::shared_ptr<void> & message,
const rmw_message_info_t & message_info) = 0;
/// Return the message borrowed in create_message.
// \param[in] Shared pointer to the returned message.
virtual void return_message(std::shared_ptr<void> & message) = 0;
virtual void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm) = 0;
virtual void handle_intra_process_message(
rcl_interfaces::msg::IntraProcessMessage & ipm,
const rmw_message_info_t & message_info) = 0;
protected:
rmw_subscription_t * intra_process_subscription_handle_;
@ -135,6 +138,8 @@ private:
};
using namespace any_subscription_callback;
/// Subscription implementation, templated on the type of message this subscription receives.
template<typename MessageT>
class Subscription : public SubscriptionBase
@ -142,7 +147,6 @@ class Subscription : public SubscriptionBase
friend class rclcpp::node::Node;
public:
using CallbackType = std::function<void(const std::shared_ptr<MessageT> &)>;
RCLCPP_SMART_PTR_DEFINITIONS(Subscription);
/// Default constructor.
@ -160,11 +164,11 @@ public:
rmw_subscription_t * subscription_handle,
const std::string & topic_name,
bool ignore_local_publications,
CallbackType callback,
AnySubscriptionCallback<MessageT> callback,
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr memory_strategy =
message_memory_strategy::MessageMemoryStrategy<MessageT>::create_default())
: SubscriptionBase(node_handle, subscription_handle, topic_name, ignore_local_publications),
callback_(callback),
any_callback_(callback),
message_memory_strategy_(memory_strategy),
get_intra_process_message_callback_(nullptr),
matches_any_intra_process_publishers_(nullptr)
@ -189,17 +193,29 @@ public:
return message_memory_strategy_->borrow_message();
}
void handle_message(std::shared_ptr<void> & message, const rmw_gid_t * sender_id)
void handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info)
{
if (matches_any_intra_process_publishers_) {
if (matches_any_intra_process_publishers_(sender_id)) {
if (matches_any_intra_process_publishers_(&message_info.publisher_gid)) {
// In this case, the message will be delivered via intra process and
// we should ignore this copy of the message.
return;
}
}
auto typed_message = std::static_pointer_cast<MessageT>(message);
callback_(typed_message);
if (any_callback_.shared_ptr_callback) {
any_callback_.shared_ptr_callback(typed_message);
} else if (any_callback_.shared_ptr_with_info_callback) {
any_callback_.shared_ptr_with_info_callback(typed_message, message_info);
} else if (any_callback_.unique_ptr_callback) {
std::unique_ptr<MessageT> unique_msg(new MessageT(*typed_message));
any_callback_.unique_ptr_callback(unique_msg);
} else if (any_callback_.unique_ptr_with_info_callback) {
std::unique_ptr<MessageT> unique_msg(new MessageT(*typed_message));
any_callback_.unique_ptr_with_info_callback(unique_msg, message_info);
} else {
throw std::runtime_error("unexpected message without any callback set");
}
}
void return_message(std::shared_ptr<void> & message)
@ -208,7 +224,9 @@ public:
message_memory_strategy_->return_message(typed_message);
}
void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm)
void handle_intra_process_message(
rcl_interfaces::msg::IntraProcessMessage & ipm,
const rmw_message_info_t & message_info)
{
if (!get_intra_process_message_callback_) {
// throw std::runtime_error(
@ -229,8 +247,19 @@ public:
// TODO(wjwwood): should we notify someone of this? log error, log warning?
return;
}
typename MessageT::SharedPtr shared_msg = std::move(msg);
callback_(shared_msg);
if (any_callback_.shared_ptr_callback) {
typename MessageT::SharedPtr shared_msg = std::move(msg);
any_callback_.shared_ptr_callback(shared_msg);
} else if (any_callback_.shared_ptr_with_info_callback) {
typename MessageT::SharedPtr shared_msg = std::move(msg);
any_callback_.shared_ptr_with_info_callback(shared_msg, message_info);
} else if (any_callback_.unique_ptr_callback) {
any_callback_.unique_ptr_callback(msg);
} else if (any_callback_.unique_ptr_with_info_callback) {
any_callback_.unique_ptr_with_info_callback(msg, message_info);
} else {
throw std::runtime_error("unexpected message without any callback set");
}
}
private:
@ -254,7 +283,7 @@ private:
RCLCPP_DISABLE_COPY(Subscription);
CallbackType callback_;
AnySubscriptionCallback<MessageT> any_callback_;
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
message_memory_strategy_;

View file

@ -192,7 +192,8 @@ sleep_for(const std::chrono::nanoseconds & nanoseconds)
// TODO: determine if posix's nanosleep(2) is more efficient here
std::unique_lock<std::mutex> lock(::g_interrupt_mutex);
auto cvs = ::g_interrupt_condition_variable.wait_for(lock, nanoseconds);
return cvs == std::cv_status::no_timeout;
// Return true if the timeout elapsed successfully, otherwise false.
return cvs != std::cv_status::no_timeout;
}
} /* namespace utilities */

View file

@ -16,6 +16,7 @@
<test_depend>ament_cmake_gtest</test_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<test_depend>rmw</test_depend>
<export>
<build_type>ament_cmake</build_type>