Skip to content

Commit

Permalink
Change PickFirstLeafLoadBalancer to only have 1 subchannel at a time …
Browse files Browse the repository at this point in the history
…if environment variable GRPC_SERIALIZE_RETRIES == true.
  • Loading branch information
larry-safran committed Sep 12, 2024
1 parent 5de65a6 commit a7937a8
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 9 deletions.
97 changes: 90 additions & 7 deletions core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,26 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
private int numTf = 0;
private boolean firstPass = true;
@Nullable
private ScheduledHandle scheduleConnectionTask;
private ScheduledHandle scheduleConnectionTask = null;
private ConnectivityState rawConnectivityState = IDLE;
private ConnectivityState concludedState = IDLE;
private final boolean enableHappyEyeballs =
PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
private final boolean enableHappyEyeballs = !isSerializingRetries()
&& PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
private boolean notAPetiolePolicy = true; // means not under a petiole policy
private Status lastError = null;
private final BackoffPolicy.Provider bkoffPolProvider = new ExponentialBackoffPolicy.Provider();
private BackoffPolicy reconnectPolicy;
@Nullable
private ScheduledHandle reconnectTask = null;

PickFirstLeafLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
}

static boolean isSerializingRetries() {
return GrpcUtil.getFlag("GRPC_SERIALIZE_RETRIES", false);
}

@Override
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
if (rawConnectivityState == SHUTDOWN) {
Expand Down Expand Up @@ -208,10 +217,17 @@ public void handleNameResolutionError(Status error) {
subchannels.clear();
addressIndex.updateGroups(ImmutableList.of());
rawConnectivityState = TRANSIENT_FAILURE;
if (lastError == null) {
lastError = error;
}
updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
}

void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo stateInfo) {
if (rawConnectivityState == SHUTDOWN) {
return;
}

ConnectivityState newState = stateInfo.getState();

// Shutdown channels/previously relevant subchannels can still callback with state updates.
Expand Down Expand Up @@ -264,6 +280,11 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
break;

case READY:
lastError = null;
if (scheduleConnectionTask != null) {
scheduleConnectionTask.cancel();
scheduleConnectionTask = null;
}
shutdownRemaining(subchannelData);
addressIndex.seekTo(getAddress(subchannelData.subchannel));
rawConnectivityState = READY;
Expand All @@ -274,13 +295,16 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
// If we are looking at current channel, request a connection if possible
if (addressIndex.isValid()
&& subchannels.get(addressIndex.getCurrentAddress()) == subchannelData) {
lastError = stateInfo.getStatus();
if (addressIndex.increment()) {
cancelScheduleTask();
requestConnection(); // is recursive so might hit the end of the addresses
} else {
scheduleBackoff();
}
}

if (isPassComplete()) {
if (rawConnectivityState != SHUTDOWN && isPassComplete()) {
rawConnectivityState = TRANSIENT_FAILURE;
updateBalancingState(TRANSIENT_FAILURE,
new Picker(PickResult.withError(stateInfo.getStatus())));
Expand All @@ -304,6 +328,42 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
}
}

/**
* Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
*/
private void scheduleBackoff() {
if (!isSerializingRetries()) {
return;
}

class EndOfCurrentBackoff implements Runnable {
@Override
public void run() {
reconnectTask = null;
if (rawConnectivityState == SHUTDOWN) {
return;
}
addressIndex.reset();
requestConnection();
}
}

// Just allow the previous one to trigger when ready if we're already in backoff
if (reconnectTask != null) {
return;
}

if (reconnectPolicy == null) {
reconnectPolicy = bkoffPolProvider.get();
}
long delayNanos = reconnectPolicy.nextBackoffNanos();
reconnectTask = helper.getSynchronizationContext().schedule(
new EndOfCurrentBackoff(),
delayNanos,
TimeUnit.NANOSECONDS,
helper.getScheduledExecutorService());
}

private void updateHealthCheckedState(SubchannelData subchannelData) {
if (subchannelData.state != READY) {
return;
Expand Down Expand Up @@ -337,6 +397,10 @@ public void shutdown() {
rawConnectivityState = SHUTDOWN;
concludedState = SHUTDOWN;
cancelScheduleTask();
if (reconnectTask != null) {
reconnectTask.cancel();
reconnectTask = null;
}

for (SubchannelData subchannelData : subchannels.values()) {
subchannelData.getSubchannel().shutdown();
Expand Down Expand Up @@ -370,7 +434,12 @@ private void shutdownRemaining(SubchannelData activeSubchannelData) {
*/
@Override
public void requestConnection() {
if (!addressIndex.isValid() || rawConnectivityState == SHUTDOWN) {
if (rawConnectivityState == SHUTDOWN) {
return;
}

if (!addressIndex.isValid()) {
scheduleBackoff();
return;
}

Expand All @@ -391,8 +460,21 @@ public void requestConnection() {
scheduleNextConnection();
break;
case TRANSIENT_FAILURE:
if (scheduleConnectionTask != null) {
break; // let the already scheduled task do its job
}
addressIndex.increment();
requestConnection();
if (!isSerializingRetries()) {
requestConnection();
} else {
if (!addressIndex.isValid()) {
scheduleBackoff();
} else {
subchannelData.subchannel.shutdown(); // shutdown the previous subchannel
subchannels.remove(currentAddress);
requestConnection();
}
}
break;
default:
// Wait for current subchannel to change state
Expand Down Expand Up @@ -458,7 +540,8 @@ private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs)
}

private boolean isPassComplete() {
if (addressIndex.isValid() || subchannels.size() < addressIndex.size()) {
if ((!isSerializingRetries() && addressIndex.isValid())
|| subchannels.size() < addressIndex.size()) {
return false;
}
for (SubchannelData sc : subchannels.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static io.grpc.LoadBalancer.HEALTH_CONSUMER_LISTENER_ARG_KEY;
import static io.grpc.LoadBalancer.IS_PETIOLE_POLICY;
import static io.grpc.internal.PickFirstLeafLoadBalancer.CONNECTION_DELAY_INTERVAL_MS;
import static io.grpc.internal.PickFirstLeafLoadBalancer.isSerializingRetries;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -95,7 +96,11 @@ public class PickFirstLeafLoadBalancerTest {

@Parameterized.Parameters(name = "{0}")
public static List<Boolean> enableHappyEyeballs() {
return Arrays.asList(true, false);
if (PickFirstLeafLoadBalancer.isSerializingRetries()) {
return Arrays.asList(false);
} else {
return Arrays.asList(false, true);
}
}

@Parameterized.Parameter
Expand Down Expand Up @@ -143,7 +148,8 @@ public void setUp() {
originalHappyEyeballsEnabledValue =
System.getProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS);
System.setProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS,
enableHappyEyeballs ? "true" : "false");
!PickFirstLeafLoadBalancer.isSerializingRetries() && enableHappyEyeballs
? "true" : "false");

for (int i = 1; i <= 5; i++) {
SocketAddress addr = new FakeSocketAddress("server" + i);
Expand Down Expand Up @@ -498,6 +504,9 @@ public void healthCheckFlow() {
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs)
.getSubchannel()).isSameInstanceAs(mockSubchannel1);
verify(mockHelper, atLeast(0)).getSynchronizationContext();
verify(mockHelper, atLeast(0)).getScheduledExecutorService();
verifyNoMoreInteractions(mockHelper);

healthListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
verifyNoMoreInteractions(mockHelper);
Expand Down Expand Up @@ -589,6 +598,8 @@ public void pickAfterResolutionAfterTransientValue() {

// Transition from TRANSIENT_ERROR to CONNECTING should also be ignored.
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
verify(mockHelper, atLeast(0)).getSynchronizationContext();
verify(mockHelper, atLeast(0)).getScheduledExecutorService();
verifyNoMoreInteractions(mockHelper);
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
}
Expand Down Expand Up @@ -619,6 +630,8 @@ public void pickWithDupAddressesUpDownUp() {

// Transition from TRANSIENT_ERROR to CONNECTING should also be ignored.
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
verify(mockHelper, atLeast(0)).getSynchronizationContext();
verify(mockHelper, atLeast(0)).getScheduledExecutorService();
verifyNoMoreInteractions(mockHelper);
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());

Expand Down Expand Up @@ -651,6 +664,8 @@ public void pickWithDupEagsUpDownUp() {

// Transition from TRANSIENT_ERROR to CONNECTING should also be ignored.
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
verify(mockHelper, atLeast(0)).getSynchronizationContext();
verify(mockHelper, atLeast(0)).getScheduledExecutorService();
verifyNoMoreInteractions(mockHelper);
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());

Expand Down Expand Up @@ -1518,6 +1533,8 @@ public void updateAddresses_intersecting_ready() {

@Test
public void updateAddresses_intersecting_transient_failure() {
Assume.assumeTrue(!isSerializingRetries());

// Starting first connection attempt
InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2,
mockSubchannel3, mockSubchannel4); // captor: captures
Expand Down Expand Up @@ -1782,6 +1799,8 @@ public void updateAddresses_identical_ready() {

@Test
public void updateAddresses_identical_transient_failure() {
Assume.assumeTrue(!isSerializingRetries());

InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2,
mockSubchannel3, mockSubchannel4);
// Creating first set of endpoints/addresses
Expand Down

0 comments on commit a7937a8

Please sign in to comment.