Skip to content

Commit

Permalink
[native] Fix the shuffle hanging problem when fetch data from presto …
Browse files Browse the repository at this point in the history
…java (coordinator)

Presto java worker doesn't set next token in response for a get data size request
and it simply returns the buffered data from the source. The Prestissimo worker
updates 'sequence_' with ack sequence encoded by next token and uses it as data
stream offset to fetch data from the source. If the next token str is not set in
the data response handler header, then ack sequence is set to zero which resets the
'sequence_' to zero, and the rollback of 'sequence_' can cause data hanging problem
when Prestissimo worker fetch data from a Presto java worker (which can only happen
in the case that Presto java worker run by the coordinator as some metadata operation
can only execute on the coordinator) from the reset 'sequence_'.

This PR fixes the issue by avoiding updating 'sequence_' if next token is not set
in data response handler.
  • Loading branch information
xiaoxmeng committed Dec 26, 2024
1 parent dba4fca commit 7998114
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 11 deletions.
20 changes: 14 additions & 6 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ void PrestoExchangeSource::processDataResponse(
VELOX_CHECK(
!headers->getIsChunked(),
"Chunked http transferring encoding is not supported.");
uint64_t contentLength =
const uint64_t contentLength =
atol(headers->getHeaders()
.getSingleOrEmpty(proxygen::HTTP_HEADER_CONTENT_LENGTH)
.c_str());
Expand All @@ -291,10 +291,16 @@ void PrestoExchangeSource::processDataResponse(
}
}

int64_t ackSequence =
atol(headers->getHeaders()
.getSingleOrEmpty(protocol::PRESTO_PAGE_NEXT_TOKEN_HEADER)
.c_str());
std::optional<int64_t> ackSequenceOpt;
const auto nextTokenStr = headers->getHeaders().getSingleOrEmpty(
protocol::PRESTO_PAGE_NEXT_TOKEN_HEADER);
if (!nextTokenStr.empty()) {
// NOTE: when get data size from Presto coordinator, it might not set next
// token so we shouldn't update 'sequence_' if it is empty. Otherwise,
// 'sequence_' gets reset and we can't fetch any data from the source with
// the rolled back 'sequence_'.
ackSequenceOpt = atol(nextTokenStr.c_str());
}

std::unique_ptr<exec::SerializedPage> page;
const bool empty = response->empty();
Expand Down Expand Up @@ -357,7 +363,9 @@ void PrestoExchangeSource::processDataResponse(
queue_->enqueueLocked(nullptr, queuePromises);
}

sequence_ = ackSequence;
if (ackSequenceOpt.has_value()) {
sequence_ = ackSequence.value();
}
requestPending_ = false;
requestPromise = std::move(promise_);
}
Expand Down
12 changes: 8 additions & 4 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
}

private:
int64_t maxWaitMs_;
int64_t startMs_;
size_t numTries_{0};

static constexpr int64_t kMinBackoffMs = 100;
static constexpr int64_t kMaxBackoffMs = 10000;
static constexpr double kJitterParam = 0.1;

int64_t maxWaitMs_;
int64_t startMs_;
size_t numTries_{0};
};

PrestoExchangeSource(
Expand Down Expand Up @@ -157,6 +157,10 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
return failedAttempts_;
}

uint64_t testingSequence() const {
return sequence_;
}

/// Invoked to track the node-wise memory usage queued in
/// PrestoExchangeSource. If 'updateBytes' > 0, then increment the usage,
/// otherwise decrement the usage.
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ void getData(
bytes += next->length();
iobuf->prev()->appendChain(std::move(next));
}
nextSequence++;
++nextSequence;
} else {
complete = true;
}
Expand Down

0 comments on commit 7998114

Please sign in to comment.