diff --git a/examples/examples_test.sh b/examples/examples_test.sh index ef0d34769482..28f050c0a0e8 100755 --- a/examples/examples_test.sh +++ b/examples/examples_test.sh @@ -71,6 +71,7 @@ EXAMPLES=( "features/orca" "features/retry" "features/unix_abstract" + "features/gracefulstop" ) declare -A SERVER_ARGS=( @@ -129,6 +130,7 @@ declare -A EXPECTED_SERVER_OUTPUT=( ["features/retry"]="request succeeded count: 4" ["features/unix_abstract"]="serving on @abstract-unix-socket" ["features/advancedtls"]="" + ["features/gracefulstop"]="Server stopped gracefully." ) declare -A EXPECTED_CLIENT_OUTPUT=( @@ -154,6 +156,7 @@ declare -A EXPECTED_CLIENT_OUTPUT=( ["features/retry"]="UnaryEcho reply: message:\"Try and Success\"" ["features/unix_abstract"]="calling echo.Echo/UnaryEcho to unix-abstract:abstract-unix-socket" ["features/advancedtls"]="" + ["features/gracefulstop"]="Successful unary requests processed by server and made by client are same." ) cd ./examples diff --git a/examples/features/gracefulstop/README.md b/examples/features/gracefulstop/README.md new file mode 100644 index 000000000000..b9a7eff07ec2 --- /dev/null +++ b/examples/features/gracefulstop/README.md @@ -0,0 +1,45 @@ +# Graceful Stop + +This example demonstrates how to gracefully stop a gRPC server using +`Server.GracefulStop()`. The graceful shutdown process involves two key steps: + +- Initiate `Server.GracefulStop()`. This function blocks until all currently + running RPCs have completed. This ensures that in-flight requests are + allowed to finish processing. + +- It's crucial to call `Server.Stop()` with a timeout before calling + `GracefulStop()`. This acts as a safety net, ensuring that the server + eventually shuts down even if some in-flight RPCs don't complete within a + reasonable timeframe. This prevents indefinite blocking. + +## Try it + +``` +go run server/main.go +``` + +``` +go run client/main.go +``` + +## Explanation + +The server starts with a client streaming and unary request handler. When +client streaming is started, client streaming handler signals the server to +initiate graceful stop and waits for the stream to be closed or aborted. Until +the`Server.GracefulStop()` is initiated, server will continue to accept unary +requests. Once `Server.GracefulStop()` is initiated, server will not accept +new unary requests. + +Client will start the client stream to the server and starts making unary +requests until receiving an error. Error will indicate that the server graceful +shutdown is initiated so client will stop making further unary requests and +closes the client stream. + +Server and client will keep track of number of unary requests processed on +their side. Once the client has successfully closed the stream, server returns +the total number of unary requests processed as response. The number from +stream response should be equal to the number of unary requests tracked by +client. This indicates that server has processed all in-flight requests before +shutting down. + diff --git a/examples/features/gracefulstop/client/main.go b/examples/features/gracefulstop/client/main.go new file mode 100644 index 000000000000..61f5874b3940 --- /dev/null +++ b/examples/features/gracefulstop/client/main.go @@ -0,0 +1,80 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Binary client demonstrates sending multiple requests to server and observe +// graceful stop. +package main + +import ( + "context" + "flag" + "fmt" + "log" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + pb "google.golang.org/grpc/examples/features/proto/echo" +) + +var addr = flag.String("addr", "localhost:50052", "the address to connect to") + +func main() { + flag.Parse() + + conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("Failed to create new client: %v", err) + } + defer conn.Close() + c := pb.NewEchoClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + // Start a client stream and keep calling the `c.UnaryEcho` until receiving + // an error. Error will indicate that server graceful stop is initiated and + // it won't accept any new requests. + stream, err := c.ClientStreamingEcho(ctx) + if err != nil { + log.Fatalf("Error starting stream: %v", err) + } + + // Keep track of successful unary requests which can be compared later to + // the successful unary requests reported by the server. + unaryRequests := 0 + for { + r, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "Hello"}) + if err != nil { + log.Printf("Error calling `UnaryEcho`. Server graceful stop initiated: %v", err) + break + } + unaryRequests++ + time.Sleep(200 * time.Millisecond) + log.Printf(r.Message) + } + log.Printf("Successful unary requests made by client: %d", unaryRequests) + + r, err := stream.CloseAndRecv() + if err != nil { + log.Fatalf("Error closing stream: %v", err) + } + if fmt.Sprintf("%d", unaryRequests) != r.Message { + log.Fatalf("Got %s successful unary requests processed from server, want: %d", r.Message, unaryRequests) + } + log.Printf("Successful unary requests processed by server and made by client are same.") +} diff --git a/examples/features/gracefulstop/server/main.go b/examples/features/gracefulstop/server/main.go new file mode 100644 index 000000000000..7b9233f82fe3 --- /dev/null +++ b/examples/features/gracefulstop/server/main.go @@ -0,0 +1,105 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Binary server demonstrates how to gracefully stop a gRPC server. +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "io" + "log" + "net" + "sync/atomic" + "time" + + "google.golang.org/grpc" + pb "google.golang.org/grpc/examples/features/proto/echo" +) + +var ( + port = flag.Int("port", 50052, "port number") +) + +type server struct { + pb.UnimplementedEchoServer + + unaryRequests atomic.Int32 // to track number of unary RPCs processed + streamStart chan struct{} // to signal if server streaming started +} + +// ClientStreamingEcho implements the EchoService.ClientStreamingEcho method. +// It signals the server that streaming has started and waits for the stream to +// be done or aborted. If `io.EOF` is received on stream that means client +// has successfully closed the stream using `stream.CloseAndRecv()`, so it +// returns an `EchoResponse` with the total number of unary RPCs processed +// otherwise, it returns the error indicating stream is aborted. +func (s *server) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) error { + // Signal streaming start to initiate graceful stop which should wait until + // server streaming finishes. + s.streamStart <- struct{}{} + + if err := stream.RecvMsg(&pb.EchoResponse{}); err != nil { + if errors.Is(err, io.EOF) { + stream.SendAndClose(&pb.EchoResponse{Message: fmt.Sprintf("%d", s.unaryRequests.Load())}) + return nil + } + return err + } + + return nil +} + +// UnaryEcho implements the EchoService.UnaryEcho method. It increments +// `s.unaryRequests` on every call and returns it as part of `EchoResponse`. +func (s *server) UnaryEcho(_ context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) { + s.unaryRequests.Add(1) + return &pb.EchoResponse{Message: req.Message}, nil +} + +func main() { + flag.Parse() + + address := fmt.Sprintf(":%v", *port) + lis, err := net.Listen("tcp", address) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + + s := grpc.NewServer() + ss := &server{streamStart: make(chan struct{})} + pb.RegisterEchoServer(s, ss) + + go func() { + <-ss.streamStart // wait until server streaming starts + time.Sleep(1 * time.Second) + log.Println("Initiating graceful shutdown...") + timer := time.AfterFunc(10*time.Second, func() { + log.Println("Server couldn't stop gracefully in time. Doing force stop.") + s.Stop() + }) + defer timer.Stop() + s.GracefulStop() // gracefully stop server after in-flight server streaming rpc finishes + log.Println("Server stopped gracefully.") + }() + + if err := s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } +}