From 2b52132b843fac06ca8a35486158022d52a04710 Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Thu, 11 Apr 2024 20:48:02 -0700 Subject: [PATCH] Let task memory reclaimer not abort when task is running (#9426) Summary: During reclaimer abort, if task is completed within 60s wait, we can safely propagate abortion.Otherwise long running operators might be in the middle of processing, making it unsafe to force abort. In this case we let running operators finish by hitting operator boundary, and rely on cleanup mechanism to release the resource. Pull Request resolved: https://github.com/facebookincubator/velox/pull/9426 Reviewed By: xiaoxmeng Differential Revision: D56004733 Pulled By: tanjialiang fbshipit-source-id: 9f40d8006a6ffc33878bfa43774ccab8bb92fafd --- velox/connectors/tpch/tests/SpeedTest.cpp | 3 +- velox/exec/Driver.cpp | 2 +- velox/exec/Task.cpp | 23 ++-- velox/exec/Task.h | 5 +- velox/exec/tests/DriverTest.cpp | 24 +++- velox/exec/tests/ExchangeFuzzer.cpp | 2 +- velox/exec/tests/LocalPartitionTest.cpp | 4 +- velox/exec/tests/PartitionedOutputTest.cpp | 4 +- velox/exec/tests/TaskTest.cpp | 125 ++++++++++++++++++++- velox/exec/tests/utils/Cursor.cpp | 4 +- velox/exec/tests/utils/QueryAssertions.cpp | 4 +- 11 files changed, 173 insertions(+), 27 deletions(-) diff --git a/velox/connectors/tpch/tests/SpeedTest.cpp b/velox/connectors/tpch/tests/SpeedTest.cpp index d212386de16e..fbf0337769c0 100644 --- a/velox/connectors/tpch/tests/SpeedTest.cpp +++ b/velox/connectors/tpch/tests/SpeedTest.cpp @@ -103,8 +103,7 @@ class TpchSpeedTest { } // Wait for the task to finish. - auto& inlineExecutor = folly::QueuedImmediateExecutor::instance(); - task->taskCompletionFuture(0).via(&inlineExecutor).wait(); + task->taskCompletionFuture().wait(); std::chrono::duration elapsed = system_clock::now() - startTime; LOG(INFO) << "Summary:"; diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 8d8fe4d47530..a7949bc3d74d 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -343,7 +343,7 @@ RowVectorPtr Driver::next(std::shared_ptr& blockingState) { // error. VELOX_CHECK( stop == StopReason::kBlock || stop == StopReason::kAtEnd || - stop == StopReason::kAlreadyTerminated); + stop == StopReason::kAlreadyTerminated || stop == StopReason::kTerminate); return result; } diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index b503ed80996e..4ce96369d4da 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -2174,7 +2174,7 @@ ContinueFuture Task::stateChangeFuture(uint64_t maxWaitMicros) { return std::move(future); } -ContinueFuture Task::taskCompletionFuture(uint64_t maxWaitMicros) { +ContinueFuture Task::taskCompletionFuture() { std::lock_guard l(mutex_); // If 'this' is running, the future is realized on timeout or when // this no longer is running. @@ -2185,9 +2185,6 @@ ContinueFuture Task::taskCompletionFuture(uint64_t maxWaitMicros) { auto [promise, future] = makeVeloxContinuePromiseContract( fmt::format("Task::taskCompletionFuture {}", taskId_)); taskCompletionPromises_.emplace_back(std::move(promise)); - if (maxWaitMicros > 0) { - return std::move(future).within(std::chrono::microseconds(maxWaitMicros)); - } return std::move(future); } @@ -2761,11 +2758,21 @@ void Task::MemoryReclaimer::abort( return; } VELOX_CHECK_EQ(task->pool()->name(), pool->name()); + task->setError(error); - const static int maxTaskAbortWaitUs = 60'000'000; // 60s - // Set timeout to zero to infinite wait until task completes. - task->taskCompletionFuture(maxTaskAbortWaitUs).wait(); - memory::MemoryReclaimer::abort(pool, error); + const static uint32_t maxTaskAbortWaitUs = 6'000'000; // 60s + if (task->taskCompletionFuture().wait( + std::chrono::microseconds(maxTaskAbortWaitUs))) { + // If task is completed within 60s wait, we can safely propagate abortion. + // Otherwise long running operators might be in the middle of processing, + // making it unsafe to force abort. In this case we let running operators + // finish by hitting operator boundary, and rely on cleanup mechanism to + // release the resource. + memory::MemoryReclaimer::abort(pool, error); + } else { + LOG(WARNING) + << "Timeout waiting for task to complete during query memory aborting."; + } } } // namespace facebook::velox::exec diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 2f5a1081a223..c6aa57233410 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -247,9 +247,8 @@ class Task : public std::enable_shared_from_this { /// Returns a future which is realized when the task is no longer in /// running state. /// If the task is not in running state at the time of call, the future is - /// immediately realized. The future is realized with an exception after - /// maxWaitMicros. A zero max wait means no timeout. - ContinueFuture taskCompletionFuture(uint64_t maxWaitMicros); + /// immediately realized. + ContinueFuture taskCompletionFuture(); /// Returns task execution error or nullptr if no error occurred. std::exception_ptr error() const { diff --git a/velox/exec/tests/DriverTest.cpp b/velox/exec/tests/DriverTest.cpp index df5986d42030..10b06d8f9562 100644 --- a/velox/exec/tests/DriverTest.cpp +++ b/velox/exec/tests/DriverTest.cpp @@ -174,8 +174,10 @@ class DriverTest : public OperatorTestBase { // To be realized either after 1s wall time or when the corresponding Task // is no longer running. auto& executor = folly::QueuedImmediateExecutor::instance(); - auto future = - tasks_.back()->taskCompletionFuture(1'000'000).via(&executor); + auto future = tasks_.back() + ->taskCompletionFuture() + .within(std::chrono::microseconds(1'000'000)) + .via(&executor); stateFutures_.emplace(threadId, std::move(future)); EXPECT_FALSE(stateFutures_.at(threadId).isReady()); @@ -450,7 +452,10 @@ TEST_F(DriverTest, error) { EXPECT_EQ(numRead, 0); EXPECT_TRUE(stateFutures_.at(0).isReady()); // Realized immediately since task not running. - EXPECT_TRUE(tasks_[0]->taskCompletionFuture(1'000'000).isReady()); + EXPECT_TRUE(tasks_[0] + ->taskCompletionFuture() + .within(std::chrono::microseconds(1'000'000)) + .isReady()); EXPECT_EQ(tasks_[0]->state(), TaskState::kFailed); } @@ -472,7 +477,10 @@ TEST_F(DriverTest, cancel) { } EXPECT_GE(numRead, 1'000'000); auto& executor = folly::QueuedImmediateExecutor::instance(); - auto future = tasks_[0]->taskCompletionFuture(1'000'000).via(&executor); + auto future = tasks_[0] + ->taskCompletionFuture() + .within(std::chrono::microseconds(1'000'000)) + .via(&executor); future.wait(); EXPECT_TRUE(stateFutures_.at(0).isReady()); @@ -525,7 +533,10 @@ TEST_F(DriverTest, slow) { // are updated some tens of instructions after this. Determinism // requires a barrier. auto& executor = folly::QueuedImmediateExecutor::instance(); - auto future = tasks_[0]->taskCompletionFuture(1'000'000).via(&executor); + auto future = tasks_[0] + ->taskCompletionFuture() + .within(std::chrono::microseconds(1'000'000)) + .via(&executor); future.wait(); // Note that the driver count drops after the last thread stops and // realizes the future. @@ -561,7 +572,8 @@ TEST_F(DriverTest, pause) { readResults(params, ResultOperation::kPause, 370'000'000, &numRead); // Each thread will fully read the 1M rows in values. EXPECT_EQ(numRead, 10 * hits); - auto stateFuture = tasks_[0]->taskCompletionFuture(100'000'000); + auto stateFuture = tasks_[0]->taskCompletionFuture().within( + std::chrono::microseconds(100'000'000)); auto& executor = folly::QueuedImmediateExecutor::instance(); auto state = std::move(stateFuture).via(&executor); state.wait(); diff --git a/velox/exec/tests/ExchangeFuzzer.cpp b/velox/exec/tests/ExchangeFuzzer.cpp index ff3a24da3de9..c6076778a327 100644 --- a/velox/exec/tests/ExchangeFuzzer.cpp +++ b/velox/exec/tests/ExchangeFuzzer.cpp @@ -182,7 +182,7 @@ class ExchangeFuzzer : public VectorTestBase { // cleaning up the query if any portition of it fails. for (const auto& otherTask : tasks) { auto* taskPtr = otherTask.get(); - otherTask->taskCompletionFuture(0) + otherTask->taskCompletionFuture() .via(executor_.get()) .thenValue([&tasks, taskPtr](auto) { VELOX_CHECK(!taskPtr->isRunning()); diff --git a/velox/exec/tests/LocalPartitionTest.cpp b/velox/exec/tests/LocalPartitionTest.cpp index 8c4ad45f9d80..86549b636a5a 100644 --- a/velox/exec/tests/LocalPartitionTest.cpp +++ b/velox/exec/tests/LocalPartitionTest.cpp @@ -78,7 +78,9 @@ class LocalPartitionTest : public HiveConnectorTestBase { exec::TaskState expected) { if (task->state() != expected) { auto& executor = folly::QueuedImmediateExecutor::instance(); - auto future = task->taskCompletionFuture(1'000'000).via(&executor); + auto future = task->taskCompletionFuture() + .within(std::chrono::microseconds(1'000'000)) + .via(&executor); future.wait(); EXPECT_EQ(expected, task->state()); } diff --git a/velox/exec/tests/PartitionedOutputTest.cpp b/velox/exec/tests/PartitionedOutputTest.cpp index 470fa517f200..3d2ca6f834f8 100644 --- a/velox/exec/tests/PartitionedOutputTest.cpp +++ b/velox/exec/tests/PartitionedOutputTest.cpp @@ -125,7 +125,9 @@ TEST_F(PartitionedOutputTest, flush) { const auto taskWaitUs = std::chrono::duration_cast( std::chrono::seconds{10}) .count(); - auto future = task->taskCompletionFuture(taskWaitUs).via(executor_.get()); + auto future = task->taskCompletionFuture() + .within(std::chrono::microseconds(taskWaitUs)) + .via(executor_.get()); future.wait(); ASSERT_TRUE(waitForTaskDriversToFinish(task.get(), taskWaitUs)); diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 7963c9ad2a16..dc049e032420 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -280,8 +280,8 @@ class ExternalBlocker { }; // A test node that normally just re-project/passthrough the output from input -// When the node is blocked by external even (via externalBlocker), the operator -// will signal kBlocked. The pipeline can ONLY proceed again when it is +// When the node is blocked by external event (via externalBlocker), the +// operator will signal kBlocked. The pipeline can ONLY proceed again when it is // unblocked externally. class TestExternalBlockableNode : public core::PlanNode { public: @@ -1616,6 +1616,127 @@ DEBUG_ONLY_TEST_F(TaskTest, resumeAfterTaskFinish) { waitForAllTasksToBeDeleted(); } +DEBUG_ONLY_TEST_F( + TaskTest, + singleThreadedLongRunningOperatorInTaskReclaimerAbort) { + auto data = makeRowVector({ + makeFlatVector(1'000, [](auto row) { return row; }), + }); + + // Filter + Project. + auto plan = + PlanBuilder().values({data, data, data}).project({"c0"}).planFragment(); + + auto queryCtx = std::make_shared(driverExecutor_.get()); + + auto blockingTask = Task::create("blocking.task.0", plan, 0, queryCtx); + + // Before we block, we expect `next` to get data normally. + EXPECT_NE(nullptr, blockingTask->next()); + + // Now, we want to block the pipeline by blocking Values operator. We expect + // `next` to return null. The `future` should be updated for the caller to + // wait before calling next() again + folly::EventCount getOutputWait; + std::atomic_bool getOutputWaitFlag{false}; + folly::EventCount abortWait; + std::atomic_bool abortWaitFlag{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Values::getOutput", + std::function([&](Values* /*unused*/) { + abortWaitFlag = true; + abortWait.notify(); + getOutputWait.await([&]() { return getOutputWaitFlag.load(); }); + })); + + const std::string abortErrorMessage("Synthetic Exception"); + auto thread = std::thread( + [&]() { VELOX_ASSERT_THROW(blockingTask->next(), abortErrorMessage); }); + + try { + VELOX_FAIL(abortErrorMessage); + } catch (VeloxException& e) { + abortWait.await([&]() { return abortWaitFlag.load(); }); + blockingTask->pool()->abort(std::current_exception()); + } + + waitForTaskCompletion(blockingTask.get(), 5'000'000); + + // We expect that abort does not trigger the operator abort by checking if the + // memory pool memory has been released or not. + blockingTask->pool()->visitChildren([](auto* child) { + if (child->isLeaf()) { + EXPECT_EQ(child->stats().numReleases, 0); + } + return true; + }); + + getOutputWaitFlag = true; + getOutputWait.notify(); + + thread.join(); + + blockingTask->taskCompletionFuture().wait(); + blockingTask->pool()->visitChildren([](auto* child) { + if (child->isLeaf()) { + EXPECT_EQ(child->stats().numReleases, 1); + } + return true; + }); +} + +DEBUG_ONLY_TEST_F(TaskTest, longRunningOperatorInTaskReclaimerAbort) { + auto data = makeRowVector({ + makeFlatVector(1'000, [](auto row) { return row; }), + }); + folly::EventCount getOutputWait; + std::atomic_bool getOutputWaitFlag{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Values::getOutput", + std::function([&](Values* /*unused*/) { + getOutputWait.await([&]() { return getOutputWaitFlag.load(); }); + })); + + // Project only dummy plan + auto plan = + PlanBuilder().values({data, data, data}).project({"c0"}).planFragment(); + + auto queryCtx = std::make_shared(driverExecutor_.get()); + + auto blockingTask = Task::create("blocking.task.0", plan, 0, queryCtx); + + blockingTask->start(4, 1); + const std::string abortErrorMessage("Synthetic Exception"); + try { + VELOX_FAIL(abortErrorMessage); + } catch (VeloxException& e) { + blockingTask->pool()->abort(std::current_exception()); + } + waitForTaskCompletion(blockingTask.get()); + + // We expect that arbitration does not trigger release of the operator pools. + blockingTask->pool()->visitChildren([](auto* child) { + if (child->isLeaf()) { + EXPECT_EQ(child->stats().numReleases, 0); + } + return true; + }); + + getOutputWaitFlag = true; + getOutputWait.notify(); + + VELOX_ASSERT_THROW( + std::rethrow_exception(blockingTask->error()), abortErrorMessage); + + blockingTask->taskCompletionFuture().wait(); + blockingTask->pool()->visitChildren([](auto* child) { + if (child->isLeaf()) { + EXPECT_EQ(child->stats().numReleases, 1); + } + return true; + }); +} + DEBUG_ONLY_TEST_F(TaskTest, taskReclaimStats) { const auto data = makeRowVector({ makeFlatVector(50, folly::identity), diff --git a/velox/exec/tests/utils/Cursor.cpp b/velox/exec/tests/utils/Cursor.cpp index 936a3b663811..a49a27ddfb0d 100644 --- a/velox/exec/tests/utils/Cursor.cpp +++ b/velox/exec/tests/utils/Cursor.cpp @@ -277,7 +277,9 @@ class MultiThreadedTaskCursor : public TaskCursorBase { if (task_->error()) { // Wait for the task to finish (there's' a small period of time between // when the error is set on the Task and terminate is called). - task_->taskCompletionFuture(1'000'000).wait(); + task_->taskCompletionFuture() + .within(std::chrono::microseconds(1'000'000)) + .wait(); // Wait for all task drivers to finish to avoid destroying the executor_ // before task_ finished using it and causing a crash. diff --git a/velox/exec/tests/utils/QueryAssertions.cpp b/velox/exec/tests/utils/QueryAssertions.cpp index ba5bc922ea08..c271b0d4a49f 100644 --- a/velox/exec/tests/utils/QueryAssertions.cpp +++ b/velox/exec/tests/utils/QueryAssertions.cpp @@ -1423,7 +1423,9 @@ bool waitForTaskStateChange( // Wait for task to transition to finished state. if (task->state() != state) { auto& executor = folly::QueuedImmediateExecutor::instance(); - auto future = task->taskCompletionFuture(maxWaitMicros).via(&executor); + auto future = task->taskCompletionFuture() + .within(std::chrono::microseconds(maxWaitMicros)) + .via(&executor); future.wait(); }