Merge pull request #94 from ros2/rmw_gid_support

Subscription side filtering using publisher gid's
This commit is contained in:
William Woodall 2015-08-26 14:38:02 -07:00
commit 16afde88a1
6 changed files with 123 additions and 20 deletions

View file

@ -185,10 +185,12 @@ protected:
{
std::shared_ptr<void> message = subscription->create_message();
bool taken = false;
rmw_ret_t status = rmw_take(subscription->subscription_handle_, message.get(), &taken);
if (status == RMW_RET_OK) {
rmw_message_info_t message_info;
auto ret =
rmw_take_with_info(subscription->subscription_handle_, message.get(), &taken, &message_info);
if (ret == RMW_RET_OK) {
if (taken) {
subscription->handle_message(message);
subscription->handle_message(message, &message_info.publisher_gid);
}
} else {
fprintf(stderr,

View file

@ -29,6 +29,8 @@
#include <unordered_map>
#include <set>
#include <rmw/types.h>
namespace rclcpp
{
namespace intra_process_manager
@ -368,6 +370,22 @@ public:
}
}
/// Return true if the given rmw_gid_t matches any stored Publishers.
bool
matches_any_publishers(const rmw_gid_t * id) const
{
for (auto & publisher_pair : publishers_) {
auto publisher = publisher_pair.second.publisher.lock();
if (!publisher) {
continue;
}
if (*publisher.get() == id) {
return true;
}
}
return false;
}
private:
static uint64_t get_next_unique_id()
{

View file

@ -259,7 +259,16 @@ Node::create_subscription(
"intra process take called after destruction of intra process manager");
}
ipm->take_intra_process_message(publisher_id, message_sequence, subscription_id, message);
});
},
[weak_ipm](const rmw_gid_t * sender_gid) -> bool {
auto ipm = weak_ipm.lock();
if (!ipm) {
throw std::runtime_error(
"intra process publisher check called after destruction of intra process manager");
}
return ipm->matches_any_publishers(sender_gid);
}
);
// *INDENT-ON*
}
// Assign to a group.

View file

@ -55,7 +55,15 @@ public:
intra_process_publisher_handle_(nullptr),
topic_(topic), queue_size_(queue_size),
intra_process_publisher_id_(0), store_intra_process_message_(nullptr)
{}
{
// Life time of this object is tied to the publisher handle.
if (rmw_get_gid_for_publisher(publisher_handle_, &rmw_gid_) != RMW_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to get publisher gid: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
}
virtual ~Publisher()
{
@ -82,16 +90,12 @@ public:
publish(std::shared_ptr<MessageT> & msg)
{
rmw_ret_t status;
if (!store_intra_process_message_) {
// TODO(wjwwood): for now, make intra process and inter process mutually exclusive.
// Later we'll have them together, when we have a way to filter more efficiently.
status = rmw_publish(publisher_handle_, msg.get());
if (status != RMW_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to publish message: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
status = rmw_publish(publisher_handle_, msg.get());
if (status != RMW_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to publish message: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
if (store_intra_process_message_) {
uint64_t message_seq = store_intra_process_message_(intra_process_publisher_id_, msg);
@ -120,6 +124,43 @@ public:
return queue_size_;
}
const rmw_gid_t &
get_gid() const
{
return rmw_gid_;
}
const rmw_gid_t &
get_intra_process_gid() const
{
return intra_process_rmw_gid_;
}
bool
operator==(const rmw_gid_t & gid) const
{
return *this == &gid;
}
bool
operator==(const rmw_gid_t * gid) const
{
bool result = false;
auto ret = rmw_compare_gids_equal(gid, &this->get_gid(), &result);
if (ret != RMW_RET_OK) {
throw std::runtime_error(
std::string("failed to compare gids: ") + rmw_get_error_string_safe());
}
if (!result) {
ret = rmw_compare_gids_equal(gid, &this->get_intra_process_gid(), &result);
if (ret != RMW_RET_OK) {
throw std::runtime_error(
std::string("failed to compare gids: ") + rmw_get_error_string_safe());
}
}
return result;
}
typedef std::function<uint64_t(uint64_t, std::shared_ptr<void>)> StoreSharedMessageCallbackT;
protected:
@ -132,6 +173,15 @@ protected:
intra_process_publisher_id_ = intra_process_publisher_id;
store_intra_process_message_ = callback;
intra_process_publisher_handle_ = intra_process_publisher_handle;
// Life time of this object is tied to the publisher handle.
auto ret = rmw_get_gid_for_publisher(intra_process_publisher_handle_, &intra_process_rmw_gid_);
if (ret != RMW_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to create intra process publisher gid: ") +
rmw_get_error_string_safe());
// *INDENT-ON*
}
}
private:
@ -146,6 +196,9 @@ private:
uint64_t intra_process_publisher_id_;
StoreSharedMessageCallbackT store_intra_process_message_;
rmw_gid_t rmw_gid_;
rmw_gid_t intra_process_rmw_gid_;
};
} /* namespace publisher */

View file

@ -94,7 +94,7 @@ public:
}
virtual std::shared_ptr<void> create_message() = 0;
virtual void handle_message(std::shared_ptr<void> & message) = 0;
virtual void handle_message(std::shared_ptr<void> & message, const rmw_gid_t * sender_id) = 0;
virtual void return_message(std::shared_ptr<void> & message) = 0;
virtual void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm) = 0;
@ -132,7 +132,9 @@ public:
message_memory_strategy::MessageMemoryStrategy<MessageT>::create_default())
: SubscriptionBase(node_handle, subscription_handle, topic_name, ignore_local_publications),
callback_(callback),
message_memory_strategy_(memory_strategy)
message_memory_strategy_(memory_strategy),
get_intra_process_message_callback_(nullptr),
matches_any_intra_process_publishers_(nullptr)
{}
void set_message_memory_strategy(
@ -146,8 +148,15 @@ public:
return message_memory_strategy_->borrow_message();
}
void handle_message(std::shared_ptr<void> & message)
void handle_message(std::shared_ptr<void> & message, const rmw_gid_t * sender_id)
{
if (matches_any_intra_process_publishers_) {
if (matches_any_intra_process_publishers_(sender_id)) {
// In this case, the message will be delivered via intra process and
// we should ignore this copy of the message.
return;
}
}
auto typed_message = std::static_pointer_cast<MessageT>(message);
callback_(typed_message);
}
@ -188,15 +197,18 @@ private:
std::function<
void (uint64_t, uint64_t, uint64_t, std::unique_ptr<MessageT> &)
> GetMessageCallbackType;
typedef std::function<bool (const rmw_gid_t *)> MatchesAnyPublishersCallbackType;
void setup_intra_process(
uint64_t intra_process_subscription_id,
rmw_subscription_t * intra_process_subscription,
GetMessageCallbackType callback)
GetMessageCallbackType get_message_callback,
MatchesAnyPublishersCallbackType matches_any_publisher_callback)
{
intra_process_subscription_id_ = intra_process_subscription_id;
intra_process_subscription_handle_ = intra_process_subscription;
get_intra_process_message_callback_ = callback;
get_intra_process_message_callback_ = get_message_callback;
matches_any_intra_process_publishers_ = matches_any_publisher_callback;
}
RCLCPP_DISABLE_COPY(Subscription);
@ -206,6 +218,7 @@ private:
message_memory_strategy_;
GetMessageCallbackType get_intra_process_message_callback_;
MatchesAnyPublishersCallbackType matches_any_intra_process_publishers_;
uint64_t intra_process_subscription_id_;
};

View file

@ -17,6 +17,7 @@
#include <gtest/gtest.h>
#include <rclcpp/macros.hpp>
#include <rmw/types.h>
// Mock up publisher and subscription base to avoid needing an rmw impl.
namespace rclcpp
@ -43,6 +44,13 @@ public:
return mock_queue_size;
}
bool
operator==(const rmw_gid_t * gid) const
{
(void)gid;
return false;
}
std::string mock_topic_name;
size_t mock_queue_size;
};