-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
ring_buffer
218 lines (178 loc) · 10.7 KB
/
ring_buffer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
/*
Copyright (C) 2018-2024 Geoffrey Daniels. https://gpdaniels.com/
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, version 3 of the License only.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#ifndef GTL_CONTAINER_RING_BUFFER_HPP
#define GTL_CONTAINER_RING_BUFFER_HPP
// Summary: Dynamically sized thread-safe multi-producer multi-consumer ring-buffer.
#ifndef NDEBUG
# if defined(_MSC_VER)
# define __builtin_trap() __debugbreak()
# endif
/// @brief A simple assert macro to break the program if the ring_buffer is misused.
# define GTL_RING_BUFFER_ASSERT(ASSERTION, MESSAGE) static_cast<void>((ASSERTION) || (__builtin_trap(), 0))
#else
/// @brief At release time the assert macro is implemented as a nop.
# define GTL_RING_BUFFER_ASSERT(ASSERTION, MESSAGE) static_cast<void>(0)
#endif
#if defined(_MSC_VER)
# pragma warning(push, 0)
#endif
#include <atomic>
#if defined(_MSC_VER)
# pragma warning(pop)
#endif
namespace gtl {
/// @brief The ring_buffer class implements a thread-safe multi-producer multi-consumer ring-buffer.
template <typename data_type>
class ring_buffer final {
public:
/// @brief Make the data type publically accessible.
using type = data_type;
private:
/// @brief Structure that holds a read and write index.
struct index_type final {
unsigned int read;
unsigned int write;
};
private:
/// @brief Reader holds the current write and pending read locations.
std::atomic<index_type> reader;
/// @brief Writer holds the current read and pending write locations.
std::atomic<index_type> writer;
/// @brief The size of the data array.
unsigned int data_size;
/// @brief The ring buffer data array.
type* data;
public:
/// @brief Defaulted destructor.
~ring_buffer() {
delete[] this->data;
}
/// @brief Constructor zeros the atomic reader and writer structures.
ring_buffer(unsigned int size)
: reader{}
, writer{}
, data_size(size)
, data(new type[size]) {
GTL_RING_BUFFER_ASSERT(data_size != 0, "Data size must be greater than 0.");
GTL_RING_BUFFER_ASSERT(data_size <= 0x80000000, "Data size must be less than 2^31.");
}
ring_buffer(const ring_buffer& other) = delete;
ring_buffer(ring_buffer&& other) = delete;
ring_buffer& operator=(const ring_buffer& other) = delete;
ring_buffer& operator=(ring_buffer&& other) = delete;
public:
/// @brief Get a boolean that represents if the ring buffer is empty, ignoring pending reads and writess.
/// @return true if the ring buffer is empty, false otherwise.
bool empty() const {
//Both the reader and writer are used here to get both the current read and current write locations.
const index_type current_reader = this->reader.load();
const index_type current_writer = this->writer.load();
// If the current read is equal to the pending write then the ring buffer is full.
return (current_writer.read == current_reader.write);
}
/// @brief Get a boolean that represents if the ring buffer is full, ignoring pending reads and writes.
/// @return true if the ring buffer is full, false otherwise.
bool full() const {
//Both the reader and writer are used here to get both the current read and current write locations.
const index_type current_reader = this->reader.load();
const index_type current_writer = this->writer.load();
// First calculate the current size, as the locations can be either size of one another use the ternary operator to compare them first.
const unsigned int current_size = (current_writer.read > current_reader.write) ? (current_writer.read - current_reader.write) : (current_reader.write - current_writer.read);
// If the data size is zero we are full, otherwise if the current size is zero we are empty, otherwise if the current size is a multiple of the buffer size we are full.
return ((this->data_size == 0) || ((current_size != 0) && ((current_size % this->data_size) == 0)));
}
public:
/// @brief Get the size of the ring buffer that is filled with elements, ignoring pending reads and writes.
/// @return The number of items pushed into the ring buffer that have not been popped out.
unsigned long long size() const {
//Both the reader and writer are used here to get both the current read and current write locations.
const index_type current_reader = this->writer.load();
const index_type current_writer = this->reader.load();
// First calculate the current size, as the locations can be either size of one another use the ternary operator to compare them first.
const unsigned int current_size = (current_writer.read > current_reader.write) ? (current_writer.read - current_reader.write) : (current_reader.write - current_writer.read);
// If the current size is zero then size is zero, otherwise if current size is a multiple of the data size we are full, otherwise the size is the current size remainder.
return (current_size == 0) ? 0 : (((current_size % this->data_size) == 0) ? this->data_size : current_size % this->data_size);
}
public:
/// @brief Attempt to push a value into the ring buffer.
/// @param value An input parameter to providing the value to push into the ring buffer.
/// @return true if value was successfully stored in the ring buffer, false otherwise.
bool try_push(const type& value) {
// Create a local copy of the writer.
index_type current_writer = this->writer.load();
// Check if the ring buffer is full.
const unsigned int current_size = (current_writer.read > current_writer.write) ? (current_writer.read - current_writer.write) : (current_writer.write - current_writer.read);
if ((current_size != 0) && ((current_size % this->data_size) == 0)) {
return false;
}
// Create a new writer location to hold the assigned/allocated/reserved pending write index.
index_type new_writer = { current_writer.read, (current_writer.write + 1) % (2 * this->data_size) };
// Attempt to assign/allocate/reserve an index for writing using atomic_compare_exchange_weak.
// Remember the writer holds the current read and pending write locations, so this tries to increment the pending write location.
// if (this->writer == current_writer) { this->writer = new_writer; } else { current_writer = this->writer; }
if (!std::atomic_compare_exchange_weak(&this->writer, ¤t_writer, new_writer)) {
return false;
}
// Write the value to the buffer at the pending write index.
this->data[current_writer.write % this->data_size] = value;
// Create a local copy of the reader.
index_type current_reader = this->reader.load();
// Update the reader to publish the pending write using atomic_compare_exchange_weak.
// Remember the reader holds the current write and the pending read locations, so this tries to set the current write location.
// if (this->reader == current_reader) { this->reader = { ... }; } else { current_reader = this->reader; }
do {
// Overwrite the readers current write location to ensure that pushes are finalised in order.
current_reader.write = current_writer.write;
}
while (!std::atomic_compare_exchange_weak(&this->reader, ¤t_reader, { current_reader.read, new_writer.write }));
// Success.
return true;
}
/// @brief Attempt to pop a value from the ring buffer.
/// @param[out] value An output parameter to store the value that is popped out of the ring buffer.
/// @return true if value was successfully recovered from the ring buffer, false otherwise.
bool try_pop(type& value) {
// Create a local copy of the reader.
index_type current_reader = this->reader.load();
// Check if the ring buffer is empty.
if (current_reader.read == current_reader.write) {
return false;
}
// Create a new reader location to hold the assigned/allocated/reserved pending read index.
index_type new_reader = { (current_reader.read + 1) % (2 * this->data_size), current_reader.write };
// Attempt to assign/allocate/reserve an index for reading using atomic_compare_exchange_weak.
// Remember the reader holds the current write and pending read locations, so this tries to increment the pending read location.
// if (this->reader == current_reader) { this->reader = new_reader; } else { current_reader = this->reader; }
if (!std::atomic_compare_exchange_weak(&this->reader, ¤t_reader, new_reader)) {
return false;
}
// Read the value from the buffer at the pending read index.
value = this->data[current_reader.read % this->data_size];
// Create a local copy of the writer.
index_type current_writer = this->writer.load();
// Update the writer to publish the pending read using atomic_compare_exchange_weak.
// Remember the writer holds the current read and the pending write locations, so this tries to set the current read location.
// if (this->writer == current_writer) { this->writer = { ... }; } else { current_writer = this->writer; }
do {
// Overwrite the writers current read location to ensure that pops are finalised in order.
current_writer.read = current_reader.read;
}
while (!std::atomic_compare_exchange_weak(&this->writer, ¤t_writer, { new_reader.read, current_writer.write }));
// Success.
return true;
}
};
}
#undef GTL_RING_BUFFER_ASSERT
#endif // GTL_CONTAINER_RING_BUFFER_HPP