protect subscriber_statistics_collectors_ with a mutex (#1084)

* subscriber_statistics_collectors_ should be protected with mutex.

Co-Authored-By: William Woodall <william+github@osrfoundation.org>
Signed-off-by: Tomoya.Fujita <Tomoya.Fujita@sony.com>
This commit is contained in:
tomoya 2020-04-23 15:55:04 +09:00 committed by GitHub
parent cdeed8903d
commit c3d599fc8c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -94,6 +94,8 @@ public:
/// Handle a message received by the subscription to collect statistics. /// Handle a message received by the subscription to collect statistics.
/** /**
* This method acquires a lock to prevent race conditions to collectors list.
*
* \param received_message the message received by the subscription * \param received_message the message received by the subscription
* \param now_nanoseconds current time in nanoseconds * \param now_nanoseconds current time in nanoseconds
*/ */
@ -101,6 +103,7 @@ public:
const CallbackMessageT & received_message, const CallbackMessageT & received_message,
const rclcpp::Time now_nanoseconds) const const rclcpp::Time now_nanoseconds) const
{ {
std::lock_guard<std::mutex> lock(mutex_);
for (const auto & collector : subscriber_statistics_collectors_) { for (const auto & collector : subscriber_statistics_collectors_) {
collector->OnMessageReceived(received_message, now_nanoseconds.nanoseconds()); collector->OnMessageReceived(received_message, now_nanoseconds.nanoseconds());
} }
@ -116,21 +119,32 @@ public:
} }
/// Publish a populated MetricsStatisticsMessage. /// Publish a populated MetricsStatisticsMessage.
/**
* This method acquires a lock to prevent race conditions to collectors list.
*/
virtual void publish_message() virtual void publish_message()
{ {
std::vector<MetricsMessage> msgs;
rclcpp::Time window_end{get_current_nanoseconds_since_epoch()}; rclcpp::Time window_end{get_current_nanoseconds_since_epoch()};
for (auto & collector : subscriber_statistics_collectors_) { {
const auto collected_stats = collector->GetStatisticsResults(); std::lock_guard<std::mutex> lock(mutex_);
for (auto & collector : subscriber_statistics_collectors_) {
const auto collected_stats = collector->GetStatisticsResults();
auto message = libstatistics_collector::collector::GenerateStatisticMessage( auto message = libstatistics_collector::collector::GenerateStatisticMessage(
node_name_, node_name_,
collector->GetMetricName(), collector->GetMetricName(),
collector->GetMetricUnit(), collector->GetMetricUnit(),
window_start_, window_start_,
window_end, window_end,
collected_stats); collected_stats);
publisher_->publish(message); msgs.push_back(message);
}
}
for (auto & msg : msgs) {
publisher_->publish(msg);
} }
window_start_ = window_end; window_start_ = window_end;
} }
@ -138,11 +152,14 @@ public:
protected: protected:
/// Return a vector of all the currently collected data. /// Return a vector of all the currently collected data.
/** /**
* This method acquires a lock to prevent race conditions to collectors list.
*
* \return a vector of all the collected data * \return a vector of all the collected data
*/ */
std::vector<StatisticData> get_current_collector_data() const std::vector<StatisticData> get_current_collector_data() const
{ {
std::vector<StatisticData> data; std::vector<StatisticData> data;
std::lock_guard<std::mutex> lock(mutex_);
for (const auto & collector : subscriber_statistics_collectors_) { for (const auto & collector : subscriber_statistics_collectors_) {
data.push_back(collector->GetStatisticsResults()); data.push_back(collector->GetStatisticsResults());
} }
@ -151,23 +168,35 @@ protected:
private: private:
/// Construct and start all collectors and set window_start_. /// Construct and start all collectors and set window_start_.
/**
* This method acquires a lock to prevent race conditions to collectors list.
*/
void bring_up() void bring_up()
{ {
auto received_message_period = std::make_unique<ReceivedMessagePeriod>(); auto received_message_period = std::make_unique<ReceivedMessagePeriod>();
received_message_period->Start(); received_message_period->Start();
subscriber_statistics_collectors_.emplace_back(std::move(received_message_period)); {
std::lock_guard<std::mutex> lock(mutex_);
subscriber_statistics_collectors_.emplace_back(std::move(received_message_period));
}
window_start_ = rclcpp::Time(get_current_nanoseconds_since_epoch()); window_start_ = rclcpp::Time(get_current_nanoseconds_since_epoch());
} }
/// Stop all collectors, clear measurements, stop publishing timer, and reset publisher. /// Stop all collectors, clear measurements, stop publishing timer, and reset publisher.
/**
* This method acquires a lock to prevent race conditions to collectors list.
*/
void tear_down() void tear_down()
{ {
for (auto & collector : subscriber_statistics_collectors_) { {
collector->Stop(); std::lock_guard<std::mutex> lock(mutex_);
} for (auto & collector : subscriber_statistics_collectors_) {
collector->Stop();
}
subscriber_statistics_collectors_.clear(); subscriber_statistics_collectors_.clear();
}
if (publisher_timer_) { if (publisher_timer_) {
publisher_timer_->cancel(); publisher_timer_->cancel();
@ -187,6 +216,8 @@ private:
return std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch()).count(); return std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch()).count();
} }
/// Mutex to protect the subsequence vectors
mutable std::mutex mutex_;
/// Collection of statistics collectors /// Collection of statistics collectors
std::vector<std::unique_ptr<TopicStatsCollector>> subscriber_statistics_collectors_{}; std::vector<std::unique_ptr<TopicStatsCollector>> subscriber_statistics_collectors_{};
/// Node name used to generate topic statistics messages to be published /// Node name used to generate topic statistics messages to be published