Skip to content

Commit

Permalink
Let task memory reclaimer not abort when task is running (facebookinc…
Browse files Browse the repository at this point in the history
…ubator#9426)

Summary:
During reclaimer abort, if task is completed within 60s wait, we can safely propagate abortion.Otherwise long running operators might be in the middle of processing, making it unsafe to force abort. In this case we let running operators finish by hitting operator boundary, and rely on cleanup mechanism to release the resource.

Pull Request resolved: facebookincubator#9426

Reviewed By: xiaoxmeng

Differential Revision: D56004733

Pulled By: tanjialiang

fbshipit-source-id: 9f40d8006a6ffc33878bfa43774ccab8bb92fafd
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Apr 12, 2024
1 parent 4b0fd74 commit 2b52132
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 27 deletions.
3 changes: 1 addition & 2 deletions velox/connectors/tpch/tests/SpeedTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ class TpchSpeedTest {
}

// Wait for the task to finish.
auto& inlineExecutor = folly::QueuedImmediateExecutor::instance();
task->taskCompletionFuture(0).via(&inlineExecutor).wait();
task->taskCompletionFuture().wait();

std::chrono::duration<double> elapsed = system_clock::now() - startTime;
LOG(INFO) << "Summary:";
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ RowVectorPtr Driver::next(std::shared_ptr<BlockingState>& blockingState) {
// error.
VELOX_CHECK(
stop == StopReason::kBlock || stop == StopReason::kAtEnd ||
stop == StopReason::kAlreadyTerminated);
stop == StopReason::kAlreadyTerminated || stop == StopReason::kTerminate);

return result;
}
Expand Down
23 changes: 15 additions & 8 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2174,7 +2174,7 @@ ContinueFuture Task::stateChangeFuture(uint64_t maxWaitMicros) {
return std::move(future);
}

ContinueFuture Task::taskCompletionFuture(uint64_t maxWaitMicros) {
ContinueFuture Task::taskCompletionFuture() {
std::lock_guard<std::timed_mutex> l(mutex_);
// If 'this' is running, the future is realized on timeout or when
// this no longer is running.
Expand All @@ -2185,9 +2185,6 @@ ContinueFuture Task::taskCompletionFuture(uint64_t maxWaitMicros) {
auto [promise, future] = makeVeloxContinuePromiseContract(
fmt::format("Task::taskCompletionFuture {}", taskId_));
taskCompletionPromises_.emplace_back(std::move(promise));
if (maxWaitMicros > 0) {
return std::move(future).within(std::chrono::microseconds(maxWaitMicros));
}
return std::move(future);
}

Expand Down Expand Up @@ -2761,11 +2758,21 @@ void Task::MemoryReclaimer::abort(
return;
}
VELOX_CHECK_EQ(task->pool()->name(), pool->name());

task->setError(error);
const static int maxTaskAbortWaitUs = 60'000'000; // 60s
// Set timeout to zero to infinite wait until task completes.
task->taskCompletionFuture(maxTaskAbortWaitUs).wait();
memory::MemoryReclaimer::abort(pool, error);
const static uint32_t maxTaskAbortWaitUs = 6'000'000; // 60s
if (task->taskCompletionFuture().wait(
std::chrono::microseconds(maxTaskAbortWaitUs))) {
// If task is completed within 60s wait, we can safely propagate abortion.
// Otherwise long running operators might be in the middle of processing,
// making it unsafe to force abort. In this case we let running operators
// finish by hitting operator boundary, and rely on cleanup mechanism to
// release the resource.
memory::MemoryReclaimer::abort(pool, error);
} else {
LOG(WARNING)
<< "Timeout waiting for task to complete during query memory aborting.";
}
}

} // namespace facebook::velox::exec
5 changes: 2 additions & 3 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,8 @@ class Task : public std::enable_shared_from_this<Task> {
/// Returns a future which is realized when the task is no longer in
/// running state.
/// If the task is not in running state at the time of call, the future is
/// immediately realized. The future is realized with an exception after
/// maxWaitMicros. A zero max wait means no timeout.
ContinueFuture taskCompletionFuture(uint64_t maxWaitMicros);
/// immediately realized.
ContinueFuture taskCompletionFuture();

/// Returns task execution error or nullptr if no error occurred.
std::exception_ptr error() const {
Expand Down
24 changes: 18 additions & 6 deletions velox/exec/tests/DriverTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,10 @@ class DriverTest : public OperatorTestBase {
// To be realized either after 1s wall time or when the corresponding Task
// is no longer running.
auto& executor = folly::QueuedImmediateExecutor::instance();
auto future =
tasks_.back()->taskCompletionFuture(1'000'000).via(&executor);
auto future = tasks_.back()
->taskCompletionFuture()
.within(std::chrono::microseconds(1'000'000))
.via(&executor);
stateFutures_.emplace(threadId, std::move(future));

EXPECT_FALSE(stateFutures_.at(threadId).isReady());
Expand Down Expand Up @@ -450,7 +452,10 @@ TEST_F(DriverTest, error) {
EXPECT_EQ(numRead, 0);
EXPECT_TRUE(stateFutures_.at(0).isReady());
// Realized immediately since task not running.
EXPECT_TRUE(tasks_[0]->taskCompletionFuture(1'000'000).isReady());
EXPECT_TRUE(tasks_[0]
->taskCompletionFuture()
.within(std::chrono::microseconds(1'000'000))
.isReady());
EXPECT_EQ(tasks_[0]->state(), TaskState::kFailed);
}

Expand All @@ -472,7 +477,10 @@ TEST_F(DriverTest, cancel) {
}
EXPECT_GE(numRead, 1'000'000);
auto& executor = folly::QueuedImmediateExecutor::instance();
auto future = tasks_[0]->taskCompletionFuture(1'000'000).via(&executor);
auto future = tasks_[0]
->taskCompletionFuture()
.within(std::chrono::microseconds(1'000'000))
.via(&executor);
future.wait();
EXPECT_TRUE(stateFutures_.at(0).isReady());

Expand Down Expand Up @@ -525,7 +533,10 @@ TEST_F(DriverTest, slow) {
// are updated some tens of instructions after this. Determinism
// requires a barrier.
auto& executor = folly::QueuedImmediateExecutor::instance();
auto future = tasks_[0]->taskCompletionFuture(1'000'000).via(&executor);
auto future = tasks_[0]
->taskCompletionFuture()
.within(std::chrono::microseconds(1'000'000))
.via(&executor);
future.wait();
// Note that the driver count drops after the last thread stops and
// realizes the future.
Expand Down Expand Up @@ -561,7 +572,8 @@ TEST_F(DriverTest, pause) {
readResults(params, ResultOperation::kPause, 370'000'000, &numRead);
// Each thread will fully read the 1M rows in values.
EXPECT_EQ(numRead, 10 * hits);
auto stateFuture = tasks_[0]->taskCompletionFuture(100'000'000);
auto stateFuture = tasks_[0]->taskCompletionFuture().within(
std::chrono::microseconds(100'000'000));
auto& executor = folly::QueuedImmediateExecutor::instance();
auto state = std::move(stateFuture).via(&executor);
state.wait();
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/ExchangeFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class ExchangeFuzzer : public VectorTestBase {
// cleaning up the query if any portition of it fails.
for (const auto& otherTask : tasks) {
auto* taskPtr = otherTask.get();
otherTask->taskCompletionFuture(0)
otherTask->taskCompletionFuture()
.via(executor_.get())
.thenValue([&tasks, taskPtr](auto) {
VELOX_CHECK(!taskPtr->isRunning());
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/tests/LocalPartitionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ class LocalPartitionTest : public HiveConnectorTestBase {
exec::TaskState expected) {
if (task->state() != expected) {
auto& executor = folly::QueuedImmediateExecutor::instance();
auto future = task->taskCompletionFuture(1'000'000).via(&executor);
auto future = task->taskCompletionFuture()
.within(std::chrono::microseconds(1'000'000))
.via(&executor);
future.wait();
EXPECT_EQ(expected, task->state());
}
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/tests/PartitionedOutputTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ TEST_F(PartitionedOutputTest, flush) {
const auto taskWaitUs = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::seconds{10})
.count();
auto future = task->taskCompletionFuture(taskWaitUs).via(executor_.get());
auto future = task->taskCompletionFuture()
.within(std::chrono::microseconds(taskWaitUs))
.via(executor_.get());
future.wait();

ASSERT_TRUE(waitForTaskDriversToFinish(task.get(), taskWaitUs));
Expand Down
125 changes: 123 additions & 2 deletions velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ class ExternalBlocker {
};

// A test node that normally just re-project/passthrough the output from input
// When the node is blocked by external even (via externalBlocker), the operator
// will signal kBlocked. The pipeline can ONLY proceed again when it is
// When the node is blocked by external event (via externalBlocker), the
// operator will signal kBlocked. The pipeline can ONLY proceed again when it is
// unblocked externally.
class TestExternalBlockableNode : public core::PlanNode {
public:
Expand Down Expand Up @@ -1616,6 +1616,127 @@ DEBUG_ONLY_TEST_F(TaskTest, resumeAfterTaskFinish) {
waitForAllTasksToBeDeleted();
}

DEBUG_ONLY_TEST_F(
TaskTest,
singleThreadedLongRunningOperatorInTaskReclaimerAbort) {
auto data = makeRowVector({
makeFlatVector<int64_t>(1'000, [](auto row) { return row; }),
});

// Filter + Project.
auto plan =
PlanBuilder().values({data, data, data}).project({"c0"}).planFragment();

auto queryCtx = std::make_shared<core::QueryCtx>(driverExecutor_.get());

auto blockingTask = Task::create("blocking.task.0", plan, 0, queryCtx);

// Before we block, we expect `next` to get data normally.
EXPECT_NE(nullptr, blockingTask->next());

// Now, we want to block the pipeline by blocking Values operator. We expect
// `next` to return null. The `future` should be updated for the caller to
// wait before calling next() again
folly::EventCount getOutputWait;
std::atomic_bool getOutputWaitFlag{false};
folly::EventCount abortWait;
std::atomic_bool abortWaitFlag{false};
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::Values::getOutput",
std::function<void(Values*)>([&](Values* /*unused*/) {
abortWaitFlag = true;
abortWait.notify();
getOutputWait.await([&]() { return getOutputWaitFlag.load(); });
}));

const std::string abortErrorMessage("Synthetic Exception");
auto thread = std::thread(
[&]() { VELOX_ASSERT_THROW(blockingTask->next(), abortErrorMessage); });

try {
VELOX_FAIL(abortErrorMessage);
} catch (VeloxException& e) {
abortWait.await([&]() { return abortWaitFlag.load(); });
blockingTask->pool()->abort(std::current_exception());
}

waitForTaskCompletion(blockingTask.get(), 5'000'000);

// We expect that abort does not trigger the operator abort by checking if the
// memory pool memory has been released or not.
blockingTask->pool()->visitChildren([](auto* child) {
if (child->isLeaf()) {
EXPECT_EQ(child->stats().numReleases, 0);
}
return true;
});

getOutputWaitFlag = true;
getOutputWait.notify();

thread.join();

blockingTask->taskCompletionFuture().wait();
blockingTask->pool()->visitChildren([](auto* child) {
if (child->isLeaf()) {
EXPECT_EQ(child->stats().numReleases, 1);
}
return true;
});
}

DEBUG_ONLY_TEST_F(TaskTest, longRunningOperatorInTaskReclaimerAbort) {
auto data = makeRowVector({
makeFlatVector<int64_t>(1'000, [](auto row) { return row; }),
});
folly::EventCount getOutputWait;
std::atomic_bool getOutputWaitFlag{false};
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::Values::getOutput",
std::function<void(Values*)>([&](Values* /*unused*/) {
getOutputWait.await([&]() { return getOutputWaitFlag.load(); });
}));

// Project only dummy plan
auto plan =
PlanBuilder().values({data, data, data}).project({"c0"}).planFragment();

auto queryCtx = std::make_shared<core::QueryCtx>(driverExecutor_.get());

auto blockingTask = Task::create("blocking.task.0", plan, 0, queryCtx);

blockingTask->start(4, 1);
const std::string abortErrorMessage("Synthetic Exception");
try {
VELOX_FAIL(abortErrorMessage);
} catch (VeloxException& e) {
blockingTask->pool()->abort(std::current_exception());
}
waitForTaskCompletion(blockingTask.get());

// We expect that arbitration does not trigger release of the operator pools.
blockingTask->pool()->visitChildren([](auto* child) {
if (child->isLeaf()) {
EXPECT_EQ(child->stats().numReleases, 0);
}
return true;
});

getOutputWaitFlag = true;
getOutputWait.notify();

VELOX_ASSERT_THROW(
std::rethrow_exception(blockingTask->error()), abortErrorMessage);

blockingTask->taskCompletionFuture().wait();
blockingTask->pool()->visitChildren([](auto* child) {
if (child->isLeaf()) {
EXPECT_EQ(child->stats().numReleases, 1);
}
return true;
});
}

DEBUG_ONLY_TEST_F(TaskTest, taskReclaimStats) {
const auto data = makeRowVector({
makeFlatVector<int64_t>(50, folly::identity),
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/tests/utils/Cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ class MultiThreadedTaskCursor : public TaskCursorBase {
if (task_->error()) {
// Wait for the task to finish (there's' a small period of time between
// when the error is set on the Task and terminate is called).
task_->taskCompletionFuture(1'000'000).wait();
task_->taskCompletionFuture()
.within(std::chrono::microseconds(1'000'000))
.wait();

// Wait for all task drivers to finish to avoid destroying the executor_
// before task_ finished using it and causing a crash.
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/tests/utils/QueryAssertions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1423,7 +1423,9 @@ bool waitForTaskStateChange(
// Wait for task to transition to finished state.
if (task->state() != state) {
auto& executor = folly::QueuedImmediateExecutor::instance();
auto future = task->taskCompletionFuture(maxWaitMicros).via(&executor);
auto future = task->taskCompletionFuture()
.within(std::chrono::microseconds(maxWaitMicros))
.via(&executor);
future.wait();
}

Expand Down

0 comments on commit 2b52132

Please sign in to comment.