Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

authz: modify the tests to use stubserver instead of testservice implementations #7888

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
13 changes: 4 additions & 9 deletions authz/audit/audit_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"crypto/x509"
"encoding/json"
"io"
"net"
"os"
"testing"
"time"
Expand Down Expand Up @@ -271,17 +270,13 @@ func (s) TestAuditLogger(t *testing.T) {
grpc.ChainUnaryInterceptor(i.UnaryInterceptor),
grpc.ChainStreamInterceptor(i.StreamInterceptor))
defer s.Stop()
testgrpc.RegisterTestServiceServer(s, ss)
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error listening: %v", err)
}
go s.Serve(lis)
ss.S = s
stubserver.StartTestService(t, ss)
purnesh42H marked this conversation as resolved.
Show resolved Hide resolved

// Setup gRPC test client with certificates containing a SPIFFE Id.
clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(clientCreds))
clientConn, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(clientCreds))
if err != nil {
t.Fatalf("grpc.NewClient(%v) failed: %v", lis.Addr().String(), err)
t.Fatalf("grpc.NewClient(%v) failed: %v", ss.Address, err)
}
defer clientConn.Close()
client := testgrpc.NewTestServiceClient(clientConn)
Expand Down
116 changes: 82 additions & 34 deletions authz/grpc_authz_end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/grpc/testdata"
Expand All @@ -42,26 +43,6 @@ import (
testpb "google.golang.org/grpc/interop/grpc_testing"
)

type testServer struct {
testgrpc.UnimplementedTestServiceServer
}

func (s *testServer) UnaryCall(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
}

func (s *testServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error {
for {
_, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&testpb.StreamingInputCallResponse{})
}
if err != nil {
return err
}
}
}

type s struct {
grpctest.Tester
}
Expand Down Expand Up @@ -317,13 +298,30 @@ func (s) TestStaticPolicyEnd2End(t *testing.T) {
grpc.ChainUnaryInterceptor(i.UnaryInterceptor),
grpc.ChainStreamInterceptor(i.StreamInterceptor))
defer s.Stop()
testgrpc.RegisterTestServiceServer(s, &testServer{})

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
go s.Serve(lis)
stub := &stubserver.StubServer{
Listener: lis,
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {
for {
_, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&testpb.StreamingInputCallResponse{})
}
if err != nil {
return err
}
}
},
}
stub.S = s
stubserver.StartTestService(t, stub)

// Establish a connection to the server.
clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand Down Expand Up @@ -387,13 +385,19 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnTLSAuthenticatedConnection(t *
grpc.Creds(creds),
grpc.ChainUnaryInterceptor(i.UnaryInterceptor))
defer s.Stop()
testgrpc.RegisterTestServiceServer(s, &testServer{})

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
go s.Serve(lis)
stub := &stubserver.StubServer{
Listener: lis,
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
}
stub.S = s
stubserver.StartTestService(t, stub)

// Establish a connection to the server.
creds, err = credentials.NewClientTLSFromFile(testdata.Path("x509/server_ca_cert.pem"), "x.test.example.com")
Expand Down Expand Up @@ -452,13 +456,19 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnMTLSAuthenticatedConnection(t
grpc.Creds(creds),
grpc.ChainUnaryInterceptor(i.UnaryInterceptor))
defer s.Stop()
testgrpc.RegisterTestServiceServer(s, &testServer{})

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
go s.Serve(lis)
stub := &stubserver.StubServer{
Listener: lis,
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
}
stub.S = s
stubserver.StartTestService(t, stub)

// Establish a connection to the server.
cert, err = tls.LoadX509KeyPair(testdata.Path("x509/client1_cert.pem"), testdata.Path("x509/client1_key.pem"))
Expand Down Expand Up @@ -506,14 +516,31 @@ func (s) TestFileWatcherEnd2End(t *testing.T) {
grpc.ChainUnaryInterceptor(i.UnaryInterceptor),
grpc.ChainStreamInterceptor(i.StreamInterceptor))
defer s.Stop()
testgrpc.RegisterTestServiceServer(s, &testServer{})

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
defer lis.Close()
go s.Serve(lis)
stub := &stubserver.StubServer{
Listener: lis,
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {
for {
_, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&testpb.StreamingInputCallResponse{})
}
if err != nil {
return err
}
}
},
}
stub.S = s
stubserver.StartTestService(t, stub)

// Establish a connection to the server.
clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand Down Expand Up @@ -575,14 +602,21 @@ func (s) TestFileWatcher_ValidPolicyRefresh(t *testing.T) {
s := grpc.NewServer(
grpc.ChainUnaryInterceptor(i.UnaryInterceptor))
defer s.Stop()
testgrpc.RegisterTestServiceServer(s, &testServer{})

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
defer lis.Close()
go s.Serve(lis)

stub := &stubserver.StubServer{
Listener: lis,
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
}
stub.S = s
stubserver.StartTestService(t, stub)

// Establish a connection to the server.
clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand Down Expand Up @@ -623,14 +657,21 @@ func (s) TestFileWatcher_InvalidPolicySkipReload(t *testing.T) {
s := grpc.NewServer(
grpc.ChainUnaryInterceptor(i.UnaryInterceptor))
defer s.Stop()
testgrpc.RegisterTestServiceServer(s, &testServer{})

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
defer lis.Close()
go s.Serve(lis)

stub := &stubserver.StubServer{
Listener: lis,
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
}
stub.S = s
stubserver.StartTestService(t, stub)

// Establish a connection to the server.
clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand Down Expand Up @@ -674,14 +715,21 @@ func (s) TestFileWatcher_RecoversFromReloadFailure(t *testing.T) {
s := grpc.NewServer(
grpc.ChainUnaryInterceptor(i.UnaryInterceptor))
defer s.Stop()
testgrpc.RegisterTestServiceServer(s, &testServer{})

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
defer lis.Close()
go s.Serve(lis)

stub := &stubserver.StubServer{
Listener: lis,
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
}
stub.S = s
stubserver.StartTestService(t, stub)

// Establish a connection to the server.
clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand Down
18 changes: 15 additions & 3 deletions internal/stubserver/stubserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@
testgrpc.TestServiceServer

// Customizable implementations of server handlers.
EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error)
UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error
EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error)
UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error
StreamingInputCallF func(stream testgrpc.TestService_StreamingInputCallServer) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@janardhanvissa i think we don't need to create extra functions for input and output. Can you just use the FullDuplexCallF for streaming calls

FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. We need to use stream.CloseAndRecv() so we need a client streaming handler. We should rename it to ClientStreamingInputCall and ClientStreamingOutputCall though

StreamingOutputCallF func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error

// A client connected to this service the test may use. Created in Start().
Client testgrpc.TestServiceClient
Expand Down Expand Up @@ -101,6 +103,16 @@
return ss.FullDuplexCallF(stream)
}

// StreamingInputCall is the handler for testpb.StreamingInputCall
func (ss *StubServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error {
purnesh42H marked this conversation as resolved.
Show resolved Hide resolved
return ss.StreamingInputCallF(stream)
}

// StreamingOutputCall is the handler for testpb.StreamingOutputCall
func (ss *StubServer) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
purnesh42H marked this conversation as resolved.
Show resolved Hide resolved
return ss.StreamingOutputCallF(req, stream)

Check warning on line 113 in internal/stubserver/stubserver.go

View check run for this annotation

Codecov / codecov/patch

internal/stubserver/stubserver.go#L112-L113

Added lines #L112 - L113 were not covered by tests
}

// Start starts the server and creates a client connected to it.
func (ss *StubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error {
if err := ss.StartServer(sopts...); err != nil {
Expand Down
Loading