Merge pull request #171 from ros2/add_additional_mutex
Lock mapped_ring_buffer
This commit is contained in:
commit
3688bb3215
3 changed files with 10 additions and 2 deletions
|
@ -139,13 +139,14 @@ public:
|
||||||
uint64_t intra_process_publisher_id,
|
uint64_t intra_process_publisher_id,
|
||||||
uint64_t & message_seq)
|
uint64_t & message_seq)
|
||||||
{
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(runtime_mutex_);
|
||||||
auto it = publishers_.find(intra_process_publisher_id);
|
auto it = publishers_.find(intra_process_publisher_id);
|
||||||
if (it == publishers_.end()) {
|
if (it == publishers_.end()) {
|
||||||
throw std::runtime_error("store_intra_process_message called with invalid publisher id");
|
throw std::runtime_error("store_intra_process_message called with invalid publisher id");
|
||||||
}
|
}
|
||||||
PublisherInfo & info = it->second;
|
PublisherInfo & info = it->second;
|
||||||
// Calculate the next message sequence number.
|
// Calculate the next message sequence number.
|
||||||
message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed);
|
message_seq = info.sequence_number.fetch_add(1);
|
||||||
|
|
||||||
return info.buffer;
|
return info.buffer;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include <cstddef>
|
#include <cstddef>
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <mutex>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#include "rclcpp/allocator/allocator_common.hpp"
|
#include "rclcpp/allocator/allocator_common.hpp"
|
||||||
|
@ -97,6 +98,7 @@ public:
|
||||||
void
|
void
|
||||||
get_copy_at_key(uint64_t key, ElemUniquePtr & value)
|
get_copy_at_key(uint64_t key, ElemUniquePtr & value)
|
||||||
{
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(data_mutex_);
|
||||||
auto it = get_iterator_of_key(key);
|
auto it = get_iterator_of_key(key);
|
||||||
value = nullptr;
|
value = nullptr;
|
||||||
if (it != elements_.end() && it->in_use) {
|
if (it != elements_.end() && it->in_use) {
|
||||||
|
@ -128,6 +130,7 @@ public:
|
||||||
void
|
void
|
||||||
get_ownership_at_key(uint64_t key, ElemUniquePtr & value)
|
get_ownership_at_key(uint64_t key, ElemUniquePtr & value)
|
||||||
{
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(data_mutex_);
|
||||||
auto it = get_iterator_of_key(key);
|
auto it = get_iterator_of_key(key);
|
||||||
value = nullptr;
|
value = nullptr;
|
||||||
if (it != elements_.end() && it->in_use) {
|
if (it != elements_.end() && it->in_use) {
|
||||||
|
@ -155,6 +158,7 @@ public:
|
||||||
void
|
void
|
||||||
pop_at_key(uint64_t key, ElemUniquePtr & value)
|
pop_at_key(uint64_t key, ElemUniquePtr & value)
|
||||||
{
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(data_mutex_);
|
||||||
auto it = get_iterator_of_key(key);
|
auto it = get_iterator_of_key(key);
|
||||||
value = nullptr;
|
value = nullptr;
|
||||||
if (it != elements_.end() && it->in_use) {
|
if (it != elements_.end() && it->in_use) {
|
||||||
|
@ -177,6 +181,7 @@ public:
|
||||||
bool
|
bool
|
||||||
push_and_replace(uint64_t key, ElemUniquePtr & value)
|
push_and_replace(uint64_t key, ElemUniquePtr & value)
|
||||||
{
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(data_mutex_);
|
||||||
bool did_replace = elements_[head_].in_use;
|
bool did_replace = elements_[head_].in_use;
|
||||||
elements_[head_].key = key;
|
elements_[head_].key = key;
|
||||||
elements_[head_].value.swap(value);
|
elements_[head_].value.swap(value);
|
||||||
|
@ -196,6 +201,7 @@ public:
|
||||||
bool
|
bool
|
||||||
has_key(uint64_t key)
|
has_key(uint64_t key)
|
||||||
{
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(data_mutex_);
|
||||||
return elements_.end() != get_iterator_of_key(key);
|
return elements_.end() != get_iterator_of_key(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -225,6 +231,7 @@ private:
|
||||||
std::vector<element, VectorAlloc> elements_;
|
std::vector<element, VectorAlloc> elements_;
|
||||||
size_t head_;
|
size_t head_;
|
||||||
std::shared_ptr<ElemAlloc> allocator_;
|
std::shared_ptr<ElemAlloc> allocator_;
|
||||||
|
std::mutex data_mutex_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace mapped_ring_buffer
|
} // namespace mapped_ring_buffer
|
||||||
|
|
|
@ -18,9 +18,9 @@
|
||||||
#include <rmw/error_handling.h>
|
#include <rmw/error_handling.h>
|
||||||
#include <rmw/rmw.h>
|
#include <rmw/rmw.h>
|
||||||
|
|
||||||
|
#include <functional>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <mutex>
|
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue