-
Notifications
You must be signed in to change notification settings - Fork 30
/
ConcurrentQueue.h
158 lines (124 loc) · 2.78 KB
/
ConcurrentQueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
#include <boost/optional.hpp>
/*!
* Implements a queue which can be used concurrently.
*
* \tparam T Generic type parameter.
*/
template <typename T>
class ConcurrentQueue
{
public:
/*!
* Initializes a new instance of this class.
*/
ConcurrentQueue();
/*!
* Removes the top item from the queue and returns it. This function is non-blocking.
*
* \return The object formerly at the top of the queue.
*/
boost::optional<T> Pop();
/*!
* Removes the top item from the queue and returns it. This function is blocking, and if the
* queue is empty, it will wait until a new item is available.
*
* \return The object formerly at the top of the queue.
*/
T PopWait();
/*!
* Removes the top item from the queue and returns it. This function is blocking, and if the
* queue is empty, it will wait until a new item is available or the timeout period has expired.
*
* \param timeout The number of milliseconds to wait for a new item.
*
* \return The object formerly at the top of the queue.
*/
boost::optional<T> PopWait(int timeout);
/*!
* Pushes an object onto the queue.
*
* \param item The item to push.
*/
void Push(T& item);
/*!
* Frees up the resources allocated during the lifetime of this instance.
*/
~ConcurrentQueue();
private:
/*!
* The queue instance used internally.
*/
std::queue<T> lst;
/*!
* The mutex instance used to protect accesses to the internal queue.
*/
std::mutex mtx;
/*!
* The conditional variable used to signal state changes.
*/
std::condition_variable cvar;
};
template <typename T>
ConcurrentQueue<T>::ConcurrentQueue()
{
}
template <typename T>
boost::optional<T> ConcurrentQueue<T>::Pop()
{
using namespace std;
unique_lock<mutex> mlock(mtx);
if (lst.empty())
{
return boost::optional<T>();
}
auto item = lst.front();
lst.pop();
return boost::optional<T>(item);
}
template <typename T>
T ConcurrentQueue<T>::PopWait()
{
using namespace std;
unique_lock<mutex> mlock(mtx);
while (lst.empty())
{
cvar.wait(mlock);
}
auto item = lst.front();
lst.pop();
return item;
}
template <typename T>
boost::optional<T> ConcurrentQueue<T>::PopWait(int timeout)
{
using namespace std;
unique_lock<mutex> mlock(mtx);
auto due = chrono::system_clock::now() + chrono::milliseconds(timeout);
while (lst.empty())
{
if (cvar.wait_until(mlock, due) == cv_status::timeout)
{
return boost::optional<T>();
}
}
auto item = lst.front();
lst.pop();
return boost::optional<T>(item);
}
template <typename T>
void ConcurrentQueue<T>::Push(T& item)
{
using namespace std;
unique_lock<mutex> mlock(mtx);
lst.push(item);
mlock.unlock();
cvar.notify_one();
}
template <typename T>
ConcurrentQueue<T>::~ConcurrentQueue()
{
}