Skip to content

Commit

Permalink
Native source sync at revision @2453c26
Browse files Browse the repository at this point in the history
  • Loading branch information
dennycd authored and HannahShiSFB committed Dec 20, 2024
1 parent 2453c26 commit c7cb8a8
Show file tree
Hide file tree
Showing 29 changed files with 1,667 additions and 808 deletions.
3 changes: 1 addition & 2 deletions gRPC-Core.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ Pod::Spec.new do |s|
ss.libraries = 'z'
ss.dependency "#{s.name}/Interface", version
ss.dependency "#{s.name}/Privacy", version
ss.dependency 'BoringSSL-GRPC', '0.0.37'
ss.dependency 'BoringSSL-GRPC', '0.0.38'
ss.dependency 'abseil/algorithm/container', abseil_version
ss.dependency 'abseil/base/base', abseil_version
ss.dependency 'abseil/base/config', abseil_version
Expand Down Expand Up @@ -1963,7 +1963,6 @@ Pod::Spec.new do |s|
'src/core/tsi/transport_security_interface.h',
'src/core/util/alloc.cc',
'src/core/util/alloc.h',
'src/core/util/atm.cc',
'src/core/util/atomic_utils.h',
'src/core/util/avl.h',
'src/core/util/backoff.cc',
Expand Down
13 changes: 0 additions & 13 deletions include/grpc/support/atm.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,4 @@
#error could not determine platform for atm
#endif

#ifdef __cplusplus
extern "C" {
#endif

/** Adds \a delta to \a *value, clamping the result to the range specified
by \a min and \a max. Returns the new value. */
gpr_atm gpr_atm_no_barrier_clamped_add(gpr_atm* value, gpr_atm delta,
gpr_atm min, gpr_atm max);

#ifdef __cplusplus
}
#endif

#endif /* GRPC_SUPPORT_ATM_H */
51 changes: 33 additions & 18 deletions src/core/client_channel/retry_throttle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,33 @@

#include "src/core/client_channel/retry_throttle.h"

#include <grpc/support/atm.h>
#include <grpc/support/port_platform.h>

#include <atomic>
#include <cstdint>
#include <limits>
#include <map>
#include <string>
#include <utility>

#include "src/core/util/useful.h"

namespace grpc_core {
namespace internal {

namespace {
template <typename T>
T ClampedAdd(std::atomic<T>& value, T delta, T min, T max) {
T prev_value = value.load(std::memory_order_relaxed);
T new_value;
do {
new_value = Clamp(SaturatingAdd(prev_value, delta), min, max);
} while (!value.compare_exchange_weak(prev_value, new_value,
std::memory_order_relaxed));
return new_value;
}
} // namespace

//
// ServerRetryThrottleData
//
Expand All @@ -44,26 +61,24 @@ ServerRetryThrottleData::ServerRetryThrottleData(
// we will start out doing the same thing on the new one.
if (old_throttle_data != nullptr) {
double token_fraction =
static_cast<uintptr_t>(
gpr_atm_acq_load(&old_throttle_data->milli_tokens_)) /
static_cast<double>(
old_throttle_data->milli_tokens_.load(std::memory_order_relaxed)) /
static_cast<double>(old_throttle_data->max_milli_tokens_);
initial_milli_tokens =
static_cast<uintptr_t>(token_fraction * max_milli_tokens);
}
gpr_atm_rel_store(&milli_tokens_, static_cast<gpr_atm>(initial_milli_tokens));
milli_tokens_.store(initial_milli_tokens, std::memory_order_relaxed);
// If there was a pre-existing entry, mark it as stale and give it a
// pointer to the new entry, which is its replacement.
if (old_throttle_data != nullptr) {
Ref().release(); // Ref held by pre-existing entry.
gpr_atm_rel_store(&old_throttle_data->replacement_,
reinterpret_cast<gpr_atm>(this));
old_throttle_data->replacement_.store(this, std::memory_order_release);
}
}

ServerRetryThrottleData::~ServerRetryThrottleData() {
ServerRetryThrottleData* replacement =
reinterpret_cast<ServerRetryThrottleData*>(
gpr_atm_acq_load(&replacement_));
replacement_.load(std::memory_order_acquire);
if (replacement != nullptr) {
replacement->Unref();
}
Expand All @@ -73,8 +88,7 @@ void ServerRetryThrottleData::GetReplacementThrottleDataIfNeeded(
ServerRetryThrottleData** throttle_data) {
while (true) {
ServerRetryThrottleData* new_throttle_data =
reinterpret_cast<ServerRetryThrottleData*>(
gpr_atm_acq_load(&(*throttle_data)->replacement_));
(*throttle_data)->replacement_.load(std::memory_order_acquire);
if (new_throttle_data == nullptr) return;
*throttle_data = new_throttle_data;
}
Expand All @@ -85,10 +99,10 @@ bool ServerRetryThrottleData::RecordFailure() {
ServerRetryThrottleData* throttle_data = this;
GetReplacementThrottleDataIfNeeded(&throttle_data);
// We decrement milli_tokens by 1000 (1 token) for each failure.
const uintptr_t new_value =
static_cast<uintptr_t>(gpr_atm_no_barrier_clamped_add(
&throttle_data->milli_tokens_, gpr_atm{-1000}, gpr_atm{0},
static_cast<gpr_atm>(throttle_data->max_milli_tokens_)));
const uintptr_t new_value = ClampedAdd<intptr_t>(
throttle_data->milli_tokens_, -1000, 0,
std::min<uintptr_t>(throttle_data->max_milli_tokens_,
std::numeric_limits<intptr_t>::max()));
// Retries are allowed as long as the new value is above the threshold
// (max_milli_tokens / 2).
return new_value > throttle_data->max_milli_tokens_ / 2;
Expand All @@ -99,10 +113,11 @@ void ServerRetryThrottleData::RecordSuccess() {
ServerRetryThrottleData* throttle_data = this;
GetReplacementThrottleDataIfNeeded(&throttle_data);
// We increment milli_tokens by milli_token_ratio for each success.
gpr_atm_no_barrier_clamped_add(
&throttle_data->milli_tokens_,
static_cast<gpr_atm>(throttle_data->milli_token_ratio_), gpr_atm{0},
static_cast<gpr_atm>(throttle_data->max_milli_tokens_));
ClampedAdd<intptr_t>(
throttle_data->milli_tokens_, throttle_data->milli_token_ratio_, 0,
std::max<intptr_t>(
0, std::min<uintptr_t>(throttle_data->max_milli_tokens_,
std::numeric_limits<intptr_t>::max())));
}

//
Expand Down
6 changes: 3 additions & 3 deletions src/core/client_channel/retry_throttle.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
#ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_RETRY_THROTTLE_H
#define GRPC_SRC_CORE_CLIENT_CHANNEL_RETRY_THROTTLE_H

#include <grpc/support/atm.h>
#include <grpc/support/port_platform.h>
#include <stdint.h>

#include <atomic>
#include <map>
#include <string>

Expand Down Expand Up @@ -58,11 +58,11 @@ class ServerRetryThrottleData final

const uintptr_t max_milli_tokens_;
const uintptr_t milli_token_ratio_;
gpr_atm milli_tokens_;
std::atomic<intptr_t> milli_tokens_;
// A pointer to the replacement for this ServerRetryThrottleData entry.
// If non-nullptr, then this entry is stale and must not be used.
// We hold a reference to the replacement.
gpr_atm replacement_ = 0;
std::atomic<ServerRetryThrottleData*> replacement_{nullptr};
};

/// Global map of server name to retry throttle data.
Expand Down
2 changes: 2 additions & 0 deletions src/core/lib/debug/trace_flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ DebugOnlyTraceFlag fd_trace_trace(false, "fd_trace");
DebugOnlyTraceFlag lb_policy_refcount_trace(false, "lb_policy_refcount");
DebugOnlyTraceFlag party_state_trace(false, "party_state");
DebugOnlyTraceFlag pending_tags_trace(false, "pending_tags");
DebugOnlyTraceFlag ph2_trace(false, "ph2");
DebugOnlyTraceFlag polling_trace(false, "polling");
DebugOnlyTraceFlag polling_api_trace(false, "polling_api");
DebugOnlyTraceFlag promise_primitives_trace(false, "promise_primitives");
Expand Down Expand Up @@ -225,6 +226,7 @@ const absl::flat_hash_map<std::string, TraceFlag*>& GetAllTraceFlags() {
{"lb_policy_refcount", &lb_policy_refcount_trace},
{"party_state", &party_state_trace},
{"pending_tags", &pending_tags_trace},
{"ph2", &ph2_trace},
{"polling", &polling_trace},
{"polling_api", &polling_api_trace},
{"promise_primitives", &promise_primitives_trace},
Expand Down
1 change: 1 addition & 0 deletions src/core/lib/debug/trace_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ extern DebugOnlyTraceFlag fd_trace_trace;
extern DebugOnlyTraceFlag lb_policy_refcount_trace;
extern DebugOnlyTraceFlag party_state_trace;
extern DebugOnlyTraceFlag pending_tags_trace;
extern DebugOnlyTraceFlag ph2_trace;
extern DebugOnlyTraceFlag polling_trace;
extern DebugOnlyTraceFlag polling_api_trace;
extern DebugOnlyTraceFlag promise_primitives_trace;
Expand Down
39 changes: 0 additions & 39 deletions src/core/lib/experiments/experiments.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,10 @@ const char* const additional_constraints_backoff_cap_initial_at_max = "{}";
const char* const description_call_tracer_in_transport =
"Transport directly passes byte counts to CallTracer.";
const char* const additional_constraints_call_tracer_in_transport = "{}";
const char* const description_canary_client_privacy =
"If set, canary client privacy";
const char* const additional_constraints_canary_client_privacy = "{}";
const char* const description_chaotic_good_legacy_protocol =
"If set, use the first version of the chaotic-good protocol when that "
"protocol is enabled.";
const char* const additional_constraints_chaotic_good_legacy_protocol = "{}";
const char* const description_client_privacy = "If set, client privacy";
const char* const additional_constraints_client_privacy = "{}";
const char* const description_disable_buffer_hint_on_high_memory_pressure =
"Disable buffer hint flag parsing in the transport under high memory "
"pressure.";
Expand Down Expand Up @@ -118,8 +113,6 @@ const char* const description_schedule_cancellation_over_write =
"Allow cancellation op to be scheduled over a write";
const char* const additional_constraints_schedule_cancellation_over_write =
"{}";
const char* const description_server_privacy = "If set, server privacy";
const char* const additional_constraints_server_privacy = "{}";
const char* const description_tcp_frame_size_tuning =
"If set, enables TCP to use RPC size estimation made by higher layers. TCP "
"would not indicate completion of a read operation until a specified "
Expand Down Expand Up @@ -154,13 +147,9 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_backoff_cap_initial_at_max, nullptr, 0, true, true},
{"call_tracer_in_transport", description_call_tracer_in_transport,
additional_constraints_call_tracer_in_transport, nullptr, 0, true, true},
{"canary_client_privacy", description_canary_client_privacy,
additional_constraints_canary_client_privacy, nullptr, 0, false, false},
{"chaotic_good_legacy_protocol", description_chaotic_good_legacy_protocol,
additional_constraints_chaotic_good_legacy_protocol, nullptr, 0, false,
true},
{"client_privacy", description_client_privacy,
additional_constraints_client_privacy, nullptr, 0, false, false},
{"disable_buffer_hint_on_high_memory_pressure",
description_disable_buffer_hint_on_high_memory_pressure,
additional_constraints_disable_buffer_hint_on_high_memory_pressure,
Expand Down Expand Up @@ -218,8 +207,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_schedule_cancellation_over_write,
additional_constraints_schedule_cancellation_over_write, nullptr, 0, false,
true},
{"server_privacy", description_server_privacy,
additional_constraints_server_privacy, nullptr, 0, false, false},
{"tcp_frame_size_tuning", description_tcp_frame_size_tuning,
additional_constraints_tcp_frame_size_tuning, nullptr, 0, false, true},
{"tcp_rcv_lowat", description_tcp_rcv_lowat,
Expand All @@ -246,15 +233,10 @@ const char* const additional_constraints_backoff_cap_initial_at_max = "{}";
const char* const description_call_tracer_in_transport =
"Transport directly passes byte counts to CallTracer.";
const char* const additional_constraints_call_tracer_in_transport = "{}";
const char* const description_canary_client_privacy =
"If set, canary client privacy";
const char* const additional_constraints_canary_client_privacy = "{}";
const char* const description_chaotic_good_legacy_protocol =
"If set, use the first version of the chaotic-good protocol when that "
"protocol is enabled.";
const char* const additional_constraints_chaotic_good_legacy_protocol = "{}";
const char* const description_client_privacy = "If set, client privacy";
const char* const additional_constraints_client_privacy = "{}";
const char* const description_disable_buffer_hint_on_high_memory_pressure =
"Disable buffer hint flag parsing in the transport under high memory "
"pressure.";
Expand Down Expand Up @@ -335,8 +317,6 @@ const char* const description_schedule_cancellation_over_write =
"Allow cancellation op to be scheduled over a write";
const char* const additional_constraints_schedule_cancellation_over_write =
"{}";
const char* const description_server_privacy = "If set, server privacy";
const char* const additional_constraints_server_privacy = "{}";
const char* const description_tcp_frame_size_tuning =
"If set, enables TCP to use RPC size estimation made by higher layers. TCP "
"would not indicate completion of a read operation until a specified "
Expand Down Expand Up @@ -371,13 +351,9 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_backoff_cap_initial_at_max, nullptr, 0, true, true},
{"call_tracer_in_transport", description_call_tracer_in_transport,
additional_constraints_call_tracer_in_transport, nullptr, 0, true, true},
{"canary_client_privacy", description_canary_client_privacy,
additional_constraints_canary_client_privacy, nullptr, 0, false, false},
{"chaotic_good_legacy_protocol", description_chaotic_good_legacy_protocol,
additional_constraints_chaotic_good_legacy_protocol, nullptr, 0, false,
true},
{"client_privacy", description_client_privacy,
additional_constraints_client_privacy, nullptr, 0, false, false},
{"disable_buffer_hint_on_high_memory_pressure",
description_disable_buffer_hint_on_high_memory_pressure,
additional_constraints_disable_buffer_hint_on_high_memory_pressure,
Expand Down Expand Up @@ -435,8 +411,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_schedule_cancellation_over_write,
additional_constraints_schedule_cancellation_over_write, nullptr, 0, false,
true},
{"server_privacy", description_server_privacy,
additional_constraints_server_privacy, nullptr, 0, false, false},
{"tcp_frame_size_tuning", description_tcp_frame_size_tuning,
additional_constraints_tcp_frame_size_tuning, nullptr, 0, false, true},
{"tcp_rcv_lowat", description_tcp_rcv_lowat,
Expand All @@ -463,15 +437,10 @@ const char* const additional_constraints_backoff_cap_initial_at_max = "{}";
const char* const description_call_tracer_in_transport =
"Transport directly passes byte counts to CallTracer.";
const char* const additional_constraints_call_tracer_in_transport = "{}";
const char* const description_canary_client_privacy =
"If set, canary client privacy";
const char* const additional_constraints_canary_client_privacy = "{}";
const char* const description_chaotic_good_legacy_protocol =
"If set, use the first version of the chaotic-good protocol when that "
"protocol is enabled.";
const char* const additional_constraints_chaotic_good_legacy_protocol = "{}";
const char* const description_client_privacy = "If set, client privacy";
const char* const additional_constraints_client_privacy = "{}";
const char* const description_disable_buffer_hint_on_high_memory_pressure =
"Disable buffer hint flag parsing in the transport under high memory "
"pressure.";
Expand Down Expand Up @@ -552,8 +521,6 @@ const char* const description_schedule_cancellation_over_write =
"Allow cancellation op to be scheduled over a write";
const char* const additional_constraints_schedule_cancellation_over_write =
"{}";
const char* const description_server_privacy = "If set, server privacy";
const char* const additional_constraints_server_privacy = "{}";
const char* const description_tcp_frame_size_tuning =
"If set, enables TCP to use RPC size estimation made by higher layers. TCP "
"would not indicate completion of a read operation until a specified "
Expand Down Expand Up @@ -588,13 +555,9 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_backoff_cap_initial_at_max, nullptr, 0, true, true},
{"call_tracer_in_transport", description_call_tracer_in_transport,
additional_constraints_call_tracer_in_transport, nullptr, 0, true, true},
{"canary_client_privacy", description_canary_client_privacy,
additional_constraints_canary_client_privacy, nullptr, 0, false, false},
{"chaotic_good_legacy_protocol", description_chaotic_good_legacy_protocol,
additional_constraints_chaotic_good_legacy_protocol, nullptr, 0, false,
true},
{"client_privacy", description_client_privacy,
additional_constraints_client_privacy, nullptr, 0, false, false},
{"disable_buffer_hint_on_high_memory_pressure",
description_disable_buffer_hint_on_high_memory_pressure,
additional_constraints_disable_buffer_hint_on_high_memory_pressure,
Expand Down Expand Up @@ -652,8 +615,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_schedule_cancellation_over_write,
additional_constraints_schedule_cancellation_over_write, nullptr, 0, false,
true},
{"server_privacy", description_server_privacy,
additional_constraints_server_privacy, nullptr, 0, false, false},
{"tcp_frame_size_tuning", description_tcp_frame_size_tuning,
additional_constraints_tcp_frame_size_tuning, nullptr, 0, false, true},
{"tcp_rcv_lowat", description_tcp_rcv_lowat,
Expand Down
Loading

0 comments on commit c7cb8a8

Please sign in to comment.