diff --git a/BUILD b/BUILD index 7bbf3ed6f477e..b2837b2796937 100644 --- a/BUILD +++ b/BUILD @@ -1945,6 +1945,7 @@ grpc_cc_library( "//src/core:channel_stack_type", "//src/core:closure", "//src/core:compression", + "//src/core:filter_args", "//src/core:connectivity_state", "//src/core:context", "//src/core:default_event_engine", @@ -3730,6 +3731,7 @@ grpc_cc_library( "//src/core:grpc_channel_idle_filter", "//src/core:grpc_service_config", "//src/core:idle_filter_state", + "//src/core:retry_interceptor", "//src/core:init_internally", "//src/core:interception_chain", "//src/core:iomgr_fwd", diff --git a/src/core/BUILD b/src/core/BUILD index 7a4aa2a4b6f15..f05753990d46f 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -3531,6 +3531,8 @@ grpc_cc_library( "interception_chain", "map", "request_buffer", + "retry_service_config", + "retry_throttle", "sleep", ], ) @@ -8553,6 +8555,7 @@ grpc_cc_library( "metadata", "ref_counted", "//:gpr_platform", + "//:grpc_trace", ], ) diff --git a/src/core/call/request_buffer.h b/src/core/call/request_buffer.h index 719a562bdba80..de5ee7297e8b9 100644 --- a/src/core/call/request_buffer.h +++ b/src/core/call/request_buffer.h @@ -105,6 +105,11 @@ class RequestBuffer { // reader. All other readers will see failure. void Commit(Reader* winner); + bool committed() const { + MutexLock lock(&mu_); + return winner_ != nullptr; + } + private: // Buffering state: we're collecting metadata and messages. struct Buffering { @@ -168,7 +173,7 @@ class RequestBuffer { readers_.erase(reader); } - Mutex mu_; + mutable Mutex mu_; Reader* winner_ ABSL_GUARDED_BY(mu_){nullptr}; State state_ ABSL_GUARDED_BY(mu_){Buffering{}}; // TODO(ctiller): change this to an intrusively linked list to avoid diff --git a/src/core/client_channel/retry_interceptor.cc b/src/core/client_channel/retry_interceptor.cc index 38f9e98c5f0a4..92c762298009e 100644 --- a/src/core/client_channel/retry_interceptor.cc +++ b/src/core/client_channel/retry_interceptor.cc @@ -21,9 +21,23 @@ namespace grpc_core { +namespace { +size_t GetMaxPerRpcRetryBufferSize(const ChannelArgs& args) { + // By default, we buffer 256 KiB per RPC for retries. + // TODO(roth): Do we have any data to suggest a better value? + static constexpr int kDefaultPerRpcRetryBufferSize = (256 << 10); + return Clamp(args.GetInt(GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE) + .value_or(kDefaultPerRpcRetryBufferSize), + 0, INT_MAX); +} +} // namespace + //////////////////////////////////////////////////////////////////////////////// // RetryInterceptor +RetryInterceptor::RetryInterceptor(const ChannelArgs& args) + : per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)) {} + void RetryInterceptor::InterceptCall( UnstartedCallHandler unstarted_call_handler) { auto call_handler = unstarted_call_handler.StartCall(); @@ -87,11 +101,78 @@ void RetryInterceptor::Call::StartAttempt() { } void RetryInterceptor::Call::MaybeCommit(size_t buffered) { - if (buffered >= interceptor_->MaxBuffered()) { + if (buffered >= interceptor_->per_rpc_retry_buffer_size_) { current_attempt_->Commit(); } } +absl::optional RetryInterceptor::Call::ShouldRetry( + const ServerMetadata& md, + absl::FunctionRef lazy_attempt_debug_string) { + // If no retry policy, don't retry. + if (retry_policy_ == nullptr) return absl::nullopt; + const auto status = md.get(GrpcStatusMetadata()); + if (status.has_value()) { + if (GPR_LIKELY(*status == GRPC_STATUS_OK)) { + if (retry_throttle_data_ != nullptr) { + retry_throttle_data_->RecordSuccess(); + } + GRPC_TRACE_LOG(retry, INFO) + << lazy_attempt_debug_string() << ": call succeeded"; + return absl::nullopt; + } + // Status is not OK. Check whether the status is retryable. + if (!retry_policy_->retryable_status_codes().Contains(*status)) { + GRPC_TRACE_LOG(retry, INFO) << lazy_attempt_debug_string() << ": status " + << grpc_status_code_to_string(*status) + << " not configured as retryable"; + return absl::nullopt; + } + } + // Record the failure and check whether retries are throttled. + // Note that it's important for this check to come after the status + // code check above, since we should only record failures whose statuses + // match the configured retryable status codes, so that we don't count + // things like failures due to malformed requests (INVALID_ARGUMENT). + // Conversely, it's important for this to come before the remaining + // checks, so that we don't fail to record failures due to other factors. + if (retry_throttle_data_ != nullptr && + !retry_throttle_data_->RecordFailure()) { + GRPC_TRACE_LOG(retry, INFO) + << lazy_attempt_debug_string() << ": retries throttled"; + return absl::nullopt; + } + // Check whether the call is committed. + if (request_buffer_.committed()) { + GRPC_TRACE_LOG(retry, INFO) + << lazy_attempt_debug_string() << ": retries already committed"; + return absl::nullopt; + } + // Check whether we have retries remaining. + ++num_attempts_completed_; + if (num_attempts_completed_ >= retry_policy_->max_attempts()) { + GRPC_TRACE_LOG(retry, INFO) + << lazy_attempt_debug_string() << ": exceeded " + << retry_policy_->max_attempts() << " retry attempts"; + return absl::nullopt; + } + // Check server push-back. + auto server_pushback = md.get(GrpcRetryPushbackMsMetadata()); + if (server_pushback.has_value()) { + if (*server_pushback < Duration::Zero()) { + GRPC_TRACE_LOG(retry, INFO) << lazy_attempt_debug_string() + << ": not retrying due to server push-back"; + return absl::nullopt; + } else { + GRPC_TRACE_LOG(retry, INFO) + << lazy_attempt_debug_string() << ": server push-back: retry in " + << server_pushback->millis() << " ms"; + } + } + // We should retry. + return server_pushback; +} + //////////////////////////////////////////////////////////////////////////////// // RetryInterceptor::Attempt @@ -118,7 +199,12 @@ auto RetryInterceptor::Attempt::ServerToClientGotInitialMetadata( auto RetryInterceptor::Attempt::ServerToClientGotTrailersOnlyResponse() { return Seq(initiator_.PullServerTrailingMetadata(), [self = Ref()](ServerMetadataHandle md) { - auto pushback = self->call_->ShouldRetry(*md); + auto pushback = self->call_->ShouldRetry( + *md, [self = self.get()]() -> std::string { + return absl::StrFormat("call:%s attempt:%p", + Activity::current()->DebugTag(), + self); + }); return If( pushback.has_value(), [self, pushback]() { @@ -174,4 +260,6 @@ void RetryInterceptor::Attempt::Start() { }); } +void RetryInterceptor::Attempt::Cancel() { initiator_.SpawnCancel(); } + } // namespace grpc_core diff --git a/src/core/client_channel/retry_interceptor.h b/src/core/client_channel/retry_interceptor.h index 5b64318b0eeb0..f00f6f5d65ffa 100644 --- a/src/core/client_channel/retry_interceptor.h +++ b/src/core/client_channel/retry_interceptor.h @@ -16,6 +16,8 @@ #define RETRY_INTERCEPTOR_H #include "src/core/call/request_buffer.h" +#include "src/core/client_channel/retry_service_config.h" +#include "src/core/client_channel/retry_throttle.h" #include "src/core/filter/filter_args.h" #include "src/core/lib/transport/interception_chain.h" @@ -23,9 +25,11 @@ namespace grpc_core { class RetryInterceptor : public Interceptor { public: - static RefCountedPtr Create(const ChannelArgs&, + explicit RetryInterceptor(const ChannelArgs& args); + + static RefCountedPtr Create(const ChannelArgs& args, const FilterArgs&) { - return MakeRefCounted(); + return MakeRefCounted(args); } void Orphaned() override {} @@ -48,7 +52,9 @@ class RetryInterceptor : public Interceptor { RetryInterceptor* interceptor() { return interceptor_.get(); } // if nullopt --> commit & don't retry // if duration --> retry after duration - absl::optional ShouldRetry(const ServerMetadata& md); + absl::optional ShouldRetry( + const ServerMetadata& md, + absl::FunctionRef lazy_attempt_debug_string); private: void MaybeCommit(size_t buffered); @@ -58,6 +64,9 @@ class RetryInterceptor : public Interceptor { CallHandler call_handler_; RefCountedPtr interceptor_; RefCountedPtr current_attempt_; + const internal::RetryMethodConfig* retry_policy_ = nullptr; + RefCountedPtr retry_throttle_data_; + int num_attempts_completed_ = 0; }; class Attempt @@ -80,7 +89,7 @@ class RetryInterceptor : public Interceptor { CallInitiator initiator_; }; - size_t MaxBuffered() const; + const size_t per_rpc_retry_buffer_size_; }; } // namespace grpc_core diff --git a/src/core/lib/transport/interception_chain.cc b/src/core/lib/transport/interception_chain.cc index 202fd17fe8266..555c79abf8111 100644 --- a/src/core/lib/transport/interception_chain.cc +++ b/src/core/lib/transport/interception_chain.cc @@ -18,6 +18,7 @@ #include +#include "src/core/lib/debug/trace.h" #include "src/core/lib/transport/call_destination.h" #include "src/core/lib/transport/call_filters.h" #include "src/core/lib/transport/call_spine.h" diff --git a/test/core/end2end/end2end_test_suites.cc b/test/core/end2end/end2end_test_suites.cc index eedc27878a015..9ca17ec75befd 100644 --- a/test/core/end2end/end2end_test_suites.cc +++ b/test/core/end2end/end2end_test_suites.cc @@ -544,9 +544,8 @@ class ChaoticGoodFixture : public CoreTestFixture { grpc_channel* MakeClient(const ChannelArgs& args, grpc_completion_queue*) override { - auto* client = grpc_chaotic_good_channel_create( - localaddr_.c_str(), - args.Set(GRPC_ARG_ENABLE_RETRIES, false).ToC().get()); + auto* client = + grpc_chaotic_good_channel_create(localaddr_.c_str(), args.ToC().get()); return client; } @@ -1002,7 +1001,6 @@ std::vector DefaultConfigs() { #endif CoreTestConfiguration{"ChaoticGoodFullStack", FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | - FEATURE_MASK_DOES_NOT_SUPPORT_RETRY | FEATURE_MASK_DOES_NOT_SUPPORT_WRITE_BUFFERING | FEATURE_MASK_IS_CALL_V3, nullptr,