diff --git a/balancer/balancer.go b/balancer/balancer.go index 30ba1811ad92..b78401eb4cf2 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -129,6 +129,11 @@ type NewSubConnOptions struct { // HealthCheckEnabled indicates whether health check service should be // enabled on this SubConn HealthCheckEnabled bool + // StateListener is called when the state of the subconn changes. If nil, + // Balancer.UpdateSubConnState will be called instead. Will never be + // invoked until after Connect() is called on the SubConn created with + // these options. + StateListener func(SubConnState) } // State contains the balancer's state relevant to the gRPC ClientConn. @@ -349,6 +354,9 @@ type Balancer interface { ResolverError(error) // UpdateSubConnState is called by gRPC when the state of a SubConn // changes. + // + // Deprecated: Use NewSubConnOptions.StateListener when creating the + // SubConn instead. UpdateSubConnState(SubConn, SubConnState) // Close closes the balancer. The balancer is not required to call // ClientConn.RemoveSubConn for its existing SubConns. diff --git a/balancer/weightedtarget/weightedtarget_test.go b/balancer/weightedtarget/weightedtarget_test.go index 5658f302a49b..adc3e32b0452 100644 --- a/balancer/weightedtarget/weightedtarget_test.go +++ b/balancer/weightedtarget/weightedtarget_test.go @@ -199,15 +199,15 @@ func (s) TestWeightedTarget(t *testing.T) { // Send subconn state change. sc1 := <-cc.NewSubConnCh - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh // Test pick with one backend. for i := 0; i < 5; i++ { gotSCSt, _ := p.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + if gotSCSt.SubConn != sc1 { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) } } @@ -243,21 +243,21 @@ func (s) TestWeightedTarget(t *testing.T) { // The subconn for cluster_1 should be removed. scRemoved := <-cc.RemoveSubConnCh - if !cmp.Equal(scRemoved, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + if scRemoved != sc1 { t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved) } - wtb.UpdateSubConnState(scRemoved, balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) + scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) sc2 := <-cc.NewSubConnCh - wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) p = <-cc.NewPickerCh // Test pick with one backend. for i := 0; i < 5; i++ { gotSCSt, _ := p.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) { + if gotSCSt.SubConn != sc2 { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2) } } @@ -288,22 +288,22 @@ func (s) TestWeightedTarget(t *testing.T) { // The subconn from the test_config_balancer should be removed. scRemoved = <-cc.RemoveSubConnCh - if !cmp.Equal(scRemoved, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) { + if scRemoved != sc2 { t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved) } - wtb.UpdateSubConnState(scRemoved, balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) + scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) // Send subconn state change. sc3 := <-cc.NewSubConnCh - wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) p = <-cc.NewPickerCh // Test pick with one backend. for i := 0; i < 5; i++ { gotSCSt, _ := p.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc3, cmp.AllowUnexported(testutils.TestSubConn{})) { + if gotSCSt.SubConn != sc3 { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc3) } } @@ -361,15 +361,15 @@ func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) { // Expect one SubConn, and move it to READY. sc1 := <-cc.NewSubConnCh - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh // Test pick with one backend. for i := 0; i < 5; i++ { gotSCSt, _ := p.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + if gotSCSt.SubConn != sc1 { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) } } @@ -390,9 +390,9 @@ func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) { // Expect one new SubConn, and move it to READY. sc2 := <-cc.NewSubConnCh // Update the SubConn to become READY. - wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) p = <-cc.NewPickerCh // Test round robin pick. @@ -411,16 +411,16 @@ func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) { // Expect one SubConn to be removed. scRemoved := <-cc.RemoveSubConnCh - if !cmp.Equal(scRemoved, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + if scRemoved != sc1 { t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved) } - wtb.UpdateSubConnState(scRemoved, balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) + scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) p = <-cc.NewPickerCh // Test pick with only the second SubConn. for i := 0; i < 5; i++ { gotSC, _ := p.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSC.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) { + if gotSC.SubConn != sc2 { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSC, sc2) } } @@ -471,17 +471,17 @@ func (s) TestWeightedTarget_TwoSubBalancers_OneBackend(t *testing.T) { }) // We expect a single subConn on each subBalancer. - sc1 := scs["cluster_1"][0].sc - sc2 := scs["cluster_2"][0].sc + sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn) + sc2 := scs["cluster_2"][0].sc.(*testutils.TestSubConn) // Send state changes for both SubConns, and wait for the picker. - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh // Test roundrobin on the last picker. @@ -541,27 +541,27 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { }) // We expect two subConns on each subBalancer. - sc1 := scs["cluster_1"][0].sc - sc2 := scs["cluster_1"][1].sc - sc3 := scs["cluster_2"][0].sc - sc4 := scs["cluster_2"][1].sc + sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn) + sc2 := scs["cluster_1"][1].sc.(*testutils.TestSubConn) + sc3 := scs["cluster_2"][0].sc.(*testutils.TestSubConn) + sc4 := scs["cluster_2"][1].sc.(*testutils.TestSubConn) // Send state changes for all SubConns, and wait for the picker. - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh // Test roundrobin on the last picker. RPCs should be sent equally to all @@ -572,7 +572,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { } // Turn sc2's connection down, should be RR between balancers. - wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) p = <-cc.NewPickerCh want = []balancer.SubConn{sc1, sc1, sc3, sc4} if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil { @@ -591,10 +591,10 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { t.Fatalf("failed to update ClientConn state: %v", err) } scRemoved := <-cc.RemoveSubConnCh - if !cmp.Equal(scRemoved, sc3, cmp.AllowUnexported(testutils.TestSubConn{})) { + if scRemoved != sc3 { t.Fatalf("RemoveSubConn, want %v, got %v", sc3, scRemoved) } - wtb.UpdateSubConnState(scRemoved, balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) + scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) p = <-cc.NewPickerCh want = []balancer.SubConn{sc1, sc4} if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil { @@ -603,7 +603,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { // Turn sc1's connection down. wantSubConnErr := errors.New("subConn connection error") - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ + sc1.UpdateState(balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, ConnectionError: wantSubConnErr, }) @@ -614,7 +614,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { } // Turn last connection to connecting. - wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) p = <-cc.NewPickerCh for i := 0; i < 5; i++ { if _, err := p.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { @@ -623,7 +623,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { } // Turn all connections down. - wtb.UpdateSubConnState(sc4, balancer.SubConnState{ + sc4.UpdateState(balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, ConnectionError: wantSubConnErr, }) @@ -685,27 +685,27 @@ func (s) TestWeightedTarget_TwoSubBalancers_DifferentWeight_MoreBackends(t *test }) // We expect two subConns on each subBalancer. - sc1 := scs["cluster_1"][0].sc - sc2 := scs["cluster_1"][1].sc - sc3 := scs["cluster_2"][0].sc - sc4 := scs["cluster_2"][1].sc + sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn) + sc2 := scs["cluster_1"][1].sc.(*testutils.TestSubConn) + sc3 := scs["cluster_2"][0].sc.(*testutils.TestSubConn) + sc4 := scs["cluster_2"][1].sc.(*testutils.TestSubConn) // Send state changes for all SubConns, and wait for the picker. - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh // Test roundrobin on the last picker. Twice the number of RPCs should be @@ -769,22 +769,22 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) { }) // We expect one subConn on each subBalancer. - sc1 := scs["cluster_1"][0].sc - sc2 := scs["cluster_2"][0].sc - sc3 := scs["cluster_3"][0].sc + sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn) + sc2 := scs["cluster_2"][0].sc.(*testutils.TestSubConn) + sc3 := scs["cluster_3"][0].sc.(*testutils.TestSubConn) // Send state changes for all SubConns, and wait for the picker. - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh want := []balancer.SubConn{sc1, sc2, sc3} @@ -824,7 +824,7 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) { p = <-cc.NewPickerCh scRemoved := <-cc.RemoveSubConnCh - if !cmp.Equal(scRemoved, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) { + if scRemoved != sc2 { t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scRemoved) } want = []balancer.SubConn{sc1, sc3} @@ -834,7 +834,7 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) { // Move balancer 3 into transient failure. wantSubConnErr := errors.New("subConn connection error") - wtb.UpdateSubConnState(sc3, balancer.SubConnState{ + sc3.UpdateState(balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, ConnectionError: wantSubConnErr, }) @@ -866,7 +866,7 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) { // picker which ensures that the removed subBalancer is not picked for RPCs. scRemoved = <-cc.RemoveSubConnCh - if !cmp.Equal(scRemoved, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + if scRemoved != sc1 { t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved) } @@ -927,27 +927,27 @@ func (s) TestWeightedTarget_TwoSubBalancers_ChangeWeight_MoreBackends(t *testing }) // We expect two subConns on each subBalancer. - sc1 := scs["cluster_1"][0].sc - sc2 := scs["cluster_1"][1].sc - sc3 := scs["cluster_2"][0].sc - sc4 := scs["cluster_2"][1].sc + sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn) + sc2 := scs["cluster_1"][1].sc.(*testutils.TestSubConn) + sc3 := scs["cluster_2"][0].sc.(*testutils.TestSubConn) + sc4 := scs["cluster_2"][1].sc.(*testutils.TestSubConn) // Send state changes for all SubConns, and wait for the picker. - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh // Test roundrobin on the last picker. Twice the number of RPCs should be @@ -1041,12 +1041,12 @@ func (s) TestWeightedTarget_InitOneSubBalancerTransientFailure(t *testing.T) { }) // We expect a single subConn on each subBalancer. - sc1 := scs["cluster_1"][0].sc + sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn) _ = scs["cluster_2"][0].sc // Set one subconn to TransientFailure, this will trigger one sub-balancer // to report transient failure. - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) p := <-cc.NewPickerCh for i := 0; i < 5; i++ { @@ -1103,18 +1103,18 @@ func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *tes }) // We expect a single subConn on each subBalancer. - sc1 := scs["cluster_1"][0].sc - sc2 := scs["cluster_2"][0].sc + sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn) + sc2 := scs["cluster_2"][0].sc.(*testutils.TestSubConn) // Set both subconn to TransientFailure, this will put both sub-balancers in // transient failure. wantSubConnErr := errors.New("subConn connection error") - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ + sc1.UpdateState(balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, ConnectionError: wantSubConnErr, }) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc2, balancer.SubConnState{ + sc2.UpdateState(balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, ConnectionError: wantSubConnErr, }) @@ -1127,7 +1127,7 @@ func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *tes } // Set one subconn to Connecting, it shouldn't change the overall state. - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) select { case <-time.After(100 * time.Millisecond): case <-cc.NewPickerCh: @@ -1260,7 +1260,7 @@ func (s) TestInitialIdle(t *testing.T) { // in the address is cleared. for range addrs { sc := <-cc.NewSubConnCh - wtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle}) + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) } if state := <-cc.NewStateCh; state != connectivity.Idle { @@ -1299,8 +1299,8 @@ func (s) TestIgnoreSubBalancerStateTransitions(t *testing.T) { } sc := <-cc.NewSubConnCh - wtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) - wtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) // Verify that the SubConnState update from TF to Connecting is ignored. if len(cc.states) != 2 || cc.states[0].ConnectivityState != connectivity.Connecting || cc.states[1].ConnectivityState != connectivity.TransientFailure { diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index b48419a96190..338d106098fb 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -125,7 +125,9 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) { ccb.mu.Lock() ccb.serializer.Schedule(func(_ context.Context) { - ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) + // Even though it is optional for balancers, gracefulswitch ensures + // opts.StateListener is set, so this cannot ever be nil. + sc.(*acBalancerWrapper).stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) }) ccb.mu.Unlock() } @@ -300,7 +302,11 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err) return nil, err } - acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)} + acbw := &acBalancerWrapper{ + ac: ac, + producers: make(map[balancer.ProducerBuilder]*refCountedProducer), + stateListener: opts.StateListener, + } ac.acbw = acbw return acbw, nil } @@ -366,7 +372,8 @@ func (ccb *ccBalancerWrapper) Target() string { // acBalancerWrapper is a wrapper on top of ac for balancers. // It implements balancer.SubConn interface. type acBalancerWrapper struct { - ac *addrConn // read-only + ac *addrConn // read-only + stateListener func(balancer.SubConnState) mu sync.Mutex producers map[balancer.ProducerBuilder]*refCountedProducer diff --git a/internal/balancer/gracefulswitch/gracefulswitch.go b/internal/balancer/gracefulswitch/gracefulswitch.go index 08666f62a7cb..65a59201c23a 100644 --- a/internal/balancer/gracefulswitch/gracefulswitch.go +++ b/internal/balancer/gracefulswitch/gracefulswitch.go @@ -200,8 +200,8 @@ func (gsb *Balancer) ExitIdle() { } } -// UpdateSubConnState forwards the update to the appropriate child. -func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { +// updateSubConnState forwards the update to the appropriate child. +func (gsb *Balancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) { gsb.currentMu.Lock() defer gsb.currentMu.Unlock() gsb.mu.Lock() @@ -214,13 +214,26 @@ func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubC } else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] { balToUpdate = gsb.balancerPending } - gsb.mu.Unlock() if balToUpdate == nil { // SubConn belonged to a stale lb policy that has not yet fully closed, // or the balancer was already closed. + gsb.mu.Unlock() return } - balToUpdate.UpdateSubConnState(sc, state) + if state.ConnectivityState == connectivity.Shutdown { + delete(balToUpdate.subconns, sc) + } + gsb.mu.Unlock() + if cb != nil { + cb(state) + } else { + balToUpdate.UpdateSubConnState(sc, state) + } +} + +// UpdateSubConnState forwards the update to the appropriate child. +func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + gsb.updateSubConnState(sc, state, nil) } // Close closes any active child balancers. @@ -254,18 +267,6 @@ type balancerWrapper struct { subconns map[balancer.SubConn]bool // subconns created by this balancer } -func (bw *balancerWrapper) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { - if state.ConnectivityState == connectivity.Shutdown { - bw.gsb.mu.Lock() - delete(bw.subconns, sc) - bw.gsb.mu.Unlock() - } - // There is no need to protect this read with a mutex, as the write to the - // Balancer field happens in SwitchTo, which completes before this can be - // called. - bw.Balancer.UpdateSubConnState(sc, state) -} - // Close closes the underlying LB policy and removes the subconns it created. bw // must not be referenced via balancerCurrent or balancerPending in gsb when // called. gsb.mu must not be held. Does not panic with a nil receiver. @@ -335,6 +336,9 @@ func (bw *balancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.Ne } bw.gsb.mu.Unlock() + var sc balancer.SubConn + oldListener := opts.StateListener + opts.StateListener = func(state balancer.SubConnState) { bw.gsb.updateSubConnState(sc, state, oldListener) } sc, err := bw.gsb.cc.NewSubConn(addrs, opts) if err != nil { return nil, err diff --git a/internal/balancer/gracefulswitch/gracefulswitch_test.go b/internal/balancer/gracefulswitch/gracefulswitch_test.go index 265e1f78e12d..6262bd24c13b 100644 --- a/internal/balancer/gracefulswitch/gracefulswitch_test.go +++ b/internal/balancer/gracefulswitch/gracefulswitch_test.go @@ -345,9 +345,7 @@ func (s) TestCurrentLeavingReady(t *testing.T) { // TestBalancerSubconns tests the SubConn functionality of the graceful switch // load balancer. This tests the SubConn update flow in both directions, and -// make sure updates end up at the correct component. Also, it tests that on an -// UpdateSubConnState() call from the ClientConn, the graceful switch load -// balancer forwards it to the correct child balancer. +// make sure updates end up at the correct component. func (s) TestBalancerSubconns(t *testing.T) { tcc, gsb := setup(t) gsb.SwitchTo(mockBalancerBuilder1{}) @@ -365,7 +363,7 @@ func (s) TestBalancerSubconns(t *testing.T) { case <-ctx.Done(): t.Fatalf("timeout while waiting for an NewSubConn call on the ClientConn") case sc := <-tcc.NewSubConnCh: - if !cmp.Equal(sc1, sc, cmp.AllowUnexported(testutils.TestSubConn{})) { + if sc != sc1 { t.Fatalf("NewSubConn, want %v, got %v", sc1, sc) } } @@ -380,7 +378,7 @@ func (s) TestBalancerSubconns(t *testing.T) { case <-ctx.Done(): t.Fatalf("timeout while waiting for an NewSubConn call on the ClientConn") case sc := <-tcc.NewSubConnCh: - if !cmp.Equal(sc2, sc, cmp.AllowUnexported(testutils.TestSubConn{})) { + if sc != sc2 { t.Fatalf("NewSubConn, want %v, got %v", sc2, sc) } } @@ -388,39 +386,12 @@ func (s) TestBalancerSubconns(t *testing.T) { // Updating the SubConnState for sc1 should cause the graceful switch // balancer to forward the Update to balancerCurrent for sc1, as that is the // balancer that created this SubConn. - gsb.UpdateSubConnState(sc1, scState) - - // This update should get forwarded to balancerCurrent, as that is the LB - // that created this SubConn. - if err := gsb.balancerCurrent.Balancer.(*mockBalancer).waitForSubConnUpdate(ctx, subConnWithState{sc: sc1, state: scState}); err != nil { - t.Fatal(err) - } - // This update should not get forwarded to balancerPending, as that is not - // the LB that created this SubConn. - sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer sCancel() - if err := gsb.balancerPending.Balancer.(*mockBalancer).waitForSubConnUpdate(sCtx, subConnWithState{sc: sc1, state: scState}); err == nil { - t.Fatalf("balancerPending should not have received a subconn update for sc1") - } + sc1.(*testutils.TestSubConn).UpdateState(scState) // Updating the SubConnState for sc2 should cause the graceful switch // balancer to forward the Update to balancerPending for sc2, as that is the // balancer that created this SubConn. - gsb.UpdateSubConnState(sc2, scState) - - // This update should get forwarded to balancerPending, as that is the LB - // that created this SubConn. - if err := gsb.balancerPending.Balancer.(*mockBalancer).waitForSubConnUpdate(ctx, subConnWithState{sc: sc2, state: scState}); err != nil { - t.Fatal(err) - } - - // This update should not get forwarded to balancerCurrent, as that is not - // the LB that created this SubConn. - sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer sCancel() - if err := gsb.balancerCurrent.Balancer.(*mockBalancer).waitForSubConnUpdate(sCtx, subConnWithState{sc: sc2, state: scState}); err == nil { - t.Fatalf("balancerCurrent should not have received a subconn update for sc2") - } + sc2.(*testutils.TestSubConn).UpdateState(scState) // Updating the addresses for both SubConns and removing both SubConns // should get forwarded to the ClientConn. @@ -448,7 +419,7 @@ func (s) TestBalancerSubconns(t *testing.T) { case <-ctx.Done(): t.Fatalf("timeout while waiting for an UpdateAddresses call on the ClientConn") case sc := <-tcc.RemoveSubConnCh: - if !cmp.Equal(sc1, sc, cmp.AllowUnexported(testutils.TestSubConn{})) { + if sc != sc1 { t.Fatalf("RemoveSubConn, want %v, got %v", sc1, sc) } } @@ -458,7 +429,7 @@ func (s) TestBalancerSubconns(t *testing.T) { case <-ctx.Done(): t.Fatalf("timeout while waiting for an UpdateAddresses call on the ClientConn") case sc := <-tcc.RemoveSubConnCh: - if !cmp.Equal(sc2, sc, cmp.AllowUnexported(testutils.TestSubConn{})) { + if sc != sc2 { t.Fatalf("RemoveSubConn, want %v, got %v", sc2, sc) } } @@ -476,7 +447,8 @@ func (s) TestBalancerClose(t *testing.T) { gsb.SwitchTo(mockBalancerBuilder1{}) gsb.SwitchTo(mockBalancerBuilder2{}) - sc1, err := gsb.balancerCurrent.Balancer.(*mockBalancer).newSubConn([]resolver.Address{}, balancer.NewSubConnOptions{}) // Will eventually get back a SubConn with an identifying property id 1 + sc1, err := gsb.balancerCurrent.Balancer.(*mockBalancer).newSubConn([]resolver.Address{}, balancer.NewSubConnOptions{}) + // Will eventually get back a SubConn with an identifying property id 1 if err != nil { t.Fatalf("error constructing newSubConn in gsb: %v", err) } @@ -488,7 +460,8 @@ func (s) TestBalancerClose(t *testing.T) { case <-tcc.NewSubConnCh: } - sc2, err := gsb.balancerPending.Balancer.(*mockBalancer).newSubConn([]resolver.Address{}, balancer.NewSubConnOptions{}) // Will eventually get back a SubConn with an identifying property id 2 + sc2, err := gsb.balancerPending.Balancer.(*mockBalancer).newSubConn([]resolver.Address{}, balancer.NewSubConnOptions{}) + // Will eventually get back a SubConn with an identifying property id 2 if err != nil { t.Fatalf("error constructing newSubConn in gsb: %v", err) } @@ -512,10 +485,8 @@ func (s) TestBalancerClose(t *testing.T) { case <-ctx.Done(): t.Fatalf("timeout while waiting for an UpdateAddresses call on the ClientConn") case sc := <-tcc.RemoveSubConnCh: - if !cmp.Equal(sc1, sc, cmp.AllowUnexported(testutils.TestSubConn{})) { - if !cmp.Equal(sc2, sc, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("RemoveSubConn, want either %v or %v, got %v", sc1, sc2, sc) - } + if sc != sc1 && sc != sc2 { + t.Fatalf("RemoveSubConn, want either %v or %v, got %v", sc1, sc2, sc) } } @@ -525,10 +496,8 @@ func (s) TestBalancerClose(t *testing.T) { case <-ctx.Done(): t.Fatalf("timeout while waiting for an UpdateAddresses call on the ClientConn") case sc := <-tcc.RemoveSubConnCh: - if !cmp.Equal(sc1, sc, cmp.AllowUnexported(testutils.TestSubConn{})) { - if !cmp.Equal(sc2, sc, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("RemoveSubConn, want either %v or %v, got %v", sc1, sc2, sc) - } + if sc != sc1 && sc != sc2 { + t.Fatalf("RemoveSubConn, want either %v or %v, got %v", sc1, sc2, sc) } } @@ -654,7 +623,7 @@ func (s) TestPendingReplacedByAnotherPending(t *testing.T) { case <-ctx.Done(): t.Fatalf("timeout while waiting for a RemoveSubConn call on the ClientConn") case sc := <-tcc.RemoveSubConnCh: - if !cmp.Equal(sc1, sc, cmp.AllowUnexported(testutils.TestSubConn{})) { + if sc != sc1 { t.Fatalf("RemoveSubConn, want %v, got %v", sc1, sc) } } @@ -735,7 +704,7 @@ func (s) TestUpdateSubConnStateRace(t *testing.T) { return default: } - gsb.UpdateSubConnState(sc, balancer.SubConnState{ + sc.(*testutils.TestSubConn).UpdateState(balancer.SubConnState{ ConnectivityState: connectivity.Ready, }) } @@ -771,7 +740,7 @@ func (s) TestInlineCallbackInBuild(t *testing.T) { } select { case <-ctx.Done(): - t.Fatalf("timeout while waiting for an NewSubConn() call on the ClientConn") + t.Fatalf("timeout while waiting for a NewSubConn() call on the ClientConn") case <-tcc.NewSubConnCh: } select { @@ -796,7 +765,7 @@ func (s) TestInlineCallbackInBuild(t *testing.T) { } select { case <-ctx.Done(): - t.Fatalf("timeout while waiting for an NewSubConn() call on the ClientConn") + t.Fatalf("timeout while waiting for a NewSubConn() call on the ClientConn") case <-tcc.NewSubConnCh: } select { @@ -945,20 +914,6 @@ func (mb1 *mockBalancer) waitForClientConnUpdate(ctx context.Context, wantCCS ba return nil } -// waitForSubConnUpdate verifies if the mockBalancer receives the provided -// SubConn update before the context expires. -func (mb1 *mockBalancer) waitForSubConnUpdate(ctx context.Context, wantSCS subConnWithState) error { - scs, err := mb1.scStateCh.Receive(ctx) - if err != nil { - return fmt.Errorf("error waiting for SubConnUpdate: %v", err) - } - gotSCS := scs.(subConnWithState) - if !cmp.Equal(gotSCS, wantSCS, cmp.AllowUnexported(subConnWithState{}, testutils.TestSubConn{})) { - return fmt.Errorf("error in SubConnUpdate: received SubConnState: %+v, want %+v", gotSCS, wantSCS) - } - return nil -} - // waitForResolverError verifies if the mockBalancer receives the provided // resolver error before the context expires. func (mb1 *mockBalancer) waitForResolverError(ctx context.Context, wantErr error) error { @@ -994,7 +949,10 @@ func (mb1 *mockBalancer) updateState(state balancer.State) { mb1.cc.UpdateState(state) } -func (mb1 *mockBalancer) newSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { +func (mb1 *mockBalancer) newSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (sc balancer.SubConn, err error) { + if opts.StateListener == nil { + opts.StateListener = func(state balancer.SubConnState) { mb1.UpdateSubConnState(sc, state) } + } return mb1.cc.NewSubConn(addrs, opts) } @@ -1061,7 +1019,10 @@ func (vb *verifyBalancer) Close() { vb.closed.Fire() } -func (vb *verifyBalancer) newSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { +func (vb *verifyBalancer) newSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (sc balancer.SubConn, err error) { + if opts.StateListener == nil { + opts.StateListener = func(state balancer.SubConnState) { vb.UpdateSubConnState(sc, state) } + } return vb.cc.NewSubConn(addrs, opts) } @@ -1111,7 +1072,10 @@ func (bcb *buildCallbackBal) updateState(state balancer.State) { bcb.cc.UpdateState(state) } -func (bcb *buildCallbackBal) newSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { +func (bcb *buildCallbackBal) newSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (sc balancer.SubConn, err error) { + if opts.StateListener == nil { + opts.StateListener = func(state balancer.SubConnState) { bcb.UpdateSubConnState(sc, state) } + } return bcb.cc.NewSubConn(addrs, opts) } diff --git a/internal/balancergroup/balancergroup.go b/internal/balancergroup/balancergroup.go index c1f7e75c3ec8..99deb9260ccd 100644 --- a/internal/balancergroup/balancergroup.go +++ b/internal/balancergroup/balancergroup.go @@ -449,9 +449,9 @@ func (bg *BalancerGroup) connect(sb *subBalancerWrapper) { // Following are actions from the parent grpc.ClientConn, forward to sub-balancers. -// UpdateSubConnState handles the state for the subconn. It finds the -// corresponding balancer and forwards the update. -func (bg *BalancerGroup) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { +// updateSubConnState handles the state for the subconn. It finds the +// corresponding balancer and forwards the update to cb. +func (bg *BalancerGroup) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) { bg.incomingMu.Lock() config, ok := bg.scToSubBalancer[sc] if !ok { @@ -465,10 +465,20 @@ func (bg *BalancerGroup) UpdateSubConnState(sc balancer.SubConn, state balancer. bg.incomingMu.Unlock() bg.outgoingMu.Lock() - config.updateSubConnState(sc, state) + if cb != nil { + cb(state) + } else { + config.updateSubConnState(sc, state) + } bg.outgoingMu.Unlock() } +// UpdateSubConnState handles the state for the subconn. It finds the +// corresponding balancer and forwards the update. +func (bg *BalancerGroup) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + bg.updateSubConnState(sc, state, nil) +} + // UpdateClientConnState handles ClientState (including balancer config and // addresses) from resolver. It finds the balancer and forwards the update. func (bg *BalancerGroup) UpdateClientConnState(id string, s balancer.ClientConnState) error { @@ -507,6 +517,9 @@ func (bg *BalancerGroup) newSubConn(config *subBalancerWrapper, addrs []resolver bg.incomingMu.Unlock() return nil, fmt.Errorf("NewSubConn is called after balancer group is closed") } + var sc balancer.SubConn + oldListener := opts.StateListener + opts.StateListener = func(state balancer.SubConnState) { bg.updateSubConnState(sc, state, oldListener) } sc, err := bg.cc.NewSubConn(addrs, opts) if err != nil { bg.incomingMu.Unlock() diff --git a/internal/balancergroup/balancergroup_test.go b/internal/balancergroup/balancergroup_test.go index 90c5c20f4158..fbe1da9f5a28 100644 --- a/internal/balancergroup/balancergroup_test.go +++ b/internal/balancergroup/balancergroup_test.go @@ -98,8 +98,8 @@ func (s) TestBalancerGroup_start_close(t *testing.T) { addrs := <-cc.NewSubConnAddrsCh sc := <-cc.NewSubConnCh m1[addrs[0]] = sc - bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) } // Test roundrobin on the last picker. @@ -116,7 +116,7 @@ func (s) TestBalancerGroup_start_close(t *testing.T) { gator.Stop() bg.Close() for i := 0; i < 4; i++ { - bg.UpdateSubConnState(<-cc.RemoveSubConnCh, balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) + (<-cc.RemoveSubConnCh).UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) } // Add b3, weight 1, backends [1,2]. @@ -140,8 +140,8 @@ func (s) TestBalancerGroup_start_close(t *testing.T) { addrs := <-cc.NewSubConnAddrsCh sc := <-cc.NewSubConnCh m2[addrs[0]] = sc - bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) } // Test roundrobin on the last picker. @@ -201,7 +201,7 @@ func replaceDefaultSubBalancerCloseTimeout(n time.Duration) func() { // Two rr balancers are added to bg, each with 2 ready subConns. A sub-balancer // is removed later, so the balancer group returned has one sub-balancer in its // own map, and one sub-balancer in cache. -func initBalancerGroupForCachingTest(t *testing.T) (*weightedaggregator.Aggregator, *BalancerGroup, *testutils.TestClientConn, map[resolver.Address]balancer.SubConn) { +func initBalancerGroupForCachingTest(t *testing.T) (*weightedaggregator.Aggregator, *BalancerGroup, *testutils.TestClientConn, map[resolver.Address]*testutils.TestSubConn) { cc := testutils.NewTestClientConn(t) gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR) gator.Start() @@ -218,13 +218,13 @@ func initBalancerGroupForCachingTest(t *testing.T) (*weightedaggregator.Aggregat bg.Start() - m1 := make(map[resolver.Address]balancer.SubConn) + m1 := make(map[resolver.Address]*testutils.TestSubConn) for i := 0; i < 4; i++ { addrs := <-cc.NewSubConnAddrsCh sc := <-cc.NewSubConnCh m1[addrs[0]] = sc - bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) } // Test roundrobin on the last picker. @@ -270,7 +270,7 @@ func (s) TestBalancerGroup_locality_caching(t *testing.T) { // Turn down subconn for addr2, shouldn't get picker update because // sub-balancer1 was removed. - bg.UpdateSubConnState(addrToSC[testBackendAddrs[2]], balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + addrToSC[testBackendAddrs[2]].UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) for i := 0; i < 10; i++ { select { case <-cc.NewPickerCh: @@ -430,8 +430,8 @@ func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *test scToAdd[addr[0]] = c - 1 sc := <-cc.NewSubConnCh addrToSC[addr[0]] = sc - bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) case <-newSCTimeout: t.Fatalf("timeout waiting for subConns (from new sub-balancer) to be newed") } @@ -567,8 +567,8 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) { sc := <-cc.NewSubConnCh m1[addrs[0]] = sc scs[sc] = true - bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) } p1 := <-cc.NewPickerCh @@ -576,7 +576,7 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) { m1[testBackendAddrs[0]], m1[testBackendAddrs[1]], } if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p1)); err != nil { - t.Fatalf("want %v, got %v", want, err) + t.Fatal(err) } // The balancer type for testBalancersIDs[0] is currently Round Robin. Now, @@ -599,7 +599,7 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) { // Update the pick first balancers SubConn as CONNECTING. This will cause // the pick first balancer to UpdateState() with CONNECTING, which shouldn't send // a Picker update back, as the Graceful Switch process is not complete. - bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) defer cancel() select { @@ -613,7 +613,7 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) { // Picker update back, as the Graceful Switch process is complete. This // Picker should always pick the pick first's created SubConn which // corresponds to address 3. - bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) p2 := <-cc.NewPickerCh pr, err := p2.Pick(balancer.PickInfo{}) if err != nil { diff --git a/internal/testutils/balancer.go b/internal/testutils/balancer.go index 8927823d09da..3ef4bfb97bd6 100644 --- a/internal/testutils/balancer.go +++ b/internal/testutils/balancer.go @@ -53,8 +53,9 @@ func init() { // TestSubConn implements the SubConn interface, to be used in tests. type TestSubConn struct { - id string - ConnectCh chan struct{} + id string + ConnectCh chan struct{} + stateListener func(balancer.SubConnState) } // UpdateAddresses is a no-op. @@ -73,6 +74,14 @@ func (tsc *TestSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.P return nil, nil } +// UpdateState pushes the state to the listener, if one is registered. +func (tsc *TestSubConn) UpdateState(state balancer.SubConnState) { + if tsc.stateListener != nil { + tsc.stateListener(state) + return + } +} + // String implements stringer to print human friendly error message. func (tsc *TestSubConn) String() string { return tsc.id @@ -83,8 +92,8 @@ type TestClientConn struct { logger testingLogger NewSubConnAddrsCh chan []resolver.Address // the last 10 []Address to create subconn. - NewSubConnCh chan balancer.SubConn // the last 10 subconn created. - RemoveSubConnCh chan balancer.SubConn // the last 10 subconn removed. + NewSubConnCh chan *TestSubConn // the last 10 subconn created. + RemoveSubConnCh chan *TestSubConn // the last 10 subconn removed. UpdateAddressesAddrsCh chan []resolver.Address // last updated address via UpdateAddresses(). NewPickerCh chan balancer.Picker // the last picker updated. @@ -100,8 +109,8 @@ func NewTestClientConn(t *testing.T) *TestClientConn { logger: t, NewSubConnAddrsCh: make(chan []resolver.Address, 10), - NewSubConnCh: make(chan balancer.SubConn, 10), - RemoveSubConnCh: make(chan balancer.SubConn, 10), + NewSubConnCh: make(chan *TestSubConn, 10), + RemoveSubConnCh: make(chan *TestSubConn, 10), UpdateAddressesAddrsCh: make(chan []resolver.Address, 1), NewPickerCh: make(chan balancer.Picker, 1), @@ -113,8 +122,8 @@ func NewTestClientConn(t *testing.T) *TestClientConn { // NewSubConn creates a new SubConn. func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) { sc := TestSubConns[tcc.subConnIdx] + sc.stateListener = o.StateListener tcc.subConnIdx++ - tcc.logger.Logf("testClientConn: NewSubConn(%v, %+v) => %s", a, o, sc) select { case tcc.NewSubConnAddrsCh <- a: @@ -133,7 +142,7 @@ func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubCon func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) { tcc.logger.Logf("testClientConn: RemoveSubConn(%s)", sc) select { - case tcc.RemoveSubConnCh <- sc: + case tcc.RemoveSubConnCh <- sc.(*TestSubConn): default: } } diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index bcdeaf681ab5..ddbcdf30ee25 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -580,6 +580,8 @@ func (ccw *ccWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubC for i, addr := range addrs { newAddrs[i] = xdsinternal.SetHandshakeInfo(addr, ccw.xdsHI) } + // No need to override opts.StateListener; just forward all calls to the + // child that created the SubConn. return ccw.ClientConn.NewSubConn(newAddrs, opts) } diff --git a/xds/internal/balancer/clusterimpl/balancer_test.go b/xds/internal/balancer/clusterimpl/balancer_test.go index 9a36db4dc7d8..106166878685 100644 --- a/xds/internal/balancer/clusterimpl/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/balancer_test.go @@ -148,7 +148,7 @@ func (s) TestDropByCategory(t *testing.T) { } continue } - if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + if err != nil || gotSCSt.SubConn != sc1 { return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) } if gotSCSt.Done != nil { @@ -215,7 +215,7 @@ func (s) TestDropByCategory(t *testing.T) { } continue } - if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + if err != nil || gotSCSt.SubConn != sc1 { return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) } if gotSCSt.Done != nil { @@ -621,14 +621,14 @@ func (s) TestLoadReporting(t *testing.T) { if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error { for i := 0; i < successCount; i++ { gotSCSt, err := p.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + if gotSCSt.SubConn != sc1 { return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) } gotSCSt.Done(balancer.DoneInfo{}) } for i := 0; i < errorCount; i++ { gotSCSt, err := p.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + if gotSCSt.SubConn != sc1 { return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) } gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")}) diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index d316d6a62a91..2c18fbe1a973 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -279,7 +279,7 @@ func (b *clusterImplBalancer) ResolverError(err error) { b.child.ResolverError(err) } -func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { +func (b *clusterImplBalancer) updateSubConnState(sc balancer.SubConn, s balancer.SubConnState, cb func(balancer.SubConnState)) { if b.closed.HasFired() { b.logger.Warningf("xds: received subconn state change {%+v, %+v} after clusterImplBalancer was closed", sc, s) return @@ -305,7 +305,15 @@ func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer } } b.scWrappersMu.Unlock() - b.child.UpdateSubConnState(sc, s) + if cb != nil { + cb(s) + } else { + b.child.UpdateSubConnState(sc, s) + } +} + +func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { + b.updateSubConnState(sc, s, nil) } func (b *clusterImplBalancer) Close() { @@ -378,6 +386,9 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer newAddrs[i] = internal.SetXDSHandshakeClusterName(addr, clusterName) lID = xdsinternal.GetLocalityID(newAddrs[i]) } + var sc balancer.SubConn + oldListener := opts.StateListener + opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state, oldListener) } sc, err := b.ClientConn.NewSubConn(newAddrs, opts) if err != nil { return nil, err diff --git a/xds/internal/balancer/clustermanager/clustermanager_test.go b/xds/internal/balancer/clustermanager/clustermanager_test.go index 7d5966339444..dec07b240c0b 100644 --- a/xds/internal/balancer/clustermanager/clustermanager_test.go +++ b/xds/internal/balancer/clustermanager/clustermanager_test.go @@ -120,7 +120,7 @@ func testPick(t *testing.T, p balancer.Picker, info balancer.PickInfo, wantSC ba if fmt.Sprint(err) != fmt.Sprint(wantErr) { t.Fatalf("picker.Pick(%+v), got error %v, want %v", info, err, wantErr) } - if !cmp.Equal(gotSCSt.SubConn, wantSC, cmp.AllowUnexported(testutils.TestSubConn{})) { + if gotSCSt.SubConn != wantSC { t.Fatalf("picker.Pick(%+v), got %v, want SubConn=%v", info, gotSCSt, wantSC) } } diff --git a/xds/internal/balancer/clusterresolver/clusterresolver.go b/xds/internal/balancer/clusterresolver/clusterresolver.go index a5749aa9aff8..ab836a5b30a7 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver.go @@ -88,6 +88,7 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal b.resourceWatcher = newResourceResolver(b, b.logger) b.cc = &ccWrapper{ ClientConn: cc, + b: b, resourceWatcher: b.resourceWatcher, } @@ -405,12 +406,26 @@ func (b *clusterResolverBalancer) ExitIdle() { } // ccWrapper overrides ResolveNow(), so that re-resolution from the child -// policies will trigger the DNS resolver in cluster_resolver balancer. +// policies will trigger the DNS resolver in cluster_resolver balancer. It +// also intercepts NewSubConn calls in case children don't set the +// StateListener, to allow redirection to happen via this cluster_resolver +// balancer. type ccWrapper struct { balancer.ClientConn + b *clusterResolverBalancer resourceWatcher *resourceResolver } func (c *ccWrapper) ResolveNow(resolver.ResolveNowOptions) { c.resourceWatcher.resolveNow() } + +func (c *ccWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (sc balancer.SubConn, err error) { + if opts.StateListener == nil { + // If already set, just allow updates to be sent directly to the + // child's listener. Otherwise, we are responsible for forwarding the + // update we'll receive to the proper child. + opts.StateListener = func(state balancer.SubConnState) { c.b.UpdateSubConnState(sc, state) } + } + return c.ClientConn.NewSubConn(addrs, opts) +} diff --git a/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go b/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go index 69b7c51cf8fa..29928be21086 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go @@ -329,6 +329,19 @@ type wrappedPriorityBuilder struct { lbCfgCh chan serviceconfig.LoadBalancingConfig } +// wpbCCWrapper wraps a ClientConn and intercepts NewSubConn calls so the +// wrapped priority balancer can intercept SubConn state updates. +type wpbCCWrapper struct { + balancer.ClientConn + b *wrappedPriorityBalancer +} + +func (c *wpbCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (sc balancer.SubConn, err error) { + oldListener := opts.StateListener + opts.StateListener = func(state balancer.SubConnState) { c.b.updateSubConnState(sc, state, oldListener) } + return c.ClientConn.NewSubConn(addrs, opts) +} + func newWrappedPriorityBuilder(b balancer.Builder) *wrappedPriorityBuilder { return &wrappedPriorityBuilder{ scStateCh: buffer.NewUnbounded(), @@ -339,12 +352,13 @@ func newWrappedPriorityBuilder(b balancer.Builder) *wrappedPriorityBuilder { } func (b *wrappedPriorityBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { - priorityLB := b.Builder.Build(cc, opts) - return &wrappedPriorityBalancer{ - Balancer: priorityLB, + wpb := &wrappedPriorityBalancer{ scStateCh: b.scStateCh, lbCfgCh: b.lbCfgCh, } + priorityLB := b.Builder.Build(&wpbCCWrapper{cc, wpb}, opts) + wpb.Balancer = priorityLB + return wpb } type wrappedPriorityBalancer struct { @@ -353,9 +367,18 @@ type wrappedPriorityBalancer struct { lbCfgCh chan serviceconfig.LoadBalancingConfig } +// UpdateSubConnState does nothing, as we ensure all SubConns created by this +// balancer have a StateListener set. func (b *wrappedPriorityBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { +} + +func (b *wrappedPriorityBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) { b.scStateCh.Put(state) - b.Balancer.UpdateSubConnState(sc, state) + if cb != nil { + cb(state) + } else { + b.Balancer.UpdateSubConnState(sc, state) + } } func (b *wrappedPriorityBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { diff --git a/xds/internal/balancer/outlierdetection/balancer.go b/xds/internal/balancer/outlierdetection/balancer.go index eaf4f7fc9ab7..5630b6fcce7f 100644 --- a/xds/internal/balancer/outlierdetection/balancer.go +++ b/xds/internal/balancer/outlierdetection/balancer.go @@ -142,6 +142,7 @@ func (bb) Name() string { type scUpdate struct { scw *subConnWrapper state balancer.SubConnState + cb func(balancer.SubConnState) } type ejectionUpdate struct { @@ -345,7 +346,7 @@ func (b *outlierDetectionBalancer) ResolverError(err error) { b.child.ResolverError(err) } -func (b *outlierDetectionBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { +func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) { b.mu.Lock() defer b.mu.Unlock() scw, ok := b.scWrappers[sc] @@ -361,9 +362,14 @@ func (b *outlierDetectionBalancer) UpdateSubConnState(sc balancer.SubConn, state b.scUpdateCh.Put(&scUpdate{ scw: scw, state: state, + cb: cb, }) } +func (b *outlierDetectionBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + b.updateSubConnState(sc, state, nil) +} + func (b *outlierDetectionBalancer) Close() { b.closed.Fire() <-b.done.Done() @@ -466,6 +472,9 @@ func (b *outlierDetectionBalancer) UpdateState(s balancer.State) { } func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + var sc balancer.SubConn + oldListener := opts.StateListener + opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state, oldListener) } sc, err := b.cc.NewSubConn(addrs, opts) if err != nil { return nil, err @@ -615,7 +624,11 @@ func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) { scw.latestState = u.state if !scw.ejected { b.childMu.Lock() - b.child.UpdateSubConnState(scw, u.state) + if u.cb != nil { + u.cb(u.state) + } else { + b.child.UpdateSubConnState(scw, u.state) + } b.childMu.Unlock() } } diff --git a/xds/internal/balancer/outlierdetection/balancer_test.go b/xds/internal/balancer/outlierdetection/balancer_test.go index 3d1efe8dcd56..4c3051f8fef3 100644 --- a/xds/internal/balancer/outlierdetection/balancer_test.go +++ b/xds/internal/balancer/outlierdetection/balancer_test.go @@ -852,7 +852,7 @@ func (s) TestUpdateAddresses(t *testing.T) { } func scwsEqual(gotSCWS subConnWithState, wantSCWS subConnWithState) error { - if !cmp.Equal(gotSCWS, wantSCWS, cmp.AllowUnexported(subConnWithState{}, testutils.TestSubConn{}, subConnWrapper{}, addressInfo{}), cmpopts.IgnoreFields(subConnWrapper{}, "scUpdateCh")) { + if gotSCWS.sc != wantSCWS.sc || !cmp.Equal(gotSCWS.state, wantSCWS.state, cmp.AllowUnexported(subConnWrapper{}, addressInfo{}), cmpopts.IgnoreFields(subConnWrapper{}, "scUpdateCh")) { return fmt.Errorf("received SubConnState: %+v, want %+v", gotSCWS, wantSCWS) } return nil diff --git a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go index e08ddc98ea79..b1f59a3c545c 100644 --- a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go +++ b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go @@ -107,7 +107,9 @@ func checkRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, for _, addr := range addrs { wantAddrCount[addr.Addr]++ } + gotAddrCount := make(map[string]int) for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + gotAddrCount = make(map[string]int) // Perform 3 iterations. var iterations [][]string for i := 0; i < 3; i++ { @@ -122,7 +124,6 @@ func checkRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, iterations = append(iterations, iteration) } // Ensure the the first iteration contains all addresses in addrs. - gotAddrCount := make(map[string]int) for _, addr := range iterations[0] { gotAddrCount[addr]++ } @@ -135,7 +136,7 @@ func checkRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, } return nil } - return fmt.Errorf("timeout when waiting for roundrobin distribution of RPCs across addresses: %v", addrs) + return fmt.Errorf("timeout when waiting for roundrobin distribution of RPCs across addresses: %v; got: %v", addrs, gotAddrCount) } // TestOutlierDetectionAlgorithmsE2E tests the Outlier Detection Success Rate diff --git a/xds/internal/balancer/priority/balancer_test.go b/xds/internal/balancer/priority/balancer_test.go index 22ecca84bf25..4e6b7c39f7b5 100644 --- a/xds/internal/balancer/priority/balancer_test.go +++ b/xds/internal/balancer/priority/balancer_test.go @@ -24,7 +24,6 @@ import ( "testing" "time" - "github.com/google/go-cmp/cmp" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" @@ -328,7 +327,7 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { // p2 SubConns are removed. scToRemove := <-cc.RemoveSubConnCh - if !cmp.Equal(scToRemove, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) { + if scToRemove != sc2 { t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove) } @@ -426,7 +425,7 @@ func (s) TestPriority_HighPriorityToConnectingFromReady(t *testing.T) { // p1 subconn should be removed. scToRemove := <-cc.RemoveSubConnCh - if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + if scToRemove != sc1 { t.Fatalf("RemoveSubConn, want %v, got %v", sc0, scToRemove) } @@ -618,10 +617,7 @@ func (s) TestPriority_HigherReadyCloseAllLower(t *testing.T) { // With localities caching, the lower priorities are closed after a timeout, // in goroutines. The order is no longer guaranteed. scToRemove := []balancer.SubConn{<-cc.RemoveSubConnCh, <-cc.RemoveSubConnCh} - if !(cmp.Equal(scToRemove[0], sc1, cmp.AllowUnexported(testutils.TestSubConn{})) && - cmp.Equal(scToRemove[1], sc2, cmp.AllowUnexported(testutils.TestSubConn{}))) && - !(cmp.Equal(scToRemove[0], sc2, cmp.AllowUnexported(testutils.TestSubConn{})) && - cmp.Equal(scToRemove[1], sc1, cmp.AllowUnexported(testutils.TestSubConn{}))) { + if !(scToRemove[0] == sc1 && scToRemove[1] == sc2) && !(scToRemove[0] == sc2 && scToRemove[1] == sc1) { t.Errorf("RemoveSubConn, want [%v, %v], got %v", sc1, sc2, scToRemove) } @@ -771,7 +767,7 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { // p0 subconn should be removed. scToRemove := <-cc.RemoveSubConnCh - if !cmp.Equal(scToRemove, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { + if scToRemove != sc0 { t.Fatalf("RemoveSubConn, want %v, got %v", sc0, scToRemove) } @@ -842,7 +838,7 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { // p1 subconn should be removed. scToRemove1 := <-cc.RemoveSubConnCh - if !cmp.Equal(scToRemove1, sc11, cmp.AllowUnexported(testutils.TestSubConn{})) { + if scToRemove1 != sc11 { t.Fatalf("RemoveSubConn, want %v, got %v", sc11, scToRemove1) } @@ -1085,7 +1081,7 @@ func (s) TestPriority_MoveChildToHigherPriority(t *testing.T) { // Old subconn should be removed. scToRemove := <-cc.RemoveSubConnCh - if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + if scToRemove != sc1 { t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) } @@ -1186,7 +1182,7 @@ func (s) TestPriority_MoveReadyChildToHigherPriority(t *testing.T) { // Old subconn from child-0 should be removed. scToRemove := <-cc.RemoveSubConnCh - if !cmp.Equal(scToRemove, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { + if scToRemove != sc0 { t.Fatalf("RemoveSubConn, want %v, got %v", sc0, scToRemove) } @@ -1279,7 +1275,7 @@ func (s) TestPriority_RemoveReadyLowestChild(t *testing.T) { // Old subconn from child-1 should be removed. scToRemove := <-cc.RemoveSubConnCh - if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + if scToRemove != sc1 { t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) } @@ -1469,7 +1465,7 @@ func (s) TestPriority_ChildPolicyChange(t *testing.T) { // Old subconn should be removed. scToRemove := <-cc.RemoveSubConnCh - if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + if scToRemove != sc1 { t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) } diff --git a/xds/internal/balancer/ringhash/ringhash_test.go b/xds/internal/balancer/ringhash/ringhash_test.go index e5b10556e982..3db8a258124b 100644 --- a/xds/internal/balancer/ringhash/ringhash_test.go +++ b/xds/internal/balancer/ringhash/ringhash_test.go @@ -89,7 +89,7 @@ func setupTest(t *testing.T, addrs []resolver.Address) (*testutils.TestClientCon sc1 := <-cc.NewSubConnCh // All the SubConns start in Idle, and should not Connect(). select { - case <-sc1.(*testutils.TestSubConn).ConnectCh: + case <-sc1.ConnectCh: t.Errorf("unexpected Connect() from SubConn %v", sc1) case <-time.After(defaultTestShortTimeout): }