Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Dec 4, 2024
1 parent a882abf commit d0e1d78
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 11 deletions.
2 changes: 2 additions & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1945,6 +1945,7 @@ grpc_cc_library(
"//src/core:channel_stack_type",
"//src/core:closure",
"//src/core:compression",
"//src/core:filter_args",
"//src/core:connectivity_state",
"//src/core:context",
"//src/core:default_event_engine",
Expand Down Expand Up @@ -3730,6 +3731,7 @@ grpc_cc_library(
"//src/core:grpc_channel_idle_filter",
"//src/core:grpc_service_config",
"//src/core:idle_filter_state",
"//src/core:retry_interceptor",
"//src/core:init_internally",
"//src/core:interception_chain",
"//src/core:iomgr_fwd",
Expand Down
3 changes: 3 additions & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3531,6 +3531,8 @@ grpc_cc_library(
"interception_chain",
"map",
"request_buffer",
"retry_service_config",
"retry_throttle",
"sleep",
],
)
Expand Down Expand Up @@ -8553,6 +8555,7 @@ grpc_cc_library(
"metadata",
"ref_counted",
"//:gpr_platform",
"//:grpc_trace",
],
)

Expand Down
7 changes: 6 additions & 1 deletion src/core/call/request_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ class RequestBuffer {
// reader. All other readers will see failure.
void Commit(Reader* winner);

bool committed() const {
MutexLock lock(&mu_);
return winner_ != nullptr;
}

private:
// Buffering state: we're collecting metadata and messages.
struct Buffering {
Expand Down Expand Up @@ -168,7 +173,7 @@ class RequestBuffer {
readers_.erase(reader);
}

Mutex mu_;
mutable Mutex mu_;
Reader* winner_ ABSL_GUARDED_BY(mu_){nullptr};
State state_ ABSL_GUARDED_BY(mu_){Buffering{}};
// TODO(ctiller): change this to an intrusively linked list to avoid
Expand Down
92 changes: 90 additions & 2 deletions src/core/client_channel/retry_interceptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,23 @@

namespace grpc_core {

namespace {
size_t GetMaxPerRpcRetryBufferSize(const ChannelArgs& args) {
// By default, we buffer 256 KiB per RPC for retries.
// TODO(roth): Do we have any data to suggest a better value?
static constexpr int kDefaultPerRpcRetryBufferSize = (256 << 10);
return Clamp(args.GetInt(GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE)
.value_or(kDefaultPerRpcRetryBufferSize),
0, INT_MAX);
}
} // namespace

////////////////////////////////////////////////////////////////////////////////
// RetryInterceptor

RetryInterceptor::RetryInterceptor(const ChannelArgs& args)
: per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)) {}

void RetryInterceptor::InterceptCall(
UnstartedCallHandler unstarted_call_handler) {
auto call_handler = unstarted_call_handler.StartCall();
Expand Down Expand Up @@ -87,11 +101,78 @@ void RetryInterceptor::Call::StartAttempt() {
}

void RetryInterceptor::Call::MaybeCommit(size_t buffered) {
if (buffered >= interceptor_->MaxBuffered()) {
if (buffered >= interceptor_->per_rpc_retry_buffer_size_) {
current_attempt_->Commit();
}
}

absl::optional<Duration> RetryInterceptor::Call::ShouldRetry(
const ServerMetadata& md,
absl::FunctionRef<std::string()> lazy_attempt_debug_string) {
// If no retry policy, don't retry.
if (retry_policy_ == nullptr) return absl::nullopt;
const auto status = md.get(GrpcStatusMetadata());
if (status.has_value()) {
if (GPR_LIKELY(*status == GRPC_STATUS_OK)) {
if (retry_throttle_data_ != nullptr) {
retry_throttle_data_->RecordSuccess();
}
GRPC_TRACE_LOG(retry, INFO)
<< lazy_attempt_debug_string() << ": call succeeded";
return absl::nullopt;
}
// Status is not OK. Check whether the status is retryable.
if (!retry_policy_->retryable_status_codes().Contains(*status)) {
GRPC_TRACE_LOG(retry, INFO) << lazy_attempt_debug_string() << ": status "
<< grpc_status_code_to_string(*status)
<< " not configured as retryable";
return absl::nullopt;
}
}
// Record the failure and check whether retries are throttled.
// Note that it's important for this check to come after the status
// code check above, since we should only record failures whose statuses
// match the configured retryable status codes, so that we don't count
// things like failures due to malformed requests (INVALID_ARGUMENT).
// Conversely, it's important for this to come before the remaining
// checks, so that we don't fail to record failures due to other factors.
if (retry_throttle_data_ != nullptr &&
!retry_throttle_data_->RecordFailure()) {
GRPC_TRACE_LOG(retry, INFO)
<< lazy_attempt_debug_string() << ": retries throttled";
return absl::nullopt;
}
// Check whether the call is committed.
if (request_buffer_.committed()) {
GRPC_TRACE_LOG(retry, INFO)
<< lazy_attempt_debug_string() << ": retries already committed";
return absl::nullopt;
}
// Check whether we have retries remaining.
++num_attempts_completed_;
if (num_attempts_completed_ >= retry_policy_->max_attempts()) {
GRPC_TRACE_LOG(retry, INFO)
<< lazy_attempt_debug_string() << ": exceeded "
<< retry_policy_->max_attempts() << " retry attempts";
return absl::nullopt;
}
// Check server push-back.
auto server_pushback = md.get(GrpcRetryPushbackMsMetadata());
if (server_pushback.has_value()) {
if (*server_pushback < Duration::Zero()) {
GRPC_TRACE_LOG(retry, INFO) << lazy_attempt_debug_string()
<< ": not retrying due to server push-back";
return absl::nullopt;
} else {
GRPC_TRACE_LOG(retry, INFO)
<< lazy_attempt_debug_string() << ": server push-back: retry in "
<< server_pushback->millis() << " ms";
}
}
// We should retry.
return server_pushback;
}

////////////////////////////////////////////////////////////////////////////////
// RetryInterceptor::Attempt

Expand All @@ -118,7 +199,12 @@ auto RetryInterceptor::Attempt::ServerToClientGotInitialMetadata(
auto RetryInterceptor::Attempt::ServerToClientGotTrailersOnlyResponse() {
return Seq(initiator_.PullServerTrailingMetadata(),
[self = Ref()](ServerMetadataHandle md) {
auto pushback = self->call_->ShouldRetry(*md);
auto pushback = self->call_->ShouldRetry(
*md, [self = self.get()]() -> std::string {
return absl::StrFormat("call:%s attempt:%p",
Activity::current()->DebugTag(),
self);
});
return If(
pushback.has_value(),
[self, pushback]() {
Expand Down Expand Up @@ -174,4 +260,6 @@ void RetryInterceptor::Attempt::Start() {
});
}

void RetryInterceptor::Attempt::Cancel() { initiator_.SpawnCancel(); }

} // namespace grpc_core
17 changes: 13 additions & 4 deletions src/core/client_channel/retry_interceptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@
#define RETRY_INTERCEPTOR_H

#include "src/core/call/request_buffer.h"
#include "src/core/client_channel/retry_service_config.h"
#include "src/core/client_channel/retry_throttle.h"
#include "src/core/filter/filter_args.h"
#include "src/core/lib/transport/interception_chain.h"

namespace grpc_core {

class RetryInterceptor : public Interceptor {
public:
static RefCountedPtr<RetryInterceptor> Create(const ChannelArgs&,
explicit RetryInterceptor(const ChannelArgs& args);

static RefCountedPtr<RetryInterceptor> Create(const ChannelArgs& args,
const FilterArgs&) {
return MakeRefCounted<RetryInterceptor>();
return MakeRefCounted<RetryInterceptor>(args);
}

void Orphaned() override {}
Expand All @@ -48,7 +52,9 @@ class RetryInterceptor : public Interceptor {
RetryInterceptor* interceptor() { return interceptor_.get(); }
// if nullopt --> commit & don't retry
// if duration --> retry after duration
absl::optional<Duration> ShouldRetry(const ServerMetadata& md);
absl::optional<Duration> ShouldRetry(
const ServerMetadata& md,
absl::FunctionRef<std::string()> lazy_attempt_debug_string);

private:
void MaybeCommit(size_t buffered);
Expand All @@ -58,6 +64,9 @@ class RetryInterceptor : public Interceptor {
CallHandler call_handler_;
RefCountedPtr<RetryInterceptor> interceptor_;
RefCountedPtr<Attempt> current_attempt_;
const internal::RetryMethodConfig* retry_policy_ = nullptr;
RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data_;
int num_attempts_completed_ = 0;
};

class Attempt
Expand All @@ -80,7 +89,7 @@ class RetryInterceptor : public Interceptor {
CallInitiator initiator_;
};

size_t MaxBuffered() const;
const size_t per_rpc_retry_buffer_size_;
};

} // namespace grpc_core
Expand Down
1 change: 1 addition & 0 deletions src/core/lib/transport/interception_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <cstddef>

#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/call_destination.h"
#include "src/core/lib/transport/call_filters.h"
#include "src/core/lib/transport/call_spine.h"
Expand Down
6 changes: 2 additions & 4 deletions test/core/end2end/end2end_test_suites.cc
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,8 @@ class ChaoticGoodFixture : public CoreTestFixture {

grpc_channel* MakeClient(const ChannelArgs& args,
grpc_completion_queue*) override {
auto* client = grpc_chaotic_good_channel_create(
localaddr_.c_str(),
args.Set(GRPC_ARG_ENABLE_RETRIES, false).ToC().get());
auto* client =
grpc_chaotic_good_channel_create(localaddr_.c_str(), args.ToC().get());
return client;
}

Expand Down Expand Up @@ -1002,7 +1001,6 @@ std::vector<CoreTestConfiguration> DefaultConfigs() {
#endif
CoreTestConfiguration{"ChaoticGoodFullStack",
FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL |
FEATURE_MASK_DOES_NOT_SUPPORT_RETRY |
FEATURE_MASK_DOES_NOT_SUPPORT_WRITE_BUFFERING |
FEATURE_MASK_IS_CALL_V3,
nullptr,
Expand Down

0 comments on commit d0e1d78

Please sign in to comment.