-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRelationalOperator.hpp
340 lines (305 loc) · 11.4 KB
/
RelationalOperator.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#ifndef QUICKSTEP_RELATIONAL_OPERATORS_RELATIONAL_OPERATOR_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_RELATIONAL_OPERATOR_HPP_
#include <cstddef>
#include <cstdint>
#include <string>
#include <unordered_set>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
#include "tmb/id_typedefs.h"
namespace tmb { class MessageBus; }
namespace quickstep {
class StorageManager;
class WorkOrderProtosContainer;
class WorkOrdersContainer;
/** \addtogroup RelationalOperators
* @{
*/
/**
* @brief An operator in a relational query plan. The query plan is
* a directed acyclic graph of RelationalOperators.
**/
class RelationalOperator {
public:
/**
* @brief Virtual destructor.
**/
virtual ~RelationalOperator() {}
/**
* @brief Enumeration of the operator types.
**/
enum OperatorType : std::uint8_t {
kAggregation = 0,
kBuildAggregationExistenceMap,
kBuildHash,
kBuildLIPFilter,
kCreateIndex,
kCreateTable,
kDelete,
kDestroyAggregationState,
kDestroyHash,
kDropTable,
kFinalizeAggregation,
kInitializeAggregation,
kInnerJoin,
kInsert,
kLeftAntiJoin,
kLeftOuterJoin,
kLeftSemiJoin,
kNestedLoopsJoin,
kSample,
kSaveBlocks,
kSelect,
kSortMergeRun,
kSortRunGeneration,
kTableExport,
kTableGenerator,
kTextScan,
kUnionAll,
kUpdate,
kWindowAggregation,
kMockOperator
};
/**
* @brief Get the type of the operator.
**/
virtual OperatorType getOperatorType() const = 0;
/**
* @brief Get the name of this relational operator.
*
* @return The name of this relational operator.
*/
virtual std::string getName() const = 0;
/**
* @brief Generate all the next WorkOrders for this RelationalOperator.
*
* @note If a RelationalOperator has blocking dependencies, it should not
* generate workorders unless all of the blocking dependencies have been
* met.
*
* @note If a RelationalOperator is not parallelizeable on a block-level, then
* only one WorkOrder consisting of all the work for this
* RelationalOperator should be generated.
*
* @param container A pointer to a WorkOrdersContainer to be used to store the
* generated WorkOrders.
* @param query_context The QueryContext that stores query execution states.
* @param storage_manager The StorageManager to use.
* @param scheduler_client_id The TMB client ID of the scheduler thread.
* @param bus A pointer to the TMB.
*
* @return Whether the operator has finished generating work orders. If \c
* false, the execution engine will invoke this method after at least
* one pending work order has finished executing.
**/
virtual bool getAllWorkOrders(WorkOrdersContainer *container,
QueryContext *query_context,
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) = 0;
/**
* @brief For the distributed version, generate all the next WorkOrder protos
* for this RelationalOperator
*
* @note If a RelationalOperator has blocking dependencies, it should not
* generate workorders unless all of the blocking dependencies have been
* met.
*
* @note If a RelationalOperator is not parallelizeable on a block-level, then
* only one WorkOrder consisting of all the work for this
* RelationalOperator should be generated.
*
* @param container A pointer to a WorkOrderProtosContainer to be used to
* store the generated WorkOrder protos.
*
* @return Whether the operator has finished generating work order protos. If
* \c false, the execution engine will invoke this method after at
* least one pending work order has finished executing.
**/
virtual bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) = 0;
/**
* @brief Update Catalog upon the completion of this RelationalOperator, if
* necessary.
*
**/
virtual void updateCatalogOnCompletion() {
}
/**
* @brief Receive input blocks for this RelationalOperator.
*
* @param input_block_id The ID of the input block.
* @param relation_id The ID of the relation that produced this input_block.
* @param part_id The partition ID of 'input_block_id'.
**/
virtual void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
const partition_id part_id) {}
/**
* @brief Signal the end of feeding of input blocks for this
* RelationalOperator.
*
* @param rel_id ID of the relation that finished producing blocks.
*
* @note A RelationalOperator that does not override this method can use \c
* done_feeding_input_relation_ member variable to figure out that this
* event was received. However, this does not distinguish between input
* relations. If the operator uses more than one input relation, it will
* need to override to this method to figure when the event is called
* for each of its relations.
*
* @note This method is invoke for both streaming and non-streaming operators.
* In the streaming case, this corresponds to end input to specified
* relation. In both the cases, this corresponds to event of the end of
* execution of downstream operator feeding input to this operator by
* specified relation.
**/
virtual void doneFeedingInputBlocks(const relation_id rel_id) {
done_feeding_input_relation_ = true;
}
/**
* @brief Get the InsertDestination index in the QueryContext, if used by
* this RelationalOperator for storing the output of the workorders'
* execution.
*
* @return If there is an InsertDestination for this RelationalOperator,
* return the non-negative id, otherwise return
* kInvalidInsertDestinationId.
**/
virtual QueryContext::insert_destination_id getInsertDestinationID() const {
return QueryContext::kInvalidInsertDestinationId;
}
/**
* @brief Get the relation ID of the output relation.
*
* @note If a derived RelationalOperator produces a output relation using
* InsertDestination, it should override this virtual method to return
* its relation_id.
*
* @note If a derived RelationalOperator (e.g., DeleteOperator) does not use
* InsertDestination to write blocks to the output, it should also
* override this virtual method to return its relation_id.
**/
virtual const relation_id getOutputRelationID() const {
return -1;
}
/**
* @brief Callback to receive feedback messages from work orders.
*
* @param message Feedback message received by the Foreman from work order on
* behalf of the relational operator.
* @note The relational operator needs to override this default
* implementation to receive messages. Most operators don't need to use
* this API. Only multi-pass operators like the sort-merge-runs
* operator and the text-scan operator (with parallelized loads) will
* need to use this API to convey information from work orders to the
* relational operator.
**/
virtual void receiveFeedbackMessage(
const WorkOrder::FeedbackMessage &message) {
LOG(FATAL) << "Received a feedback message on default interface. "
<< "Operator has no implementation.";
}
/**
* @brief Set the index of this operator in the query plan DAG.
*
* @param operator_index The index of this operator in the query plan DAG.
**/
void setOperatorIndex(const std::size_t operator_index) {
op_index_ = operator_index;
}
/**
* @brief Get the index of this operator in the query plan DAG.
*
* @return The index of this operator in the query plan DAG.
*/
std::size_t getOperatorIndex() const {
return op_index_;
}
/**
* @brief Get the number of partitions of the input relation in the operator.
*
* @return The number of partitions of the input relation.
*/
std::size_t getNumPartitions() const {
return num_partitions_;
}
/**
* @brief Whether this operator does repartition.
*
* @return True if this node does repartition. Otherwise, false.
*/
bool hasRepartition() const {
return has_repartition_;
}
/**
* @brief Get the number of partitions of the output relation in the operator.
*
* @return The number of partitions of the output relation.
*/
std::size_t getOutputNumPartitions() const {
return output_num_partitions_;
}
/**
* @brief Deploy a group of LIPFilters to this operator.
*/
void deployLIPFilters(const QueryContext::lip_deployment_id lip_deployment_index,
const std::unordered_set<QueryContext::lip_filter_id> &lip_filter_indexes) {
lip_deployment_index_ = lip_deployment_index;
lip_filter_indexes_ = lip_filter_indexes;
}
protected:
/**
* @brief Constructor
*
* @param query_id The ID of the query to which this operator belongs.
* @param num_partitions The number of partitions of the input relation.
* If create table / index, return zero. If no partitions, return one.
* @param has_repartition Whether this operator does repartition.
* @param output_num_partitions The number of partitions of the output
* relation. If no output, return zero. If no partitions, return one.
**/
explicit RelationalOperator(const std::size_t query_id,
const std::size_t num_partitions = 1u,
const bool has_repartition = false,
const std::size_t output_num_partitions = 0u)
: query_id_(query_id),
num_partitions_(num_partitions),
output_num_partitions_(output_num_partitions),
has_repartition_(has_repartition),
done_feeding_input_relation_(false),
lip_deployment_index_(QueryContext::kInvalidLIPDeploymentId) {}
const std::size_t query_id_;
const std::size_t num_partitions_, output_num_partitions_;
const bool has_repartition_;
bool done_feeding_input_relation_;
std::size_t op_index_;
QueryContext::lip_deployment_id lip_deployment_index_;
std::unordered_set<QueryContext::lip_filter_id> lip_filter_indexes_;
private:
DISALLOW_COPY_AND_ASSIGN(RelationalOperator);
};
/** @} */
} // namespace quickstep
#endif // QUICKSTEP_RELATIONAL_OPERATORS_RELATIONAL_OPERATOR_HPP_