Skip to content

Commit

Permalink
Add execution mode enforcement (facebookincubator#9489)
Browse files Browse the repository at this point in the history
Summary:
Velox task has two execution modes: one is the sequential execution mode used by streaming service and the other is the parallel execution mode used by query engine like Prestissimo. The two execution modes use different APIs but we don't have any reliable way to tell if a task is under sequential or parallel execution modes internally, such as parallel execution mode use a driver executor but sequential mode not.
This PR defines execution mode for the task/query to add sanity checks for different api calls.

Pull Request resolved: facebookincubator#9489

Reviewed By: xiaoxmeng

Differential Revision: D56227111

Pulled By: tanjialiang

fbshipit-source-id: ce95bf693fee4274ad5743fb6ea7a15e0654550e
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Apr 21, 2024
1 parent 09efd20 commit 53f6bf7
Show file tree
Hide file tree
Showing 20 changed files with 344 additions and 49 deletions.
6 changes: 4 additions & 2 deletions velox/examples/ScanAndSort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ int main(int argc, char** argv) {
"my_write_task",
writerPlanFragment,
/*destination=*/0,
std::make_shared<core::QueryCtx>(executor.get()));
std::make_shared<core::QueryCtx>(executor.get()),
exec::Task::ExecutionMode::kParallel);

// next() starts execution using the client thread. The loop pumps output
// vectors out of the task (there are none in this query fragment).
Expand Down Expand Up @@ -159,7 +160,8 @@ int main(int argc, char** argv) {
"my_read_task",
readPlanFragment,
/*destination=*/0,
std::make_shared<core::QueryCtx>(executor.get()));
std::make_shared<core::QueryCtx>(executor.get()),
exec::Task::ExecutionMode::kParallel);

// Now that we have the query fragment and Task structure set up, we will
// add data to it via `splits`.
Expand Down
78 changes: 72 additions & 6 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,23 @@ void movePromisesOut(
}
from.clear();
}

} // namespace

std::string executionModeString(Task::ExecutionMode mode) {
switch (mode) {
case Task::ExecutionMode::kSerial:
return "Serial";
case Task::ExecutionMode::kParallel:
return "Parallel";
default:
return fmt::format("Unknown {}", static_cast<int>(mode));
}
}

std::ostream& operator<<(std::ostream& out, Task::ExecutionMode mode) {
return out << executionModeString(mode);
}

std::string taskStateString(TaskState state) {
switch (state) {
case TaskState::kRunning:
Expand Down Expand Up @@ -226,6 +240,45 @@ bool unregisterTaskListener(const std::shared_ptr<TaskListener>& listener) {
}

// static.
std::shared_ptr<Task> Task::create(
const std::string& taskId,
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
Consumer consumer,
std::function<void(std::exception_ptr)> onError) {
return Task::create(
taskId,
std::move(planFragment),
destination,
std::move(queryCtx),
mode,
(consumer ? [c = std::move(consumer)]() { return c; }
: ConsumerSupplier{}),
std::move(onError));
}

std::shared_ptr<Task> Task::create(
const std::string& taskId,
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
ConsumerSupplier consumerSupplier,
std::function<void(std::exception_ptr)> onError) {
auto task = std::shared_ptr<Task>(new Task(
taskId,
std::move(planFragment),
destination,
std::move(queryCtx),
mode,
std::move(consumerSupplier),
std::move(onError)));
task->initTaskPool();
return task;
}

std::shared_ptr<Task> Task::create(
const std::string& taskId,
core::PlanFragment planFragment,
Expand Down Expand Up @@ -255,6 +308,7 @@ std::shared_ptr<Task> Task::create(
std::move(planFragment),
destination,
std::move(queryCtx),
Task::ExecutionMode::kParallel,
std::move(consumerSupplier),
std::move(onError)));
task->initTaskPool();
Expand All @@ -266,17 +320,26 @@ Task::Task(
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
ConsumerSupplier consumerSupplier,
std::function<void(std::exception_ptr)> onError)
: uuid_{makeUuid()},
taskId_(taskId),
planFragment_(std::move(planFragment)),
destination_(destination),
queryCtx_(std::move(queryCtx)),
mode_(mode),
consumerSupplier_(std::move(consumerSupplier)),
onError_(onError),
splitsStates_(buildSplitStates(planFragment_.planNode)),
bufferManager_(OutputBufferManager::getInstance()) {}
bufferManager_(OutputBufferManager::getInstance()) {
// NOTE: the executor must not be folly::InlineLikeExecutor for parallel
// execution.
if (mode_ == Task::ExecutionMode::kParallel) {
VELOX_CHECK_NULL(
dynamic_cast<const folly::InlineLikeExecutor*>(queryCtx_->executor()));
}
}

Task::~Task() {
// TODO(spershin): Temporary code designed to reveal what causes SIGABRT in
Expand Down Expand Up @@ -525,6 +588,7 @@ bool Task::supportsSingleThreadedExecution() const {
}

RowVectorPtr Task::next(ContinueFuture* future) {
checkExecutionMode(ExecutionMode::kSerial);
// NOTE: Task::next() is single-threaded execution so locking is not required
// to access Task object.
VELOX_CHECK_EQ(
Expand Down Expand Up @@ -645,6 +709,7 @@ void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) {
facebook::velox::process::ThreadDebugInfo threadDebugInfo{
queryCtx()->queryId(), taskId_, nullptr};
facebook::velox::process::ScopedThreadDebugInfo scopedInfo(threadDebugInfo);
checkExecutionMode(ExecutionMode::kParallel);

try {
VELOX_CHECK_GE(
Expand Down Expand Up @@ -688,6 +753,10 @@ void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) {
}
}

void Task::checkExecutionMode(ExecutionMode mode) {
VELOX_CHECK_EQ(mode, mode_, "Inconsistent task execution mode.")
}

void Task::createDriverFactoriesLocked(uint32_t maxDrivers) {
VELOX_CHECK(isRunningLocked());
VELOX_CHECK(driverFactories_.empty());
Expand Down Expand Up @@ -716,6 +785,7 @@ void Task::createDriverFactoriesLocked(uint32_t maxDrivers) {
}

void Task::createAndStartDrivers(uint32_t concurrentSplitGroups) {
checkExecutionMode(Task::ExecutionMode::kParallel);
std::unique_lock<std::timed_mutex> l(mutex_);
VELOX_CHECK(
isRunningLocked(),
Expand Down Expand Up @@ -766,10 +836,6 @@ void Task::createAndStartDrivers(uint32_t concurrentSplitGroups) {
// cancellations and pauses have the well-defined timing. For example, do
// not pause and restart a task while it is still adding Drivers.
//
// NOTE: the executor must not be folly::InlineLikeExecutor for parallel
// execution. Task::next() is used for the single-threaded execution.
VELOX_CHECK_NULL(
dynamic_cast<const folly::InlineLikeExecutor*>(queryCtx()->executor()));
// We might have first slots taken for grouped execution drivers, so need
// only to enqueue the ungrouped execution drivers.
for (auto it = drivers_.end() - numDriversUngrouped_; it != drivers_.end();
Expand Down
59 changes: 58 additions & 1 deletion velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ using ConnectorSplitPreloadFunc =

class Task : public std::enable_shared_from_this<Task> {
public:
/// Threading mode the task is executed.
enum class ExecutionMode {
/// Mode that executes the query serially (single-threaded) on the calling
/// thread. Task is executed via the Task::next() API.
kSerial,
/// Mode that executes the query in parallel (multi-threaded) using provided
/// executor. Task is executed via the Task::start() API that starts up
/// multiple driver threads and manages their lifecycle.
kParallel,
};

/// Creates a task to execute a plan fragment, but doesn't start execution
/// until Task::start() method is called.
/// @param taskId Unique task identifier.
Expand All @@ -48,11 +59,33 @@ class Task : public std::enable_shared_from_this<Task> {
/// @param queryCtx Query context containing MemoryPool and MemoryAllocator
/// instances to use for memory allocations during execution, executor to
/// schedule operators on, and session properties.
/// @param mode Execution mode for this task. The task can be executed in
/// Serial and Parallel mode.
/// @param consumer Optional factory function to get callbacks to pass the
/// results of the execution. In a multi-threaded execution, results from each
/// results of the execution. In a parallel execution mode, results from each
/// thread are passed on to a separate consumer.
/// @param onError Optional callback to receive an exception if task
/// execution fails.
static std::shared_ptr<Task> create(
const std::string& taskId,
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
Consumer consumer = nullptr,
std::function<void(std::exception_ptr)> onError = nullptr);

static std::shared_ptr<Task> create(
const std::string& taskId,
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
ConsumerSupplier consumerSupplier,
std::function<void(std::exception_ptr)> onError = nullptr);

/// TODO: Delete following two overloads once all callers are migrated to the
/// above ones
static std::shared_ptr<Task> create(
const std::string& taskId,
core::PlanFragment planFragment,
Expand Down Expand Up @@ -651,9 +684,14 @@ class Task : public std::enable_shared_from_this<Task> {
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
ConsumerSupplier consumerSupplier,
std::function<void(std::exception_ptr)> onError = nullptr);

// Consistency check of the task execution to make sure the execution mode
// stays the same.
void checkExecutionMode(ExecutionMode mode);

// Creates driver factories.
void createDriverFactoriesLocked(uint32_t maxDrivers);

Expand Down Expand Up @@ -958,6 +996,10 @@ class Task : public std::enable_shared_from_this<Task> {
const int destination_;
const std::shared_ptr<core::QueryCtx> queryCtx_;

// The execution mode of the task. It is enforced that a task can only be
// executed in a single mode throughout its lifetime
const ExecutionMode mode_;

// Root MemoryPool for this Task. All member variables that hold references
// to pool_ must be defined after pool_, childPools_.
std::shared_ptr<memory::MemoryPool> pool_;
Expand Down Expand Up @@ -1144,4 +1186,19 @@ bool registerTaskListener(std::shared_ptr<TaskListener> listener);
/// unregistered successfuly, false if listener was not found.
bool unregisterTaskListener(const std::shared_ptr<TaskListener>& listener);

std::string executionModeString(Task::ExecutionMode mode);

std::ostream& operator<<(std::ostream& out, Task::ExecutionMode mode);

} // namespace facebook::velox::exec

template <>
struct fmt::formatter<facebook::velox::exec::Task::ExecutionMode>
: formatter<std::string> {
auto format(
facebook::velox::exec::Task::ExecutionMode m,
format_context& ctx) {
return formatter<std::string>::format(
facebook::velox::exec::executionModeString(m), ctx);
}
};
1 change: 1 addition & 0 deletions velox/exec/benchmarks/ExchangeBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ class ExchangeBenchmark : public VectorTestBase {
std::move(planFragment),
destination,
std::move(queryCtx),
Task::ExecutionMode::kParallel,
std::move(consumer));
}

Expand Down
3 changes: 3 additions & 0 deletions velox/exec/tests/DriverTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ class DriverTest : public OperatorTestBase {
plan,
0,
std::make_shared<core::QueryCtx>(driverExecutor_.get()),
Task::ExecutionMode::kParallel,
[](RowVectorPtr /*unused*/, ContinueFuture* /*unused*/) {
return exec::BlockingReason::kNotBlocked;
});
Expand Down Expand Up @@ -1503,6 +1504,7 @@ DEBUG_ONLY_TEST_F(DriverTest, driverCpuTimeSlicingCheck) {
0,
std::make_shared<core::QueryCtx>(
driverExecutor_.get(), std::move(queryConfig)),
Task::ExecutionMode::kParallel,
[](RowVectorPtr /*unused*/, ContinueFuture* /*unused*/) {
return exec::BlockingReason::kNotBlocked;
});
Expand Down Expand Up @@ -1537,6 +1539,7 @@ TEST_F(OpCallStatusTest, basic) {
0,
std::make_shared<core::QueryCtx>(
driverExecutor_.get(), std::move(queryConfig)),
Task::ExecutionMode::kParallel,
[](RowVectorPtr /*unused*/, ContinueFuture* /*unused*/) {
return exec::BlockingReason::kNotBlocked;
});
Expand Down
6 changes: 5 additions & 1 deletion velox/exec/tests/ExchangeClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ class ExchangeClientTest : public testing::Test,
queryCtx->testingOverrideMemoryPool(
memory::memoryManager()->addRootPool(queryCtx->queryId()));
return Task::create(
taskId, core::PlanFragment{planNode}, 0, std::move(queryCtx));
taskId,
core::PlanFragment{planNode},
0,
std::move(queryCtx),
Task::ExecutionMode::kParallel);
}

int32_t enqueue(
Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/ExchangeFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ class ExchangeFuzzer : public VectorTestBase {
std::move(planFragment),
destination,
std::move(queryCtx),
Task::ExecutionMode::kParallel,
std::move(consumer));
}

Expand Down
Loading

0 comments on commit 53f6bf7

Please sign in to comment.