From 0d30acd7d2fa525763f655bd82a037dab0b55384 Mon Sep 17 00:00:00 2001 From: Zach Loafman Date: Fri, 31 May 2024 13:29:09 +0000 Subject: [PATCH 1/4] agones-{extensions,allocator}: Make servers context aware, add gRPC health check * adds an `httpserver` utility package to handle the `Run` function that controller/extensions use. Make that context aware using the same method as https.Run: https://github.com/googleforgames/agones/blob/dfa414e5e4da37798833bbf8c33919acb5f3c2ea/pkg/util/https/server.go#L127-L130 * also plumbs context-awareness through the allocator run{Mux,REST,GRPC} functions. * adds a gRPC health server to the allocator, calls .Shutdown() on it during graceful termination - this seems to push the client off correctly. Tested with e2e in a loop. Towards #3853 --- cmd/allocator/main.go | 74 +++++--- cmd/controller/main.go | 27 +-- cmd/extensions/main.go | 27 +-- go.mod | 4 +- pkg/util/httpserver/server.go | 59 +++++++ test/e2e/allocator/pod_termination_test.go | 72 ++++---- test/e2e/allocator_test.go | 16 +- test/e2e/allochelper/helper_func.go | 65 ++++--- .../google.golang.org/grpc/health/client.go | 117 +++++++++++++ .../google.golang.org/grpc/health/logging.go | 23 +++ .../google.golang.org/grpc/health/server.go | 163 ++++++++++++++++++ vendor/modules.txt | 1 + 12 files changed, 505 insertions(+), 143 deletions(-) create mode 100644 pkg/util/httpserver/server.go create mode 100644 vendor/google.golang.org/grpc/health/client.go create mode 100644 vendor/google.golang.org/grpc/health/logging.go create mode 100644 vendor/google.golang.org/grpc/health/server.go diff --git a/cmd/allocator/main.go b/cmd/allocator/main.go index 9e31cbda83..2bbe7cc18a 100644 --- a/cmd/allocator/main.go +++ b/cmd/allocator/main.go @@ -35,6 +35,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + grpchealth "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -52,6 +54,7 @@ import ( "agones.dev/agones/pkg/gameserverallocations" "agones.dev/agones/pkg/gameservers" "agones.dev/agones/pkg/util/fswatch" + "agones.dev/agones/pkg/util/httpserver" "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/signals" ) @@ -218,9 +221,6 @@ func main() { health, closer := setupMetricsRecorder(conf) defer closer() - // http.DefaultServerMux is used for http connection, not for https - http.Handle("/", health) - kubeClient, agonesClient, err := getClients(conf) if err != nil { logger.WithError(err).Fatal("could not create clients") @@ -229,8 +229,9 @@ func main() { // This will test the connection to agones on each readiness probe // so if one of the allocator pod can't reach Kubernetes it will be removed // from the Kubernetes service. - ctx, cancelCtx := context.WithCancel(context.Background()) + listenCtx, cancelListenCtx := context.WithCancel(context.Background()) podReady = true + grpcHealth := grpchealth.NewServer() // only used for gRPC, ignored o/w health.AddReadinessCheck("allocator-agones-client", func() error { if !podReady { return errors.New("asked to shut down, failed readiness check") @@ -245,16 +246,14 @@ func main() { signals.NewSigTermHandler(func() { logger.Info("Pod shutdown has been requested, failing readiness check") podReady = false + grpcHealth.Shutdown() time.Sleep(conf.ReadinessShutdownDuration) - cancelCtx() - logger.Infof("Readiness shutdown duration has passed, context cancelled") - time.Sleep(1 * time.Second) // allow a brief time for cleanup, but force exit if main doesn't - os.Exit(0) + cancelListenCtx() }) grpcUnallocatedStatusCode := grpcCodeFromHTTPStatus(conf.httpUnallocatedStatusCode) - h := newServiceHandler(ctx, kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime, grpcUnallocatedStatusCode) + h := newServiceHandler(context.Background(), kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime, grpcUnallocatedStatusCode) if !h.tlsDisabled { cancelTLS, err := fswatch.Watch(logger, tlsDir, time.Second, func() { @@ -294,21 +293,26 @@ func main() { // If grpc and http use the same port then use a mux. if conf.GRPCPort == conf.HTTPPort { - runMux(h, conf.HTTPPort) + runMux(listenCtx, h, grpcHealth, conf.HTTPPort) } else { // Otherwise, run each on a dedicated port. if validPort(conf.HTTPPort) { - runREST(h, conf.HTTPPort) + runREST(listenCtx, h, conf.HTTPPort) } if validPort(conf.GRPCPort) { - runGRPC(h, conf.GRPCPort) + runGRPC(listenCtx, h, grpcHealth, conf.GRPCPort) } } - // Finally listen on 8080 (http) and block the main goroutine - // this is used to serve /live and /ready handlers for Kubernetes probes. - err = http.ListenAndServe(":8080", http.DefaultServeMux) - logger.WithError(err).Fatal("allocation service crashed") + // Finally listen on 8080 (http), used to serve /live and /ready handlers for Kubernetes probes. + healthserver := httpserver.Server{Logger: logger} + healthserver.Handle("/", health) + go func() { _ = healthserver.Run(listenCtx, 0) }() + + <-listenCtx.Done() + logger.Infof("Listen context cancelled") + time.Sleep(5 * time.Second) // allow a brief time for cleanup/shutdown + logger.Info("Shut down allocator") } func validPort(port int) bool { @@ -316,29 +320,30 @@ func validPort(port int) bool { return port >= 0 && port < maxPort } -func runMux(h *serviceHandler, httpPort int) { +func runMux(ctx context.Context, h *serviceHandler, grpcHealth *grpchealth.Server, httpPort int) { logger.Infof("Running the mux handler on port %d", httpPort) grpcServer := grpc.NewServer(h.getMuxServerOptions()...) pb.RegisterAllocationServiceServer(grpcServer, h) + grpc_health_v1.RegisterHealthServer(grpcServer, grpcHealth) mux := runtime.NewServerMux() if err := pb.RegisterAllocationServiceHandlerServer(context.Background(), mux, h); err != nil { panic(err) } - runHTTP(h, httpPort, grpcHandlerFunc(grpcServer, mux)) + runHTTP(ctx, h, httpPort, grpcHandlerFunc(grpcServer, mux)) } -func runREST(h *serviceHandler, httpPort int) { +func runREST(ctx context.Context, h *serviceHandler, httpPort int) { logger.WithField("port", httpPort).Info("Running the rest handler") mux := runtime.NewServerMux() if err := pb.RegisterAllocationServiceHandlerServer(context.Background(), mux, h); err != nil { panic(err) } - runHTTP(h, httpPort, mux) + runHTTP(ctx, h, httpPort, mux) } -func runHTTP(h *serviceHandler, httpPort int, handler http.Handler) { +func runHTTP(ctx context.Context, h *serviceHandler, httpPort int, handler http.Handler) { cfg := &tls.Config{} if !h.tlsDisabled { cfg.GetCertificate = h.getTLSCert @@ -356,6 +361,11 @@ func runHTTP(h *serviceHandler, httpPort int, handler http.Handler) { } go func() { + go func() { + <-ctx.Done() + _ = server.Close() + }() + var err error if !h.tlsDisabled { err = server.ListenAndServeTLS("", "") @@ -363,14 +373,17 @@ func runHTTP(h *serviceHandler, httpPort int, handler http.Handler) { err = server.ListenAndServe() } - if err != nil { + if err == http.ErrServerClosed { + logger.WithError(err).Info("HTTP/HTTPS server closed") + os.Exit(0) + } else { logger.WithError(err).Fatal("Unable to start HTTP/HTTPS listener") os.Exit(1) } }() } -func runGRPC(h *serviceHandler, grpcPort int) { +func runGRPC(ctx context.Context, h *serviceHandler, grpcHealth *grpchealth.Server, grpcPort int) { logger.WithField("port", grpcPort).Info("Running the grpc handler on port") listener, err := net.Listen("tcp", fmt.Sprintf(":%d", grpcPort)) if err != nil { @@ -380,11 +393,22 @@ func runGRPC(h *serviceHandler, grpcPort int) { grpcServer := grpc.NewServer(h.getGRPCServerOptions()...) pb.RegisterAllocationServiceServer(grpcServer, h) + grpc_health_v1.RegisterHealthServer(grpcServer, grpcHealth) go func() { + go func() { + <-ctx.Done() + grpcServer.Stop() + }() + err := grpcServer.Serve(listener) - logger.WithError(err).Fatal("allocation service crashed") - os.Exit(1) + if err != nil { + logger.WithError(err).Fatal("allocation service crashed") + os.Exit(1) + } else { + logger.Info("allocation server closed") + os.Exit(0) + } }() } diff --git a/cmd/controller/main.go b/cmd/controller/main.go index a48dceaa41..115a8519af 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" "io" - "net/http" "os" "path/filepath" "strings" @@ -54,6 +53,7 @@ import ( "agones.dev/agones/pkg/gameservers" "agones.dev/agones/pkg/gameserversets" "agones.dev/agones/pkg/metrics" + "agones.dev/agones/pkg/util/httpserver" "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/signals" ) @@ -171,7 +171,7 @@ func main() { agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync) kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync) - server := &httpServer{} + server := &httpserver.Server{Logger: logger} var rs []runner var health healthcheck.Handler @@ -547,10 +547,6 @@ type runner interface { Run(ctx context.Context, workers int) error } -type httpServer struct { - http.ServeMux -} - func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.Entry, doLeaderElection bool, kubeClient *kubernetes.Clientset, namespace string, start func(_ context.Context)) { if !doLeaderElection { start(ctx) @@ -600,22 +596,3 @@ func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.E }, }) } - -func (h *httpServer) Run(_ context.Context, _ int) error { - logger.Info("Starting http server...") - srv := &http.Server{ - Addr: ":8080", - Handler: h, - } - defer srv.Close() // nolint: errcheck - - if err := srv.ListenAndServe(); err != nil { - if err == http.ErrServerClosed { - logger.WithError(err).Info("http server closed") - } else { - wrappedErr := errors.Wrap(err, "Could not listen on :8080") - runtime.HandleError(logger.WithError(wrappedErr), wrappedErr) - } - } - return nil -} diff --git a/cmd/extensions/main.go b/cmd/extensions/main.go index 31987b1037..d01be204f0 100644 --- a/cmd/extensions/main.go +++ b/cmd/extensions/main.go @@ -18,7 +18,6 @@ package main import ( "context" "io" - "net/http" "os" "path/filepath" "strings" @@ -46,6 +45,7 @@ import ( "agones.dev/agones/pkg/metrics" "agones.dev/agones/pkg/util/apiserver" "agones.dev/agones/pkg/util/https" + "agones.dev/agones/pkg/util/httpserver" "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/signals" "agones.dev/agones/pkg/util/webhooks" @@ -150,7 +150,7 @@ func main() { agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync) kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync) - server := &httpServer{} + server := &httpserver.Server{Logger: logger} var health healthcheck.Handler // Stackdriver metrics @@ -340,26 +340,3 @@ type config struct { type runner interface { Run(ctx context.Context, workers int) error } - -type httpServer struct { - http.ServeMux -} - -func (h *httpServer) Run(_ context.Context, _ int) error { - logger.Info("Starting http server...") - srv := &http.Server{ - Addr: ":8080", - Handler: h, - } - defer srv.Close() // nolint: errcheck - - if err := srv.ListenAndServe(); err != nil { - if err == http.ErrServerClosed { - logger.WithError(err).Info("http server closed") - } else { - wrappedErr := errors.Wrap(err, "Could not listen on :8080") - runtime.HandleError(logger.WithError(wrappedErr), wrappedErr) - } - } - return nil -} diff --git a/go.mod b/go.mod index 1dcc663754..bc3f239596 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( contrib.go.opencensus.io/exporter/stackdriver v0.8.0 fortio.org/fortio v1.3.1 github.com/ahmetb/gen-crd-api-reference-docs v0.3.0 + github.com/evanphx/json-patch v4.12.0+incompatible github.com/fsnotify/fsnotify v1.6.0 github.com/go-openapi/spec v0.19.5 github.com/google/go-cmp v0.5.9 @@ -23,7 +24,6 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 github.com/sirupsen/logrus v1.9.0 - github.com/spf13/cast v1.3.0 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.7.0 github.com/stretchr/testify v1.8.2 @@ -60,7 +60,6 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect - github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect @@ -92,6 +91,7 @@ require ( github.com/prometheus/procfs v0.10.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/spf13/afero v1.9.2 // indirect + github.com/spf13/cast v1.3.0 // indirect github.com/spf13/jwalterweatherman v1.0.0 // indirect github.com/subosito/gotenv v1.2.0 // indirect golang.org/x/crypto v0.21.0 // indirect diff --git a/pkg/util/httpserver/server.go b/pkg/util/httpserver/server.go new file mode 100644 index 0000000000..e23e0fb614 --- /dev/null +++ b/pkg/util/httpserver/server.go @@ -0,0 +1,59 @@ +// Copyright 2024 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package httpserver implements an http server that conforms to the +// controller runner interface. +package httpserver + +import ( + "context" + "net/http" + + "agones.dev/agones/pkg/util/runtime" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +// Server is a HTTPs server that conforms to the runner interface +// we use in /cmd/controller. +// +//nolint:govet // ignore field alignment complaint, this is a singleton +type Server struct { + http.ServeMux + + Logger *logrus.Entry +} + +// Run runs an http server on port :8080. +func (s *Server) Run(ctx context.Context, _ int) error { + s.Logger.Info("Starting http server...") + srv := &http.Server{ + Addr: ":8080", + Handler: s, + } + go func() { + <-ctx.Done() + _ = srv.Close() + }() + + if err := srv.ListenAndServe(); err != nil { + if err == http.ErrServerClosed { + s.Logger.WithError(err).Info("http server closed") + } else { + wrappedErr := errors.Wrap(err, "Could not listen on :8080") + runtime.HandleError(s.Logger.WithError(wrappedErr), wrappedErr) + } + } + return nil +} diff --git a/test/e2e/allocator/pod_termination_test.go b/test/e2e/allocator/pod_termination_test.go index fc051b739a..c18a7d38ed 100644 --- a/test/e2e/allocator/pod_termination_test.go +++ b/test/e2e/allocator/pod_termination_test.go @@ -26,7 +26,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc/status" - v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -41,34 +41,23 @@ func TestAllocatorAfterDeleteReplica(t *testing.T) { ctx := context.Background() logger := e2e.TestLogger(t) - var list *v1.PodList - - dep, err := framework.KubeClient.AppsV1().Deployments("agones-system").Get(ctx, "agones-allocator", metav1.GetOptions{}) - require.NoError(t, err, "Failed to get replicas") - replicaCnt := int(*(dep.Spec.Replicas)) - logger.Infof("Replica count config is %d", replicaCnt) + // initialize gRPC client, which tests the connection + grpcClient, err := helper.GetAllocatorClient(ctx, t, framework) + require.NoError(t, err, "Could not initialize rpc client") - // poll and wait until all allocator pods are running - _ = wait.PollUntilContextTimeout(context.Background(), retryInterval, retryTimeout, true, func(ctx context.Context) (done bool, err error) { - list, err = helper.GetAgonesAllocatorPods(ctx, framework) + // poll and wait until all allocator pods are available + err = wait.PollUntilContextTimeout(context.Background(), retryInterval, retryTimeout, true, func(ctx context.Context) (done bool, err error) { + deployment, err := framework.KubeClient.AppsV1().Deployments("agones-system").Get(ctx, "agones-allocator", metav1.GetOptions{}) if err != nil { return true, err } - - if len(list.Items) != replicaCnt { + if deployment.Status.Replicas != deployment.Status.AvailableReplicas { + logger.Infof("Waiting for agones-allocator to stabilize: %d/%d replicas available", deployment.Status.AvailableReplicas, deployment.Status.ReadyReplicas) return false, nil } - - for _, allocpod := range list.Items { - podstatus := string(allocpod.Status.Phase) - logger.Infof("Allocator Pod %s, has status of %s", allocpod.ObjectMeta.Name, podstatus) - if podstatus != "Running" { - return false, nil - } - } - return true, nil }) + require.NoError(t, err, "Failed to stabilize agones-allocator") // create fleet flt, err := helper.CreateFleet(ctx, framework.Namespace, framework) @@ -77,7 +66,23 @@ func TestAllocatorAfterDeleteReplica(t *testing.T) { } framework.AssertFleetCondition(t, flt, e2e.FleetReadyCount(flt.Spec.Replicas)) - var response *pb.AllocationResponse + logger.Infof("=== agones-allocator available, gRPC client initialized ===") + + // One probe into the test, delete all of the allocators except 1 + go func() { + time.Sleep(retryInterval) + + list, err := framework.KubeClient.CoreV1().Pods("agones-system").List( + ctx, metav1.ListOptions{LabelSelector: labels.Set{"multicluster.agones.dev/role": "allocator"}.String()}) + if assert.NoError(t, err, "Could not list allocator pods") { + for _, pod := range list.Items[1:] { + logger.Infof("Deleting Pod %s", pod.ObjectMeta.Name) + err = helper.DeleteAgonesPod(ctx, pod.ObjectMeta.Name, "agones-system", framework) + assert.NoError(t, err, "Could not delete allocator pod") + } + } + }() + request := &pb.AllocationRequest{ Namespace: framework.Namespace, PreferredGameServerSelectors: []*pb.GameServerSelector{{MatchLabels: map[string]string{agonesv1.FleetNameLabel: flt.ObjectMeta.Name}}}, @@ -85,23 +90,18 @@ func TestAllocatorAfterDeleteReplica(t *testing.T) { Metadata: &pb.MetaPatch{Labels: map[string]string{"gslabel": "allocatedbytest"}}, } - // delete all of the allocators except 1 - for _, pod := range list.Items[1:] { - err = helper.DeleteAgonesAllocatorPod(ctx, pod.ObjectMeta.Name, framework) - require.NoError(t, err, "Could not delete allocator pod") - } - - grpcClient, err := helper.GetAllocatorClient(ctx, t, framework) - require.NoError(t, err, "Could not initialize rpc client") - // Wait and keep making calls till we know the draining time has passed _ = wait.PollUntilContextTimeout(context.Background(), retryInterval, retryTimeout, true, func(ctx context.Context) (bool, error) { - response, err = grpcClient.Allocate(context.Background(), request) + ctx, cancelCtx := context.WithTimeout(ctx, retryInterval) + defer cancelCtx() + + response, err := grpcClient.Allocate(ctx, request) logger.Infof("err = %v (code = %v), response = %v", err, status.Code(err), response) - helper.ValidateAllocatorResponse(t, response) - require.NoError(t, err, "Failed grpc allocation request") - err = helper.DeleteAgonesPod(ctx, response.GameServerName, framework.Namespace, framework) - require.NoError(t, err, "Failed to delete game server pod %s", response.GameServerName) + if assert.NoError(t, err, "Failed grpc allocation request") { + helper.ValidateAllocatorResponse(t, response) + err = helper.DeleteAgonesPod(ctx, response.GameServerName, framework.Namespace, framework) + assert.NoError(t, err, "Failed to delete game server pod %s", response.GameServerName) + } return false, nil }) } diff --git a/test/e2e/allocator_test.go b/test/e2e/allocator_test.go index 0997a95a65..4e79a3831b 100644 --- a/test/e2e/allocator_test.go +++ b/test/e2e/allocator_test.go @@ -87,12 +87,12 @@ func TestAllocatorWithDeprecatedRequired(t *testing.T) { // wait for the allocation system to come online err = wait.PollUntilContextTimeout(context.Background(), 2*time.Second, 5*time.Minute, true, func(ctx context.Context) (bool, error) { // create the grpc client each time, as we may end up looking at an old cert - dialOpts, err := helper.CreateRemoteClusterDialOption(ctx, allocatorClientSecretNamespace, allocatorClientSecretName, tlsCA, framework) + dialOpts, err := helper.CreateRemoteClusterDialOptions(ctx, allocatorClientSecretNamespace, allocatorClientSecretName, tlsCA, framework) if err != nil { return false, err } - conn, err := grpc.Dial(requestURL, dialOpts) + conn, err := grpc.Dial(requestURL, dialOpts...) if err != nil { logrus.WithError(err).Info("failing grpc.Dial") return false, nil @@ -181,12 +181,12 @@ func TestAllocatorWithSelectors(t *testing.T) { // wait for the allocation system to come online err = wait.PollUntilContextTimeout(context.Background(), 2*time.Second, 5*time.Minute, true, func(ctx context.Context) (bool, error) { // create the grpc client each time, as we may end up looking at an old cert - dialOpts, err := helper.CreateRemoteClusterDialOption(ctx, allocatorClientSecretNamespace, allocatorClientSecretName, tlsCA, framework) + dialOpts, err := helper.CreateRemoteClusterDialOptions(ctx, allocatorClientSecretNamespace, allocatorClientSecretName, tlsCA, framework) if err != nil { return false, err } - conn, err := grpc.Dial(requestURL, dialOpts) + conn, err := grpc.Dial(requestURL, dialOpts...) if err != nil { logrus.WithError(err).Info("failing grpc.Dial") return false, nil @@ -357,11 +357,11 @@ func TestAllocatorWithCountersAndLists(t *testing.T) { }, } err = wait.PollUntilContextTimeout(context.Background(), 2*time.Second, 5*time.Minute, true, func(ctx context.Context) (bool, error) { - dialOpts, err := helper.CreateRemoteClusterDialOption(ctx, allocatorClientSecretNamespace, allocatorClientSecretName, tlsCA, framework) + dialOpts, err := helper.CreateRemoteClusterDialOptions(ctx, allocatorClientSecretNamespace, allocatorClientSecretName, tlsCA, framework) if err != nil { return false, err } - conn, err := grpc.Dial(requestURL, dialOpts) + conn, err := grpc.Dial(requestURL, dialOpts...) if err != nil { logrus.WithError(err).Info("failing grpc.Dial") return false, nil @@ -612,12 +612,12 @@ func TestAllocatorCrossNamespace(t *testing.T) { // wait for the allocation system to come online err = wait.PollUntilContextTimeout(context.Background(), 2*time.Second, 5*time.Minute, true, func(ctx context.Context) (bool, error) { // create the grpc client each time, as we may end up looking at an old cert - dialOpts, err := helper.CreateRemoteClusterDialOption(ctx, namespaceA, allocatorClientSecretName, tlsCA, framework) + dialOpts, err := helper.CreateRemoteClusterDialOptions(ctx, namespaceA, allocatorClientSecretName, tlsCA, framework) if err != nil { return false, err } - conn, err := grpc.Dial(requestURL, dialOpts) + conn, err := grpc.Dial(requestURL, dialOpts...) if err != nil { logrus.WithError(err).Info("failing grpc.Dial") return false, nil diff --git a/test/e2e/allochelper/helper_func.go b/test/e2e/allochelper/helper_func.go index 9f2ff6721b..c668d6dce2 100644 --- a/test/e2e/allochelper/helper_func.go +++ b/test/e2e/allochelper/helper_func.go @@ -38,8 +38,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/backoff" "google.golang.org/grpc/credentials" - corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -56,6 +56,21 @@ const ( allocatorClientSecretName = "allocator-client.default" allocatorClientSecretNamespace = "default" replicasCount = 5 + + gRPCRetryPolicy = `{ + "methodConfig": [{ + "name": [{}], + "waitForReady": true, + + "retryPolicy": { + "MaxAttempts": 4, + "InitialBackoff": ".01s", + "MaxBackoff": ".01s", + "BackoffMultiplier": 1.0, + "RetryableStatusCodes": [ "UNAVAILABLE" ] + } + }] + }` ) // CopyDefaultAllocatorClientSecret copys the allocator client secret @@ -106,16 +121,26 @@ func GetAllocatorEndpoint(ctx context.Context, t *testing.T, framework *e2e.Fram return svc.Status.LoadBalancer.Ingress[0].IP, port.Port } -// CreateRemoteClusterDialOption creates a grpc client dial option with proper certs to make a remote call. -// -//nolint:unparam -func CreateRemoteClusterDialOption(ctx context.Context, namespace, clientSecretName string, tlsCA []byte, framework *e2e.Framework) (grpc.DialOption, error) { +// CreateRemoteClusterDialOptions creates a grpc client dial option with proper certs to make a remote call. +func CreateRemoteClusterDialOptions(ctx context.Context, namespace, clientSecretName string, tlsCA []byte, framework *e2e.Framework) ([]grpc.DialOption, error) { tlsConfig, err := GetTLSConfig(ctx, namespace, clientSecretName, tlsCA, framework) if err != nil { return nil, err } - return grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), nil + return []grpc.DialOption{ + grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), + grpc.WithDefaultServiceConfig(gRPCRetryPolicy), + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: time.Duration(100) * time.Millisecond, + Multiplier: 1.6, + Jitter: 0.2, + MaxDelay: 30 * time.Second, + }, + MinConnectTimeout: time.Second, + }), + }, nil } // GetTLSConfig gets the namesapce client secret @@ -260,11 +285,6 @@ func ValidateAllocatorResponse(t *testing.T, resp *pb.AllocationResponse) { assert.NotEmpty(t, resp.Metadata.Annotations) } -// DeleteAgonesAllocatorPod deletes a Agones allocator pod -func DeleteAgonesAllocatorPod(ctx context.Context, podName string, framework *e2e.Framework) error { - return DeleteAgonesPod(ctx, podName, "agones-system", framework) -} - // DeleteAgonesPod deletes an Agones pod with the specified namespace and podname func DeleteAgonesPod(ctx context.Context, podName string, namespace string, framework *e2e.Framework) error { policy := metav1.DeletePropagationBackground @@ -273,14 +293,9 @@ func DeleteAgonesPod(ctx context.Context, podName string, namespace string, fram return err } -// GetAgonesAllocatorPods returns all the Agones allocator pods -func GetAgonesAllocatorPods(ctx context.Context, framework *e2e.Framework) (*corev1.PodList, error) { - opts := metav1.ListOptions{LabelSelector: labels.Set{"multicluster.agones.dev/role": "allocator"}.String()} - return framework.KubeClient.CoreV1().Pods("agones-system").List(ctx, opts) -} - // GetAllocatorClient creates a client and ensure that it can be connected to func GetAllocatorClient(ctx context.Context, t *testing.T, framework *e2e.Framework) (pb.AllocationServiceClient, error) { + logger := e2e.TestLogger(t) ip, port := GetAllocatorEndpoint(ctx, t, framework) requestURL := fmt.Sprintf(allocatorReqURLFmt, ip, port) tlsCA := RefreshAllocatorTLSCerts(ctx, t, ip, framework) @@ -291,16 +306,22 @@ func GetAllocatorClient(ctx context.Context, t *testing.T, framework *e2e.Framew } framework.AssertFleetCondition(t, flt, e2e.FleetReadyCount(flt.Spec.Replicas)) - dialOpts, err := CreateRemoteClusterDialOption(ctx, allocatorClientSecretNamespace, allocatorClientSecretName, tlsCA, framework) + dialOpts, err := CreateRemoteClusterDialOptions(ctx, allocatorClientSecretNamespace, allocatorClientSecretName, tlsCA, framework) if err != nil { return nil, err } - conn, err := grpc.Dial(requestURL, dialOpts) + conn, err := grpc.Dial(requestURL, dialOpts...) require.NoError(t, err, "Failed grpc.Dial") go func() { - <-ctx.Done() - conn.Close() // nolint: errcheck + for { + state := conn.GetState() + logger.Infof("allocation client state: %v", state) + if notDone := conn.WaitForStateChange(ctx, state); !notDone { + break + } + } + _ = conn.Close() }() grpcClient := pb.NewAllocationServiceClient(conn) @@ -316,7 +337,7 @@ func GetAllocatorClient(ctx context.Context, t *testing.T, framework *e2e.Framew err = wait.PollUntilContextTimeout(context.Background(), 2*time.Second, 5*time.Minute, true, func(ctx context.Context) (bool, error) { response, err = grpcClient.Allocate(context.Background(), request) if err != nil { - logrus.WithError(err).Info("failing Allocate request") + logger.WithError(err).Info("failing Allocate request") return false, nil } ValidateAllocatorResponse(t, response) diff --git a/vendor/google.golang.org/grpc/health/client.go b/vendor/google.golang.org/grpc/health/client.go new file mode 100644 index 0000000000..b5bee48380 --- /dev/null +++ b/vendor/google.golang.org/grpc/health/client.go @@ -0,0 +1,117 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package health + +import ( + "context" + "fmt" + "io" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/backoff" + "google.golang.org/grpc/status" +) + +var ( + backoffStrategy = backoff.DefaultExponential + backoffFunc = func(ctx context.Context, retries int) bool { + d := backoffStrategy.Backoff(retries) + timer := time.NewTimer(d) + select { + case <-timer.C: + return true + case <-ctx.Done(): + timer.Stop() + return false + } + } +) + +func init() { + internal.HealthCheckFunc = clientHealthCheck +} + +const healthCheckMethod = "/grpc.health.v1.Health/Watch" + +// This function implements the protocol defined at: +// https://github.com/grpc/grpc/blob/master/doc/health-checking.md +func clientHealthCheck(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State, error), service string) error { + tryCnt := 0 + +retryConnection: + for { + // Backs off if the connection has failed in some way without receiving a message in the previous retry. + if tryCnt > 0 && !backoffFunc(ctx, tryCnt-1) { + return nil + } + tryCnt++ + + if ctx.Err() != nil { + return nil + } + setConnectivityState(connectivity.Connecting, nil) + rawS, err := newStream(healthCheckMethod) + if err != nil { + continue retryConnection + } + + s, ok := rawS.(grpc.ClientStream) + // Ideally, this should never happen. But if it happens, the server is marked as healthy for LBing purposes. + if !ok { + setConnectivityState(connectivity.Ready, nil) + return fmt.Errorf("newStream returned %v (type %T); want grpc.ClientStream", rawS, rawS) + } + + if err = s.SendMsg(&healthpb.HealthCheckRequest{Service: service}); err != nil && err != io.EOF { + // Stream should have been closed, so we can safely continue to create a new stream. + continue retryConnection + } + s.CloseSend() + + resp := new(healthpb.HealthCheckResponse) + for { + err = s.RecvMsg(resp) + + // Reports healthy for the LBing purposes if health check is not implemented in the server. + if status.Code(err) == codes.Unimplemented { + setConnectivityState(connectivity.Ready, nil) + return err + } + + // Reports unhealthy if server's Watch method gives an error other than UNIMPLEMENTED. + if err != nil { + setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but received health check RPC error: %v", err)) + continue retryConnection + } + + // As a message has been received, removes the need for backoff for the next retry by resetting the try count. + tryCnt = 0 + if resp.Status == healthpb.HealthCheckResponse_SERVING { + setConnectivityState(connectivity.Ready, nil) + } else { + setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but health check failed. status=%s", resp.Status)) + } + } + } +} diff --git a/vendor/google.golang.org/grpc/health/logging.go b/vendor/google.golang.org/grpc/health/logging.go new file mode 100644 index 0000000000..83c6acf55e --- /dev/null +++ b/vendor/google.golang.org/grpc/health/logging.go @@ -0,0 +1,23 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package health + +import "google.golang.org/grpc/grpclog" + +var logger = grpclog.Component("health_service") diff --git a/vendor/google.golang.org/grpc/health/server.go b/vendor/google.golang.org/grpc/health/server.go new file mode 100644 index 0000000000..cce6312d77 --- /dev/null +++ b/vendor/google.golang.org/grpc/health/server.go @@ -0,0 +1,163 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package health provides a service that exposes server's health and it must be +// imported to enable support for client-side health checks. +package health + +import ( + "context" + "sync" + + "google.golang.org/grpc/codes" + healthgrpc "google.golang.org/grpc/health/grpc_health_v1" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" +) + +// Server implements `service Health`. +type Server struct { + healthgrpc.UnimplementedHealthServer + mu sync.RWMutex + // If shutdown is true, it's expected all serving status is NOT_SERVING, and + // will stay in NOT_SERVING. + shutdown bool + // statusMap stores the serving status of the services this Server monitors. + statusMap map[string]healthpb.HealthCheckResponse_ServingStatus + updates map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus +} + +// NewServer returns a new Server. +func NewServer() *Server { + return &Server{ + statusMap: map[string]healthpb.HealthCheckResponse_ServingStatus{"": healthpb.HealthCheckResponse_SERVING}, + updates: make(map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus), + } +} + +// Check implements `service Health`. +func (s *Server) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { + s.mu.RLock() + defer s.mu.RUnlock() + if servingStatus, ok := s.statusMap[in.Service]; ok { + return &healthpb.HealthCheckResponse{ + Status: servingStatus, + }, nil + } + return nil, status.Error(codes.NotFound, "unknown service") +} + +// Watch implements `service Health`. +func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { + service := in.Service + // update channel is used for getting service status updates. + update := make(chan healthpb.HealthCheckResponse_ServingStatus, 1) + s.mu.Lock() + // Puts the initial status to the channel. + if servingStatus, ok := s.statusMap[service]; ok { + update <- servingStatus + } else { + update <- healthpb.HealthCheckResponse_SERVICE_UNKNOWN + } + + // Registers the update channel to the correct place in the updates map. + if _, ok := s.updates[service]; !ok { + s.updates[service] = make(map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus) + } + s.updates[service][stream] = update + defer func() { + s.mu.Lock() + delete(s.updates[service], stream) + s.mu.Unlock() + }() + s.mu.Unlock() + + var lastSentStatus healthpb.HealthCheckResponse_ServingStatus = -1 + for { + select { + // Status updated. Sends the up-to-date status to the client. + case servingStatus := <-update: + if lastSentStatus == servingStatus { + continue + } + lastSentStatus = servingStatus + err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus}) + if err != nil { + return status.Error(codes.Canceled, "Stream has ended.") + } + // Context done. Removes the update channel from the updates map. + case <-stream.Context().Done(): + return status.Error(codes.Canceled, "Stream has ended.") + } + } +} + +// SetServingStatus is called when need to reset the serving status of a service +// or insert a new service entry into the statusMap. +func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) { + s.mu.Lock() + defer s.mu.Unlock() + if s.shutdown { + logger.Infof("health: status changing for %s to %v is ignored because health service is shutdown", service, servingStatus) + return + } + + s.setServingStatusLocked(service, servingStatus) +} + +func (s *Server) setServingStatusLocked(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) { + s.statusMap[service] = servingStatus + for _, update := range s.updates[service] { + // Clears previous updates, that are not sent to the client, from the channel. + // This can happen if the client is not reading and the server gets flow control limited. + select { + case <-update: + default: + } + // Puts the most recent update to the channel. + update <- servingStatus + } +} + +// Shutdown sets all serving status to NOT_SERVING, and configures the server to +// ignore all future status changes. +// +// This changes serving status for all services. To set status for a particular +// services, call SetServingStatus(). +func (s *Server) Shutdown() { + s.mu.Lock() + defer s.mu.Unlock() + s.shutdown = true + for service := range s.statusMap { + s.setServingStatusLocked(service, healthpb.HealthCheckResponse_NOT_SERVING) + } +} + +// Resume sets all serving status to SERVING, and configures the server to +// accept all future status changes. +// +// This changes serving status for all services. To set status for a particular +// services, call SetServingStatus(). +func (s *Server) Resume() { + s.mu.Lock() + defer s.mu.Unlock() + s.shutdown = false + for service := range s.statusMap { + s.setServingStatusLocked(service, healthpb.HealthCheckResponse_SERVING) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 68d6b14de4..fdd895811d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -513,6 +513,7 @@ google.golang.org/grpc/credentials/oauth google.golang.org/grpc/encoding google.golang.org/grpc/encoding/proto google.golang.org/grpc/grpclog +google.golang.org/grpc/health google.golang.org/grpc/health/grpc_health_v1 google.golang.org/grpc/internal google.golang.org/grpc/internal/backoff From d3d5fcbcabc991e713df85763d6547e7a01648c9 Mon Sep 17 00:00:00 2001 From: Zach Loafman Date: Tue, 11 Jun 2024 15:51:59 +0000 Subject: [PATCH 2/4] Move from context.Background() --- cmd/allocator/main.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/cmd/allocator/main.go b/cmd/allocator/main.go index 2bbe7cc18a..259496bbf6 100644 --- a/cmd/allocator/main.go +++ b/cmd/allocator/main.go @@ -230,6 +230,8 @@ func main() { // so if one of the allocator pod can't reach Kubernetes it will be removed // from the Kubernetes service. listenCtx, cancelListenCtx := context.WithCancel(context.Background()) + workerCtx, cancelWorkerCtx := context.WithCancel(context.Background()) + podReady = true grpcHealth := grpchealth.NewServer() // only used for gRPC, ignored o/w health.AddReadinessCheck("allocator-agones-client", func() error { @@ -253,7 +255,7 @@ func main() { grpcUnallocatedStatusCode := grpcCodeFromHTTPStatus(conf.httpUnallocatedStatusCode) - h := newServiceHandler(context.Background(), kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime, grpcUnallocatedStatusCode) + h := newServiceHandler(workerCtx, kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime, grpcUnallocatedStatusCode) if !h.tlsDisabled { cancelTLS, err := fswatch.Watch(logger, tlsDir, time.Second, func() { @@ -309,9 +311,14 @@ func main() { healthserver.Handle("/", health) go func() { _ = healthserver.Run(listenCtx, 0) }() + // TODO: This is messy. Contexts are the wrong way to handle this - we should be using shutdown, + // and a cascading graceful shutdown instead of multiple contexts and sleeps. <-listenCtx.Done() logger.Infof("Listen context cancelled") - time.Sleep(5 * time.Second) // allow a brief time for cleanup/shutdown + time.Sleep(5 * time.Second) + cancelWorkerCtx() + logger.Infof("Worker context cancelled") + time.Sleep(1 * time.Second) logger.Info("Shut down allocator") } From a34905825b41addc4525c4d0df486bc8ad42faa1 Mon Sep 17 00:00:00 2001 From: Zach Loafman Date: Fri, 14 Jun 2024 17:55:22 +0000 Subject: [PATCH 3/4] Use Shutdown/GracefulStop --- cmd/allocator/main.go | 26 +++++++++++++------------- pkg/util/https/server.go | 4 ++-- pkg/util/https/server_test.go | 2 +- pkg/util/httpserver/server.go | 2 +- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/cmd/allocator/main.go b/cmd/allocator/main.go index 259496bbf6..b7168f2b2b 100644 --- a/cmd/allocator/main.go +++ b/cmd/allocator/main.go @@ -226,12 +226,11 @@ func main() { logger.WithError(err).Fatal("could not create clients") } + listenCtx, cancelListenCtx := context.WithCancel(context.Background()) + // This will test the connection to agones on each readiness probe // so if one of the allocator pod can't reach Kubernetes it will be removed // from the Kubernetes service. - listenCtx, cancelListenCtx := context.WithCancel(context.Background()) - workerCtx, cancelWorkerCtx := context.WithCancel(context.Background()) - podReady = true grpcHealth := grpchealth.NewServer() // only used for gRPC, ignored o/w health.AddReadinessCheck("allocator-agones-client", func() error { @@ -255,6 +254,7 @@ func main() { grpcUnallocatedStatusCode := grpcCodeFromHTTPStatus(conf.httpUnallocatedStatusCode) + workerCtx, cancelWorkerCtx := context.WithCancel(context.Background()) h := newServiceHandler(workerCtx, kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime, grpcUnallocatedStatusCode) if !h.tlsDisabled { @@ -295,11 +295,11 @@ func main() { // If grpc and http use the same port then use a mux. if conf.GRPCPort == conf.HTTPPort { - runMux(listenCtx, h, grpcHealth, conf.HTTPPort) + runMux(listenCtx, workerCtx, h, grpcHealth, conf.HTTPPort) } else { // Otherwise, run each on a dedicated port. if validPort(conf.HTTPPort) { - runREST(listenCtx, h, conf.HTTPPort) + runREST(listenCtx, workerCtx, h, conf.HTTPPort) } if validPort(conf.GRPCPort) { runGRPC(listenCtx, h, grpcHealth, conf.GRPCPort) @@ -327,7 +327,7 @@ func validPort(port int) bool { return port >= 0 && port < maxPort } -func runMux(ctx context.Context, h *serviceHandler, grpcHealth *grpchealth.Server, httpPort int) { +func runMux(listenCtx context.Context, workerCtx context.Context, h *serviceHandler, grpcHealth *grpchealth.Server, httpPort int) { logger.Infof("Running the mux handler on port %d", httpPort) grpcServer := grpc.NewServer(h.getMuxServerOptions()...) pb.RegisterAllocationServiceServer(grpcServer, h) @@ -338,19 +338,19 @@ func runMux(ctx context.Context, h *serviceHandler, grpcHealth *grpchealth.Serve panic(err) } - runHTTP(ctx, h, httpPort, grpcHandlerFunc(grpcServer, mux)) + runHTTP(listenCtx, workerCtx, h, httpPort, grpcHandlerFunc(grpcServer, mux)) } -func runREST(ctx context.Context, h *serviceHandler, httpPort int) { +func runREST(listenCtx context.Context, workerCtx context.Context, h *serviceHandler, httpPort int) { logger.WithField("port", httpPort).Info("Running the rest handler") mux := runtime.NewServerMux() if err := pb.RegisterAllocationServiceHandlerServer(context.Background(), mux, h); err != nil { panic(err) } - runHTTP(ctx, h, httpPort, mux) + runHTTP(listenCtx, workerCtx, h, httpPort, mux) } -func runHTTP(ctx context.Context, h *serviceHandler, httpPort int, handler http.Handler) { +func runHTTP(listenCtx context.Context, workerCtx context.Context, h *serviceHandler, httpPort int, handler http.Handler) { cfg := &tls.Config{} if !h.tlsDisabled { cfg.GetCertificate = h.getTLSCert @@ -369,8 +369,8 @@ func runHTTP(ctx context.Context, h *serviceHandler, httpPort int, handler http. go func() { go func() { - <-ctx.Done() - _ = server.Close() + <-listenCtx.Done() + _ = server.Shutdown(workerCtx) }() var err error @@ -405,7 +405,7 @@ func runGRPC(ctx context.Context, h *serviceHandler, grpcHealth *grpchealth.Serv go func() { go func() { <-ctx.Done() - grpcServer.Stop() + grpcServer.GracefulStop() }() err := grpcServer.Serve(listener) diff --git a/pkg/util/https/server.go b/pkg/util/https/server.go index 34a2a471b8..7d472fd8c7 100644 --- a/pkg/util/https/server.go +++ b/pkg/util/https/server.go @@ -33,7 +33,7 @@ const ( // tls is a http server interface to enable easier testing type tls interface { - Close() error + Shutdown(context.Context) error ListenAndServeTLS(certFile, keyFile string) error } @@ -126,7 +126,7 @@ func (s *Server) WatchForCertificateChanges() (func(), error) { func (s *Server) Run(ctx context.Context, _ int) error { go func() { <-ctx.Done() - s.tls.Close() // nolint: errcheck,gosec + _ = s.tls.Shutdown(context.Background()) }() s.logger.WithField("server", s).Infof("https server started") diff --git a/pkg/util/https/server_test.go b/pkg/util/https/server_test.go index 95348e6308..a03739f453 100644 --- a/pkg/util/https/server_test.go +++ b/pkg/util/https/server_test.go @@ -27,7 +27,7 @@ type testServer struct { server *httptest.Server } -func (ts *testServer) Close() error { +func (ts *testServer) Shutdown(_ context.Context) error { ts.server.Close() return nil } diff --git a/pkg/util/httpserver/server.go b/pkg/util/httpserver/server.go index e23e0fb614..2a4811fdc5 100644 --- a/pkg/util/httpserver/server.go +++ b/pkg/util/httpserver/server.go @@ -44,7 +44,7 @@ func (s *Server) Run(ctx context.Context, _ int) error { } go func() { <-ctx.Done() - _ = srv.Close() + _ = srv.Shutdown(context.Background()) }() if err := srv.ListenAndServe(); err != nil { From 9d246a345fd8350ed02916feb9295d0a01ced779 Mon Sep 17 00:00:00 2001 From: Zach Loafman Date: Wed, 26 Jun 2024 00:22:05 +0000 Subject: [PATCH 4/4] Relax deadline slightly (original had none), also delete pod from GetAllocatorClient --- test/e2e/allocator/pod_termination_test.go | 2 +- test/e2e/allochelper/helper_func.go | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/test/e2e/allocator/pod_termination_test.go b/test/e2e/allocator/pod_termination_test.go index c18a7d38ed..60ade418a8 100644 --- a/test/e2e/allocator/pod_termination_test.go +++ b/test/e2e/allocator/pod_termination_test.go @@ -92,7 +92,7 @@ func TestAllocatorAfterDeleteReplica(t *testing.T) { // Wait and keep making calls till we know the draining time has passed _ = wait.PollUntilContextTimeout(context.Background(), retryInterval, retryTimeout, true, func(ctx context.Context) (bool, error) { - ctx, cancelCtx := context.WithTimeout(ctx, retryInterval) + ctx, cancelCtx := context.WithTimeout(ctx, retryInterval*2) defer cancelCtx() response, err := grpcClient.Allocate(ctx, request) diff --git a/test/e2e/allochelper/helper_func.go b/test/e2e/allochelper/helper_func.go index c668d6dce2..f2fbf0bd7b 100644 --- a/test/e2e/allochelper/helper_func.go +++ b/test/e2e/allochelper/helper_func.go @@ -293,7 +293,9 @@ func DeleteAgonesPod(ctx context.Context, podName string, namespace string, fram return err } -// GetAllocatorClient creates a client and ensure that it can be connected to +// GetAllocatorClient creates an allocator client and ensures that it can be connected to. Returns +// a client that has at least once successfully allocated from a fleet. The fleet used to test +// the client is leaked. func GetAllocatorClient(ctx context.Context, t *testing.T, framework *e2e.Framework) (pb.AllocationServiceClient, error) { logger := e2e.TestLogger(t) ip, port := GetAllocatorEndpoint(ctx, t, framework) @@ -334,13 +336,15 @@ func GetAllocatorClient(ctx context.Context, t *testing.T, framework *e2e.Framew } var response *pb.AllocationResponse - err = wait.PollUntilContextTimeout(context.Background(), 2*time.Second, 5*time.Minute, true, func(ctx context.Context) (bool, error) { - response, err = grpcClient.Allocate(context.Background(), request) + err = wait.PollUntilContextTimeout(ctx, 5*time.Second, 5*time.Minute, true, func(ctx context.Context) (bool, error) { + response, err = grpcClient.Allocate(ctx, request) if err != nil { - logger.WithError(err).Info("failing Allocate request") + logger.WithError(err).Info("Failed grpc allocation request while waiting for certs to stabilize") return false, nil } ValidateAllocatorResponse(t, response) + err = DeleteAgonesPod(ctx, response.GameServerName, framework.Namespace, framework) + assert.NoError(t, err, "Failed to delete game server pod %s", response.GameServerName) return true, nil }) if err != nil {