Skip to content

Commit

Permalink
using cc.Connect() instead of making an RPC call
Browse files Browse the repository at this point in the history
  • Loading branch information
janardhankrishna-sai committed Dec 23, 2024
1 parent 8fab779 commit d28f9f8
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 46 deletions.
48 changes: 14 additions & 34 deletions internal/idle/idle_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,16 @@ func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()
cc.Connect()

// Start a test backend and push an address update via the resolver.
backend := stubserver.StartTestService(t, nil)
defer backend.Stop()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Trigger the resolver by initiating an RPC.
// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
go func() {
_ = cc.Invoke(ctx, "/test/method", nil, nil)
}()
// Verify that the ClientConn moves to READY.
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Verify that the ClientConn stays in READY.
Expand Down Expand Up @@ -186,20 +183,16 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()

cc.Connect()
// Start a test backend and push an address update via the resolver.
lis := testutils.NewListenerWrapper(t, nil)
backend := stubserver.StartTestService(t, &stubserver.StubServer{Listener: lis})
defer backend.Stop()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Verify that the ClientConn moves to READY and trigger the resolver by
// initiating an RPC.
// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
go func() {
_ = cc.Invoke(ctx, "/test/method", nil, nil)
}()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Retrieve the wrapped conn from the listener.
Expand Down Expand Up @@ -278,7 +271,7 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()

cc.Connect()
// Start a test backend that keeps the RPC call active by blocking
// on a channel that is closed by the test later on.
blockCh := make(chan struct{})
Expand All @@ -299,15 +292,11 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {

// Push an address update containing the address of the above
// backend via the manual resolver.
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Verify that the ClientConn moves to READY and trigger the resolver by
// initiating an RPC.
// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
go func() {
_ = cc.Invoke(ctx, "/test/method", nil, nil)
}()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Spawn a goroutine to check for expected behavior while a blocking
Expand Down Expand Up @@ -371,19 +360,15 @@ func (s) TestChannelIdleness_Enabled_ActiveSinceLastCheck(t *testing.T) {
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()

cc.Connect()
// Start a test backend and push an address update via the resolver.
backend := stubserver.StartTestService(t, nil)
defer backend.Stop()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Verify that the ClientConn moves to READY and trigger the resolver by
// initiating an RPC.
// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
go func() {
_ = cc.Invoke(ctx, "/test/method", nil, nil)
}()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// For a duration of three times the configured idle timeout, making RPCs
Expand Down Expand Up @@ -444,15 +429,10 @@ func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) {
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()

// Verify that the ClientConn moves to READY and trigger the resolver by
// initiating an RPC.
cc.Connect()
// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
go func() {
_ = cc.Invoke(ctx, "/test/method", nil, nil)
}()

testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Verify that the ClientConn moves to IDLE as there is no activity.
Expand Down
14 changes: 12 additions & 2 deletions orca/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,12 @@ func (s) TestProducer(t *testing.T) {
li := &listenerInfo{scChan: make(chan balancer.SubConn, 1), listener: oobLis, opts: lisOpts}
addr := setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)
r.InitialState(resolver.State{Addresses: []resolver.Address{addr}})
cc, err := grpc.NewClient("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
dopts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`),
grpc.WithResolvers(r),
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
cc, err := grpc.NewClient("whatever:///whatever", dopts...)
if err != nil {
t.Fatalf("grpc.NewClient() failed: %v", err)
}
Expand Down Expand Up @@ -319,7 +324,12 @@ func (s) TestProducerBackoff(t *testing.T) {
lisOpts := orca.OOBListenerOptions{ReportInterval: reportInterval}
li := &listenerInfo{scChan: make(chan balancer.SubConn, 1), listener: oobLis, opts: lisOpts}
r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)}})
cc, err := grpc.NewClient("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
dopts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`),
grpc.WithResolvers(r),
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
cc, err := grpc.NewClient("whatever:///whatever", dopts...)
if err != nil {
t.Fatalf("grpc.NewClient failed: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions resolver/manual/manual_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ func TestResolver(t *testing.T) {
})

t.Run("happy_path", func(t *testing.T) {
r.InitialState(resolver.State{Addresses: []resolver.Address{
{Addr: "ok"},
}})
cc, err := grpc.NewClient("whatever://localhost",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r))
Expand All @@ -70,6 +67,9 @@ func TestResolver(t *testing.T) {
}
defer cc.Close()
cc.Connect()
r.UpdateState(resolver.State{Addresses: []resolver.Address{
{Addr: "ok"},
}})
r.ReportError(errors.New("example"))
})
}
7 changes: 1 addition & 6 deletions test/authority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,7 @@ func (s) TestColonPortAuthority(t *testing.T) {
//
// Append "localhost" before calling net.Dial, in case net.Dial on certain
// platforms doesn't work well for address without the IP.
cc, err := grpc.NewClient(":"+port, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
if len(addr) > 0 && addr[0] == ':' {
addr = "localhost" + addr
}
return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
}))
cc, err := grpc.NewClient(":"+port, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%q) = %v", ss.Target, err)
}
Expand Down
2 changes: 1 addition & 1 deletion test/roundrobin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (s) TestRoundRobin_UpdateAddressAttributes(t *testing.T) {
grpc.WithResolvers(r),
grpc.WithDefaultServiceConfig(rrServiceConfig),
}
// Send a resolver update with no address attributes.
// Set an initial resolver update with no address attributes.
addr := resolver.Address{Addr: backend.Address}
r.InitialState(resolver.State{Addresses: []resolver.Address{addr}})
cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
Expand Down

0 comments on commit d28f9f8

Please sign in to comment.