Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Dec 5, 2024
1 parent 0b4c5fb commit 78e61ea
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 83 deletions.
1 change: 1 addition & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3535,6 +3535,7 @@ grpc_cc_library(
"retry_service_config",
"retry_throttle",
"sleep",
"//:backoff",
],
)

Expand Down
198 changes: 125 additions & 73 deletions src/core/client_channel/retry_interceptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,133 @@ size_t GetMaxPerRpcRetryBufferSize(const ChannelArgs& args) {
}
} // namespace

namespace retry_detail {

absl::optional<Duration> RetryState::ShouldRetry(
const ServerMetadata& md, bool committed,
absl::FunctionRef<std::string()> lazy_attempt_debug_string) {
// If no retry policy, don't retry.
if (retry_policy_ == nullptr) {
GRPC_TRACE_LOG(retry, INFO)
<< lazy_attempt_debug_string() << " no retry policy";
return absl::nullopt;
}
const auto status = md.get(GrpcStatusMetadata());
if (status.has_value()) {
if (GPR_LIKELY(*status == GRPC_STATUS_OK)) {
if (retry_throttle_data_ != nullptr) {
retry_throttle_data_->RecordSuccess();
}
GRPC_TRACE_LOG(retry, INFO)
<< lazy_attempt_debug_string() << " call succeeded";
return absl::nullopt;
}
// Status is not OK. Check whether the status is retryable.
if (!retry_policy_->retryable_status_codes().Contains(*status)) {
GRPC_TRACE_LOG(retry, INFO) << lazy_attempt_debug_string() << ": status "
<< grpc_status_code_to_string(*status)
<< " not configured as retryable";
return absl::nullopt;
}
}
// Record the failure and check whether retries are throttled.
// Note that it's important for this check to come after the status
// code check above, since we should only record failures whose statuses
// match the configured retryable status codes, so that we don't count
// things like failures due to malformed requests (INVALID_ARGUMENT).
// Conversely, it's important for this to come before the remaining
// checks, so that we don't fail to record failures due to other factors.
if (retry_throttle_data_ != nullptr &&
!retry_throttle_data_->RecordFailure()) {
GRPC_TRACE_LOG(retry, INFO)
<< lazy_attempt_debug_string() << " retries throttled";
return absl::nullopt;
}
// Check whether the call is committed.
if (committed) {
GRPC_TRACE_LOG(retry, INFO)
<< lazy_attempt_debug_string() << " retries already committed";
return absl::nullopt;
}
// Check whether we have retries remaining.
++num_attempts_completed_;
if (num_attempts_completed_ >= retry_policy_->max_attempts()) {
GRPC_TRACE_LOG(retry, INFO)
<< lazy_attempt_debug_string() << " exceeded "
<< retry_policy_->max_attempts() << " retry attempts";
return absl::nullopt;
}
// Check server push-back.
const auto server_pushback = md.get(GrpcRetryPushbackMsMetadata());
if (server_pushback.has_value() && server_pushback < Duration::Zero()) {
GRPC_TRACE_LOG(retry, INFO) << lazy_attempt_debug_string()
<< " not retrying due to server push-back";
return absl::nullopt;
}
// We should retry.
Duration next_attempt_timeout;
if (server_pushback.has_value()) {
CHECK_GE(*server_pushback, Duration::Zero());
next_attempt_timeout = *server_pushback;
retry_backoff_.Reset();
} else {
next_attempt_timeout = retry_backoff_.NextAttemptDelay();
}
GRPC_TRACE_LOG(retry, INFO)
<< lazy_attempt_debug_string() << " server push-back: retry in "
<< next_attempt_timeout;
return next_attempt_timeout;
}

absl::StatusOr<RefCountedPtr<internal::ServerRetryThrottleData>>
ServerRetryThrottleDataFromChannelArgs(const ChannelArgs& args) {
// Get retry throttling parameters from service config.
auto* service_config = args.GetObject<ServiceConfig>();
if (service_config == nullptr) return nullptr;
const auto* config = static_cast<const internal::RetryGlobalConfig*>(
service_config->GetGlobalParsedConfig(
internal::RetryServiceConfigParser::ParserIndex()));
if (config == nullptr) return nullptr;
// Get server name from target URI.
auto server_uri = args.GetString(GRPC_ARG_SERVER_URI);
if (!server_uri.has_value()) {
return GRPC_ERROR_CREATE(
"server URI channel arg missing or wrong type in client channel "
"filter");
}
absl::StatusOr<URI> uri = URI::Parse(*server_uri);
if (!uri.ok() || uri->path().empty()) {
return GRPC_ERROR_CREATE("could not extract server name from target URI");
}
std::string server_name(absl::StripPrefix(uri->path(), "/"));
// Get throttling config for server_name.
return internal::ServerRetryThrottleMap::Get()->GetDataForServer(
server_name, config->max_milli_tokens(), config->milli_token_ratio());
}

} // namespace retry_detail

////////////////////////////////////////////////////////////////////////////////
// RetryInterceptor

RetryInterceptor::RetryInterceptor(const ChannelArgs& args)
absl::StatusOr<RefCountedPtr<RetryInterceptor>> RetryInterceptor::Create(
const ChannelArgs& args, const FilterArgs&) {
auto retry_throttle_data =
retry_detail::ServerRetryThrottleDataFromChannelArgs(args);
if (!retry_throttle_data.ok()) {
return retry_throttle_data.status();
}
return MakeRefCounted<RetryInterceptor>(args,
std::move(*retry_throttle_data));
}

RetryInterceptor::RetryInterceptor(
const ChannelArgs& args,
RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data)
: per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)),
service_config_parser_index_(
internal::RetryServiceConfigParser::ParserIndex()) {}
internal::RetryServiceConfigParser::ParserIndex()),
retry_throttle_data_(std::move(retry_throttle_data)) {}

void RetryInterceptor::InterceptCall(
UnstartedCallHandler unstarted_call_handler) {
Expand All @@ -65,9 +185,10 @@ RetryInterceptor::Call::Call(RefCountedPtr<RetryInterceptor> interceptor,
CallHandler call_handler)
: call_handler_(std::move(call_handler)),
interceptor_(std::move(interceptor)),
retry_policy_(interceptor_->GetRetryPolicy()) {
retry_state_(interceptor_->GetRetryPolicy(),
interceptor_->retry_throttle_data_) {
GRPC_TRACE_LOG(retry, INFO)
<< DebugTag() << " retry call created: " << *retry_policy_;
<< DebugTag() << " retry call created: " << retry_state_;
}

auto RetryInterceptor::Call::ClientToBuffer() {
Expand Down Expand Up @@ -128,75 +249,6 @@ void RetryInterceptor::Call::MaybeCommit(size_t buffered) {
}
}

absl::optional<Duration> RetryInterceptor::Call::ShouldRetry(
const ServerMetadata& md,
absl::FunctionRef<std::string()> lazy_attempt_debug_string) {
// If no retry policy, don't retry.
if (retry_policy_ == nullptr) {
GRPC_TRACE_LOG(retry, INFO)
<< lazy_attempt_debug_string() << " no retry policy";
return absl::nullopt;
}
const auto status = md.get(GrpcStatusMetadata());
if (status.has_value()) {
if (GPR_LIKELY(*status == GRPC_STATUS_OK)) {
if (retry_throttle_data_ != nullptr) {
retry_throttle_data_->RecordSuccess();
}
GRPC_TRACE_LOG(retry, INFO)
<< lazy_attempt_debug_string() << " call succeeded";
return absl::nullopt;
}
// Status is not OK. Check whether the status is retryable.
if (!retry_policy_->retryable_status_codes().Contains(*status)) {
GRPC_TRACE_LOG(retry, INFO) << lazy_attempt_debug_string() << ": status "
<< grpc_status_code_to_string(*status)
<< " not configured as retryable";
return absl::nullopt;
}
}
// Record the failure and check whether retries are throttled.
// Note that it's important for this check to come after the status
// code check above, since we should only record failures whose statuses
// match the configured retryable status codes, so that we don't count
// things like failures due to malformed requests (INVALID_ARGUMENT).
// Conversely, it's important for this to come before the remaining
// checks, so that we don't fail to record failures due to other factors.
if (retry_throttle_data_ != nullptr &&
!retry_throttle_data_->RecordFailure()) {
GRPC_TRACE_LOG(retry, INFO)
<< lazy_attempt_debug_string() << " retries throttled";
return absl::nullopt;
}
// Check whether the call is committed.
if (request_buffer_.committed()) {
GRPC_TRACE_LOG(retry, INFO)
<< lazy_attempt_debug_string() << " retries already committed";
return absl::nullopt;
}
// Check whether we have retries remaining.
++num_attempts_completed_;
if (num_attempts_completed_ >= retry_policy_->max_attempts()) {
GRPC_TRACE_LOG(retry, INFO)
<< lazy_attempt_debug_string() << " exceeded "
<< retry_policy_->max_attempts() << " retry attempts";
return absl::nullopt;
}
// Check server push-back.
auto server_pushback =
md.get(GrpcRetryPushbackMsMetadata()).value_or(Duration::Zero());
if (server_pushback < Duration::Zero()) {
GRPC_TRACE_LOG(retry, INFO) << lazy_attempt_debug_string()
<< " not retrying due to server push-back";
return absl::nullopt;
}
// We should retry.
GRPC_TRACE_LOG(retry, INFO)
<< lazy_attempt_debug_string() << " server push-back: retry in "
<< server_pushback;
return server_pushback;
}

std::string RetryInterceptor::Call::DebugTag() {
return absl::StrFormat("%s call:%p", Activity::current()->DebugTag(), this);
}
Expand Down
76 changes: 66 additions & 10 deletions src/core/client_channel/retry_interceptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,69 @@
#include "src/core/client_channel/retry_throttle.h"
#include "src/core/filter/filter_args.h"
#include "src/core/lib/transport/interception_chain.h"
#include "src/core/util/backoff.h"

// Channel arg key for server URI string.
#define GRPC_ARG_SERVER_URI "grpc.server_uri"

namespace grpc_core {

namespace retry_detail {
class RetryState {
public:
RetryState(
const internal::RetryMethodConfig* retry_policy,
RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data)
: retry_policy_(retry_policy),
retry_throttle_data_(std::move(retry_throttle_data)),
retry_backoff_(
BackOff::Options()
.set_initial_backoff(retry_policy_ == nullptr
? Duration::Zero()
: retry_policy_->initial_backoff())
.set_multiplier(retry_policy_ == nullptr
? 0
: retry_policy_->backoff_multiplier())
// This value was picked arbitrarily. It can be changed if
// there is any even moderately compelling reason to do so.
.set_jitter(0.2)
.set_max_backoff(retry_policy_ == nullptr
? Duration::Zero()
: retry_policy_->max_backoff())) {}

// if nullopt --> commit & don't retry
// if duration --> retry after duration
absl::optional<Duration> ShouldRetry(
const ServerMetadata& md, bool committed,
absl::FunctionRef<std::string()> lazy_attempt_debug_string);
int num_attempts_completed() const { return num_attempts_completed_; }

template <typename Sink>
friend void AbslStringify(Sink& sink, const RetryState& state) {
sink.Append(absl::StrCat("policy:{", *state.retry_policy_, "} throttle:",
state.retry_throttle_data_ != nullptr,
" attempts:", state.num_attempts_completed_));
}

private:
const internal::RetryMethodConfig* const retry_policy_;
RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data_;
int num_attempts_completed_ = 0;
BackOff retry_backoff_;
};

absl::StatusOr<RefCountedPtr<internal::ServerRetryThrottleData>>
ServerRetryThrottleDataFromChannelArgs(const ChannelArgs& args);
} // namespace retry_detail

class RetryInterceptor : public Interceptor {
public:
explicit RetryInterceptor(const ChannelArgs& args);
RetryInterceptor(
const ChannelArgs& args,
RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data);

static RefCountedPtr<RetryInterceptor> Create(const ChannelArgs& args,
const FilterArgs&) {
return MakeRefCounted<RetryInterceptor>(args);
}
static absl::StatusOr<RefCountedPtr<RetryInterceptor>> Create(
const ChannelArgs& args, const FilterArgs&);

void Orphaned() override {}

Expand All @@ -54,8 +106,13 @@ class RetryInterceptor : public Interceptor {
// if duration --> retry after duration
absl::optional<Duration> ShouldRetry(
const ServerMetadata& md,
absl::FunctionRef<std::string()> lazy_attempt_debug_string);
int num_attempts_completed() const { return num_attempts_completed_; }
absl::FunctionRef<std::string()> lazy_attempt_debug_string) {
return retry_state_.ShouldRetry(md, request_buffer_.committed(),
lazy_attempt_debug_string);
}
int num_attempts_completed() const {
return retry_state_.num_attempts_completed();
}

std::string DebugTag();

Expand All @@ -67,9 +124,7 @@ class RetryInterceptor : public Interceptor {
CallHandler call_handler_;
RefCountedPtr<RetryInterceptor> interceptor_;
RefCountedPtr<Attempt> current_attempt_;
const internal::RetryMethodConfig* const retry_policy_;
RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data_;
int num_attempts_completed_ = 0;
retry_detail::RetryState retry_state_;
};

class Attempt
Expand Down Expand Up @@ -98,6 +153,7 @@ class RetryInterceptor : public Interceptor {

const size_t per_rpc_retry_buffer_size_;
const size_t service_config_parser_index_;
const RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data_;
};

} // namespace grpc_core
Expand Down

0 comments on commit 78e61ea

Please sign in to comment.