From 9387e7bd49070fca347130d72f2920dda7a7ec52 Mon Sep 17 00:00:00 2001 From: Zach Loafman Date: Fri, 31 May 2024 13:29:09 +0000 Subject: [PATCH] agones-{extensions,allocator}: Make servers context aware I ran the HA tests on #3843 overnight in a loop and still noticed a very low grade flake in the allocator, so I decided to go ahead and clean up the issues I noticed while working on the previous PR: * adds an `httpserver` utility package to handle the `Run` function that controller/extensions use. Make that context aware using the same method as https.Run: https://github.com/googleforgames/agones/blob/dfa414e5e4da37798833bbf8c33919acb5f3c2ea/pkg/util/https/server.go#L127-L130 (note that I think this ^ is why the extensions flake disappeared after I added a pause). * also plumbs context-awareness through the allocator run{Mux,REST,GRPC} functions, which is I suspect hy we're seeing a low-grade flake in allocator still. (I think the delay I added previously has sufficient to drive it off, too, but this PR should be more better.) --- cmd/allocator/main.go | 57 +++++++++++++++++++++++------------ cmd/controller/main.go | 27 ++--------------- cmd/extensions/main.go | 27 ++--------------- pkg/util/httpserver/server.go | 53 ++++++++++++++++++++++++++++++++ 4 files changed, 95 insertions(+), 69 deletions(-) create mode 100644 pkg/util/httpserver/server.go diff --git a/cmd/allocator/main.go b/cmd/allocator/main.go index 9e31cbda83..e9102733ca 100644 --- a/cmd/allocator/main.go +++ b/cmd/allocator/main.go @@ -52,6 +52,7 @@ import ( "agones.dev/agones/pkg/gameserverallocations" "agones.dev/agones/pkg/gameservers" "agones.dev/agones/pkg/util/fswatch" + "agones.dev/agones/pkg/util/httpserver" "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/signals" ) @@ -218,9 +219,6 @@ func main() { health, closer := setupMetricsRecorder(conf) defer closer() - // http.DefaultServerMux is used for http connection, not for https - http.Handle("/", health) - kubeClient, agonesClient, err := getClients(conf) if err != nil { logger.WithError(err).Fatal("could not create clients") @@ -294,21 +292,24 @@ func main() { // If grpc and http use the same port then use a mux. if conf.GRPCPort == conf.HTTPPort { - runMux(h, conf.HTTPPort) + runMux(ctx, h, conf.HTTPPort) } else { // Otherwise, run each on a dedicated port. if validPort(conf.HTTPPort) { - runREST(h, conf.HTTPPort) + runREST(ctx, h, conf.HTTPPort) } if validPort(conf.GRPCPort) { - runGRPC(h, conf.GRPCPort) + runGRPC(ctx, h, conf.GRPCPort) } } - // Finally listen on 8080 (http) and block the main goroutine - // this is used to serve /live and /ready handlers for Kubernetes probes. - err = http.ListenAndServe(":8080", http.DefaultServeMux) - logger.WithError(err).Fatal("allocation service crashed") + // Finally listen on 8080 (http), used to serve /live and /ready handlers for Kubernetes probes. + healthserver := httpserver.Server{Logger: logger} + healthserver.Handle("/", health) + go func() { _ = healthserver.Run(ctx, 0) }() + + <-ctx.Done() + logger.Info("Shut down allocator") } func validPort(port int) bool { @@ -316,7 +317,7 @@ func validPort(port int) bool { return port >= 0 && port < maxPort } -func runMux(h *serviceHandler, httpPort int) { +func runMux(ctx context.Context, h *serviceHandler, httpPort int) { logger.Infof("Running the mux handler on port %d", httpPort) grpcServer := grpc.NewServer(h.getMuxServerOptions()...) pb.RegisterAllocationServiceServer(grpcServer, h) @@ -326,19 +327,19 @@ func runMux(h *serviceHandler, httpPort int) { panic(err) } - runHTTP(h, httpPort, grpcHandlerFunc(grpcServer, mux)) + runHTTP(ctx, h, httpPort, grpcHandlerFunc(grpcServer, mux)) } -func runREST(h *serviceHandler, httpPort int) { +func runREST(ctx context.Context, h *serviceHandler, httpPort int) { logger.WithField("port", httpPort).Info("Running the rest handler") mux := runtime.NewServerMux() if err := pb.RegisterAllocationServiceHandlerServer(context.Background(), mux, h); err != nil { panic(err) } - runHTTP(h, httpPort, mux) + runHTTP(ctx, h, httpPort, mux) } -func runHTTP(h *serviceHandler, httpPort int, handler http.Handler) { +func runHTTP(ctx context.Context, h *serviceHandler, httpPort int, handler http.Handler) { cfg := &tls.Config{} if !h.tlsDisabled { cfg.GetCertificate = h.getTLSCert @@ -356,6 +357,11 @@ func runHTTP(h *serviceHandler, httpPort int, handler http.Handler) { } go func() { + go func() { + <-ctx.Done() + _ = server.Close() + }() + var err error if !h.tlsDisabled { err = server.ListenAndServeTLS("", "") @@ -363,14 +369,17 @@ func runHTTP(h *serviceHandler, httpPort int, handler http.Handler) { err = server.ListenAndServe() } - if err != nil { + if err == http.ErrServerClosed { + logger.WithError(err).Info("HTTP/HTTPS server closed") + os.Exit(0) + } else { logger.WithError(err).Fatal("Unable to start HTTP/HTTPS listener") os.Exit(1) } }() } -func runGRPC(h *serviceHandler, grpcPort int) { +func runGRPC(ctx context.Context, h *serviceHandler, grpcPort int) { logger.WithField("port", grpcPort).Info("Running the grpc handler on port") listener, err := net.Listen("tcp", fmt.Sprintf(":%d", grpcPort)) if err != nil { @@ -382,9 +391,19 @@ func runGRPC(h *serviceHandler, grpcPort int) { pb.RegisterAllocationServiceServer(grpcServer, h) go func() { + go func() { + <-ctx.Done() + grpcServer.Stop() + }() + err := grpcServer.Serve(listener) - logger.WithError(err).Fatal("allocation service crashed") - os.Exit(1) + if err != nil { + logger.WithError(err).Fatal("allocation service crashed") + os.Exit(1) + } else { + logger.Info("allocation server closed") + os.Exit(0) + } }() } diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 6cfa4354f7..2e55f8e620 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" "io" - "net/http" "os" "path/filepath" "strings" @@ -54,6 +53,7 @@ import ( "agones.dev/agones/pkg/gameservers" "agones.dev/agones/pkg/gameserversets" "agones.dev/agones/pkg/metrics" + "agones.dev/agones/pkg/util/httpserver" "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/signals" ) @@ -170,7 +170,7 @@ func main() { agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync) kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync) - server := &httpServer{} + server := &httpserver.Server{Logger: logger} var rs []runner var health healthcheck.Handler @@ -541,10 +541,6 @@ type runner interface { Run(ctx context.Context, workers int) error } -type httpServer struct { - http.ServeMux -} - func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.Entry, doLeaderElection bool, kubeClient *kubernetes.Clientset, namespace string, start func(_ context.Context)) { if !doLeaderElection { start(ctx) @@ -594,22 +590,3 @@ func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.E }, }) } - -func (h *httpServer) Run(_ context.Context, _ int) error { - logger.Info("Starting http server...") - srv := &http.Server{ - Addr: ":8080", - Handler: h, - } - defer srv.Close() // nolint: errcheck - - if err := srv.ListenAndServe(); err != nil { - if err == http.ErrServerClosed { - logger.WithError(err).Info("http server closed") - } else { - wrappedErr := errors.Wrap(err, "Could not listen on :8080") - runtime.HandleError(logger.WithError(wrappedErr), wrappedErr) - } - } - return nil -} diff --git a/cmd/extensions/main.go b/cmd/extensions/main.go index 31987b1037..d01be204f0 100644 --- a/cmd/extensions/main.go +++ b/cmd/extensions/main.go @@ -18,7 +18,6 @@ package main import ( "context" "io" - "net/http" "os" "path/filepath" "strings" @@ -46,6 +45,7 @@ import ( "agones.dev/agones/pkg/metrics" "agones.dev/agones/pkg/util/apiserver" "agones.dev/agones/pkg/util/https" + "agones.dev/agones/pkg/util/httpserver" "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/signals" "agones.dev/agones/pkg/util/webhooks" @@ -150,7 +150,7 @@ func main() { agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync) kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync) - server := &httpServer{} + server := &httpserver.Server{Logger: logger} var health healthcheck.Handler // Stackdriver metrics @@ -340,26 +340,3 @@ type config struct { type runner interface { Run(ctx context.Context, workers int) error } - -type httpServer struct { - http.ServeMux -} - -func (h *httpServer) Run(_ context.Context, _ int) error { - logger.Info("Starting http server...") - srv := &http.Server{ - Addr: ":8080", - Handler: h, - } - defer srv.Close() // nolint: errcheck - - if err := srv.ListenAndServe(); err != nil { - if err == http.ErrServerClosed { - logger.WithError(err).Info("http server closed") - } else { - wrappedErr := errors.Wrap(err, "Could not listen on :8080") - runtime.HandleError(logger.WithError(wrappedErr), wrappedErr) - } - } - return nil -} diff --git a/pkg/util/httpserver/server.go b/pkg/util/httpserver/server.go new file mode 100644 index 0000000000..42dcbae4ba --- /dev/null +++ b/pkg/util/httpserver/server.go @@ -0,0 +1,53 @@ +// Copyright 2024 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package httpserver + +import ( + "context" + "net/http" + + "agones.dev/agones/pkg/util/runtime" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type Server struct { + http.ServeMux + + Logger *logrus.Entry +} + +// Run runs an http server on port :8080. +func (s *Server) Run(ctx context.Context, _ int) error { + s.Logger.Info("Starting http server...") + srv := &http.Server{ + Addr: ":8080", + Handler: s, + } + go func() { + <-ctx.Done() + _ = srv.Close() + }() + + if err := srv.ListenAndServe(); err != nil { + if err == http.ErrServerClosed { + s.Logger.WithError(err).Info("http server closed") + } else { + wrappedErr := errors.Wrap(err, "Could not listen on :8080") + runtime.HandleError(s.Logger.WithError(wrappedErr), wrappedErr) + } + } + return nil +}