From 78e61eab088789a3ffd3f75eb4fe2f29599651ed Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 4 Dec 2024 23:00:30 -0800 Subject: [PATCH] x --- src/core/BUILD | 1 + src/core/client_channel/retry_interceptor.cc | 198 ++++++++++++------- src/core/client_channel/retry_interceptor.h | 76 ++++++- 3 files changed, 192 insertions(+), 83 deletions(-) diff --git a/src/core/BUILD b/src/core/BUILD index 460f0aaf62cc7..14c23d212032f 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -3535,6 +3535,7 @@ grpc_cc_library( "retry_service_config", "retry_throttle", "sleep", + "//:backoff", ], ) diff --git a/src/core/client_channel/retry_interceptor.cc b/src/core/client_channel/retry_interceptor.cc index ed49968569322..cdf2147489982 100644 --- a/src/core/client_channel/retry_interceptor.cc +++ b/src/core/client_channel/retry_interceptor.cc @@ -33,13 +33,133 @@ size_t GetMaxPerRpcRetryBufferSize(const ChannelArgs& args) { } } // namespace +namespace retry_detail { + +absl::optional RetryState::ShouldRetry( + const ServerMetadata& md, bool committed, + absl::FunctionRef lazy_attempt_debug_string) { + // If no retry policy, don't retry. + if (retry_policy_ == nullptr) { + GRPC_TRACE_LOG(retry, INFO) + << lazy_attempt_debug_string() << " no retry policy"; + return absl::nullopt; + } + const auto status = md.get(GrpcStatusMetadata()); + if (status.has_value()) { + if (GPR_LIKELY(*status == GRPC_STATUS_OK)) { + if (retry_throttle_data_ != nullptr) { + retry_throttle_data_->RecordSuccess(); + } + GRPC_TRACE_LOG(retry, INFO) + << lazy_attempt_debug_string() << " call succeeded"; + return absl::nullopt; + } + // Status is not OK. Check whether the status is retryable. + if (!retry_policy_->retryable_status_codes().Contains(*status)) { + GRPC_TRACE_LOG(retry, INFO) << lazy_attempt_debug_string() << ": status " + << grpc_status_code_to_string(*status) + << " not configured as retryable"; + return absl::nullopt; + } + } + // Record the failure and check whether retries are throttled. + // Note that it's important for this check to come after the status + // code check above, since we should only record failures whose statuses + // match the configured retryable status codes, so that we don't count + // things like failures due to malformed requests (INVALID_ARGUMENT). + // Conversely, it's important for this to come before the remaining + // checks, so that we don't fail to record failures due to other factors. + if (retry_throttle_data_ != nullptr && + !retry_throttle_data_->RecordFailure()) { + GRPC_TRACE_LOG(retry, INFO) + << lazy_attempt_debug_string() << " retries throttled"; + return absl::nullopt; + } + // Check whether the call is committed. + if (committed) { + GRPC_TRACE_LOG(retry, INFO) + << lazy_attempt_debug_string() << " retries already committed"; + return absl::nullopt; + } + // Check whether we have retries remaining. + ++num_attempts_completed_; + if (num_attempts_completed_ >= retry_policy_->max_attempts()) { + GRPC_TRACE_LOG(retry, INFO) + << lazy_attempt_debug_string() << " exceeded " + << retry_policy_->max_attempts() << " retry attempts"; + return absl::nullopt; + } + // Check server push-back. + const auto server_pushback = md.get(GrpcRetryPushbackMsMetadata()); + if (server_pushback.has_value() && server_pushback < Duration::Zero()) { + GRPC_TRACE_LOG(retry, INFO) << lazy_attempt_debug_string() + << " not retrying due to server push-back"; + return absl::nullopt; + } + // We should retry. + Duration next_attempt_timeout; + if (server_pushback.has_value()) { + CHECK_GE(*server_pushback, Duration::Zero()); + next_attempt_timeout = *server_pushback; + retry_backoff_.Reset(); + } else { + next_attempt_timeout = retry_backoff_.NextAttemptDelay(); + } + GRPC_TRACE_LOG(retry, INFO) + << lazy_attempt_debug_string() << " server push-back: retry in " + << next_attempt_timeout; + return next_attempt_timeout; +} + +absl::StatusOr> +ServerRetryThrottleDataFromChannelArgs(const ChannelArgs& args) { + // Get retry throttling parameters from service config. + auto* service_config = args.GetObject(); + if (service_config == nullptr) return nullptr; + const auto* config = static_cast( + service_config->GetGlobalParsedConfig( + internal::RetryServiceConfigParser::ParserIndex())); + if (config == nullptr) return nullptr; + // Get server name from target URI. + auto server_uri = args.GetString(GRPC_ARG_SERVER_URI); + if (!server_uri.has_value()) { + return GRPC_ERROR_CREATE( + "server URI channel arg missing or wrong type in client channel " + "filter"); + } + absl::StatusOr uri = URI::Parse(*server_uri); + if (!uri.ok() || uri->path().empty()) { + return GRPC_ERROR_CREATE("could not extract server name from target URI"); + } + std::string server_name(absl::StripPrefix(uri->path(), "/")); + // Get throttling config for server_name. + return internal::ServerRetryThrottleMap::Get()->GetDataForServer( + server_name, config->max_milli_tokens(), config->milli_token_ratio()); +} + +} // namespace retry_detail + //////////////////////////////////////////////////////////////////////////////// // RetryInterceptor -RetryInterceptor::RetryInterceptor(const ChannelArgs& args) +absl::StatusOr> RetryInterceptor::Create( + const ChannelArgs& args, const FilterArgs&) { + auto retry_throttle_data = + retry_detail::ServerRetryThrottleDataFromChannelArgs(args); + if (!retry_throttle_data.ok()) { + return retry_throttle_data.status(); + } + return MakeRefCounted(args, + std::move(*retry_throttle_data)); +} + +RetryInterceptor::RetryInterceptor( + const ChannelArgs& args, + RefCountedPtr retry_throttle_data) : per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)), service_config_parser_index_( - internal::RetryServiceConfigParser::ParserIndex()) {} + internal::RetryServiceConfigParser::ParserIndex()), + retry_throttle_data_(std::move(retry_throttle_data)) {} void RetryInterceptor::InterceptCall( UnstartedCallHandler unstarted_call_handler) { @@ -65,9 +185,10 @@ RetryInterceptor::Call::Call(RefCountedPtr interceptor, CallHandler call_handler) : call_handler_(std::move(call_handler)), interceptor_(std::move(interceptor)), - retry_policy_(interceptor_->GetRetryPolicy()) { + retry_state_(interceptor_->GetRetryPolicy(), + interceptor_->retry_throttle_data_) { GRPC_TRACE_LOG(retry, INFO) - << DebugTag() << " retry call created: " << *retry_policy_; + << DebugTag() << " retry call created: " << retry_state_; } auto RetryInterceptor::Call::ClientToBuffer() { @@ -128,75 +249,6 @@ void RetryInterceptor::Call::MaybeCommit(size_t buffered) { } } -absl::optional RetryInterceptor::Call::ShouldRetry( - const ServerMetadata& md, - absl::FunctionRef lazy_attempt_debug_string) { - // If no retry policy, don't retry. - if (retry_policy_ == nullptr) { - GRPC_TRACE_LOG(retry, INFO) - << lazy_attempt_debug_string() << " no retry policy"; - return absl::nullopt; - } - const auto status = md.get(GrpcStatusMetadata()); - if (status.has_value()) { - if (GPR_LIKELY(*status == GRPC_STATUS_OK)) { - if (retry_throttle_data_ != nullptr) { - retry_throttle_data_->RecordSuccess(); - } - GRPC_TRACE_LOG(retry, INFO) - << lazy_attempt_debug_string() << " call succeeded"; - return absl::nullopt; - } - // Status is not OK. Check whether the status is retryable. - if (!retry_policy_->retryable_status_codes().Contains(*status)) { - GRPC_TRACE_LOG(retry, INFO) << lazy_attempt_debug_string() << ": status " - << grpc_status_code_to_string(*status) - << " not configured as retryable"; - return absl::nullopt; - } - } - // Record the failure and check whether retries are throttled. - // Note that it's important for this check to come after the status - // code check above, since we should only record failures whose statuses - // match the configured retryable status codes, so that we don't count - // things like failures due to malformed requests (INVALID_ARGUMENT). - // Conversely, it's important for this to come before the remaining - // checks, so that we don't fail to record failures due to other factors. - if (retry_throttle_data_ != nullptr && - !retry_throttle_data_->RecordFailure()) { - GRPC_TRACE_LOG(retry, INFO) - << lazy_attempt_debug_string() << " retries throttled"; - return absl::nullopt; - } - // Check whether the call is committed. - if (request_buffer_.committed()) { - GRPC_TRACE_LOG(retry, INFO) - << lazy_attempt_debug_string() << " retries already committed"; - return absl::nullopt; - } - // Check whether we have retries remaining. - ++num_attempts_completed_; - if (num_attempts_completed_ >= retry_policy_->max_attempts()) { - GRPC_TRACE_LOG(retry, INFO) - << lazy_attempt_debug_string() << " exceeded " - << retry_policy_->max_attempts() << " retry attempts"; - return absl::nullopt; - } - // Check server push-back. - auto server_pushback = - md.get(GrpcRetryPushbackMsMetadata()).value_or(Duration::Zero()); - if (server_pushback < Duration::Zero()) { - GRPC_TRACE_LOG(retry, INFO) << lazy_attempt_debug_string() - << " not retrying due to server push-back"; - return absl::nullopt; - } - // We should retry. - GRPC_TRACE_LOG(retry, INFO) - << lazy_attempt_debug_string() << " server push-back: retry in " - << server_pushback; - return server_pushback; -} - std::string RetryInterceptor::Call::DebugTag() { return absl::StrFormat("%s call:%p", Activity::current()->DebugTag(), this); } diff --git a/src/core/client_channel/retry_interceptor.h b/src/core/client_channel/retry_interceptor.h index 8f772e4de4683..58a7655dfd90f 100644 --- a/src/core/client_channel/retry_interceptor.h +++ b/src/core/client_channel/retry_interceptor.h @@ -20,17 +20,69 @@ #include "src/core/client_channel/retry_throttle.h" #include "src/core/filter/filter_args.h" #include "src/core/lib/transport/interception_chain.h" +#include "src/core/util/backoff.h" + +// Channel arg key for server URI string. +#define GRPC_ARG_SERVER_URI "grpc.server_uri" namespace grpc_core { +namespace retry_detail { +class RetryState { + public: + RetryState( + const internal::RetryMethodConfig* retry_policy, + RefCountedPtr retry_throttle_data) + : retry_policy_(retry_policy), + retry_throttle_data_(std::move(retry_throttle_data)), + retry_backoff_( + BackOff::Options() + .set_initial_backoff(retry_policy_ == nullptr + ? Duration::Zero() + : retry_policy_->initial_backoff()) + .set_multiplier(retry_policy_ == nullptr + ? 0 + : retry_policy_->backoff_multiplier()) + // This value was picked arbitrarily. It can be changed if + // there is any even moderately compelling reason to do so. + .set_jitter(0.2) + .set_max_backoff(retry_policy_ == nullptr + ? Duration::Zero() + : retry_policy_->max_backoff())) {} + + // if nullopt --> commit & don't retry + // if duration --> retry after duration + absl::optional ShouldRetry( + const ServerMetadata& md, bool committed, + absl::FunctionRef lazy_attempt_debug_string); + int num_attempts_completed() const { return num_attempts_completed_; } + + template + friend void AbslStringify(Sink& sink, const RetryState& state) { + sink.Append(absl::StrCat("policy:{", *state.retry_policy_, "} throttle:", + state.retry_throttle_data_ != nullptr, + " attempts:", state.num_attempts_completed_)); + } + + private: + const internal::RetryMethodConfig* const retry_policy_; + RefCountedPtr retry_throttle_data_; + int num_attempts_completed_ = 0; + BackOff retry_backoff_; +}; + +absl::StatusOr> +ServerRetryThrottleDataFromChannelArgs(const ChannelArgs& args); +} // namespace retry_detail + class RetryInterceptor : public Interceptor { public: - explicit RetryInterceptor(const ChannelArgs& args); + RetryInterceptor( + const ChannelArgs& args, + RefCountedPtr retry_throttle_data); - static RefCountedPtr Create(const ChannelArgs& args, - const FilterArgs&) { - return MakeRefCounted(args); - } + static absl::StatusOr> Create( + const ChannelArgs& args, const FilterArgs&); void Orphaned() override {} @@ -54,8 +106,13 @@ class RetryInterceptor : public Interceptor { // if duration --> retry after duration absl::optional ShouldRetry( const ServerMetadata& md, - absl::FunctionRef lazy_attempt_debug_string); - int num_attempts_completed() const { return num_attempts_completed_; } + absl::FunctionRef lazy_attempt_debug_string) { + return retry_state_.ShouldRetry(md, request_buffer_.committed(), + lazy_attempt_debug_string); + } + int num_attempts_completed() const { + return retry_state_.num_attempts_completed(); + } std::string DebugTag(); @@ -67,9 +124,7 @@ class RetryInterceptor : public Interceptor { CallHandler call_handler_; RefCountedPtr interceptor_; RefCountedPtr current_attempt_; - const internal::RetryMethodConfig* const retry_policy_; - RefCountedPtr retry_throttle_data_; - int num_attempts_completed_ = 0; + retry_detail::RetryState retry_state_; }; class Attempt @@ -98,6 +153,7 @@ class RetryInterceptor : public Interceptor { const size_t per_rpc_retry_buffer_size_; const size_t service_config_parser_index_; + const RefCountedPtr retry_throttle_data_; }; } // namespace grpc_core