add a way to get logs out of the default executor
This commit is contained in:
parent
675c2affc4
commit
65b0cf2072
3 changed files with 160 additions and 26 deletions
|
@ -50,6 +50,18 @@ ament_target_dependencies(multithread_priority_executor
|
|||
simple_timer
|
||||
)
|
||||
|
||||
add_library(default_executor src/default_executor.cpp)
|
||||
target_include_directories(default_executor PUBLIC
|
||||
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
|
||||
$<INSTALL_INTERFACE:include>
|
||||
)
|
||||
ament_target_dependencies(default_executor
|
||||
rmw
|
||||
rclcpp
|
||||
rcl
|
||||
simple_timer
|
||||
)
|
||||
|
||||
add_executable(usage_example src/usage_example.cpp)
|
||||
target_include_directories(usage_example PUBLIC
|
||||
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
|
||||
|
@ -72,7 +84,7 @@ install(TARGETS usage_example
|
|||
DESTINATION lib/${PROJECT_NAME}
|
||||
)
|
||||
|
||||
install(TARGETS priority_executor multithread_priority_executor
|
||||
install(TARGETS priority_executor multithread_priority_executor default_executor
|
||||
ARCHIVE DESTINATION lib
|
||||
LIBRARY DESTINATION lib
|
||||
RUNTIME DESTINATION bin
|
||||
|
@ -92,4 +104,5 @@ endif()
|
|||
ament_export_include_directories(include)
|
||||
ament_export_libraries(priority_executor)
|
||||
ament_export_libraries(multithread_priority_executor)
|
||||
ament_export_libraries(default_executor)
|
||||
ament_package()
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include <vector>
|
||||
|
||||
#include "rclcpp/executor.hpp"
|
||||
#include "rclcpp/rclcpp.hpp"
|
||||
#include "rclcpp/macros.hpp"
|
||||
#include "rclcpp/memory_strategies.hpp"
|
||||
#include "rclcpp/node.hpp"
|
||||
|
@ -30,17 +31,42 @@
|
|||
#include "rclcpp/rate.hpp"
|
||||
#include "rclcpp/visibility_control.hpp"
|
||||
#include "priority_executor/priority_memory_strategy.hpp"
|
||||
#include "simple_timer/rt-sched.hpp"
|
||||
|
||||
class RTISTimed
|
||||
{
|
||||
public:
|
||||
node_time_logger logger_ = create_logger();
|
||||
};
|
||||
|
||||
class ROSDefaultMultithreadedExecutor : public rclcpp::executors::MultiThreadedExecutor, public RTISTimed
|
||||
{
|
||||
public:
|
||||
RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultMultithreadedExecutor)
|
||||
|
||||
/// Default constructor. See the default constructor for Executor.
|
||||
RCLCPP_PUBLIC explicit ROSDefaultMultithreadedExecutor(
|
||||
const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions(),
|
||||
size_t number_of_threads = 0,
|
||||
bool yield_before_execute = false,
|
||||
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)) : rclcpp::executors::MultiThreadedExecutor(options, number_of_threads, yield_before_execute, timeout){};
|
||||
|
||||
/// Default destructor.
|
||||
RCLCPP_PUBLIC
|
||||
virtual ~ROSDefaultMultithreadedExecutor();
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
bool get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
|
||||
};
|
||||
|
||||
/// Single-threaded executor implementation.
|
||||
/**
|
||||
* This is the default executor created by rclcpp::spin.
|
||||
*/
|
||||
class ROSDefaultExecutor : public rclcpp::Executor
|
||||
class ROSDefaultExecutor : public rclcpp::Executor, public RTISTimed
|
||||
{
|
||||
public:
|
||||
RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultExecutor)
|
||||
std::unordered_map<std::shared_ptr<const void>, PriorityExecutable> priority_map;
|
||||
node_time_logger logger;
|
||||
|
||||
/// Default constructor. See the default constructor for Executor.
|
||||
RCLCPP_PUBLIC
|
||||
|
@ -64,6 +90,10 @@ public:
|
|||
spin() override;
|
||||
bool get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
|
||||
|
||||
private:
|
||||
RCLCPP_DISABLE_COPY(ROSDefaultExecutor)
|
||||
};
|
||||
|
|
|
@ -12,25 +12,100 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "rcpputils/scope_exit.hpp"
|
||||
|
||||
#include "rclcpp/any_executable.hpp"
|
||||
#include "priority_executor/priority_memory_strategy.hpp"
|
||||
#include "priority_executor/default_executor.hpp"
|
||||
#include "priority_executor/primes_workload.hpp"
|
||||
#include "rcpputils/scope_exit.hpp"
|
||||
#include "rclcpp/any_executable.hpp"
|
||||
#include "simple_timer/rt-sched.hpp"
|
||||
|
||||
ROSDefaultExecutor::ROSDefaultExecutor(const rclcpp::ExecutorOptions &options)
|
||||
: rclcpp::Executor(options) {}
|
||||
: rclcpp::Executor(options)
|
||||
{
|
||||
logger_ = create_logger();
|
||||
}
|
||||
|
||||
ROSDefaultExecutor::~ROSDefaultExecutor() {}
|
||||
|
||||
ROSDefaultMultithreadedExecutor::~ROSDefaultMultithreadedExecutor() {}
|
||||
|
||||
|
||||
void ROSDefaultExecutor::wait_for_work(std::chrono::nanoseconds timeout)
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
|
||||
|
||||
// Collect the subscriptions and timers to be waited on
|
||||
memory_strategy_->clear_handles();
|
||||
bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_);
|
||||
|
||||
// Clean up any invalid nodes, if they were detected
|
||||
if (has_invalid_weak_nodes)
|
||||
{
|
||||
auto node_it = weak_nodes_.begin();
|
||||
auto gc_it = guard_conditions_.begin();
|
||||
while (node_it != weak_nodes_.end())
|
||||
{
|
||||
if (node_it->expired())
|
||||
{
|
||||
node_it = weak_nodes_.erase(node_it);
|
||||
memory_strategy_->remove_guard_condition(*gc_it);
|
||||
gc_it = guard_conditions_.erase(gc_it);
|
||||
}
|
||||
else
|
||||
{
|
||||
++node_it;
|
||||
++gc_it;
|
||||
}
|
||||
}
|
||||
}
|
||||
// clear wait set
|
||||
rcl_ret_t ret = rcl_wait_set_clear(&wait_set_);
|
||||
if (ret != RCL_RET_OK)
|
||||
{
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't clear wait set");
|
||||
}
|
||||
|
||||
// The size of waitables are accounted for in size of the other entities
|
||||
ret = rcl_wait_set_resize(
|
||||
&wait_set_, memory_strategy_->number_of_ready_subscriptions(),
|
||||
memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
|
||||
memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
|
||||
memory_strategy_->number_of_ready_events());
|
||||
if (RCL_RET_OK != ret)
|
||||
{
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't resize the wait set");
|
||||
}
|
||||
|
||||
if (!memory_strategy_->add_handles_to_wait_set(&wait_set_))
|
||||
{
|
||||
throw std::runtime_error("Couldn't fill wait set");
|
||||
}
|
||||
}
|
||||
rcl_ret_t status =
|
||||
rcl_wait(&wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());
|
||||
if (status == RCL_RET_WAIT_SET_EMPTY)
|
||||
{
|
||||
RCUTILS_LOG_WARN_NAMED(
|
||||
"rclcpp",
|
||||
"empty wait set received in rcl_wait(). This should never happen.");
|
||||
}
|
||||
else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT)
|
||||
{
|
||||
using rclcpp::exceptions::throw_from_rcl_error;
|
||||
throw_from_rcl_error(status, "rcl_wait() failed");
|
||||
}
|
||||
|
||||
// check the null handles in the wait set and remove them from the handles in memory strategy
|
||||
// for callback-based entities
|
||||
memory_strategy_->remove_null_handles(&wait_set_);
|
||||
}
|
||||
|
||||
bool ROSDefaultExecutor::get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout)
|
||||
{
|
||||
bool success = false;
|
||||
// Check to see if there are any subscriptions or timers needing service
|
||||
// TODO(wjwwood): improve run to run efficiency of this function
|
||||
// sched_yield();
|
||||
wait_for_work(std::chrono::milliseconds(1));
|
||||
wait_for_work(timeout);
|
||||
success = get_next_ready_executable(any_executable);
|
||||
return success;
|
||||
}
|
||||
|
@ -47,21 +122,37 @@ void ROSDefaultExecutor::spin()
|
|||
rclcpp::AnyExecutable any_executable;
|
||||
if (get_next_executable(any_executable))
|
||||
{
|
||||
if (any_executable.timer)
|
||||
{
|
||||
if (priority_map.find(any_executable.timer->get_timer_handle()) != priority_map.end())
|
||||
{
|
||||
timespec current_time;
|
||||
clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time);
|
||||
uint64_t millis = (current_time.tv_sec * (uint64_t)1000) + (current_time.tv_nsec / 1000000);
|
||||
PriorityExecutable next_exec = priority_map[any_executable.timer->get_timer_handle()];
|
||||
|
||||
auto timer = next_exec.timer_handle;
|
||||
// TODO: this is really a fire
|
||||
// log_entry(logger, "timer_" + std::to_string(next_exec.chain_id) + "_release_" + std::to_string(millis + (timer->time_until_trigger().count() / 1000000)));
|
||||
}
|
||||
}
|
||||
execute_any_executable(any_executable);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool ROSDefaultMultithreadedExecutor::get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout)
|
||||
{
|
||||
bool success = false;
|
||||
// Check to see if there are any subscriptions or timers needing service
|
||||
// TODO(wjwwood): improve run to run efficiency of this function
|
||||
|
||||
// try to get an executable
|
||||
success = get_next_ready_executable(any_executable);
|
||||
std::stringstream oss;
|
||||
oss << "{\"operation\":\"get_next_executable\", \"result\":\"" << success << "\"}";
|
||||
log_entry(logger_, oss.str());
|
||||
|
||||
// If there are none
|
||||
if (!success)
|
||||
{
|
||||
// Wait for subscriptions or timers to work on
|
||||
// queue refresh
|
||||
wait_for_work(timeout);
|
||||
if (!spinning.load())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
// Try again
|
||||
success = get_next_ready_executable(any_executable);
|
||||
oss.str("");
|
||||
oss << "{\"operation\":\"wait_for_work\", \"result\":\"" << success << "\"}";
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue