Skip to content

Commit

Permalink
[pick_first] don't finish Happy Eyeballs pass until all subchannels f…
Browse files Browse the repository at this point in the history
…ail at least once (grpc#34717)
  • Loading branch information
markdroth authored Oct 18, 2023
1 parent ccfc5b9 commit b2d5a3c
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ class PickFirst : public LoadBalancingPolicy {
// Cancels any pending connectivity watch and unrefs the subchannel.
void ShutdownLocked();

bool seen_transient_failure() const { return seen_transient_failure_; }

private:
// Watcher for subchannel connectivity state.
class Watcher
Expand Down Expand Up @@ -199,6 +201,7 @@ class PickFirst : public LoadBalancingPolicy {
// Data updated by the watcher.
absl::optional<grpc_connectivity_state> connectivity_state_;
absl::Status connectivity_status_;
bool seen_transient_failure_ = false;
};

SubchannelList(RefCountedPtr<PickFirst> policy,
Expand All @@ -214,6 +217,17 @@ class PickFirst : public LoadBalancingPolicy {

void Orphan() override;

bool IsHappyEyeballsPassComplete() const {
// Checking attempting_index_ here is just an optimization -- if
// we haven't actually tried all subchannels yet, then we don't
// need to iterate.
if (attempting_index_ < size()) return false;
for (const SubchannelData& sd : subchannels_) {
if (!sd.seen_transient_failure()) return false;
}
return true;
}

private:
// Returns true if all subchannels have seen their initial
// connectivity state notifications.
Expand All @@ -222,11 +236,16 @@ class PickFirst : public LoadBalancingPolicy {
// Looks through subchannels_ starting from attempting_index_ to
// find the first one not currently in TRANSIENT_FAILURE, then
// triggers a connection attempt for that subchannel. If there are
// no more subchannels not in TRANSIENT_FAILURE (i.e., the Happy
// Eyeballs pass is complete), transitions to a mode where we
// try to connect to all subchannels in parallel.
// no more subchannels not in TRANSIENT_FAILURE, calls
// MaybeFinishHappyEyeballsPass().
void StartConnectingNextSubchannel();

// Checks to see if the initial Happy Eyeballs pass is complete --
// i.e., all subchannels have seen TRANSIENT_FAILURE state at least once.
// If so, transitions to a mode where we try to connect to all subchannels
// in parallel and returns true.
void MaybeFinishHappyEyeballsPass();

// Backpointer to owning policy.
RefCountedPtr<PickFirst> policy_;

Expand Down Expand Up @@ -254,6 +273,9 @@ class PickFirst : public LoadBalancingPolicy {
// After the initial Happy Eyeballs pass, the number of failures
// we've seen. Every size() failures, we trigger re-resolution.
size_t num_failures_ = 0;

// The status from the last subchannel that reported TRANSIENT_FAILURE.
absl::Status last_failure_;
};

class HealthWatcher
Expand Down Expand Up @@ -607,16 +629,17 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
"[PF %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): connectivity changed: old_state=%s, new_state=%s, "
"status=%s, shutting_down=%d, pending_watcher=%p, "
"p->selected_=%p, p->subchannel_list_=%p, "
"p->latest_pending_subchannel_list_=%p",
"seen_transient_failure=%d, p->selected_=%p, "
"p->subchannel_list_=%p, p->latest_pending_subchannel_list_=%p",
p, subchannel_list_, Index(), subchannel_list_->size(),
subchannel_.get(),
(connectivity_state_.has_value()
? ConnectivityStateName(*connectivity_state_)
: "N/A"),
ConnectivityStateName(new_state), status.ToString().c_str(),
subchannel_list_->shutting_down_, pending_watcher_, p->selected_,
p->subchannel_list_.get(), p->latest_pending_subchannel_list_.get());
subchannel_list_->shutting_down_, pending_watcher_,
seen_transient_failure_, p->selected_, p->subchannel_list_.get(),
p->latest_pending_subchannel_list_.get());
}
if (subchannel_list_->shutting_down_ || pending_watcher_ == nullptr) return;
// The notification must be for a subchannel in either the current or
Expand All @@ -626,7 +649,7 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
absl::optional<grpc_connectivity_state> old_state = connectivity_state_;
connectivity_state_ = new_state;
connectivity_status_ = status;
connectivity_status_ = std::move(status);
// Handle updates for the currently selected subchannel.
if (p->selected_ == this) {
GPR_ASSERT(subchannel_list_ == p->subchannel_list_.get());
Expand Down Expand Up @@ -657,14 +680,12 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
// Set our state to that of the pending subchannel list.
if (IsPickFirstHappyEyeballsEnabled()
? (p->subchannel_list_->attempting_index_ ==
p->subchannel_list_->size())
? p->subchannel_list_->IsHappyEyeballsPassComplete()
: p->subchannel_list_->in_transient_failure_) {
absl::Status status = absl::UnavailableError(absl::StrCat(
status = absl::UnavailableError(absl::StrCat(
"selected subchannel failed; switching to pending update; "
"last failure: ",
p->subchannel_list_->subchannels_.back()
.connectivity_status_.ToString()));
p->subchannel_list_->last_failure_.ToString()));
p->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
} else if (p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
Expand Down Expand Up @@ -698,6 +719,12 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
ProcessUnselectedReadyLocked();
return;
}
// Make sure we note when a subchannel has seen TRANSIENT_FAILURE.
bool prev_seen_transient_failure = seen_transient_failure_;
if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
seen_transient_failure_ = true;
subchannel_list_->last_failure_ = connectivity_status_;
}
// If we haven't yet seen the initial connectivity state notification
// for all subchannels, do nothing.
if (!subchannel_list_->AllSubchannelsSeenInitialState()) return;
Expand All @@ -724,17 +751,24 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
// Otherwise, process connectivity state change.
switch (*connectivity_state_) {
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
// If a connection attempt fails before the timer fires, then
// cancel the timer and start connecting on the next subchannel.
if (Index() == subchannel_list_->attempting_index_) {
if (subchannel_list_->timer_handle_.has_value()) {
p->channel_control_helper()->GetEventEngine()->Cancel(
*subchannel_list_->timer_handle_);
// If this is the first failure we've seen on this subchannel,
// then we're still in the Happy Eyeballs pass.
if (!prev_seen_transient_failure && seen_transient_failure_) {
// If a connection attempt fails before the timer fires, then
// cancel the timer and start connecting on the next subchannel.
if (Index() == subchannel_list_->attempting_index_) {
if (subchannel_list_->timer_handle_.has_value()) {
p->channel_control_helper()->GetEventEngine()->Cancel(
*subchannel_list_->timer_handle_);
}
++subchannel_list_->attempting_index_;
subchannel_list_->StartConnectingNextSubchannel();
} else {
// If this was the last subchannel to fail, check if the Happy
// Eyeballs pass is complete.
subchannel_list_->MaybeFinishHappyEyeballsPass();
}
++subchannel_list_->attempting_index_;
subchannel_list_->StartConnectingNextSubchannel();
} else if (subchannel_list_->attempting_index_ ==
subchannel_list_->size()) {
} else if (subchannel_list_->IsHappyEyeballsPassComplete()) {
// We're done with the initial Happy Eyeballs pass and in a mode
// where we're attempting to connect to every subchannel in
// parallel. We count the number of failed connection attempts,
Expand All @@ -749,7 +783,7 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
++subchannel_list_->num_failures_;
if (subchannel_list_->num_failures_ % subchannel_list_->size() == 0) {
p->channel_control_helper()->RequestReresolution();
absl::Status status = absl::UnavailableError(absl::StrCat(
status = absl::UnavailableError(absl::StrCat(
(p->omit_status_message_prefix_
? ""
: "failed to connect to all addresses; last error: "),
Expand All @@ -764,7 +798,7 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
// If we've finished the first Happy Eyeballs pass, then we go
// into a mode where we immediately try to connect to every
// subchannel in parallel.
if (subchannel_list_->attempting_index_ == subchannel_list_->size()) {
if (subchannel_list_->IsHappyEyeballsPassComplete()) {
subchannel_->RequestConnection();
}
break;
Expand Down Expand Up @@ -1082,6 +1116,15 @@ void PickFirst::SubchannelList::StartConnectingNextSubchannel() {
return;
}
}
// If we didn't find a subchannel to request a connection on, check to
// see if the Happy Eyeballs pass is complete.
MaybeFinishHappyEyeballsPass();
}

void PickFirst::SubchannelList::MaybeFinishHappyEyeballsPass() {
// Make sure all subchannels have finished a connection attempt before
// we consider the Happy Eyeballs pass complete.
if (!IsHappyEyeballsPassComplete()) return;
// We didn't find another subchannel not in state TRANSIENT_FAILURE,
// so report TRANSIENT_FAILURE and switch to a mode in which we try to
// connect to all addresses in parallel.
Expand Down Expand Up @@ -1115,7 +1158,7 @@ void PickFirst::SubchannelList::StartConnectingNextSubchannel() {
absl::StrCat((policy_->omit_status_message_prefix_
? ""
: "failed to connect to all addresses; last error: "),
subchannels_.back().connectivity_status().ToString()));
last_failure_.ToString()));
policy_->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
}
Expand Down
21 changes: 12 additions & 9 deletions test/core/client_channel/lb_policy/lb_policy_test_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -1008,18 +1008,18 @@ class LoadBalancingPolicyTest : public ::testing::Test {

// Expects a state update for the specified state and status, and then
// expects the resulting picker to queue picks.
void ExpectStateAndQueuingPicker(
bool ExpectStateAndQueuingPicker(
grpc_connectivity_state expected_state,
absl::Status expected_status = absl::OkStatus(),
SourceLocation location = SourceLocation()) {
auto picker = ExpectState(expected_state, expected_status, location);
ExpectPickQueued(picker.get(), {}, location);
return ExpectPickQueued(picker.get(), {}, location);
}

// Convenient frontend to ExpectStateAndQueuingPicker() for CONNECTING.
void ExpectConnectingUpdate(SourceLocation location = SourceLocation()) {
ExpectStateAndQueuingPicker(GRPC_CHANNEL_CONNECTING, absl::OkStatus(),
location);
bool ExpectConnectingUpdate(SourceLocation location = SourceLocation()) {
return ExpectStateAndQueuingPicker(GRPC_CHANNEL_CONNECTING,
absl::OkStatus(), location);
}

static std::unique_ptr<LoadBalancingPolicy::MetadataInterface> MakeMetadata(
Expand All @@ -1038,15 +1038,18 @@ class LoadBalancingPolicyTest : public ::testing::Test {
}

// Requests a pick on picker and expects a Queue result.
void ExpectPickQueued(LoadBalancingPolicy::SubchannelPicker* picker,
bool ExpectPickQueued(LoadBalancingPolicy::SubchannelPicker* picker,
const CallAttributes call_attributes = {},
SourceLocation location = SourceLocation()) {
ASSERT_NE(picker, nullptr);
EXPECT_NE(picker, nullptr) << location.file() << ":" << location.line();
if (picker == nullptr) return false;
auto pick_result = DoPick(picker, call_attributes);
ASSERT_TRUE(absl::holds_alternative<LoadBalancingPolicy::PickResult::Queue>(
EXPECT_TRUE(absl::holds_alternative<LoadBalancingPolicy::PickResult::Queue>(
pick_result.result))
<< PickResultString(pick_result) << "\nat " << location.file() << ":"
<< location.line();
return absl::holds_alternative<LoadBalancingPolicy::PickResult::Queue>(
pick_result.result);
}

// Requests a pick on picker and expects a Complete result.
Expand Down Expand Up @@ -1257,7 +1260,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
void DrainConnectingUpdates(SourceLocation location = SourceLocation()) {
gpr_log(GPR_INFO, "Draining CONNECTING updates...");
while (!helper_->QueueEmpty()) {
ExpectConnectingUpdate(location);
ASSERT_TRUE(ExpectConnectingUpdate(location));
}
gpr_log(GPR_INFO, "Done draining CONNECTING updates");
}
Expand Down
85 changes: 82 additions & 3 deletions test/core/client_channel/lb_policy/pick_first_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,10 @@ TEST_F(PickFirstTest, HappyEyeballsCompletesWithoutSuccess) {
subchannel3->SetConnectivityState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"));
ExpectQueueEmpty();
// Eventually, the first subchannel fails as well.
subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"));
// The LB policy should request re-resolution.
ExpectReresolutionRequest();
// The LB policy should report TRANSIENT_FAILURE.
Expand All @@ -540,14 +544,19 @@ TEST_F(PickFirstTest, HappyEyeballsCompletesWithoutSuccess) {
// mode where we try to connect to all subchannels in parallel.
// Subchannel 2 was already in state IDLE, so the LB policy will
// immediately trigger a connection request on it. It will not do so
// for subchannels 1 (in CONNECTING) or 3 (in TRANSIENT_FAILURE).
// for subchannels 1 or 3, which are in TRANSIENT_FAILURE.
EXPECT_FALSE(subchannel->ConnectionRequested());
EXPECT_TRUE(subchannel2->ConnectionRequested());
EXPECT_FALSE(subchannel3->ConnectionRequested());
// Subchannel 2 reports CONNECTING.
subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// Now subchannel 1 reports TF. This is the first failure since we
// finished Happy Eyeballs.
// Now subchannel 1 reports IDLE. This should trigger another
// connection attempt.
subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
EXPECT_TRUE(subchannel->ConnectionRequested());
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// Now subchannel 1 reports TRANSIENT_FAILURE. This is the first failure
// since we finished Happy Eyeballs.
subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"));
EXPECT_FALSE(subchannel->ConnectionRequested());
Expand Down Expand Up @@ -589,6 +598,76 @@ TEST_F(PickFirstTest, HappyEyeballsCompletesWithoutSuccess) {
}
}

TEST_F(PickFirstTest,
HappyEyeballsLastSubchannelFailsWhileAnotherIsStillPending) {
if (!IsPickFirstHappyEyeballsEnabled()) return;
// Send an update containing three addresses.
constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses.
auto* subchannel = FindSubchannel(kAddresses[0]);
ASSERT_NE(subchannel, nullptr);
auto* subchannel2 = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel2, nullptr);
// When the LB policy receives the first subchannel's initial connectivity
// state notification (IDLE), it will request a connection.
EXPECT_TRUE(subchannel->ConnectionRequested());
// This causes the subchannel to start to connect, so it reports
// CONNECTING.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// The second subchannel should not be connecting.
EXPECT_FALSE(subchannel2->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the second subchannel.
EXPECT_TRUE(subchannel2->ConnectionRequested());
subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// The second subchannel fails.
subchannel2->SetConnectivityState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"));
// The LB policy should not yet report TRANSIENT_FAILURE, because the
// first subchannel is still CONNECTING.
DrainConnectingUpdates();
// Set subchannel 2 back to IDLE, so it's already in that state when
// Happy Eyeballs fails.
subchannel2->SetConnectivityState(GRPC_CHANNEL_IDLE);
// Now the first subchannel fails.
subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"));
// The LB policy should request re-resolution.
ExpectReresolutionRequest();
// The LB policy should report TRANSIENT_FAILURE.
WaitForConnectionFailed([&](const absl::Status& status) {
EXPECT_EQ(status, absl::UnavailableError(
"failed to connect to all addresses; "
"last error: UNAVAILABLE: failed to connect"));
});
// We are now done with the Happy Eyeballs pass, and we move into a
// mode where we try to connect to all subchannels in parallel.
// Subchannel 2 was already in state IDLE, so the LB policy will
// immediately trigger a connection request on it. It will not do so
// for subchannel 1, which is still in TRANSIENT_FAILURE.
EXPECT_FALSE(subchannel->ConnectionRequested());
EXPECT_TRUE(subchannel2->ConnectionRequested());
// Subchannel 2 reports CONNECTING.
subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// Subchannel 2 reports READY.
subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
// The LB policy will report READY.
auto picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
// Picker should return the same subchannel repeatedly.
for (size_t i = 0; i < 3; ++i) {
EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[1]);
}
}

TEST_F(PickFirstTest, HappyEyeballsAddressInterleaving) {
if (!IsPickFirstHappyEyeballsEnabled()) return;
// Send an update containing three IPv4 addresses followed by three
Expand Down

0 comments on commit b2d5a3c

Please sign in to comment.