From e7c58ff5b3264ba9815466e9655608afaeb86e44 Mon Sep 17 00:00:00 2001 From: Kurt Wilson Date: Mon, 22 May 2023 11:58:17 -0400 Subject: [PATCH] fix race condition with callback groups --- .../src/priority_memory_strategy.cpp | 48 +++++++++++++------ 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/src/priority_executor/src/priority_memory_strategy.cpp b/src/priority_executor/src/priority_memory_strategy.cpp index 2c84e0d..8264840 100644 --- a/src/priority_executor/src/priority_memory_strategy.cpp +++ b/src/priority_executor/src/priority_memory_strategy.cpp @@ -432,19 +432,7 @@ void PriorityMemoryStrategy<>::get_next_executable( // std::cout << "all_executables_.size():" << all_executables_.size() << std::endl; // log contents of all_executables_ // std::cout << exec->name << ": " << exec->handle << " : " << exec->sched_type << std::endl; - std::ostringstream oss; - oss << "{\"operation\":\"get_next_executable\""; - // output names and handles of all_executables_ in json array - oss << ",\"all_executables\":["; - for (auto exec : all_executables_) - { - // if (exec->can_be_run) - oss << "{\"name\":\"" << exec->name << "\", \"sched_type\":\"" << exec->sched_type << "\", \"can_be_run\":\"" << exec->can_be_run << "\"},"; - } - // remove trailing comma - oss.seekp(-1, oss.cur); - oss << "]}"; - log_entry(logger_, oss.str()); + std::set skip_groups; // while (!all_executables_.empty()) for (auto exec : all_executables_) @@ -471,11 +459,18 @@ void PriorityMemoryStrategy<>::get_next_executable( // it = subscription_handles_.erase(it); continue; } + if (skip_groups.find(group) != skip_groups.end()) + { + // group was used at some point during this loop. skip it so we can re-check it next time + continue; + } + if (!group->can_be_taken_from().load()) { // Group is mutually exclusive and is being used, so skip it for now // Leave it to be checked next time, but continue searching // ++it; + skip_groups.insert(group); continue; } any_exec.callback_group = group; @@ -555,11 +550,17 @@ void PriorityMemoryStrategy<>::get_next_executable( // it = subscription_handles_.erase(it); continue; } + if (skip_groups.find(group) != skip_groups.end()) + { + // group was used at some point during this loop. skip it so we can re-check it next time + continue; + } if (!group->can_be_taken_from().load()) { // Group is mutually exclusive and is being used, so skip it for now // Leave it to be checked next time, but continue searching // ++it; + skip_groups.insert(group); continue; } any_exec.callback_group = group; @@ -605,11 +606,28 @@ void PriorityMemoryStrategy<>::get_next_executable( { // std::cout << "running first in chain deadline" << std::endl; } + /* std::ostringstream oss; + oss << "{\"operation\":\"get_next_executable\""; + // output names and handles of all_executables_ in json array + oss << ",\"all_executables\":["; + for (auto exec : all_executables_) + { + // if (exec->can_be_run) + oss << "{\"name\":\"" << exec->name << "\", \"sched_type\":\"" << exec->sched_type << "\", \"can_be_run\":\"" << exec->can_be_run << "\"},"; + } + // remove trailing comma + oss.seekp(-1, oss.cur); + oss << "]}"; + log_entry(logger_, oss.str()); + oss.str(""); + oss.clear(); + oss << "{\"operation\": \"select_task\", \"task\": \"" << exec->name << "\"}"; + log_entry(logger_, oss.str()); */ + // returning with an executable // remove from all_executables_ map all_executables_.erase(next_exec); - std::ostringstream oss; - oss << "{\"operation\": \"select_task\", \"task\": \"" << exec->name << "\"}"; + return; } }