Skip to content

Commit

Permalink
Merge branch 'grpc:master' into proxy_pr2
Browse files Browse the repository at this point in the history
  • Loading branch information
eshitachandwani authored Dec 27, 2024
2 parents 7c5b1b3 + 724f450 commit cb6b09c
Show file tree
Hide file tree
Showing 17 changed files with 113 additions and 67 deletions.
25 changes: 15 additions & 10 deletions balancer/rls/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,12 @@ func (s) TestConfigUpdate_ChildPolicyConfigs(t *testing.T) {
// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()
cc.Connect()

// At this point, the RLS LB policy should have received its config, and
// should have created a child policy for the default target.
Expand Down Expand Up @@ -448,11 +449,12 @@ func (s) TestConfigUpdate_ChildPolicyChange(t *testing.T) {
// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()
cc.Connect()

// At this point, the RLS LB policy should have received its config, and
// should have created a child policy for the default target.
Expand Down Expand Up @@ -603,11 +605,12 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) {
// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()
cc.Connect()

<-clientConnUpdateDone

Expand Down Expand Up @@ -769,11 +772,12 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON)
r.InitialState(resolver.State{ServiceConfig: sc})

cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("create grpc.Dial() failed: %v", err)
t.Fatalf("create grpc.NewClient() failed: %v", err)
}
defer cc.Close()
cc.Connect()

<-clientConnUpdateDone

Expand Down Expand Up @@ -1151,11 +1155,12 @@ func (s) TestUpdateStatePauses(t *testing.T) {
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON)
r.InitialState(resolver.State{ServiceConfig: sc})

cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()
cc.Connect()

// Wait for the clientconn update to be processed by the RLS LB policy.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand Down
11 changes: 7 additions & 4 deletions examples/features/csm_observability/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/examples/features/proto/echo"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
"google.golang.org/grpc/stats/opentelemetry"
"google.golang.org/grpc/stats/opentelemetry/csm"
_ "google.golang.org/grpc/xds" // To install the xds resolvers and balancers.
Expand All @@ -40,9 +40,12 @@ import (
"go.opentelemetry.io/otel/sdk/metric"
)

const defaultName = "world"

var (
target = flag.String("target", "xds:///helloworld:50051", "the server address to connect to")
prometheusEndpoint = flag.String("prometheus_endpoint", ":9464", "the Prometheus exporter endpoint")
name = flag.String("name", defaultName, "Name to greet")
)

func main() {
Expand All @@ -68,15 +71,15 @@ func main() {
log.Fatalf("Failed to start NewClient: %v", err)
}
defer cc.Close()
c := echo.NewEchoClient(cc)
c := pb.NewGreeterClient(cc)

// Make an RPC every second. This should trigger telemetry to be emitted from
// the client and the server.
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
r, err := c.UnaryEcho(ctx, &echo.EchoRequest{Message: "this is examples/opentelemetry"})
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
if err != nil {
log.Printf("UnaryEcho failed: %v", err)
log.Fatalf("Could not greet: %v", err)
}
fmt.Println(r)
time.Sleep(time.Second)
Expand Down
15 changes: 8 additions & 7 deletions examples/features/csm_observability/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ package main
import (
"context"
"flag"
"fmt"
"log"
"net"
"net/http"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
pb "google.golang.org/grpc/examples/features/proto/echo"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
"google.golang.org/grpc/stats/opentelemetry"
"google.golang.org/grpc/stats/opentelemetry/csm"
"google.golang.org/grpc/xds"
Expand All @@ -45,13 +44,15 @@ var (
prometheusEndpoint = flag.String("prometheus_endpoint", ":9464", "the Prometheus exporter endpoint")
)

type echoServer struct {
pb.UnimplementedEchoServer
// server is used to implement helloworld.GreeterServer.
type server struct {
pb.UnimplementedGreeterServer
addr string
}

func (s *echoServer) UnaryEcho(_ context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
return &pb.EchoResponse{Message: fmt.Sprintf("%s (from %s)", req.Message, s.addr)}, nil
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(_ context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

func main() {
Expand Down Expand Up @@ -80,7 +81,7 @@ func main() {
if err != nil {
log.Fatalf("Failed to start xDS Server: %v", err)
}
pb.RegisterEchoServer(s, &echoServer{addr: ":" + *port})
pb.RegisterGreeterServer(s, &server{addr: ":" + *port})

log.Printf("Serving on %s\n", *port)

Expand Down
28 changes: 15 additions & 13 deletions internal/idle/idle_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,12 @@ func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
grpc.WithIdleTimeout(0), // Disable idleness.
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()
cc.Connect()

// Start a test backend and push an address update via the resolver.
backend := stubserver.StartTestService(t, nil)
Expand Down Expand Up @@ -177,12 +178,13 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()

cc.Connect()
// Start a test backend and push an address update via the resolver.
lis := testutils.NewListenerWrapper(t, nil)
backend := stubserver.StartTestService(t, &stubserver.StubServer{Listener: lis})
Expand Down Expand Up @@ -265,12 +267,12 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()

cc.Connect()
// Start a test backend that keeps the RPC call active by blocking
// on a channel that is closed by the test later on.
blockCh := make(chan struct{})
Expand Down Expand Up @@ -354,12 +356,12 @@ func (s) TestChannelIdleness_Enabled_ActiveSinceLastCheck(t *testing.T) {
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()

cc.Connect()
// Start a test backend and push an address update via the resolver.
backend := stubserver.StartTestService(t, nil)
defer backend.Stop()
Expand Down Expand Up @@ -423,12 +425,12 @@ func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) {
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()

cc.Connect()
// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down
2 changes: 1 addition & 1 deletion internal/testutils/blocking_context_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (s) TestBlockingDialer_HoldWaitFail(t *testing.T) {
}()

if !h.Wait(ctx) {
t.Fatalf("Timeout while waiting for a connection attempt to " + h.addr)
t.Fatal("Timeout while waiting for a connection attempt to " + h.addr)
}
select {
case err = <-dialError:
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/handler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,5 +498,5 @@ func mapRecvMsgError(err error) error {
if strings.Contains(err.Error(), "body closed by handler") {
return status.Error(codes.Canceled, err.Error())
}
return connectionErrorf(true, err, err.Error())
return connectionErrorf(true, err, "%s", err.Error())
}
3 changes: 3 additions & 0 deletions internal/xds/rbac/rbac_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ func newRPCData(ctx context.Context) (*rpcData, error) {
if !ok {
return nil, errors.New("missing method in incoming context")
}
// gRPC-Go strips :path from the headers given to the application, but RBAC should be
// able to match against it.
md[":path"] = []string{mn}

// The connection is needed in order to find the destination address and
// port of the incoming RPC Call.
Expand Down
26 changes: 19 additions & 7 deletions orca/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,17 @@ func (s) TestProducer(t *testing.T) {
li := &listenerInfo{scChan: make(chan balancer.SubConn, 1), listener: oobLis, opts: lisOpts}
addr := setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)
r.InitialState(resolver.State{Addresses: []resolver.Address{addr}})
cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
dopts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`),
grpc.WithResolvers(r),
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
cc, err := grpc.NewClient("whatever:///whatever", dopts...)
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()

cc.Connect()
// Set a few metrics and wait for them on the client side.
smr.SetCPUUtilization(10)
smr.SetMemoryUtilization(0.1)
Expand Down Expand Up @@ -319,10 +324,16 @@ func (s) TestProducerBackoff(t *testing.T) {
lisOpts := orca.OOBListenerOptions{ReportInterval: reportInterval}
li := &listenerInfo{scChan: make(chan balancer.SubConn, 1), listener: oobLis, opts: lisOpts}
r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)}})
cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
dopts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`),
grpc.WithResolvers(r),
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
cc, err := grpc.NewClient("whatever:///whatever", dopts...)
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
t.Fatalf("grpc.NewClient failed: %v", err)
}
cc.Connect()
defer cc.Close()

// Define a load report to send and expect the client to see.
Expand Down Expand Up @@ -431,10 +442,11 @@ func (s) TestProducerMultipleListeners(t *testing.T) {
lisOpts1 := orca.OOBListenerOptions{ReportInterval: reportInterval1}
li := &listenerInfo{scChan: make(chan balancer.SubConn, 1), listener: oobLis1, opts: lisOpts1}
r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)}})
cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
cc, err := grpc.NewClient("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
t.Fatalf("grpc.NewClient() failed: %v", err)
}
cc.Connect()
defer cc.Close()

// Ensure the OOB listener is stopped before the client is closed to avoid
Expand Down
6 changes: 4 additions & 2 deletions resolver/manual/manual_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,14 @@ func TestResolver(t *testing.T) {
})

t.Run("happy_path", func(t *testing.T) {
_, err := grpc.Dial("whatever://localhost",
cc, err := grpc.NewClient("whatever://localhost",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r))
if err != nil {
t.Errorf("dial setup error: %v", err)
t.Errorf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()
cc.Connect()
r.UpdateState(resolver.State{Addresses: []resolver.Address{
{Addr: "ok"},
}})
Expand Down
2 changes: 1 addition & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1766,7 +1766,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
return err
}
if err == io.ErrUnexpectedEOF {
err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
err = status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())
}
return toRPCErr(err)
}
Expand Down
15 changes: 5 additions & 10 deletions test/authority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s) TestUnixCustomDialer(t *testing.T) {
}
}

// TestColonPortAuthority does an end to end test with the target for grpc.Dial
// TestColonPortAuthority does an end to end test with the target for grpc.NewClient
// being ":[port]". Ensures authority is "localhost:[port]".
func (s) TestColonPortAuthority(t *testing.T) {
expectedAuthority := ""
Expand All @@ -189,16 +189,11 @@ func (s) TestColonPortAuthority(t *testing.T) {
authorityMu.Lock()
expectedAuthority = "localhost:" + port
authorityMu.Unlock()
// ss.Start dials, but not the ":[port]" target that is being tested here.
// Dial again, with ":[port]" as the target.
//
// Append "localhost" before calling net.Dial, in case net.Dial on certain
// platforms doesn't work well for address without the IP.
cc, err := grpc.Dial(":"+port, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "tcp", "localhost"+addr)
}))
// ss.Start dials the server, but we explicitly test with ":[port]"
// as the target.
cc, err := grpc.NewClient(":"+port, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.Dial(%q) = %v", ss.Target, err)
t.Fatalf("grpc.NewClient(%q) = %v", ss.Target, err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand Down
2 changes: 1 addition & 1 deletion test/creds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ type methodTestCreds struct{}

func (m *methodTestCreds) GetRequestMetadata(ctx context.Context, _ ...string) (map[string]string, error) {
ri, _ := credentials.RequestInfoFromContext(ctx)
return nil, status.Errorf(codes.Unknown, ri.Method)
return nil, status.Error(codes.Unknown, ri.Method)
}

func (m *methodTestCreds) RequireTransportSecurity() bool { return false }
Expand Down
Loading

0 comments on commit cb6b09c

Please sign in to comment.