forked from google/distbench
-
Notifications
You must be signed in to change notification settings - Fork 0
/
distbench_engine.h
242 lines (196 loc) · 7.05 KB
/
distbench_engine.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef DISTBENCH_DISTBENCH_ENGINE_H_
#define DISTBENCH_DISTBENCH_ENGINE_H_
#include <unordered_set>
#include "distbench.grpc.pb.h"
#include "distbench_utils.h"
#include "protocol_driver.h"
#include "absl/random/random.h"
namespace distbench {
class SimpleClock {
public:
virtual ~SimpleClock() = default;
virtual absl::Time Now() = 0;
virtual bool MutexLockWhenWithDeadline(
absl::Mutex* mu, const absl::Condition& condition, absl::Time deadline)
ABSL_EXCLUSIVE_LOCK_FUNCTION(mu) = 0;
};
class RealClock : public SimpleClock {
public:
~RealClock() override {}
absl::Time Now() override { return absl::Now(); }
bool MutexLockWhenWithDeadline(
absl::Mutex* mu, const absl::Condition& condition, absl::Time deadline)
ABSL_EXCLUSIVE_LOCK_FUNCTION(mu) override {
return mu->LockWhenWithDeadline(condition, deadline);
}
};
class DistBenchEngine : public ConnectionSetup::Service {
public:
explicit DistBenchEngine(
std::unique_ptr<ProtocolDriver> pd, SimpleClock* clock);
~DistBenchEngine() override;
absl::Status Initialize(
const DistributedSystemDescription& global_description,
std::string_view service_name,
int service_instance,
int* port);
absl::Status ConfigurePeers(const ServiceEndpointMap& peers);
absl::Status RunTraffic(const RunTrafficRequest* request);
void CancelTraffic();
ServicePerformanceLog FinishTrafficAndGetLogs();
grpc::Status SetupConnection(grpc::ServerContext* context,
const ConnectRequest* request,
ConnectResponse* response) override;
private:
struct StochasticDist{
float probability;
int nb_targets;
};
struct RpcDefinition {
// Original proto
RpcSpec rpc_spec;
// Used to store decoded stochastic fanout
bool is_stochastic_fanout;
std::vector<StochasticDist> stochastic_dist;
// Decoded
int request_payload_size;
int response_payload_size;
};
struct PeerMetadata {
PeerMetadata() {}
PeerMetadata(const PeerMetadata& from) {
from.mutex.Lock();
log_name = from.log_name;
log = from.log;
trace_id = from.trace_id;
from.mutex.Unlock();
}
std::string log_name;
std::string endpoint_address;
int trace_id;
int pd_id = -1;
PeerPerformanceLog log ABSL_GUARDED_BY(mutex);
mutable absl::Mutex mutex;
};
struct SimulatedServerRpc {
std::vector<GenericResponse> response_table;
int handler_action_list_index = -1;
RpcDefinition rpc_definition;
};
struct SimulatedClientRpc {
int service_index;
std::vector<GenericRequest> request_table;
RpcDefinition rpc_definition;
std::atomic<int64_t> rpc_tracing_counter = 0;
std::vector<int> pending_requests_per_peer;
};
struct ActionTableEntry {
Action proto;
// index into peers_ that identifies the target(s) of this RPC
int rpc_service_index = -1;
// index into the client_rpc_table_/server_rpc_table_;
int rpc_index = -1;
int actionlist_index = -1;
std::vector<int> dependent_action_indices;
};
struct ActionListTableEntry {
ActionList proto;
std::vector<ActionTableEntry> list_actions;
bool has_rpcs = false;
};
struct ActionListState;
struct ActionState;
struct ActionIterationState {
struct ActionState* action_state = nullptr;
int iteration_number = 0;
std::vector<ClientRpcState> rpc_states;
std::atomic<int> remaining_rpcs = 0;
};
struct ActionState {
bool started = false;
bool finished = false;
ActionListState* s;
absl::Mutex iteration_mutex;
int next_iteration ABSL_GUARDED_BY(iteration_mutex);
int finished_iterations ABSL_GUARDED_BY(iteration_mutex);
absl::Time next_iteration_time = absl::InfiniteFuture();
int64_t iteration_limit = std::numeric_limits<int64_t>::max();
absl::Time time_limit = absl::InfiniteFuture();
const ActionTableEntry* action = nullptr;
int rpc_index;
int rpc_service_index;
std::function<void(std::shared_ptr<ActionIterationState> iteration_state)>
iteration_function;
std::function<void(void)> all_done_callback;
std::map<int, std::vector<int>> partially_randomized_vectors;
};
struct ActionListState {
void FinishAction(int action_index);
void WaitForAllPendingActions();
void RecordLatency(
size_t rpc_index,
size_t service_type,
size_t instance,
const ClientRpcState* state);
const ServerRpcState* incoming_rpc_state = nullptr; // may be nullptr
std::unique_ptr<ActionState[]> state_table;
const ActionListTableEntry* action_list;
absl::Mutex action_mu;
std::vector<int> finished_action_indices;
std::vector<std::vector<PeerPerformanceLog>> peer_logs ABSL_GUARDED_BY(action_mu);
};
absl::Status InitializeTables();
absl::Status InitializePayloadsMap();
absl::Status InitializeRpcDefinitionStochastic(
RpcDefinition& rpc_def);
absl::Status InitializeRpcDefinitionsMap();
void RunActionList(int list_index, const ServerRpcState* incoming_rpc_state);
void RunAction(ActionState* action_state);
void StartOpenLoopIteration(ActionState* action_state);
void StartIteration(std::shared_ptr<ActionIterationState> iteration_state);
void FinishIteration(std::shared_ptr<ActionIterationState> iteration_state);
void RunRpcActionIteration(
std::shared_ptr<ActionIterationState> iteration_state);
std::vector<int> PickRpcFanoutTargets(ActionState* action_state);
std::unique_ptr<SimulatedClientRpc[]> client_rpc_table_;
std::vector<SimulatedServerRpc> server_rpc_table_;
std::vector<ActionListTableEntry> action_lists_;
absl::Status ConnectToPeers();
void RpcHandler(ServerRpcState* state);
int get_payload_size(const std::string& name);
absl::Notification canceled_;
DistributedSystemDescription traffic_config_;
ServiceEndpointMap service_map_;
std::string service_name_;
ServiceSpec service_spec_;
std::set<std::string> dependent_services_;
int service_index_;
int service_instance_;
std::unique_ptr<grpc::Server> server_;
std::unique_ptr<ProtocolDriver> pd_;
std::thread engine_main_thread_;
// Payloads definitions
std::map<std::string, PayloadSpec> payload_map_;
std::map<std::string, RpcDefinition> rpc_map_;
// The first index is the service, the second is the instance.
std::vector<std::vector<PeerMetadata>> peers_;
int trace_id_ = -1;
SimpleClock* clock_ = nullptr;
// Random
absl::BitGen random_generator;
};
} // namespace distbench
#endif // DISTBENCH_DISTBENCH_ENGINE_H_