Cpplint and cleanup

This commit is contained in:
Jackie Kay 2015-10-27 19:08:21 -07:00
parent fc48cf5fa2
commit 24fb204192
19 changed files with 368 additions and 376 deletions

View file

@ -12,22 +12,22 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP_RCLCPP__ALLOCATOR_COMMON__HPP_ #ifndef RCLCPP__ALLOCATOR__ALLOCATOR_COMMON_HPP_
#define RCLCPP_RCLCPP__ALLOCATOR_COMMON__HPP_ #define RCLCPP__ALLOCATOR__ALLOCATOR_COMMON_HPP_
#include <memory> #include <memory>
#include <rclcpp/allocator/allocator_deleter.hpp> #include "rclcpp/allocator/allocator_deleter.hpp"
namespace rclcpp namespace rclcpp
{ {
namespace allocator namespace allocator
{ {
template<typename U, typename Alloc> template<typename T, typename Alloc>
using AllocRebind = typename std::allocator_traits<Alloc>::template rebind_traits<U>; using AllocRebind = typename std::allocator_traits<Alloc>::template rebind_traits<T>;
} } // namespace allocator
} } // namespace rclcpp
#endif #endif // RCLCPP__ALLOCATOR__ALLOCATOR_COMMON_HPP_

View file

@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP_RCLCPP_ALLOCATOR_DELETER_HPP_ #ifndef RCLCPP__ALLOCATOR__ALLOCATOR_DELETER_HPP_
#define RCLCPP_RCLCPP_ALLOCATOR_DELETER_HPP_ #define RCLCPP__ALLOCATOR__ALLOCATOR_DELETER_HPP_
#include <memory> #include <memory>
@ -26,33 +26,32 @@ namespace allocator
template<typename Allocator> template<typename Allocator>
class AllocatorDeleter class AllocatorDeleter
{ {
template<typename U> template<typename T>
using AllocRebind = typename std::allocator_traits<Allocator>::template rebind_alloc<U>; using AllocRebind = typename std::allocator_traits<Allocator>::template rebind_alloc<T>;
public: public:
AllocatorDeleter() AllocatorDeleter()
: allocator_(NULL) : allocator_(nullptr)
{ {
} }
AllocatorDeleter(Allocator * a) explicit AllocatorDeleter(Allocator * a)
: allocator_(a) : allocator_(a)
{ {
} }
template<typename U> template<typename T>
AllocatorDeleter(const AllocatorDeleter<U> & a) AllocatorDeleter(const AllocatorDeleter<T> & a)
{ {
allocator_ = a.get_allocator(); allocator_ = a.get_allocator();
} }
template<typename U> template<typename T>
void operator()(U * ptr) void operator()(T * ptr)
{ {
std::allocator_traits<AllocRebind<U>>::destroy(*allocator_, ptr); std::allocator_traits<AllocRebind<T>>::destroy(*allocator_, ptr);
// TODO: figure out if we're guaranteed to be destroying only 1 item here std::allocator_traits<AllocRebind<T>>::deallocate(*allocator_, ptr, 1);
std::allocator_traits<AllocRebind<U>>::deallocate(*allocator_, ptr, 1); ptr = nullptr;
ptr = NULL;
} }
Allocator * get_allocator() const Allocator * get_allocator() const
@ -100,7 +99,7 @@ using Deleter = typename std::conditional<
std::default_delete<T>, std::default_delete<T>,
AllocatorDeleter<Alloc> AllocatorDeleter<Alloc>
>::type; >::type;
} } // namespace allocator
} } // namespace rclcpp
#endif #endif // RCLCPP__ALLOCATOR__ALLOCATOR_DELETER_HPP_

View file

@ -12,17 +12,17 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_ #ifndef RCLCPP__ANY_SUBSCRIPTION_CALLBACK_HPP_
#define RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_ #define RCLCPP__ANY_SUBSCRIPTION_CALLBACK_HPP_
#include <rclcpp/allocator/allocator_common.hpp> #include <rmw/types.h>
#include <rclcpp/function_traits.hpp>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <type_traits> #include <type_traits>
#include <rmw/types.h> #include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/function_traits.hpp"
namespace rclcpp namespace rclcpp
{ {
@ -56,7 +56,7 @@ class AnySubscriptionCallback
UniquePtrWithInfoCallback unique_ptr_with_info_callback_; UniquePtrWithInfoCallback unique_ptr_with_info_callback_;
public: public:
AnySubscriptionCallback(std::shared_ptr<Alloc> allocator) explicit AnySubscriptionCallback(std::shared_ptr<Alloc> allocator)
: shared_ptr_callback_(nullptr), shared_ptr_with_info_callback_(nullptr), : shared_ptr_callback_(nullptr), shared_ptr_with_info_callback_(nullptr),
const_shared_ptr_callback_(nullptr), const_shared_ptr_with_info_callback_(nullptr), const_shared_ptr_callback_(nullptr), const_shared_ptr_with_info_callback_(nullptr),
unique_ptr_callback_(nullptr), unique_ptr_with_info_callback_(nullptr) unique_ptr_callback_(nullptr), unique_ptr_with_info_callback_(nullptr)
@ -206,7 +206,7 @@ private:
MessageDeleter message_deleter_; MessageDeleter message_deleter_;
}; };
} /* namespace any_subscription_callback */ } // namespace any_subscription_callback
} /* namespace rclcpp */ } // namespace rclcpp
#endif /* RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_ */ #endif // RCLCPP__ANY_SUBSCRIPTION_CALLBACK_HPP_

View file

@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP_RCLCPP_EXECUTOR_HPP_ #ifndef RCLCPP__EXECUTOR_HPP_
#define RCLCPP_RCLCPP_EXECUTOR_HPP_ #define RCLCPP__EXECUTOR_HPP_
#include <algorithm> #include <algorithm>
#include <cassert> #include <cassert>
@ -23,14 +23,14 @@
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <rcl_interfaces/msg/intra_process_message.hpp> #include "rcl_interfaces/msg/intra_process_message.hpp"
#include <rclcpp/any_executable.hpp> #include "rclcpp/any_executable.hpp"
#include <rclcpp/macros.hpp> #include "rclcpp/macros.hpp"
#include <rclcpp/memory_strategy.hpp> #include "rclcpp/memory_strategies.hpp"
#include <rclcpp/memory_strategies.hpp> #include "rclcpp/memory_strategy.hpp"
#include <rclcpp/node.hpp> #include "rclcpp/node.hpp"
#include <rclcpp/utilities.hpp> #include "rclcpp/utilities.hpp"
namespace rclcpp namespace rclcpp
{ {
@ -49,7 +49,6 @@ namespace executor
*/ */
class Executor class Executor
{ {
public: public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Executor); RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Executor);
@ -94,7 +93,7 @@ public:
for (auto & weak_node : weak_nodes_) { for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock(); auto node = weak_node.lock();
if (node == node_ptr) { if (node == node_ptr) {
// TODO: Use a different error here? // TODO(jacquelinekay): Use a different error here?
throw std::runtime_error("Cannot add node to executor, node already added."); throw std::runtime_error("Cannot add node to executor, node already added.");
} }
} }
@ -358,7 +357,6 @@ protected:
})); }));
} }
// Use the number of subscriptions to allocate memory in the handles // Use the number of subscriptions to allocate memory in the handles
rmw_subscriptions_t subscriber_handles; rmw_subscriptions_t subscriber_handles;
subscriber_handles.subscriber_count = subscriber_handles.subscriber_count =
@ -431,9 +429,8 @@ protected:
memory_strategy_->clear_handles(); memory_strategy_->clear_handles();
return; return;
} }
// Add the new work to the class's list of things waiting to be executed
// Starting with the subscribers memory_strategy_->revalidate_handles();
memory_strategy_->clear_active_entities();
} }
/******************************/ /******************************/
@ -565,7 +562,7 @@ protected:
return any_exec; return any_exec;
} }
// If there is no ready executable, return a null ptr // If there is no ready executable, return a null ptr
return std::shared_ptr<AnyExecutable>(); return nullptr;
} }
template<typename T = std::milli> template<typename T = std::milli>
@ -613,7 +610,7 @@ private:
std::array<void *, 2> guard_cond_handles_; std::array<void *, 2> guard_cond_handles_;
}; };
} /* executor */ } // namespace executor
} /* rclcpp */ } // namespace rclcpp
#endif /* RCLCPP_RCLCPP_EXECUTOR_HPP_ */ #endif // RCLCPP__EXECUTOR_HPP_

View file

@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP_RCLCPP_EXECUTORS_MULTI_THREADED_EXECUTOR_HPP_ #ifndef RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_
#define RCLCPP_RCLCPP_EXECUTORS_MULTI_THREADED_EXECUTOR_HPP_ #define RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_
#include <rmw/rmw.h>
#include <cassert> #include <cassert>
#include <cstdlib> #include <cstdlib>
@ -21,12 +23,10 @@
#include <mutex> #include <mutex>
#include <vector> #include <vector>
#include <rmw/rmw.h> #include "rclcpp/executor.hpp"
#include "rclcpp/macros.hpp"
#include <rclcpp/executor.hpp> #include "rclcpp/node.hpp"
#include <rclcpp/macros.hpp> #include "rclcpp/utilities.hpp"
#include <rclcpp/node.hpp>
#include <rclcpp/utilities.hpp>
namespace rclcpp namespace rclcpp
{ {
@ -97,11 +97,10 @@ private:
std::mutex wait_mutex_; std::mutex wait_mutex_;
size_t number_of_threads_; size_t number_of_threads_;
}; };
} /* namespace multi_threaded_executor */ } // namespace multi_threaded_executor
} /* namespace executors */ } // namespace executors
} /* namespace rclcpp */ } // namespace rclcpp
#endif /* RCLCPP_RCLCPP_EXECUTORS_MULTI_THREADED_EXECUTOR_HPP_ */ #endif // RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_

View file

@ -12,22 +12,22 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP_RCLCPP_EXECUTORS_SINGLE_THREADED_EXECUTOR_HPP_ #ifndef RCLCPP__EXECUTORS__SINGLE_THREADED_EXECUTOR_HPP_
#define RCLCPP_RCLCPP_EXECUTORS_SINGLE_THREADED_EXECUTOR_HPP_ #define RCLCPP__EXECUTORS__SINGLE_THREADED_EXECUTOR_HPP_
#include <rmw/rmw.h>
#include <cassert> #include <cassert>
#include <cstdlib> #include <cstdlib>
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <rmw/rmw.h> #include "rclcpp/executor.hpp"
#include "rclcpp/macros.hpp"
#include <rclcpp/executor.hpp> #include "rclcpp/memory_strategies.hpp"
#include <rclcpp/macros.hpp> #include "rclcpp/node.hpp"
#include <rclcpp/memory_strategies.hpp> #include "rclcpp/utilities.hpp"
#include <rclcpp/node.hpp> #include "rclcpp/rate.hpp"
#include <rclcpp/utilities.hpp>
#include <rclcpp/rate.hpp>
namespace rclcpp namespace rclcpp
{ {
@ -64,11 +64,10 @@ public:
private: private:
RCLCPP_DISABLE_COPY(SingleThreadedExecutor); RCLCPP_DISABLE_COPY(SingleThreadedExecutor);
}; };
} /* namespace single_threaded_executor */ } // namespace single_threaded_executor
} /* namespace executors */ } // namespace executors
} /* namespace rclcpp */ } // namespace rclcpp
#endif /* RCLCPP_RCLCPP_EXECUTORS_SINGLE_THREADED_EXECUTOR_HPP_ */ #endif // RCLCPP__EXECUTORS__SINGLE_THREADED_EXECUTOR_HPP_

View file

@ -12,24 +12,26 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_ #ifndef RCLCPP__INTRA_PROCESS_MANAGER_HPP_
#define RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_ #define RCLCPP__INTRA_PROCESS_MANAGER_HPP_
#include <rclcpp/intra_process_manager_state.hpp> #include <rmw/types.h>
#include <rclcpp/mapped_ring_buffer.hpp>
#include <rclcpp/macros.hpp>
#include <rclcpp/publisher.hpp>
#include <rclcpp/subscription.hpp>
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <cstdint> #include <cstdint>
#include <exception> #include <exception>
#include <map> #include <map>
#include <memory>
#include <unordered_map> #include <unordered_map>
#include <set> #include <set>
#include <rmw/types.h> #include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/intra_process_manager_state.hpp"
#include "rclcpp/mapped_ring_buffer.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/subscription.hpp"
namespace rclcpp namespace rclcpp
{ {
@ -122,8 +124,9 @@ private:
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager); RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager);
//IntraProcessManager() = default; explicit IntraProcessManager(
IntraProcessManager(IntraProcessManagerStateBase::SharedPtr state = create_default_state()) IntraProcessManagerStateBase::SharedPtr state = create_default_state()
)
: state_(state) : state_(state)
{ {
} }
@ -236,19 +239,18 @@ public:
* \param message the message that is being stored. * \param message the message that is being stored.
* \return the message sequence number. * \return the message sequence number.
*/ */
template<typename MessageT, typename Alloc = std::allocator<void>> template<typename MessageT, typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>>
uint64_t uint64_t
store_intra_process_message( store_intra_process_message(
uint64_t intra_process_publisher_id, uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT, std::unique_ptr<MessageT, Deleter> & message)
typename allocator::Deleter<typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>,
MessageT>> & message)
{ {
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>; using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc> TypedMRB; using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc>;
uint64_t message_seq = 0;
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer; mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = state_->get_publisher_info_for_id(
auto message_seq = state_->get_publisher_info_for_id(intra_process_publisher_id, buffer); intra_process_publisher_id, message_seq);
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer); typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
if (!typed_buffer) { if (!typed_buffer) {
throw std::runtime_error("Typecast failed due to incorrect message type"); throw std::runtime_error("Typecast failed due to incorrect message type");
@ -299,25 +301,25 @@ public:
* \param requesting_subscriptions_intra_process_id the subscription's id. * \param requesting_subscriptions_intra_process_id the subscription's id.
* \param message the message typed unique_ptr used to return the message. * \param message the message typed unique_ptr used to return the message.
*/ */
template<typename MessageT, typename Alloc = std::allocator<void>> template<typename MessageT, typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>>
void void
take_intra_process_message( take_intra_process_message(
uint64_t intra_process_publisher_id, uint64_t intra_process_publisher_id,
uint64_t message_sequence_number, uint64_t message_sequence_number,
uint64_t requesting_subscriptions_intra_process_id, uint64_t requesting_subscriptions_intra_process_id,
std::unique_ptr<MessageT, std::unique_ptr<MessageT, Deleter> & message)
typename allocator::Deleter<typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>,
MessageT>> & message)
{ {
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>; using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc> TypedMRB; using TypedMRB = mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc>;
message = nullptr;
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer; size_t target_subs_size = 0;
size_t target_subs_size = state_->take_intra_process_message( mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = state_->take_intra_process_message(
intra_process_publisher_id, intra_process_publisher_id,
message_sequence_number, message_sequence_number,
requesting_subscriptions_intra_process_id, requesting_subscriptions_intra_process_id,
buffer target_subs_size
); );
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer); typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
if (!typed_buffer) { if (!typed_buffer) {
@ -365,12 +367,11 @@ private:
static std::atomic<uint64_t> next_unique_id_; static std::atomic<uint64_t> next_unique_id_;
IntraProcessManagerStateBase::SharedPtr state_; IntraProcessManagerStateBase::SharedPtr state_;
}; };
std::atomic<uint64_t> IntraProcessManager::next_unique_id_ {1}; std::atomic<uint64_t> IntraProcessManager::next_unique_id_ {1};
} /* namespace intra_process_manager */ } // namespace intra_process_manager
} /* namespace rclcpp */ } // namespace rclcpp
#endif /* RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_ */ #endif // RCLCPP__INTRA_PROCESS_MANAGER_HPP_

View file

@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP__RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_ #ifndef RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_
#define RCLCPP__RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_ #define RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_
#include <rclcpp/mapped_ring_buffer.hpp> #include <rclcpp/mapped_ring_buffer.hpp>
#include <rclcpp/publisher.hpp> #include <rclcpp/publisher.hpp>
@ -21,11 +21,14 @@
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <functional>
#include <limits> #include <limits>
#include <memory> #include <memory>
#include <map> #include <map>
#include <unordered_map> #include <unordered_map>
#include <set> #include <set>
#include <string>
#include <utility>
namespace rclcpp namespace rclcpp
{ {
@ -35,8 +38,7 @@ namespace intra_process_manager
class IntraProcessManagerStateBase class IntraProcessManagerStateBase
{ {
public: public:
//RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManagerStateBase); RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(IntraProcessManagerStateBase);
using SharedPtr = std::shared_ptr<IntraProcessManagerStateBase>;
virtual void virtual void
add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) = 0; add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) = 0;
@ -52,19 +54,19 @@ public:
virtual void virtual void
remove_publisher(uint64_t intra_process_publisher_id) = 0; remove_publisher(uint64_t intra_process_publisher_id) = 0;
virtual uint64_t virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
get_publisher_info_for_id( get_publisher_info_for_id(
uint64_t intra_process_publisher_id, uint64_t intra_process_publisher_id,
mapped_ring_buffer::MappedRingBufferBase::SharedPtr & mrb) = 0; uint64_t & message_seq) = 0;
virtual void virtual void
store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) = 0; store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) = 0;
virtual size_t virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
take_intra_process_message(uint64_t intra_process_publisher_id, take_intra_process_message(uint64_t intra_process_publisher_id,
uint64_t message_sequence_number, uint64_t message_sequence_number,
uint64_t requesting_subscriptions_intra_process_id, uint64_t requesting_subscriptions_intra_process_id,
mapped_ring_buffer::MappedRingBufferBase::SharedPtr & mrb) = 0; size_t & size) = 0;
virtual bool virtual bool
matches_any_publishers(const rmw_gid_t * id) const = 0; matches_any_publishers(const rmw_gid_t * id) const = 0;
@ -95,7 +97,6 @@ public:
sub_pair.second.erase(intra_process_subscription_id); sub_pair.second.erase(intra_process_subscription_id);
} }
} }
} }
void add_publisher(uint64_t id, void add_publisher(uint64_t id,
@ -103,7 +104,6 @@ public:
mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
size_t size) size_t size)
{ {
publishers_[id].publisher = publisher; publishers_[id].publisher = publisher;
// As long as the size of the ring buffer is less than the max sequence number, we're safe. // As long as the size of the ring buffer is less than the max sequence number, we're safe.
if (size > std::numeric_limits<uint64_t>::max()) { if (size > std::numeric_limits<uint64_t>::max()) {
@ -121,23 +121,21 @@ public:
publishers_.erase(intra_process_publisher_id); publishers_.erase(intra_process_publisher_id);
} }
// TODO
// return message_seq and mrb // return message_seq and mrb
uint64_t mapped_ring_buffer::MappedRingBufferBase::SharedPtr
get_publisher_info_for_id( get_publisher_info_for_id(
uint64_t intra_process_publisher_id, uint64_t intra_process_publisher_id,
mapped_ring_buffer::MappedRingBufferBase::SharedPtr & mrb) uint64_t & message_seq)
{ {
auto it = publishers_.find(intra_process_publisher_id); auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) { if (it == publishers_.end()) {
throw std::runtime_error("store_intra_process_message called with invalid publisher id"); throw std::runtime_error("store_intra_process_message called with invalid publisher id");
} }
PublisherInfo & info = it->second; PublisherInfo & info = it->second;
mrb = info.buffer;
// Calculate the next message sequence number. // Calculate the next message sequence number.
uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed); message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed);
return message_seq; return info.buffer;
} }
void void
@ -166,17 +164,15 @@ public:
info.target_subscriptions_by_message_sequence[message_seq].end() info.target_subscriptions_by_message_sequence[message_seq].end()
) )
); );
} }
size_t mapped_ring_buffer::MappedRingBufferBase::SharedPtr
take_intra_process_message(uint64_t intra_process_publisher_id, take_intra_process_message(uint64_t intra_process_publisher_id,
uint64_t message_sequence_number, uint64_t message_sequence_number,
uint64_t requesting_subscriptions_intra_process_id, uint64_t requesting_subscriptions_intra_process_id,
mapped_ring_buffer::MappedRingBufferBase::SharedPtr & mrb size_t & size
) )
{ {
mrb = nullptr;
PublisherInfo * info; PublisherInfo * info;
{ {
auto it = publishers_.find(intra_process_publisher_id); auto it = publishers_.find(intra_process_publisher_id);
@ -206,8 +202,8 @@ public:
} }
target_subs->erase(it); target_subs->erase(it);
} }
mrb = info->buffer; size = target_subs->size();
return target_subs->size(); return info->buffer;
} }
bool bool
@ -229,14 +225,12 @@ private:
template<typename T> template<typename T>
using RebindAlloc = typename std::allocator_traits<Allocator>::template rebind_alloc<T>; using RebindAlloc = typename std::allocator_traits<Allocator>::template rebind_alloc<T>;
//using AllocString = std::basic_string<char, std::char_traits<char>, RebindAlloc<char>>;
using AllocString = std::string;
using AllocSet = std::set<uint64_t, std::less<uint64_t>, RebindAlloc<uint64_t>>; using AllocSet = std::set<uint64_t, std::less<uint64_t>, RebindAlloc<uint64_t>>;
using SubscriptionMap = std::unordered_map<uint64_t, subscription::SubscriptionBase::WeakPtr, using SubscriptionMap = std::unordered_map<uint64_t, subscription::SubscriptionBase::WeakPtr,
std::hash<uint64_t>, std::equal_to<uint64_t>, std::hash<uint64_t>, std::equal_to<uint64_t>,
RebindAlloc<std::pair<const uint64_t, subscription::SubscriptionBase::WeakPtr>>>; RebindAlloc<std::pair<const uint64_t, subscription::SubscriptionBase::WeakPtr>>>;
using IDTopicMap = std::map<AllocString, AllocSet, using IDTopicMap = std::map<std::string, AllocSet,
std::less<AllocString>, RebindAlloc<std::pair<AllocString, AllocSet>>>; std::less<std::string>, RebindAlloc<std::pair<std::string, AllocSet>>>;
SubscriptionMap subscriptions_; SubscriptionMap subscriptions_;
@ -263,7 +257,6 @@ private:
RebindAlloc<std::pair<const uint64_t, PublisherInfo>>>; RebindAlloc<std::pair<const uint64_t, PublisherInfo>>>;
PublisherMap publishers_; PublisherMap publishers_;
}; };
static IntraProcessManagerStateBase::SharedPtr create_default_state() static IntraProcessManagerStateBase::SharedPtr create_default_state()
@ -271,7 +264,7 @@ static IntraProcessManagerStateBase::SharedPtr create_default_state()
return std::make_shared<IntraProcessManagerState<>>(); return std::make_shared<IntraProcessManagerState<>>();
} }
} } // namespace intra_process_manager
} } // namespace rclcpp
#endif #endif // RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_

View file

@ -12,11 +12,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP_RCLCPP_RING_BUFFER_HPP_ #ifndef RCLCPP__MAPPED_RING_BUFFER_HPP_
#define RCLCPP_RCLCPP_RING_BUFFER_HPP_ #define RCLCPP__MAPPED_RING_BUFFER_HPP_
#include <rclcpp/allocator/allocator_common.hpp>
#include <rclcpp/macros.hpp>
#include <algorithm> #include <algorithm>
#include <cstddef> #include <cstddef>
@ -24,6 +21,9 @@
#include <memory> #include <memory>
#include <vector> #include <vector>
#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/macros.hpp"
namespace rclcpp namespace rclcpp
{ {
namespace mapped_ring_buffer namespace mapped_ring_buffer
@ -67,7 +67,7 @@ public:
* *
* \param size size of the ring buffer; must be positive and non-zero. * \param size size of the ring buffer; must be positive and non-zero.
*/ */
MappedRingBuffer(size_t size, std::shared_ptr<Alloc> allocator = nullptr) explicit MappedRingBuffer(size_t size, std::shared_ptr<Alloc> allocator = nullptr)
: elements_(size), head_(0) : elements_(size), head_(0)
{ {
if (size == 0) { if (size == 0) {
@ -224,10 +224,9 @@ private:
std::vector<element, VectorAlloc> elements_; std::vector<element, VectorAlloc> elements_;
size_t head_; size_t head_;
std::shared_ptr<ElemAlloc> allocator_; std::shared_ptr<ElemAlloc> allocator_;
}; };
} /* namespace ring_buffer */ } // namespace mapped_ring_buffer
} /* namespace rclcpp */ } // namespace rclcpp
#endif /* RCLCPP_RCLCPP_RING_BUFFER_HPP_ */ #endif // RCLCPP__MAPPED_RING_BUFFER_HPP_

View file

@ -12,11 +12,9 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP_RCLCPP_MEMORY_STRATEGIES_HPP_ #ifndef RCLCPP__MEMORY_STRATEGIES_HPP_
#define RCLCPP_RCLCPP_MEMORY_STRATEGIES_HPP_ #define RCLCPP__MEMORY_STRATEGIES_HPP_
//#include <rclcpp/strategies/heap_pool_memory_strategy.hpp>
//#include <rclcpp/strategies/stack_pool_memory_strategy.hpp>
#include <rclcpp/memory_strategy.hpp> #include <rclcpp/memory_strategy.hpp>
#include <rclcpp/strategies/allocator_memory_strategy.hpp> #include <rclcpp/strategies/allocator_memory_strategy.hpp>
@ -26,15 +24,13 @@ namespace memory_strategies
{ {
using rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy; using rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy;
//using rclcpp::memory_strategies::heap_pool_memory_strategy::HeapPoolMemoryStrategy;
//using rclcpp::memory_strategies::stack_pool_memory_strategy::StackPoolMemoryStrategy;
static memory_strategy::MemoryStrategy::SharedPtr create_default_strategy() static memory_strategy::MemoryStrategy::SharedPtr create_default_strategy()
{ {
return std::make_shared<memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>(); return std::make_shared<AllocatorMemoryStrategy<>>();
} }
} /* memory_strategies */ } // namespace memory_strategies
} /* rclcpp */ } // namespace rclcpp
#endif #endif // RCLCPP__MEMORY_STRATEGIES_HPP_

View file

@ -12,22 +12,21 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP_RCLCPP_MEMORY_STRATEGY_HPP_ #ifndef RCLCPP__MEMORY_STRATEGY_HPP_
#define RCLCPP_RCLCPP_MEMORY_STRATEGY_HPP_ #define RCLCPP__MEMORY_STRATEGY_HPP_
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <rclcpp/any_executable.hpp> #include "rclcpp/any_executable.hpp"
#include <rclcpp/macros.hpp> #include "rclcpp/macros.hpp"
#include <rclcpp/node.hpp> #include "rclcpp/node.hpp"
namespace rclcpp namespace rclcpp
{ {
namespace memory_strategy namespace memory_strategy
{ {
/// Delegate for handling memory allocations while the Executor is executing. /// Delegate for handling memory allocations while the Executor is executing.
/** /**
* By default, the memory strategy dynamically allocates memory for structures that come in from * By default, the memory strategy dynamically allocates memory for structures that come in from
@ -52,6 +51,7 @@ public:
virtual void clear_active_entities() = 0; virtual void clear_active_entities() = 0;
virtual void clear_handles() = 0; virtual void clear_handles() = 0;
virtual void revalidate_handles() = 0;
virtual bool collect_entities(const WeakNodeVector & weak_nodes) = 0; virtual bool collect_entities(const WeakNodeVector & weak_nodes) = 0;
/// Provide a newly initialized AnyExecutable object. /// Provide a newly initialized AnyExecutable object.
@ -103,8 +103,7 @@ public:
} }
static rclcpp::service::ServiceBase::SharedPtr static rclcpp::service::ServiceBase::SharedPtr
get_service_by_handle(void * service_handle, get_service_by_handle(void * service_handle, const WeakNodeVector & weak_nodes)
const WeakNodeVector & weak_nodes)
{ {
for (auto & weak_node : weak_nodes) { for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock(); auto node = weak_node.lock();
@ -123,12 +122,11 @@ public:
} }
} }
} }
return rclcpp::service::ServiceBase::SharedPtr(); return nullptr;
} }
static rclcpp::client::ClientBase::SharedPtr static rclcpp::client::ClientBase::SharedPtr
get_client_by_handle(void * client_handle, get_client_by_handle(void * client_handle, const WeakNodeVector & weak_nodes)
const WeakNodeVector & weak_nodes)
{ {
for (auto & weak_node : weak_nodes) { for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock(); auto node = weak_node.lock();
@ -147,7 +145,7 @@ public:
} }
} }
} }
return rclcpp::client::ClientBase::SharedPtr(); return nullptr;
} }
static rclcpp::node::Node::SharedPtr static rclcpp::node::Node::SharedPtr
@ -155,7 +153,7 @@ public:
const WeakNodeVector & weak_nodes) const WeakNodeVector & weak_nodes)
{ {
if (!group) { if (!group) {
return rclcpp::node::Node::SharedPtr(); return nullptr;
} }
for (auto & weak_node : weak_nodes) { for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock(); auto node = weak_node.lock();
@ -169,7 +167,7 @@ public:
} }
} }
} }
return rclcpp::node::Node::SharedPtr(); return nullptr;
} }
static rclcpp::callback_group::CallbackGroup::SharedPtr static rclcpp::callback_group::CallbackGroup::SharedPtr
@ -195,14 +193,63 @@ public:
} }
} }
} }
return rclcpp::callback_group::CallbackGroup::SharedPtr(); return nullptr;
}
static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_service(
rclcpp::service::ServiceBase::SharedPtr service,
const WeakNodeVector & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
if (!group) {
continue;
}
for (auto & serv : group->get_service_ptrs()) {
if (serv == service) {
return group;
}
}
}
}
return nullptr;
}
static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_client(rclcpp::client::ClientBase::SharedPtr client,
const WeakNodeVector & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
if (!group) {
continue;
}
for (auto & cli : group->get_client_ptrs()) {
if (cli == client) {
return group;
}
}
}
}
return nullptr;
} }
}; };
} /* memory_strategy */ } // namespace memory_strategy
} /* rclcpp */ } // namespace rclcpp
#endif #endif // RCLCPP__MEMORY_STRATEGY_HPP_

View file

@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP_RCLCPP_MSG_MEMORY_STRATEGY_HPP_ #ifndef RCLCPP__MESSAGE_MEMORY_STRATEGY_HPP_
#define RCLCPP_RCLCPP_MSG_MEMORY_STRATEGY_HPP_ #define RCLCPP__MESSAGE_MEMORY_STRATEGY_HPP_
#include <memory> #include <memory>
#include <rclcpp/allocator/allocator_common.hpp> #include "rclcpp/allocator/allocator_common.hpp"
#include <rclcpp/macros.hpp> #include "rclcpp/macros.hpp"
namespace rclcpp namespace rclcpp
{ {
@ -30,7 +30,6 @@ namespace message_memory_strategy
template<typename MessageT, typename Alloc = std::allocator<void>> template<typename MessageT, typename Alloc = std::allocator<void>>
class MessageMemoryStrategy class MessageMemoryStrategy
{ {
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(MessageMemoryStrategy); RCLCPP_SMART_PTR_DEFINITIONS(MessageMemoryStrategy);
@ -43,7 +42,7 @@ public:
message_allocator_ = std::make_shared<MessageAlloc>(); message_allocator_ = std::make_shared<MessageAlloc>();
} }
MessageMemoryStrategy(std::shared_ptr<Alloc> allocator) explicit MessageMemoryStrategy(std::shared_ptr<Alloc> allocator)
{ {
message_allocator_ = std::make_shared<MessageAlloc>(*allocator.get()); message_allocator_ = std::make_shared<MessageAlloc>(*allocator.get());
} }
@ -72,7 +71,7 @@ public:
MessageDeleter message_deleter_; MessageDeleter message_deleter_;
}; };
} /* message_memory_strategy */ } // namespace message_memory_strategy
} /* rclcpp */ } // namespace rclcpp
#endif /* RCLCPP_RCLCPP_MSG_MEMORY_STRATEGY_HPP_ */ #endif // RCLCPP__MESSAGE_MEMORY_STRATEGY_HPP_

View file

@ -12,48 +12,48 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP_RCLCPP_NODE_HPP_ #ifndef RCLCPP__NODE_HPP_
#define RCLCPP_RCLCPP_NODE_HPP_ #define RCLCPP__NODE_HPP_
#include <list> #include <list>
#include <map>
#include <memory> #include <memory>
#include <string> #include <string>
#include <tuple> #include <tuple>
#include <vector>
#include <rcl_interfaces/msg/list_parameters_result.hpp> #include "rcl_interfaces/msg/list_parameters_result.hpp"
#include <rcl_interfaces/msg/parameter_descriptor.hpp> #include "rcl_interfaces/msg/parameter_descriptor.hpp"
#include <rcl_interfaces/msg/parameter_event.hpp> #include "rcl_interfaces/msg/parameter_event.hpp"
#include <rcl_interfaces/msg/set_parameters_result.hpp> #include "rcl_interfaces/msg/set_parameters_result.hpp"
#include <rosidl_generator_cpp/message_type_support.hpp> #include "rosidl_generator_cpp/message_type_support.hpp"
#include <rclcpp/callback_group.hpp> #include "rclcpp/callback_group.hpp"
#include <rclcpp/client.hpp> #include "rclcpp/client.hpp"
#include <rclcpp/context.hpp> #include "rclcpp/context.hpp"
#include <rclcpp/macros.hpp> #include "rclcpp/macros.hpp"
#include <rclcpp/message_memory_strategy.hpp> #include "rclcpp/message_memory_strategy.hpp"
#include <rclcpp/parameter.hpp> #include "rclcpp/parameter.hpp"
#include <rclcpp/publisher.hpp> #include "rclcpp/publisher.hpp"
#include <rclcpp/service.hpp> #include "rclcpp/service.hpp"
#include <rclcpp/subscription.hpp> #include "rclcpp/subscription.hpp"
#include <rclcpp/timer.hpp> #include "rclcpp/timer.hpp"
// Forward declaration of ROS middleware class // Forward declaration of ROS middleware class
namespace rmw namespace rmw
{ {
struct rmw_node_t; struct rmw_node_t;
} // namespace rmw } // namespace rmw
namespace rclcpp namespace rclcpp
{ {
namespace node namespace node
{ {
/// Node is the single point of entry for creating publishers and subscribers. /// Node is the single point of entry for creating publishers and subscribers.
class Node class Node
{ {
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(Node); RCLCPP_SMART_PTR_DEFINITIONS(Node);
@ -63,7 +63,7 @@ public:
* \param[in] use_intra_process_comms True to use the optimized intra-process communication * \param[in] use_intra_process_comms True to use the optimized intra-process communication
* pipeline to pass messages between nodes in the same process using shared memory. * pipeline to pass messages between nodes in the same process using shared memory.
*/ */
Node(const std::string & node_name, bool use_intra_process_comms = false); explicit Node(const std::string & node_name, bool use_intra_process_comms = false);
/// Create a node based on the node name and a rclcpp::context::Context. /// Create a node based on the node name and a rclcpp::context::Context.
/** /**
@ -278,9 +278,9 @@ const rosidl_message_type_support_t * Node::ipm_ts_ =
make_shared())); \ make_shared())); \
} }
#ifndef RCLCPP_RCLCPP_NODE_IMPL_HPP_ #ifndef RCLCPP__NODE_IMPL_HPP_
// Template implementations // Template implementations
#include "node_impl.hpp" #include "node_impl.hpp"
#endif #endif
#endif /* RCLCPP_RCLCPP_NODE_HPP_ */ #endif // RCLCPP__NODE_HPP_

View file

@ -12,34 +12,38 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP_RCLCPP_NODE_IMPL_HPP_ #ifndef RCLCPP__NODE_IMPL_HPP_
#define RCLCPP_RCLCPP_NODE_IMPL_HPP_ #define RCLCPP__NODE_IMPL_HPP_
#include <rmw/error_handling.h>
#include <rmw/rmw.h>
#include <algorithm> #include <algorithm>
#include <cstdlib> #include <cstdlib>
#include <iostream> #include <iostream>
#include <limits> #include <limits>
#include <map>
#include <memory> #include <memory>
#include <sstream> #include <sstream>
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
#include <utility>
#include <vector>
#include <rcl_interfaces/msg/intra_process_message.hpp> #include "rcl_interfaces/msg/intra_process_message.hpp"
#include <rmw/error_handling.h> #include "rosidl_generator_cpp/message_type_support.hpp"
#include <rmw/rmw.h> #include "rosidl_generator_cpp/service_type_support.hpp"
#include <rosidl_generator_cpp/message_type_support.hpp>
#include <rosidl_generator_cpp/service_type_support.hpp>
#include <rclcpp/contexts/default_context.hpp> #include "rclcpp/contexts/default_context.hpp"
#include <rclcpp/intra_process_manager.hpp> #include "rclcpp/intra_process_manager.hpp"
#include <rclcpp/parameter.hpp> #include "rclcpp/parameter.hpp"
#ifndef RCLCPP_RCLCPP_NODE_HPP_ #ifndef RCLCPP__NODE_HPP_
#include "node.hpp" #include "node.hpp"
#endif #endif
using namespace rclcpp; using namespace rclcpp;
using namespace rclcpp::node; using namespace node;
Node::Node(const std::string & node_name, bool use_intra_process_comms) Node::Node(const std::string & node_name, bool use_intra_process_comms)
: Node( : Node(
@ -189,7 +193,8 @@ Node::create_publisher(
MessageT * typed_message_ptr = static_cast<MessageT *>(msg); MessageT * typed_message_ptr = static_cast<MessageT *>(msg);
using MessageDeleter = typename publisher::Publisher<MessageT, Alloc>::MessageDeleter; using MessageDeleter = typename publisher::Publisher<MessageT, Alloc>::MessageDeleter;
std::unique_ptr<MessageT, MessageDeleter> unique_msg(typed_message_ptr); std::unique_ptr<MessageT, MessageDeleter> unique_msg(typed_message_ptr);
uint64_t message_seq = ipm->store_intra_process_message<MessageT, Alloc>(publisher_id, unique_msg); uint64_t message_seq =
ipm->store_intra_process_message<MessageT, Alloc>(publisher_id, unique_msg);
return message_seq; return message_seq;
}; };
// *INDENT-ON* // *INDENT-ON*
@ -294,7 +299,8 @@ Node::create_subscription(
throw std::runtime_error( throw std::runtime_error(
"intra process take called after destruction of intra process manager"); "intra process take called after destruction of intra process manager");
} }
ipm->take_intra_process_message<MessageT, Alloc>(publisher_id, message_sequence, subscription_id, message); ipm->take_intra_process_message<MessageT, Alloc>(
publisher_id, message_sequence, subscription_id, message);
}, },
[weak_ipm](const rmw_gid_t * sender_gid) -> bool { [weak_ipm](const rmw_gid_t * sender_gid) -> bool {
auto ipm = weak_ipm.lock(); auto ipm = weak_ipm.lock();
@ -310,7 +316,7 @@ Node::create_subscription(
// Assign to a group. // Assign to a group.
if (group) { if (group) {
if (!group_in_node(group)) { if (!group_in_node(group)) {
// TODO: use custom exception // TODO(jacquelinekay): use custom exception
throw std::runtime_error("Cannot create subscription, group not in node."); throw std::runtime_error("Cannot create subscription, group not in node.");
} }
group->add_subscription(sub_base_ptr); group->add_subscription(sub_base_ptr);
@ -354,7 +360,7 @@ Node::create_wall_timer(
auto timer = rclcpp::timer::WallTimer::make_shared(period, callback); auto timer = rclcpp::timer::WallTimer::make_shared(period, callback);
if (group) { if (group) {
if (!group_in_node(group)) { if (!group_in_node(group)) {
// TODO: use custom exception // TODO(jacquelinekay): use custom exception
throw std::runtime_error("Cannot create timer, group not in node."); throw std::runtime_error("Cannot create timer, group not in node.");
} }
group->add_timer(timer); group->add_timer(timer);
@ -449,7 +455,7 @@ Node::create_service(
auto serv_base_ptr = std::dynamic_pointer_cast<service::ServiceBase>(serv); auto serv_base_ptr = std::dynamic_pointer_cast<service::ServiceBase>(serv);
if (group) { if (group) {
if (!group_in_node(group)) { if (!group_in_node(group)) {
// TODO: use custom exception // TODO(jacquelinekay): use custom exception
throw std::runtime_error("Cannot create service, group not in node."); throw std::runtime_error("Cannot create service, group not in node.");
} }
group->add_service(serv_base_ptr); group->add_service(serv_base_ptr);
@ -497,7 +503,7 @@ Node::set_parameters_atomically(
tmp_map.insert(parameters_.begin(), parameters_.end()); tmp_map.insert(parameters_.begin(), parameters_.end());
std::swap(tmp_map, parameters_); std::swap(tmp_map, parameters_);
// TODO: handle parameter constraints // TODO(jacquelinekay): handle parameter constraints
rcl_interfaces::msg::SetParametersResult result; rcl_interfaces::msg::SetParametersResult result;
result.successful = true; result.successful = true;
@ -667,4 +673,4 @@ Node::get_callback_groups() const
return callback_groups_; return callback_groups_;
} }
#endif /* RCLCPP_RCLCPP_NODE_IMPL_HPP_ */ #endif // RCLCPP__NODE_IMPL_HPP_

View file

@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP_RCLCPP_PUBLISHER_HPP_ #ifndef RCLCPP__PUBLISHER_HPP_
#define RCLCPP_RCLCPP_PUBLISHER_HPP_ #define RCLCPP__PUBLISHER_HPP_
#include <rclcpp/macros.hpp> #include <rmw/error_handling.h>
#include <rmw/rmw.h>
#include <iostream> #include <iostream>
#include <memory> #include <memory>
@ -23,12 +24,11 @@
#include <sstream> #include <sstream>
#include <string> #include <string>
#include <rcl_interfaces/msg/intra_process_message.hpp> #include "rcl_interfaces/msg/intra_process_message.hpp"
#include <rmw/impl/cpp/demangle.hpp> #include "rmw/impl/cpp/demangle.hpp"
#include <rmw/error_handling.h>
#include <rmw/rmw.h>
#include <rclcpp/allocator/allocator_deleter.hpp> #include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/macros.hpp"
namespace rclcpp namespace rclcpp
{ {
@ -37,7 +37,7 @@ namespace rclcpp
namespace node namespace node
{ {
class Node; class Node;
} // namespace node } // namespace node
namespace publisher namespace publisher
{ {
@ -346,10 +346,9 @@ protected:
std::shared_ptr<MessageAlloc> message_allocator_; std::shared_ptr<MessageAlloc> message_allocator_;
MessageDeleter message_deleter_; MessageDeleter message_deleter_;
}; };
} /* namespace publisher */ } // namespace publisher
} /* namespace rclcpp */ } // namespace rclcpp
#endif /* RCLCPP_RCLCPP_PUBLISHER_HPP_ */ #endif // RCLCPP__PUBLISHER_HPP_

View file

@ -12,15 +12,15 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP_RCLCPP_ALLOCATOR_MEMORY_STRATEGY_HPP_ #ifndef RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
#define RCLCPP_RCLCPP_ALLOCATOR_MEMORY_STRATEGY_HPP_ #define RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <rclcpp/allocator/allocator_common.hpp> #include "rclcpp/allocator/allocator_common.hpp"
#include <rclcpp/memory_strategy.hpp> #include "rclcpp/memory_strategy.hpp"
#include <rclcpp/node.hpp> #include "rclcpp/node.hpp"
namespace rclcpp namespace rclcpp
{ {
@ -40,7 +40,6 @@ namespace allocator_memory_strategy
template<typename Alloc = std::allocator<void>> template<typename Alloc = std::allocator<void>>
class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
{ {
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(AllocatorMemoryStrategy<Alloc>); RCLCPP_SMART_PTR_DEFINITIONS(AllocatorMemoryStrategy<Alloc>);
@ -52,7 +51,7 @@ public:
using WeakNodeVector = std::vector<std::weak_ptr<node::Node>>; using WeakNodeVector = std::vector<std::weak_ptr<node::Node>>;
AllocatorMemoryStrategy(std::shared_ptr<Alloc> allocator) explicit AllocatorMemoryStrategy(std::shared_ptr<Alloc> allocator)
{ {
executable_allocator_ = std::make_shared<ExecAlloc>(*allocator.get()); executable_allocator_ = std::make_shared<ExecAlloc>(*allocator.get());
allocator_ = std::make_shared<VoidAlloc>(*allocator.get()); allocator_ = std::make_shared<VoidAlloc>(*allocator.get());
@ -66,48 +65,34 @@ public:
size_t fill_subscriber_handles(void ** & ptr) size_t fill_subscriber_handles(void ** & ptr)
{ {
size_t max_size = subscriptions_.size();
subscriber_handles_.resize(max_size);
size_t count = 0;
for (auto & subscription : subscriptions_) { for (auto & subscription : subscriptions_) {
subscriber_handles_[count++] = subscription->get_subscription_handle()->data; subscriber_handles_.push_back(subscription->get_subscription_handle()->data);
if (subscription->get_intra_process_subscription_handle()) { if (subscription->get_intra_process_subscription_handle()) {
subscriber_handles_[count++] = subscriber_handles_.push_back(subscription->get_intra_process_subscription_handle()->data);
subscription->get_intra_process_subscription_handle()->data;
} }
} }
ptr = subscriber_handles_.data(); ptr = subscriber_handles_.data();
return count; return subscriber_handles_.size();
} }
// return the new number of services // return the new number of services
size_t fill_service_handles(void ** & ptr) size_t fill_service_handles(void ** & ptr)
{ {
size_t max_size = services_.size();
if (service_handles_.size() < max_size) {
service_handles_.resize(max_size);
}
size_t count = 0;
for (auto & service : services_) { for (auto & service : services_) {
service_handles_[count++] = service->get_service_handle()->data; service_handles_.push_back(service->get_service_handle()->data);
} }
ptr = service_handles_.data(); ptr = service_handles_.data();
return count; return service_handles_.size();
} }
// return the new number of clients // return the new number of clients
size_t fill_client_handles(void ** & ptr) size_t fill_client_handles(void ** & ptr)
{ {
size_t max_size = clients_.size();
if (client_handles_.size() < max_size) {
client_handles_.resize(max_size);
}
size_t count = 0;
for (auto & client : clients_) { for (auto & client : clients_) {
client_handles_[count++] = client->get_client_handle()->data; client_handles_.push_back(client->get_client_handle()->data);
} }
ptr = client_handles_.data(); ptr = client_handles_.data();
return count; return client_handles_.size();
} }
void clear_active_entities() void clear_active_entities()
@ -124,6 +109,37 @@ public:
client_handles_.clear(); client_handles_.clear();
} }
void revalidate_handles()
{
{
VectorRebind<void *> temp;
for (size_t i = 0; i < subscriptions_.size() * 2; i++) {
if (subscriber_handles_[i]) {
temp.push_back(subscriber_handles_[i]);
}
}
subscriber_handles_.swap(temp);
}
{
VectorRebind<void *> temp;
for (size_t i = 0; i < services_.size(); i++) {
if (service_handles_[i]) {
temp.push_back(service_handles_[i]);
}
}
service_handles_.swap(temp);
}
{
VectorRebind<void *> temp;
for (size_t i = 0; i < clients_.size(); i++) {
if (client_handles_[i]) {
temp.push_back(client_handles_[i]);
}
}
client_handles_.swap(temp);
}
}
bool collect_entities(const WeakNodeVector & weak_nodes) bool collect_entities(const WeakNodeVector & weak_nodes)
{ {
bool has_invalid_weak_nodes = false; bool has_invalid_weak_nodes = false;
@ -145,10 +161,14 @@ public:
} }
} }
for (auto & service : group->get_service_ptrs()) { for (auto & service : group->get_service_ptrs()) {
services_.push_back(service); if (service) {
services_.push_back(service);
}
} }
for (auto & client : group->get_client_ptrs()) { for (auto & client : group->get_client_ptrs()) {
clients_.push_back(client); if (client) {
clients_.push_back(client);
}
} }
} }
} }
@ -168,10 +188,8 @@ public:
const WeakNodeVector & weak_nodes) const WeakNodeVector & weak_nodes)
{ {
auto it = subscriber_handles_.begin(); auto it = subscriber_handles_.begin();
VectorRebind<typename VectorRebind<void *>::iterator> to_erase;
while (it != subscriber_handles_.end()) { while (it != subscriber_handles_.end()) {
auto subscription = memory_strategy::MemoryStrategy::get_subscription_by_handle(*it, auto subscription = get_subscription_by_handle(*it, weak_nodes);
weak_nodes);
if (subscription) { if (subscription) {
// Figure out if this is for intra-process or not. // Figure out if this is for intra-process or not.
bool is_intra_process = false; bool is_intra_process = false;
@ -183,8 +201,7 @@ public:
if (!group) { if (!group) {
// Group was not found, meaning the subscription is not valid... // Group was not found, meaning the subscription is not valid...
// Remove it from the ready list and continue looking // Remove it from the ready list and continue looking
//subscriber_handles_.erase(it++); subscriber_handles_.erase(it);
to_erase.push_back(it++);
continue; continue;
} }
if (!group->can_be_taken_from().load()) { if (!group->can_be_taken_from().load()) {
@ -201,44 +218,12 @@ public:
} }
any_exec->callback_group = group; any_exec->callback_group = group;
any_exec->node = get_node_by_group(group, weak_nodes); any_exec->node = get_node_by_group(group, weak_nodes);
to_erase.push_back(it++); subscriber_handles_.erase(it);
return; return;
} }
// Else, the subscription is no longer valid, remove it and continue // Else, the subscription is no longer valid, remove it and continue
// TODO subscriber_handles_.erase(it);
to_erase.push_back(it++);
} }
for (auto & entry : to_erase) {
if (entry != subscriber_handles_.end()) {
subscriber_handles_.erase(entry);
}
}
}
static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_service(
rclcpp::service::ServiceBase::SharedPtr service,
const WeakNodeVector & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
if (!group) {
continue;
}
for (auto & serv : group->get_service_ptrs()) {
if (serv == service) {
return group;
}
}
}
}
return rclcpp::callback_group::CallbackGroup::SharedPtr();
} }
virtual void virtual void
@ -247,14 +232,14 @@ public:
{ {
auto it = service_handles_.begin(); auto it = service_handles_.begin();
while (it != service_handles_.end()) { while (it != service_handles_.end()) {
auto service = memory_strategy::MemoryStrategy::get_service_by_handle(*it, weak_nodes); auto service = get_service_by_handle(*it, weak_nodes);
if (service) { if (service) {
// Find the group for this handle and see if it can be serviced // Find the group for this handle and see if it can be serviced
auto group = get_group_by_service(service, weak_nodes); auto group = get_group_by_service(service, weak_nodes);
if (!group) { if (!group) {
// Group was not found, meaning the service is not valid... // Group was not found, meaning the service is not valid...
// Remove it from the ready list and continue looking // Remove it from the ready list and continue looking
service_handles_.erase(it++); service_handles_.erase(it);
continue; continue;
} }
if (!group->can_be_taken_from().load()) { if (!group->can_be_taken_from().load()) {
@ -267,53 +252,27 @@ public:
any_exec->service = service; any_exec->service = service;
any_exec->callback_group = group; any_exec->callback_group = group;
any_exec->node = get_node_by_group(group, weak_nodes); any_exec->node = get_node_by_group(group, weak_nodes);
service_handles_.erase(it++); service_handles_.erase(it);
return; return;
} }
// Else, the service is no longer valid, remove it and continue // Else, the service is no longer valid, remove it and continue
service_handles_.erase(it++); service_handles_.erase(it);
} }
} }
static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_client(
rclcpp::client::ClientBase::SharedPtr client,
const WeakNodeVector & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
if (!group) {
continue;
}
for (auto & cli : group->get_client_ptrs()) {
if (cli == client) {
return group;
}
}
}
}
return rclcpp::callback_group::CallbackGroup::SharedPtr();
}
virtual void virtual void
get_next_client(executor::AnyExecutable::SharedPtr any_exec, get_next_client(executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes)
const WeakNodeVector & weak_nodes)
{ {
auto it = client_handles_.begin(); auto it = client_handles_.begin();
while (it != client_handles_.end()) { while (it != client_handles_.end()) {
auto client = memory_strategy::MemoryStrategy::get_client_by_handle(*it, weak_nodes); auto client = get_client_by_handle(*it, weak_nodes);
if (client) { if (client) {
// Find the group for this handle and see if it can be serviced // Find the group for this handle and see if it can be serviced
auto group = get_group_by_client(client, weak_nodes); auto group = get_group_by_client(client, weak_nodes);
if (!group) { if (!group) {
// Group was not found, meaning the service is not valid... // Group was not found, meaning the service is not valid...
// Remove it from the ready list and continue looking // Remove it from the ready list and continue looking
client_handles_.erase(it++); client_handles_.erase(it);
continue; continue;
} }
if (!group->can_be_taken_from().load()) { if (!group->can_be_taken_from().load()) {
@ -326,18 +285,18 @@ public:
any_exec->client = client; any_exec->client = client;
any_exec->callback_group = group; any_exec->callback_group = group;
any_exec->node = get_node_by_group(group, weak_nodes); any_exec->node = get_node_by_group(group, weak_nodes);
client_handles_.erase(it++); client_handles_.erase(it);
return; return;
} }
// Else, the service is no longer valid, remove it and continue // Else, the service is no longer valid, remove it and continue
client_handles_.erase(it++); client_handles_.erase(it);
} }
} }
private: private:
template<typename U> template<typename T>
using VectorRebind = using VectorRebind =
std::vector<U, typename std::allocator_traits<Alloc>::template rebind_alloc<U>>; std::vector<T, typename std::allocator_traits<Alloc>::template rebind_alloc<T>>;
VectorRebind<rclcpp::subscription::SubscriptionBase::SharedPtr> subscriptions_; VectorRebind<rclcpp::subscription::SubscriptionBase::SharedPtr> subscriptions_;
VectorRebind<rclcpp::service::ServiceBase::SharedPtr> services_; VectorRebind<rclcpp::service::ServiceBase::SharedPtr> services_;
@ -351,9 +310,9 @@ private:
std::shared_ptr<VoidAlloc> allocator_; std::shared_ptr<VoidAlloc> allocator_;
}; };
} /* allocator_memory_strategy */ } // namespace allocator_memory_strategy
} /* memory_strategies */ } // namespace memory_strategies
} /* rclcpp */ } // namespace rclcpp
#endif #endif // RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_

View file

@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP_RCLCPP_MSG_POOL_MEMORY_STRATEGY_HPP_ #ifndef RCLCPP__STRATEGIES__MESSAGE_POOL_MEMORY_STRATEGY_HPP_
#define RCLCPP_RCLCPP_MSG_POOL_MEMORY_STRATEGY_HPP_ #define RCLCPP__STRATEGIES__MESSAGE_POOL_MEMORY_STRATEGY_HPP_
#include <rclcpp/macros.hpp> #include <rclcpp/macros.hpp>
#include <rclcpp/message_memory_strategy.hpp> #include <rclcpp/message_memory_strategy.hpp>
@ -96,10 +96,9 @@ protected:
std::array<PoolMember, Size> pool_; std::array<PoolMember, Size> pool_;
size_t next_array_index_; size_t next_array_index_;
}; };
} /* message_pool_memory_strategy */ } // namespace message_pool_memory_strategy
} /* strategies */ } // namespace strategies
} /* rclcpp */ } // namespace rclcpp
#endif /* RCLCPP_RCLCPP_MSG_POOL_MEMORY_STRATEGY_HPP_ */ #endif // RCLCPP__STRATEGIES__MESSAGE_POOL_MEMORY_STRATEGY_HPP_

View file

@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP_RCLCPP_SUBSCRIPTION_HPP_ #ifndef RCLCPP__SUBSCRIPTION_HPP_
#define RCLCPP_RCLCPP_SUBSCRIPTION_HPP_ #define RCLCPP__SUBSCRIPTION_HPP_
#include <rmw/error_handling.h>
#include <rmw/rmw.h>
#include <functional> #include <functional>
#include <iostream> #include <iostream>
@ -21,13 +24,11 @@
#include <sstream> #include <sstream>
#include <string> #include <string>
#include <rcl_interfaces/msg/intra_process_message.hpp> #include "rcl_interfaces/msg/intra_process_message.hpp"
#include <rmw/error_handling.h>
#include <rmw/rmw.h>
#include <rclcpp/macros.hpp> #include "rclcpp/macros.hpp"
#include <rclcpp/message_memory_strategy.hpp> #include "rclcpp/message_memory_strategy.hpp"
#include <rclcpp/any_subscription_callback.hpp> #include "rclcpp/any_subscription_callback.hpp"
namespace rclcpp namespace rclcpp
{ {
@ -35,7 +36,7 @@ namespace rclcpp
namespace node namespace node
{ {
class Node; class Node;
} // namespace node } // namespace node
namespace subscription namespace subscription
{ {
@ -44,7 +45,6 @@ namespace subscription
/// specializations of Subscription, among other things. /// specializations of Subscription, among other things.
class SubscriptionBase class SubscriptionBase
{ {
public: public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(SubscriptionBase); RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(SubscriptionBase);
@ -138,7 +138,6 @@ private:
std::string topic_name_; std::string topic_name_;
bool ignore_local_publications_; bool ignore_local_publications_;
}; };
using namespace any_subscription_callback; using namespace any_subscription_callback;
@ -279,7 +278,7 @@ private:
uint64_t intra_process_subscription_id_; uint64_t intra_process_subscription_id_;
}; };
} /* namespace subscription */ } // namespace subscription
} /* namespace rclcpp */ } // namespace rclcpp
#endif /* RCLCPP_RCLCPP_SUBSCRIPTION_HPP_ */ #endif // RCLCPP__SUBSCRIPTION_HPP_

View file

@ -15,6 +15,7 @@
#include <memory> #include <memory>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <rclcpp/allocator/allocator_common.hpp> #include <rclcpp/allocator/allocator_common.hpp>
#include <rclcpp/macros.hpp> #include <rclcpp/macros.hpp>
#include <rmw/types.h> #include <rmw/types.h>
@ -115,8 +116,8 @@ public:
} }
// Prevent rclcpp/publisher.hpp and rclcpp/subscription.hpp from being imported. // Prevent rclcpp/publisher.hpp and rclcpp/subscription.hpp from being imported.
#define RCLCPP_RCLCPP_PUBLISHER_HPP_ #define RCLCPP__PUBLISHER_HPP_
#define RCLCPP_RCLCPP_SUBSCRIPTION_HPP_ #define RCLCPP__SUBSCRIPTION_HPP_
// Force ipm to use our mock publisher class. // Force ipm to use our mock publisher class.
#define Publisher mock::Publisher #define Publisher mock::Publisher
#define PublisherBase mock::PublisherBase #define PublisherBase mock::PublisherBase