-
Notifications
You must be signed in to change notification settings - Fork 69
/
Copy pathcondition_variable.cpp
140 lines (117 loc) · 4.4 KB
/
condition_variable.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#include <condition_variable>
#include <chrono>
#include <iostream>
#include <mutex>
#include <sstream>
#include <thread>
#include <vector>
/*
* ------------------------------------------------------------
* Setup and helpers. You don't need to change any code here.
* ------------------------------------------------------------
*/
using namespace std::chrono_literals; // We can write 1s
// Print contents of the stream to cout in a thread-safe manner.
// This class consumes stream inputs, buffers them, and writes them
// out when destructed respecting the cout_mutex.
class SafeCout {
std::stringstream stream;
inline static std::mutex cout_mutex; // We need this to synchronise printing
public:
~SafeCout() {
std::scoped_lock<std::mutex> coutLock{cout_mutex};
std::cout << stream.str();
}
template<typename T>
SafeCout & operator<<(T&& arg) {
stream << std::forward<T>(arg);
return *this;
}
};
// A mock data object
struct Data {
bool isReady() const {
return _isReady;
}
bool _isReady = false;
bool _isConsistent = false;
};
// The function that processes the data. You don't need to touch it.
// It will check whether the data are in a consistent state, and
// idle a bit to simulate longer data processing.
// Note: It should run in parallel.
bool process(unsigned int threadIdx, Data const & data) {
bool processingOK = true;
SafeCout{} << '[' << threadIdx << "] I'm starting to process the data now\n";
if (!data._isConsistent) {
processingOK = false;
SafeCout{} << '[' << threadIdx << "] ERROR data isn't fully ready! Race condition!\n";
}
// Burn some CPU cycles to simulate intensive data processing.
const auto startTime = std::chrono::high_resolution_clock::now();
unsigned dummyCounter = 0;
while (std::chrono::high_resolution_clock::now() - startTime < 5s) {
++dummyCounter;
}
return processingOK;
}
/*
* ------------------------------------------------------------
* Exercise code you need to work on.
* ------------------------------------------------------------
* This program tries to implement a producer/multi-consumer dependency between threads.
* One producer provides some data, and multiple consumers wait for the data to become ready.
*
* The threads communicate via std::condition_variable, but the implementation is incomplete.
*
* Tasks:
* 1. Run the program and understand why the consumer threads start running too early (on some
* platforms, at least ...).
* Run it multiple times if necessary.
* Why do they wake up although they should remain sleeping?
* 2. Fix the race condition by protecting the data production phase with a lock.
* 3. When you run the program now, the consumers should start running only after the data are ready.
* Why do they run one by one, though?
* Check the CPU consumption with a tool like top in a second shell.
* Ideally, we want 400% utilisation.
* 4. Fix the consumer waiting phase like in the lecture to make the consumers run in parallel.
* Check the CPU consumption again.
*/
int main() {
std::mutex mutex;
std::condition_variable cond;
Data data;
// DATA-PROCESSING THREADS
// Here we start the processing threads. They have to wait for the data to be ready,
// and then they should process it in parallel.
auto processData = [&](unsigned int threadIdx){
SafeCout{} << '[' << threadIdx << "] I'm starting to wait\n";
std::unique_lock<std::mutex> lock{mutex};
cond.wait(lock, [&](){ return data.isReady(); });
auto result = process(threadIdx, data);
SafeCout{} << '[' << threadIdx << "] Data processing completed " << (result ? "OK" : "with failure!") << '\n';
};
std::vector<std::thread> consumers;
for (unsigned int i=0; i < 4; ++i) {
consumers.emplace_back(processData, i);
}
// DATA-PRODUCER THREAD
// This thread produces the data. We simulate a complicated way of producing the data
// by making the thread wait for a few seconds during the data production.
std::thread producer([&](){
SafeCout{} << "[p] Starting to produce data\n";
data._isReady = true;
// Sleep a bit to simulate a complicated set up phase
std::this_thread::sleep_for(6s);
data._isConsistent = true;
SafeCout{} << "[p] Data ready now\n";
// Wake up all threads
cond.notify_all();
});
// Join all threads, so we don't terminate prematurely
producer.join();
for (auto & t : consumers) {
t.join();
}
return 0;
}