From cb0a479556e92d5e685ec49d87fc33e5c4c9d486 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Mon, 2 Dec 2024 13:58:39 +0000 Subject: [PATCH 01/10] Initializing StreamingInputCall and SteamingOutputCall in the stubserver --- internal/stubserver/stubserver.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/internal/stubserver/stubserver.go b/internal/stubserver/stubserver.go index 2e404e294bf6..3c3f4fb067f2 100644 --- a/internal/stubserver/stubserver.go +++ b/internal/stubserver/stubserver.go @@ -56,9 +56,11 @@ type StubServer struct { testgrpc.TestServiceServer // Customizable implementations of server handlers. - EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) - UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) - FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error + EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) + UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) + FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error + StreamingInputCallF func(stream testgrpc.TestService_StreamingInputCallServer) error + StreamingOutputCallF func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error // A client connected to this service the test may use. Created in Start(). Client testgrpc.TestServiceClient @@ -101,6 +103,16 @@ func (ss *StubServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallS return ss.FullDuplexCallF(stream) } +// StreamingInputCall is the handler for testpb.StreamingInputCall +func (ss *StubServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error { + return ss.StreamingInputCallF(stream) +} + +// StreamingOutputCall is the handler for testpb.StreamingOutputCall +func (ss *StubServer) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { + return ss.StreamingOutputCallF(req, stream) +} + // Start starts the server and creates a client connected to it. func (ss *StubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error { if err := ss.StartServer(sopts...); err != nil { From d364fdc3d9e0ed208343a1c78f3a05c4e424d851 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Tue, 3 Dec 2024 07:00:39 +0000 Subject: [PATCH 02/10] switching to stubserver in tests instead of testserviceimpl --- authz/grpc_authz_end2end_test.go | 116 ++++++++++++++++++++++--------- 1 file changed, 82 insertions(+), 34 deletions(-) diff --git a/authz/grpc_authz_end2end_test.go b/authz/grpc_authz_end2end_test.go index 4e798f7ca3d7..8008a2275d40 100644 --- a/authz/grpc_authz_end2end_test.go +++ b/authz/grpc_authz_end2end_test.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/grpc/testdata" @@ -42,26 +43,6 @@ import ( testpb "google.golang.org/grpc/interop/grpc_testing" ) -type testServer struct { - testgrpc.UnimplementedTestServiceServer -} - -func (s *testServer) UnaryCall(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - return &testpb.SimpleResponse{}, nil -} - -func (s *testServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error { - for { - _, err := stream.Recv() - if err == io.EOF { - return stream.SendAndClose(&testpb.StreamingInputCallResponse{}) - } - if err != nil { - return err - } - } -} - type s struct { grpctest.Tester } @@ -317,13 +298,30 @@ func (s) TestStaticPolicyEnd2End(t *testing.T) { grpc.ChainUnaryInterceptor(i.UnaryInterceptor), grpc.ChainStreamInterceptor(i.StreamInterceptor)) defer s.Stop() - testgrpc.RegisterTestServiceServer(s, &testServer{}) lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) } - go s.Serve(lis) + stub := &stubserver.StubServer{ + Listener: lis, + UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { + for { + _, err := stream.Recv() + if err == io.EOF { + return stream.SendAndClose(&testpb.StreamingInputCallResponse{}) + } + if err != nil { + return err + } + } + }, + } + stub.S = s + stubserver.StartTestService(t, stub) // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -387,13 +385,19 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnTLSAuthenticatedConnection(t * grpc.Creds(creds), grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) defer s.Stop() - testgrpc.RegisterTestServiceServer(s, &testServer{}) lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) } - go s.Serve(lis) + stub := &stubserver.StubServer{ + Listener: lis, + UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + } + stub.S = s + stubserver.StartTestService(t, stub) // Establish a connection to the server. creds, err = credentials.NewClientTLSFromFile(testdata.Path("x509/server_ca_cert.pem"), "x.test.example.com") @@ -452,13 +456,19 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnMTLSAuthenticatedConnection(t grpc.Creds(creds), grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) defer s.Stop() - testgrpc.RegisterTestServiceServer(s, &testServer{}) lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) } - go s.Serve(lis) + stub := &stubserver.StubServer{ + Listener: lis, + UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + } + stub.S = s + stubserver.StartTestService(t, stub) // Establish a connection to the server. cert, err = tls.LoadX509KeyPair(testdata.Path("x509/client1_cert.pem"), testdata.Path("x509/client1_key.pem")) @@ -506,14 +516,31 @@ func (s) TestFileWatcherEnd2End(t *testing.T) { grpc.ChainUnaryInterceptor(i.UnaryInterceptor), grpc.ChainStreamInterceptor(i.StreamInterceptor)) defer s.Stop() - testgrpc.RegisterTestServiceServer(s, &testServer{}) lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) } defer lis.Close() - go s.Serve(lis) + stub := &stubserver.StubServer{ + Listener: lis, + UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { + for { + _, err := stream.Recv() + if err == io.EOF { + return stream.SendAndClose(&testpb.StreamingInputCallResponse{}) + } + if err != nil { + return err + } + } + }, + } + stub.S = s + stubserver.StartTestService(t, stub) // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -575,14 +602,21 @@ func (s) TestFileWatcher_ValidPolicyRefresh(t *testing.T) { s := grpc.NewServer( grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) defer s.Stop() - testgrpc.RegisterTestServiceServer(s, &testServer{}) lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) } defer lis.Close() - go s.Serve(lis) + + stub := &stubserver.StubServer{ + Listener: lis, + UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + } + stub.S = s + stubserver.StartTestService(t, stub) // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -623,14 +657,21 @@ func (s) TestFileWatcher_InvalidPolicySkipReload(t *testing.T) { s := grpc.NewServer( grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) defer s.Stop() - testgrpc.RegisterTestServiceServer(s, &testServer{}) lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) } defer lis.Close() - go s.Serve(lis) + + stub := &stubserver.StubServer{ + Listener: lis, + UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + } + stub.S = s + stubserver.StartTestService(t, stub) // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -674,14 +715,21 @@ func (s) TestFileWatcher_RecoversFromReloadFailure(t *testing.T) { s := grpc.NewServer( grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) defer s.Stop() - testgrpc.RegisterTestServiceServer(s, &testServer{}) lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) } defer lis.Close() - go s.Serve(lis) + + stub := &stubserver.StubServer{ + Listener: lis, + UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + } + stub.S = s + stubserver.StartTestService(t, stub) // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) From 309abd15146c379dd73e53fcb9c064780d577c3e Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Tue, 3 Dec 2024 12:02:40 +0000 Subject: [PATCH 03/10] switching to stubserver instead of testservice --- authz/audit/audit_logging_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/authz/audit/audit_logging_test.go b/authz/audit/audit_logging_test.go index ea84db099608..2344cc12329f 100644 --- a/authz/audit/audit_logging_test.go +++ b/authz/audit/audit_logging_test.go @@ -271,7 +271,8 @@ func (s) TestAuditLogger(t *testing.T) { grpc.ChainUnaryInterceptor(i.UnaryInterceptor), grpc.ChainStreamInterceptor(i.StreamInterceptor)) defer s.Stop() - testgrpc.RegisterTestServiceServer(s, ss) + ss.S = s + stubserver.StartTestService(t, ss) lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("Error listening: %v", err) From fa74a99642833e691eec69eeb5675ae1ee1ff633 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Fri, 6 Dec 2024 06:42:57 +0000 Subject: [PATCH 04/10] removing redundant server code --- authz/audit/audit_logging_test.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/authz/audit/audit_logging_test.go b/authz/audit/audit_logging_test.go index 2344cc12329f..5db4487e227f 100644 --- a/authz/audit/audit_logging_test.go +++ b/authz/audit/audit_logging_test.go @@ -24,7 +24,6 @@ import ( "crypto/x509" "encoding/json" "io" - "net" "os" "testing" "time" @@ -273,16 +272,11 @@ func (s) TestAuditLogger(t *testing.T) { defer s.Stop() ss.S = s stubserver.StartTestService(t, ss) - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Error listening: %v", err) - } - go s.Serve(lis) // Setup gRPC test client with certificates containing a SPIFFE Id. - clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(clientCreds)) + clientConn, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(clientCreds)) if err != nil { - t.Fatalf("grpc.NewClient(%v) failed: %v", lis.Addr().String(), err) + t.Fatalf("grpc.NewClient(%v) failed: %v", ss.Address, err) } defer clientConn.Close() client := testgrpc.NewTestServiceClient(clientConn) From 1e7a099279df1f99ccd3e6518cd4b00a0b88a689 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Wed, 18 Dec 2024 09:49:28 +0000 Subject: [PATCH 05/10] renaming streamingcalls --- authz/grpc_authz_end2end_test.go | 4 ++-- internal/stubserver/stubserver.go | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/authz/grpc_authz_end2end_test.go b/authz/grpc_authz_end2end_test.go index 8008a2275d40..24ceacd43d81 100644 --- a/authz/grpc_authz_end2end_test.go +++ b/authz/grpc_authz_end2end_test.go @@ -308,7 +308,7 @@ func (s) TestStaticPolicyEnd2End(t *testing.T) { UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, - StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { + ClientStreamingInputCall: func(stream testgrpc.TestService_StreamingInputCallServer) error { for { _, err := stream.Recv() if err == io.EOF { @@ -527,7 +527,7 @@ func (s) TestFileWatcherEnd2End(t *testing.T) { UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, - StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { + ClientStreamingInputCall: func(stream testgrpc.TestService_StreamingInputCallServer) error { for { _, err := stream.Recv() if err == io.EOF { diff --git a/internal/stubserver/stubserver.go b/internal/stubserver/stubserver.go index 3c3f4fb067f2..a9d766a31a39 100644 --- a/internal/stubserver/stubserver.go +++ b/internal/stubserver/stubserver.go @@ -56,11 +56,11 @@ type StubServer struct { testgrpc.TestServiceServer // Customizable implementations of server handlers. - EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) - UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) - FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error - StreamingInputCallF func(stream testgrpc.TestService_StreamingInputCallServer) error - StreamingOutputCallF func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error + EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) + UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) + FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error + ClientStreamingInputCall func(stream testgrpc.TestService_StreamingInputCallServer) error + ClientStreamingOutputCall func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error // A client connected to this service the test may use. Created in Start(). Client testgrpc.TestServiceClient @@ -105,12 +105,12 @@ func (ss *StubServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallS // StreamingInputCall is the handler for testpb.StreamingInputCall func (ss *StubServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error { - return ss.StreamingInputCallF(stream) + return ss.ClientStreamingInputCall(stream) } // StreamingOutputCall is the handler for testpb.StreamingOutputCall func (ss *StubServer) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { - return ss.StreamingOutputCallF(req, stream) + return ss.ClientStreamingOutputCall(req, stream) } // Start starts the server and creates a client connected to it. From 924a9e74781cbc9d9e672da86f8aeaa2c46ec472 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Wed, 18 Dec 2024 10:28:28 +0000 Subject: [PATCH 06/10] moving initialization of NewServer inside stub --- authz/grpc_authz_end2end_test.go | 70 +++++++++++++------------------- 1 file changed, 29 insertions(+), 41 deletions(-) diff --git a/authz/grpc_authz_end2end_test.go b/authz/grpc_authz_end2end_test.go index 24ceacd43d81..45dcc30f69f9 100644 --- a/authz/grpc_authz_end2end_test.go +++ b/authz/grpc_authz_end2end_test.go @@ -294,10 +294,6 @@ func (s) TestStaticPolicyEnd2End(t *testing.T) { t.Run(name, func(t *testing.T) { // Start a gRPC server with gRPC authz unary and stream server interceptors. i, _ := authz.NewStatic(test.authzPolicy) - s := grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor), - grpc.ChainStreamInterceptor(i.StreamInterceptor)) - defer s.Stop() lis, err := net.Listen("tcp", "localhost:0") if err != nil { @@ -319,9 +315,12 @@ func (s) TestStaticPolicyEnd2End(t *testing.T) { } } }, + S: grpc.NewServer( + grpc.ChainUnaryInterceptor(i.UnaryInterceptor), + grpc.ChainStreamInterceptor(i.StreamInterceptor)), } - stub.S = s stubserver.StartTestService(t, stub) + defer stub.S.Stop() // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -381,10 +380,6 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnTLSAuthenticatedConnection(t * if err != nil { t.Fatalf("failed to generate credentials: %v", err) } - s := grpc.NewServer( - grpc.Creds(creds), - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) - defer s.Stop() lis, err := net.Listen("tcp", "localhost:0") if err != nil { @@ -395,9 +390,12 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnTLSAuthenticatedConnection(t * UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, + S: grpc.NewServer( + grpc.Creds(creds), + grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), } - stub.S = s stubserver.StartTestService(t, stub) + defer stub.S.Stop() // Establish a connection to the server. creds, err = credentials.NewClientTLSFromFile(testdata.Path("x509/server_ca_cert.pem"), "x.test.example.com") @@ -452,10 +450,6 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnMTLSAuthenticatedConnection(t Certificates: []tls.Certificate{cert}, ClientCAs: certPool, }) - s := grpc.NewServer( - grpc.Creds(creds), - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) - defer s.Stop() lis, err := net.Listen("tcp", "localhost:0") if err != nil { @@ -466,9 +460,12 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnMTLSAuthenticatedConnection(t UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, + S: grpc.NewServer( + grpc.Creds(creds), + grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), } - stub.S = s stubserver.StartTestService(t, stub) + defer stub.S.Stop() // Establish a connection to the server. cert, err = tls.LoadX509KeyPair(testdata.Path("x509/client1_cert.pem"), testdata.Path("x509/client1_key.pem")) @@ -511,12 +508,6 @@ func (s) TestFileWatcherEnd2End(t *testing.T) { i, _ := authz.NewFileWatcher(file, 1*time.Second) defer i.Close() - // Start a gRPC server with gRPC authz unary and stream server interceptors. - s := grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor), - grpc.ChainStreamInterceptor(i.StreamInterceptor)) - defer s.Stop() - lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) @@ -538,9 +529,13 @@ func (s) TestFileWatcherEnd2End(t *testing.T) { } } }, + // Start a gRPC server with gRPC authz unary and stream server interceptors. + S: grpc.NewServer( + grpc.ChainUnaryInterceptor(i.UnaryInterceptor), + grpc.ChainStreamInterceptor(i.StreamInterceptor)), } - stub.S = s stubserver.StartTestService(t, stub) + defer stub.S.Stop() // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -598,11 +593,6 @@ func (s) TestFileWatcher_ValidPolicyRefresh(t *testing.T) { i, _ := authz.NewFileWatcher(file, 100*time.Millisecond) defer i.Close() - // Start a gRPC server with gRPC authz unary server interceptor. - s := grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) - defer s.Stop() - lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) @@ -614,9 +604,12 @@ func (s) TestFileWatcher_ValidPolicyRefresh(t *testing.T) { UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, + // Start a gRPC server with gRPC authz unary server interceptor. + S: grpc.NewServer( + grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), } - stub.S = s stubserver.StartTestService(t, stub) + defer stub.S.Stop() // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -653,11 +646,6 @@ func (s) TestFileWatcher_InvalidPolicySkipReload(t *testing.T) { i, _ := authz.NewFileWatcher(file, 20*time.Millisecond) defer i.Close() - // Start a gRPC server with gRPC authz unary server interceptors. - s := grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) - defer s.Stop() - lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) @@ -669,9 +657,12 @@ func (s) TestFileWatcher_InvalidPolicySkipReload(t *testing.T) { UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, + // Start a gRPC server with gRPC authz unary server interceptors. + S: grpc.NewServer( + grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), } - stub.S = s stubserver.StartTestService(t, stub) + defer stub.S.Stop() // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -705,17 +696,12 @@ func (s) TestFileWatcher_InvalidPolicySkipReload(t *testing.T) { } } -func (s) TestFileWatcher_RecoversFromReloadFailure(t *testing.T) { +func TestFileWatcher_RecoversFromReloadFailure(t *testing.T) { valid1 := authzTests["DeniesRPCMatchInDenyAndAllow"] file := createTmpPolicyFile(t, "recovers_from_reload_failure", []byte(valid1.authzPolicy)) i, _ := authz.NewFileWatcher(file, 100*time.Millisecond) defer i.Close() - // Start a gRPC server with gRPC authz unary server interceptors. - s := grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) - defer s.Stop() - lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) @@ -727,9 +713,11 @@ func (s) TestFileWatcher_RecoversFromReloadFailure(t *testing.T) { UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, + S: grpc.NewServer( + grpc.ChainUnaryInterceptor(i.UnaryInterceptor)), } - stub.S = s stubserver.StartTestService(t, stub) + defer stub.S.Stop() // Establish a connection to the server. clientConn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) From ecdab23b72f154d6e6efa74abf06574526121fd4 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Fri, 20 Dec 2024 09:55:48 +0000 Subject: [PATCH 07/10] renaming client and server handlers --- authz/grpc_authz_end2end_test.go | 4 ++-- internal/stubserver/stubserver.go | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/authz/grpc_authz_end2end_test.go b/authz/grpc_authz_end2end_test.go index 45dcc30f69f9..4c0340111cd8 100644 --- a/authz/grpc_authz_end2end_test.go +++ b/authz/grpc_authz_end2end_test.go @@ -304,7 +304,7 @@ func (s) TestStaticPolicyEnd2End(t *testing.T) { UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, - ClientStreamingInputCall: func(stream testgrpc.TestService_StreamingInputCallServer) error { + StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { for { _, err := stream.Recv() if err == io.EOF { @@ -518,7 +518,7 @@ func (s) TestFileWatcherEnd2End(t *testing.T) { UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, - ClientStreamingInputCall: func(stream testgrpc.TestService_StreamingInputCallServer) error { + StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { for { _, err := stream.Recv() if err == io.EOF { diff --git a/internal/stubserver/stubserver.go b/internal/stubserver/stubserver.go index a9d766a31a39..1c92793d8df6 100644 --- a/internal/stubserver/stubserver.go +++ b/internal/stubserver/stubserver.go @@ -56,11 +56,11 @@ type StubServer struct { testgrpc.TestServiceServer // Customizable implementations of server handlers. - EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) - UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) - FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error - ClientStreamingInputCall func(stream testgrpc.TestService_StreamingInputCallServer) error - ClientStreamingOutputCall func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error + EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) + UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) + FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error + StreamingInputCallF func(stream testgrpc.TestService_StreamingInputCallServer) error // ClientStreaming + StreamingOutputCallF func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error // ServerStreaming // A client connected to this service the test may use. Created in Start(). Client testgrpc.TestServiceClient @@ -105,12 +105,12 @@ func (ss *StubServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallS // StreamingInputCall is the handler for testpb.StreamingInputCall func (ss *StubServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error { - return ss.ClientStreamingInputCall(stream) + return ss.StreamingInputCallF(stream) } // StreamingOutputCall is the handler for testpb.StreamingOutputCall func (ss *StubServer) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { - return ss.ClientStreamingOutputCall(req, stream) + return ss.StreamingOutputCallF(req, stream) } // Start starts the server and creates a client connected to it. From 72879839ecb0434f9c91b7182a936fa2daa48635 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Fri, 20 Dec 2024 10:10:26 +0000 Subject: [PATCH 08/10] updating grpc_authz_end2end_test.go --- authz/grpc_authz_end2end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/authz/grpc_authz_end2end_test.go b/authz/grpc_authz_end2end_test.go index 4c0340111cd8..d54c88d67ff6 100644 --- a/authz/grpc_authz_end2end_test.go +++ b/authz/grpc_authz_end2end_test.go @@ -696,7 +696,7 @@ func (s) TestFileWatcher_InvalidPolicySkipReload(t *testing.T) { } } -func TestFileWatcher_RecoversFromReloadFailure(t *testing.T) { +func (s) TestFileWatcher_RecoversFromReloadFailure(t *testing.T) { valid1 := authzTests["DeniesRPCMatchInDenyAndAllow"] file := createTmpPolicyFile(t, "recovers_from_reload_failure", []byte(valid1.authzPolicy)) i, _ := authz.NewFileWatcher(file, 100*time.Millisecond) From 4588fc111159824ce72e97d9590481d2dc8efe75 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Sun, 22 Dec 2024 06:23:31 +0000 Subject: [PATCH 09/10] updating client and server streaming comments --- internal/stubserver/stubserver.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/stubserver/stubserver.go b/internal/stubserver/stubserver.go index 1c92793d8df6..92262af877a6 100644 --- a/internal/stubserver/stubserver.go +++ b/internal/stubserver/stubserver.go @@ -59,8 +59,8 @@ type StubServer struct { EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) FullDuplexCallF func(stream testgrpc.TestService_FullDuplexCallServer) error - StreamingInputCallF func(stream testgrpc.TestService_StreamingInputCallServer) error // ClientStreaming - StreamingOutputCallF func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error // ServerStreaming + StreamingInputCallF func(stream testgrpc.TestService_StreamingInputCallServer) error // Client-Streaming request + StreamingOutputCallF func(req *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error // Server-streaming response // A client connected to this service the test may use. Created in Start(). Client testgrpc.TestServiceClient From 4298e86ab2788f20e6d69f37f521cd6904a34607 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Sun, 22 Dec 2024 06:32:33 +0000 Subject: [PATCH 10/10] removing empty lines for re-trigger --- authz/grpc_authz_end2end_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/authz/grpc_authz_end2end_test.go b/authz/grpc_authz_end2end_test.go index d54c88d67ff6..6ddc8dbf78f0 100644 --- a/authz/grpc_authz_end2end_test.go +++ b/authz/grpc_authz_end2end_test.go @@ -598,7 +598,6 @@ func (s) TestFileWatcher_ValidPolicyRefresh(t *testing.T) { t.Fatalf("error listening: %v", err) } defer lis.Close() - stub := &stubserver.StubServer{ Listener: lis, UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { @@ -651,7 +650,6 @@ func (s) TestFileWatcher_InvalidPolicySkipReload(t *testing.T) { t.Fatalf("error listening: %v", err) } defer lis.Close() - stub := &stubserver.StubServer{ Listener: lis, UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {