Adjust for new rmw_waitset_t API
This commit is contained in:
		
							parent
							
								
									6ec5e8e974
								
							
						
					
					
						commit
						4b0ad21b3d
					
				
					 9 changed files with 87 additions and 39 deletions
				
			
		| 
						 | 
					@ -44,6 +44,24 @@ namespace executor
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
enum FutureReturnCode {SUCCESS, INTERRUPTED, TIMEOUT};
 | 
					enum FutureReturnCode {SUCCESS, INTERRUPTED, TIMEOUT};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					///
 | 
				
			||||||
 | 
					/**
 | 
				
			||||||
 | 
					 * Options to be passed to the executor constructor.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					struct ExecutorArgs
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  memory_strategy::MemoryStrategy::SharedPtr memory_strategy;
 | 
				
			||||||
 | 
					  size_t max_conditions = 0;
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static inline ExecutorArgs create_default_executor_arguments()
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  ExecutorArgs args;
 | 
				
			||||||
 | 
					  args.memory_strategy = memory_strategies::create_default_strategy();
 | 
				
			||||||
 | 
					  args.max_conditions = 0;
 | 
				
			||||||
 | 
					  return args;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/// Coordinate the order and timing of available communication tasks.
 | 
					/// Coordinate the order and timing of available communication tasks.
 | 
				
			||||||
/**
 | 
					/**
 | 
				
			||||||
 * Executor provides spin functions (including spin_node_once and spin_some).
 | 
					 * Executor provides spin functions (including spin_node_once and spin_some).
 | 
				
			||||||
| 
						 | 
					@ -62,8 +80,7 @@ public:
 | 
				
			||||||
  /// Default constructor.
 | 
					  /// Default constructor.
 | 
				
			||||||
  // \param[in] ms The memory strategy to be used with this executor.
 | 
					  // \param[in] ms The memory strategy to be used with this executor.
 | 
				
			||||||
  RCLCPP_PUBLIC
 | 
					  RCLCPP_PUBLIC
 | 
				
			||||||
  explicit Executor(
 | 
					  explicit Executor(const ExecutorArgs & args = create_default_executor_arguments());
 | 
				
			||||||
    memory_strategy::MemoryStrategy::SharedPtr ms = memory_strategies::create_default_strategy());
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
  /// Default destructor.
 | 
					  /// Default destructor.
 | 
				
			||||||
  RCLCPP_PUBLIC
 | 
					  RCLCPP_PUBLIC
 | 
				
			||||||
| 
						 | 
					@ -262,9 +279,14 @@ protected:
 | 
				
			||||||
  /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
 | 
					  /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
 | 
				
			||||||
  std::atomic_bool spinning;
 | 
					  std::atomic_bool spinning;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  rmw_guard_conditions_t fixed_guard_conditions_;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  /// Guard condition for signaling the rmw layer to wake up for special events.
 | 
					  /// Guard condition for signaling the rmw layer to wake up for special events.
 | 
				
			||||||
  rmw_guard_condition_t * interrupt_guard_condition_;
 | 
					  rmw_guard_condition_t * interrupt_guard_condition_;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  /// Waitset for managing entities that the rmw layer waits on.
 | 
				
			||||||
 | 
					  rmw_waitset_t * waitset_;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  /// The memory strategy: an interface for handling user-defined memory allocation strategies.
 | 
					  /// The memory strategy: an interface for handling user-defined memory allocation strategies.
 | 
				
			||||||
  memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;
 | 
					  memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -38,7 +38,7 @@ public:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  RCLCPP_PUBLIC
 | 
					  RCLCPP_PUBLIC
 | 
				
			||||||
  MultiThreadedExecutor(
 | 
					  MultiThreadedExecutor(
 | 
				
			||||||
    memory_strategy::MemoryStrategy::SharedPtr ms = memory_strategies::create_default_strategy());
 | 
					    const executor::ExecutorArgs & args = rclcpp::executor::create_default_executor_arguments());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  RCLCPP_PUBLIC
 | 
					  RCLCPP_PUBLIC
 | 
				
			||||||
  virtual ~MultiThreadedExecutor();
 | 
					  virtual ~MultiThreadedExecutor();
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -47,7 +47,7 @@ public:
 | 
				
			||||||
  /// Default constructor. See the default constructor for Executor.
 | 
					  /// Default constructor. See the default constructor for Executor.
 | 
				
			||||||
  RCLCPP_PUBLIC
 | 
					  RCLCPP_PUBLIC
 | 
				
			||||||
  SingleThreadedExecutor(
 | 
					  SingleThreadedExecutor(
 | 
				
			||||||
    memory_strategy::MemoryStrategy::SharedPtr ms = memory_strategies::create_default_strategy());
 | 
					    const executor::ExecutorArgs & args = rclcpp::executor::create_default_executor_arguments());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  /// Default destrcutor.
 | 
					  /// Default destrcutor.
 | 
				
			||||||
  RCLCPP_PUBLIC
 | 
					  RCLCPP_PUBLIC
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -252,6 +252,8 @@ public:
 | 
				
			||||||
  const CallbackGroupWeakPtrList &
 | 
					  const CallbackGroupWeakPtrList &
 | 
				
			||||||
  get_callback_groups() const;
 | 
					  get_callback_groups() const;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  std::atomic_bool has_executor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
private:
 | 
					private:
 | 
				
			||||||
  RCLCPP_DISABLE_COPY(Node);
 | 
					  RCLCPP_DISABLE_COPY(Node);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -19,15 +19,57 @@
 | 
				
			||||||
 | 
					
 | 
				
			||||||
using rclcpp::executor::AnyExecutable;
 | 
					using rclcpp::executor::AnyExecutable;
 | 
				
			||||||
using rclcpp::executor::Executor;
 | 
					using rclcpp::executor::Executor;
 | 
				
			||||||
 | 
					using rclcpp::executor::ExecutorArgs;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
Executor::Executor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms)
 | 
					Executor::Executor(const ExecutorArgs & args)
 | 
				
			||||||
: spinning(false), interrupt_guard_condition_(rmw_create_guard_condition()),
 | 
					: spinning(false),
 | 
				
			||||||
  memory_strategy_(ms)
 | 
					  memory_strategy_(args.memory_strategy)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
 | 
					  interrupt_guard_condition_ = rmw_create_guard_condition();
 | 
				
			||||||
 | 
					  if (!interrupt_guard_condition_) {
 | 
				
			||||||
 | 
					    throw std::runtime_error("Failed to create interrupt guard condition in Executor constructor");
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  // The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond,
 | 
				
			||||||
 | 
					  // and one for the executor's guard cond (interrupt_guard_condition_)
 | 
				
			||||||
 | 
					  // These guard conditions are permanently attached to the waitset.
 | 
				
			||||||
 | 
					  const size_t number_of_guard_conds = 2;
 | 
				
			||||||
 | 
					  fixed_guard_conditions_.guard_condition_count = number_of_guard_conds;
 | 
				
			||||||
 | 
					  fixed_guard_conditions_.guard_conditions = static_cast<void **>(guard_cond_handles_.data());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  // Put the global ctrl-c guard condition in
 | 
				
			||||||
 | 
					  assert(fixed_guard_conditions_.guard_condition_count > 1);
 | 
				
			||||||
 | 
					  fixed_guard_conditions_.guard_conditions[0] = \
 | 
				
			||||||
 | 
					    rclcpp::utilities::get_global_sigint_guard_condition()->data;
 | 
				
			||||||
 | 
					  // Put the executor's guard condition in
 | 
				
			||||||
 | 
					  fixed_guard_conditions_.guard_conditions[1] = interrupt_guard_condition_->data;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  // The waitset adds the fixed guard conditions to the middleware waitset on initialization,
 | 
				
			||||||
 | 
					  // and removes the guard conditions in rmw_destroy_waitset.
 | 
				
			||||||
 | 
					  waitset_ = rmw_create_waitset(&fixed_guard_conditions_, args.max_conditions);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (!waitset_) {
 | 
				
			||||||
 | 
					    fprintf(stderr,
 | 
				
			||||||
 | 
					      "[rclcpp::error] failed to create waitset: %s\n", rmw_get_error_string_safe());
 | 
				
			||||||
 | 
					    rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_);
 | 
				
			||||||
 | 
					    if (status != RMW_RET_OK) {
 | 
				
			||||||
 | 
					      fprintf(stderr,
 | 
				
			||||||
 | 
					        "[rclcpp::error] failed to destroy guard condition: %s\n", rmw_get_error_string_safe());
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    throw std::runtime_error("Failed to create waitset in Executor constructor");
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
Executor::~Executor()
 | 
					Executor::~Executor()
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
 | 
					  // Try to deallocate the waitset.
 | 
				
			||||||
 | 
					  if (waitset_) {
 | 
				
			||||||
 | 
					    rmw_ret_t status = rmw_destroy_waitset(waitset_);
 | 
				
			||||||
 | 
					    if (status != RMW_RET_OK) {
 | 
				
			||||||
 | 
					      fprintf(stderr,
 | 
				
			||||||
 | 
					        "[rclcpp::error] failed to destroy waitset: %s\n", rmw_get_error_string_safe());
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
  // Try to deallocate the interrupt guard condition.
 | 
					  // Try to deallocate the interrupt guard condition.
 | 
				
			||||||
  if (interrupt_guard_condition_ != nullptr) {
 | 
					  if (interrupt_guard_condition_ != nullptr) {
 | 
				
			||||||
    rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_);
 | 
					    rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_);
 | 
				
			||||||
| 
						 | 
					@ -41,6 +83,10 @@ Executor::~Executor()
 | 
				
			||||||
void
 | 
					void
 | 
				
			||||||
Executor::add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify)
 | 
					Executor::add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
 | 
					  // If the node already has an executor
 | 
				
			||||||
 | 
					  if (node_ptr->has_executor.exchange(true)) {
 | 
				
			||||||
 | 
					    throw std::runtime_error("Node has already been added to an executor.");
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
  // Check to ensure node not already added
 | 
					  // Check to ensure node not already added
 | 
				
			||||||
  for (auto & weak_node : weak_nodes_) {
 | 
					  for (auto & weak_node : weak_nodes_) {
 | 
				
			||||||
    auto node = weak_node.lock();
 | 
					    auto node = weak_node.lock();
 | 
				
			||||||
| 
						 | 
					@ -76,6 +122,7 @@ Executor::remove_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify)
 | 
				
			||||||
      // *INDENT-ON*
 | 
					      // *INDENT-ON*
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
  );
 | 
					  );
 | 
				
			||||||
 | 
					  node_ptr->has_executor.store(false);
 | 
				
			||||||
  if (notify) {
 | 
					  if (notify) {
 | 
				
			||||||
    // If the node was matched and removed, interrupt waiting
 | 
					    // If the node was matched and removed, interrupt waiting
 | 
				
			||||||
    if (node_removed) {
 | 
					    if (node_removed) {
 | 
				
			||||||
| 
						 | 
					@ -318,25 +365,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
 | 
				
			||||||
  client_handles.client_count =
 | 
					  client_handles.client_count =
 | 
				
			||||||
    memory_strategy_->fill_client_handles(client_handles.clients);
 | 
					    memory_strategy_->fill_client_handles(client_handles.clients);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond,
 | 
					  // Don't pass guard conditions to rmw_wait; they are permanent fixtures of the waitset
 | 
				
			||||||
  // and one for the executor's guard cond (interrupt_guard_condition_)
 | 
					 | 
				
			||||||
  size_t number_of_guard_conds = 2;
 | 
					 | 
				
			||||||
  rmw_guard_conditions_t guard_condition_handles;
 | 
					 | 
				
			||||||
  guard_condition_handles.guard_condition_count = number_of_guard_conds;
 | 
					 | 
				
			||||||
  guard_condition_handles.guard_conditions = static_cast<void **>(guard_cond_handles_.data());
 | 
					 | 
				
			||||||
  if (guard_condition_handles.guard_conditions == NULL &&
 | 
					 | 
				
			||||||
    number_of_guard_conds > 0)
 | 
					 | 
				
			||||||
  {
 | 
					 | 
				
			||||||
    // TODO(wjwwood): Use a different error here? maybe std::bad_alloc?
 | 
					 | 
				
			||||||
    throw std::runtime_error("Could not malloc for guard condition pointers.");
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
  // Put the global ctrl-c guard condition in
 | 
					 | 
				
			||||||
  assert(guard_condition_handles.guard_condition_count > 1);
 | 
					 | 
				
			||||||
  guard_condition_handles.guard_conditions[0] = \
 | 
					 | 
				
			||||||
    rclcpp::utilities::get_global_sigint_guard_condition()->data;
 | 
					 | 
				
			||||||
  // Put the executor's guard condition in
 | 
					 | 
				
			||||||
  guard_condition_handles.guard_conditions[1] = \
 | 
					 | 
				
			||||||
    interrupt_guard_condition_->data;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
  rmw_time_t * wait_timeout = NULL;
 | 
					  rmw_time_t * wait_timeout = NULL;
 | 
				
			||||||
  rmw_time_t rmw_timeout;
 | 
					  rmw_time_t rmw_timeout;
 | 
				
			||||||
| 
						 | 
					@ -364,19 +393,14 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
 | 
				
			||||||
  // Now wait on the waitable subscriptions and timers
 | 
					  // Now wait on the waitable subscriptions and timers
 | 
				
			||||||
  rmw_ret_t status = rmw_wait(
 | 
					  rmw_ret_t status = rmw_wait(
 | 
				
			||||||
    &subscriber_handles,
 | 
					    &subscriber_handles,
 | 
				
			||||||
    &guard_condition_handles,
 | 
					    nullptr,
 | 
				
			||||||
    &service_handles,
 | 
					    &service_handles,
 | 
				
			||||||
    &client_handles,
 | 
					    &client_handles,
 | 
				
			||||||
 | 
					    waitset_,
 | 
				
			||||||
    wait_timeout);
 | 
					    wait_timeout);
 | 
				
			||||||
  if (status != RMW_RET_OK && status != RMW_RET_TIMEOUT) {
 | 
					  if (status != RMW_RET_OK && status != RMW_RET_TIMEOUT) {
 | 
				
			||||||
    throw std::runtime_error(rmw_get_error_string_safe());
 | 
					    throw std::runtime_error(rmw_get_error_string_safe());
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  // If ctrl-c guard condition, return directly
 | 
					 | 
				
			||||||
  if (guard_condition_handles.guard_conditions[0] != 0) {
 | 
					 | 
				
			||||||
    // Make sure to free or clean memory
 | 
					 | 
				
			||||||
    memory_strategy_->clear_handles();
 | 
					 | 
				
			||||||
    return;
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
  memory_strategy_->remove_null_handles();
 | 
					  memory_strategy_->remove_null_handles();
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -27,4 +27,5 @@ rclcpp::spin(node::Node::SharedPtr node_ptr)
 | 
				
			||||||
  rclcpp::executors::SingleThreadedExecutor exec;
 | 
					  rclcpp::executors::SingleThreadedExecutor exec;
 | 
				
			||||||
  exec.add_node(node_ptr);
 | 
					  exec.add_node(node_ptr);
 | 
				
			||||||
  exec.spin();
 | 
					  exec.spin();
 | 
				
			||||||
 | 
					  exec.remove_node(node_ptr);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -23,8 +23,8 @@
 | 
				
			||||||
 | 
					
 | 
				
			||||||
using rclcpp::executors::multi_threaded_executor::MultiThreadedExecutor;
 | 
					using rclcpp::executors::multi_threaded_executor::MultiThreadedExecutor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
MultiThreadedExecutor::MultiThreadedExecutor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms)
 | 
					MultiThreadedExecutor::MultiThreadedExecutor(const rclcpp::executor::ExecutorArgs & args)
 | 
				
			||||||
: executor::Executor(ms)
 | 
					: executor::Executor(args)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  number_of_threads_ = std::thread::hardware_concurrency();
 | 
					  number_of_threads_ = std::thread::hardware_concurrency();
 | 
				
			||||||
  if (number_of_threads_ == 0) {
 | 
					  if (number_of_threads_ == 0) {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -17,10 +17,8 @@
 | 
				
			||||||
 | 
					
 | 
				
			||||||
using rclcpp::executors::single_threaded_executor::SingleThreadedExecutor;
 | 
					using rclcpp::executors::single_threaded_executor::SingleThreadedExecutor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
SingleThreadedExecutor::SingleThreadedExecutor(
 | 
					SingleThreadedExecutor::SingleThreadedExecutor(const rclcpp::executor::ExecutorArgs & args)
 | 
				
			||||||
  rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms)
 | 
					: executor::Executor(args) {}
 | 
				
			||||||
: executor::Executor(ms) {}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
SingleThreadedExecutor::~SingleThreadedExecutor() {}
 | 
					SingleThreadedExecutor::~SingleThreadedExecutor() {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -38,6 +38,7 @@ Node::Node(
 | 
				
			||||||
  number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0),
 | 
					  number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0),
 | 
				
			||||||
  use_intra_process_comms_(use_intra_process_comms)
 | 
					  use_intra_process_comms_(use_intra_process_comms)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
 | 
					  has_executor.store(false);
 | 
				
			||||||
  size_t domain_id = 0;
 | 
					  size_t domain_id = 0;
 | 
				
			||||||
  char * ros_domain_id = nullptr;
 | 
					  char * ros_domain_id = nullptr;
 | 
				
			||||||
  const char * env_var = "ROS_DOMAIN_ID";
 | 
					  const char * env_var = "ROS_DOMAIN_ID";
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue