subscriber: added implementation using ros_middleware_interface

This commit is contained in:
William Woodall 2014-08-12 19:22:20 -07:00
parent c6c04749ca
commit 2b8e19c88d
3 changed files with 166 additions and 2 deletions

View file

@ -1,7 +1,11 @@
#ifndef __rclcpp__Node__h__ #ifndef __rclcpp__Node__h__
#define __rclcpp__Node__h__ #define __rclcpp__Node__h__
#include <memory>
#include <vector>
#include "Publisher.h" #include "Publisher.h"
#include "Subscriber.h"
#include "ros_middleware_interface/functions.h" #include "ros_middleware_interface/functions.h"
#include "ros_middleware_interface/handles.h" #include "ros_middleware_interface/handles.h"
@ -12,8 +16,14 @@
namespace rclcpp namespace rclcpp
{ {
namespace executor
{
class SingleThreadExecutor;
}
class Node class Node
{ {
friend class rclcpp::executor::SingleThreadExecutor;
public: public:
Node() Node()
{ {
@ -28,16 +38,28 @@ public:
return new Publisher<ROSMessage>(publisher_handle); return new Publisher<ROSMessage>(publisher_handle);
} }
template<typename ROSMessage>
Subscriber<ROSMessage>* create_subscriber(const char * topic_name)
{
const rosidl_generator_cpp::MessageTypeSupportHandle & type_support_handle = ::ros_middleware_interface::get_type_support_handle<ROSMessage>();
ros_middleware_interface::SubscriberHandle subscriber_handle = ::ros_middleware_interface::create_subscriber(node_handle_, type_support_handle, topic_name);
SubscriberInterface *sub = new Subscriber<ROSMessage>(subscriber_handle);
this->subscribers_.push_back(sub);
return static_cast<Subscriber<ROSMessage> *>(sub);
}
private: private:
ros_middleware_interface::NodeHandle node_handle_; ros_middleware_interface::NodeHandle node_handle_;
}; std::vector<SubscriberInterface *> subscribers_;
} };
rclcpp::Node* create_node() rclcpp::Node* create_node()
{ {
return new rclcpp::Node(); return new rclcpp::Node();
} }
} /* namespace rclcpp */
#endif // __rclcpp__Node__h__ #endif // __rclcpp__Node__h__

View file

@ -0,0 +1,44 @@
#ifndef __rclcpp__Subscriber__h__
#define __rclcpp__Subscriber__h__
#include <functional>
#include "ros_middleware_interface/functions.h"
#include "ros_middleware_interface/handles.h"
namespace rclcpp
{
class Node;
namespace executor
{
class SingleThreadExecutor;
}
class SubscriberInterface
{
friend class rclcpp::executor::SingleThreadExecutor;
public:
SubscriberInterface(const ros_middleware_interface::SubscriberHandle &subscriber_handle)
: subscriber_handle_(subscriber_handle)
{}
private:
ros_middleware_interface::SubscriberHandle subscriber_handle_;
};
template<typename ROSMessage>
class Subscriber : public SubscriberInterface
{
friend class rclcpp::Node;
public:
typedef std::function<void(ROSMessage *)> CallbackType;
Subscriber(const ros_middleware_interface::SubscriberHandle &subscriber_handle)
: SubscriberInterface(subscriber_handle)
{}
};
}
#endif // __rclcpp__Subscriber__h__

View file

@ -0,0 +1,98 @@
#ifndef __rclcpp__executor__SingleThreadExecutor__h__
#define __rclcpp__executor__SingleThreadExecutor__h__
#include <algorithm>
#include <cassert>
#include <vector>
#include "rclcpp/Node.h"
#include "ros_middleware_interface/functions.h"
#include "ros_middleware_interface/handles.h"
namespace rclcpp
{
namespace executor
{
class SingleThreadExecutor
{
public:
SingleThreadExecutor() {}
void register_node(rclcpp::Node *node)
{
this->nodes_.push_back(node);
}
void unregister_node(rclcpp::Node *node)
{
auto it = std::find(this->nodes_.begin(), this->nodes_.end(), node);
if (it != this->nodes_.end())
{
this->nodes_.erase(it);
}
}
void exec()
{
while (1)
{
size_t total_subscribers_size = 0;
for (auto node : this->nodes_)
{
total_subscribers_size += node->subscribers_.size();
}
ros_middleware_interface::SubscriberHandles subscriber_handles;
subscriber_handles.subscriber_count_ = total_subscribers_size;
subscriber_handles.subscribers_ = static_cast<void **>(malloc(sizeof(void *) * total_subscribers_size));
size_t handles_index = 0;
for (auto node : this->nodes_) {
for (auto subscriber : node->subscribers_)
{
assert(handles_index < total_subscribers_size);
subscriber_handles.subscribers_[handles_index] = subscriber->subscriber_handle_.data_;
handles_index += 1;
}
}
ros_middleware_interface::GuardConditionHandles guard_condition_handles;
guard_condition_handles.guard_condition_count_ = 0;
guard_condition_handles.guard_conditions_ = 0;
ros_middleware_interface::wait(subscriber_handles, guard_condition_handles);
for (size_t index = 0; index < total_subscribers_size; ++index)
{
void *handle = subscriber_handles.subscribers_[index];
if (!handle)
{
continue;
}
for (auto node : this->nodes_)
{
for (auto subscriber : node->subscribers_)
{
if (subscriber->subscriber_handle_.data_ == handle)
{
// Do callback
}
}
}
}
}
}
private:
SingleThreadExecutor(const SingleThreadExecutor&);
std::vector<rclcpp::Node *> nodes_;
};
} /* namespace executor */
} /* namespace rclcpp */
#endif // __rclcpp__executor__SingleThreadExecutor__h__