Skip to content

Commit

Permalink
agones-{extensions,allocator}: Make servers context aware (#3845)
Browse files Browse the repository at this point in the history
* agones-{extensions,allocator}: Make servers context aware, add gRPC health check

* adds an `httpserver` utility package to handle the `Run` function
that controller/extensions use. Make that context aware using the same
method as https.Run:
https://github.com/googleforgames/agones/blob/dfa414e5e4da37798833bbf8c33919acb5f3c2ea/pkg/util/https/server.go#L127-L130

* also plumbs context-awareness through the allocator
run{Mux,REST,GRPC} functions.

* adds a gRPC health server to the allocator, calls .Shutdown() on it
during graceful termination - this seems to push the client off correctly.

Tested with e2e in a loop.

Towards #3853

* Move from context.Background()

* Use Shutdown/GracefulStop

* Relax deadline slightly (original had none), also delete pod from GetAllocatorClient
  • Loading branch information
zmerlynn authored Jul 12, 2024
1 parent d5027db commit a74a86f
Show file tree
Hide file tree
Showing 14 changed files with 522 additions and 149 deletions.
81 changes: 56 additions & 25 deletions cmd/allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
grpchealth "google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -52,6 +54,7 @@ import (
"agones.dev/agones/pkg/gameserverallocations"
"agones.dev/agones/pkg/gameservers"
"agones.dev/agones/pkg/util/fswatch"
"agones.dev/agones/pkg/util/httpserver"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
)
Expand Down Expand Up @@ -218,19 +221,18 @@ func main() {
health, closer := setupMetricsRecorder(conf)
defer closer()

// http.DefaultServerMux is used for http connection, not for https
http.Handle("/", health)

kubeClient, agonesClient, err := getClients(conf)
if err != nil {
logger.WithError(err).Fatal("could not create clients")
}

listenCtx, cancelListenCtx := context.WithCancel(context.Background())

// This will test the connection to agones on each readiness probe
// so if one of the allocator pod can't reach Kubernetes it will be removed
// from the Kubernetes service.
ctx, cancelCtx := context.WithCancel(context.Background())
podReady = true
grpcHealth := grpchealth.NewServer() // only used for gRPC, ignored o/w
health.AddReadinessCheck("allocator-agones-client", func() error {
if !podReady {
return errors.New("asked to shut down, failed readiness check")
Expand All @@ -245,16 +247,15 @@ func main() {
signals.NewSigTermHandler(func() {
logger.Info("Pod shutdown has been requested, failing readiness check")
podReady = false
grpcHealth.Shutdown()
time.Sleep(conf.ReadinessShutdownDuration)
cancelCtx()
logger.Infof("Readiness shutdown duration has passed, context cancelled")
time.Sleep(1 * time.Second) // allow a brief time for cleanup, but force exit if main doesn't
os.Exit(0)
cancelListenCtx()
})

grpcUnallocatedStatusCode := grpcCodeFromHTTPStatus(conf.httpUnallocatedStatusCode)

h := newServiceHandler(ctx, kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime, grpcUnallocatedStatusCode)
workerCtx, cancelWorkerCtx := context.WithCancel(context.Background())
h := newServiceHandler(workerCtx, kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime, grpcUnallocatedStatusCode)

if !h.tlsDisabled {
cancelTLS, err := fswatch.Watch(logger, tlsDir, time.Second, func() {
Expand Down Expand Up @@ -294,51 +295,62 @@ func main() {

// If grpc and http use the same port then use a mux.
if conf.GRPCPort == conf.HTTPPort {
runMux(h, conf.HTTPPort)
runMux(listenCtx, workerCtx, h, grpcHealth, conf.HTTPPort)
} else {
// Otherwise, run each on a dedicated port.
if validPort(conf.HTTPPort) {
runREST(h, conf.HTTPPort)
runREST(listenCtx, workerCtx, h, conf.HTTPPort)
}
if validPort(conf.GRPCPort) {
runGRPC(h, conf.GRPCPort)
runGRPC(listenCtx, h, grpcHealth, conf.GRPCPort)
}
}

// Finally listen on 8080 (http) and block the main goroutine
// this is used to serve /live and /ready handlers for Kubernetes probes.
err = http.ListenAndServe(":8080", http.DefaultServeMux)
logger.WithError(err).Fatal("allocation service crashed")
// Finally listen on 8080 (http), used to serve /live and /ready handlers for Kubernetes probes.
healthserver := httpserver.Server{Logger: logger}
healthserver.Handle("/", health)
go func() { _ = healthserver.Run(listenCtx, 0) }()

// TODO: This is messy. Contexts are the wrong way to handle this - we should be using shutdown,
// and a cascading graceful shutdown instead of multiple contexts and sleeps.
<-listenCtx.Done()
logger.Infof("Listen context cancelled")
time.Sleep(5 * time.Second)
cancelWorkerCtx()
logger.Infof("Worker context cancelled")
time.Sleep(1 * time.Second)
logger.Info("Shut down allocator")
}

func validPort(port int) bool {
const maxPort = 65535
return port >= 0 && port < maxPort
}

func runMux(h *serviceHandler, httpPort int) {
func runMux(listenCtx context.Context, workerCtx context.Context, h *serviceHandler, grpcHealth *grpchealth.Server, httpPort int) {
logger.Infof("Running the mux handler on port %d", httpPort)
grpcServer := grpc.NewServer(h.getMuxServerOptions()...)
pb.RegisterAllocationServiceServer(grpcServer, h)
grpc_health_v1.RegisterHealthServer(grpcServer, grpcHealth)

mux := runtime.NewServerMux()
if err := pb.RegisterAllocationServiceHandlerServer(context.Background(), mux, h); err != nil {
panic(err)
}

runHTTP(h, httpPort, grpcHandlerFunc(grpcServer, mux))
runHTTP(listenCtx, workerCtx, h, httpPort, grpcHandlerFunc(grpcServer, mux))
}

func runREST(h *serviceHandler, httpPort int) {
func runREST(listenCtx context.Context, workerCtx context.Context, h *serviceHandler, httpPort int) {
logger.WithField("port", httpPort).Info("Running the rest handler")
mux := runtime.NewServerMux()
if err := pb.RegisterAllocationServiceHandlerServer(context.Background(), mux, h); err != nil {
panic(err)
}
runHTTP(h, httpPort, mux)
runHTTP(listenCtx, workerCtx, h, httpPort, mux)
}

func runHTTP(h *serviceHandler, httpPort int, handler http.Handler) {
func runHTTP(listenCtx context.Context, workerCtx context.Context, h *serviceHandler, httpPort int, handler http.Handler) {
cfg := &tls.Config{}
if !h.tlsDisabled {
cfg.GetCertificate = h.getTLSCert
Expand All @@ -356,21 +368,29 @@ func runHTTP(h *serviceHandler, httpPort int, handler http.Handler) {
}

go func() {
go func() {
<-listenCtx.Done()
_ = server.Shutdown(workerCtx)
}()

var err error
if !h.tlsDisabled {
err = server.ListenAndServeTLS("", "")
} else {
err = server.ListenAndServe()
}

if err != nil {
if err == http.ErrServerClosed {
logger.WithError(err).Info("HTTP/HTTPS server closed")
os.Exit(0)
} else {
logger.WithError(err).Fatal("Unable to start HTTP/HTTPS listener")
os.Exit(1)
}
}()
}

func runGRPC(h *serviceHandler, grpcPort int) {
func runGRPC(ctx context.Context, h *serviceHandler, grpcHealth *grpchealth.Server, grpcPort int) {
logger.WithField("port", grpcPort).Info("Running the grpc handler on port")
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", grpcPort))
if err != nil {
Expand All @@ -380,11 +400,22 @@ func runGRPC(h *serviceHandler, grpcPort int) {

grpcServer := grpc.NewServer(h.getGRPCServerOptions()...)
pb.RegisterAllocationServiceServer(grpcServer, h)
grpc_health_v1.RegisterHealthServer(grpcServer, grpcHealth)

go func() {
go func() {
<-ctx.Done()
grpcServer.GracefulStop()
}()

err := grpcServer.Serve(listener)
logger.WithError(err).Fatal("allocation service crashed")
os.Exit(1)
if err != nil {
logger.WithError(err).Fatal("allocation service crashed")
os.Exit(1)
} else {
logger.Info("allocation server closed")
os.Exit(0)
}
}()
}

Expand Down
27 changes: 2 additions & 25 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -54,6 +53,7 @@ import (
"agones.dev/agones/pkg/gameservers"
"agones.dev/agones/pkg/gameserversets"
"agones.dev/agones/pkg/metrics"
"agones.dev/agones/pkg/util/httpserver"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
)
Expand Down Expand Up @@ -171,7 +171,7 @@ func main() {
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)

server := &httpServer{}
server := &httpserver.Server{Logger: logger}
var rs []runner
var health healthcheck.Handler

Expand Down Expand Up @@ -547,10 +547,6 @@ type runner interface {
Run(ctx context.Context, workers int) error
}

type httpServer struct {
http.ServeMux
}

func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.Entry, doLeaderElection bool, kubeClient *kubernetes.Clientset, namespace string, start func(_ context.Context)) {
if !doLeaderElection {
start(ctx)
Expand Down Expand Up @@ -600,22 +596,3 @@ func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.E
},
})
}

func (h *httpServer) Run(_ context.Context, _ int) error {
logger.Info("Starting http server...")
srv := &http.Server{
Addr: ":8080",
Handler: h,
}
defer srv.Close() // nolint: errcheck

if err := srv.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
logger.WithError(err).Info("http server closed")
} else {
wrappedErr := errors.Wrap(err, "Could not listen on :8080")
runtime.HandleError(logger.WithError(wrappedErr), wrappedErr)
}
}
return nil
}
27 changes: 2 additions & 25 deletions cmd/extensions/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package main
import (
"context"
"io"
"net/http"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -46,6 +45,7 @@ import (
"agones.dev/agones/pkg/metrics"
"agones.dev/agones/pkg/util/apiserver"
"agones.dev/agones/pkg/util/https"
"agones.dev/agones/pkg/util/httpserver"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
"agones.dev/agones/pkg/util/webhooks"
Expand Down Expand Up @@ -150,7 +150,7 @@ func main() {
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)

server := &httpServer{}
server := &httpserver.Server{Logger: logger}
var health healthcheck.Handler

// Stackdriver metrics
Expand Down Expand Up @@ -340,26 +340,3 @@ type config struct {
type runner interface {
Run(ctx context.Context, workers int) error
}

type httpServer struct {
http.ServeMux
}

func (h *httpServer) Run(_ context.Context, _ int) error {
logger.Info("Starting http server...")
srv := &http.Server{
Addr: ":8080",
Handler: h,
}
defer srv.Close() // nolint: errcheck

if err := srv.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
logger.WithError(err).Info("http server closed")
} else {
wrappedErr := errors.Wrap(err, "Could not listen on :8080")
runtime.HandleError(logger.WithError(wrappedErr), wrappedErr)
}
}
return nil
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
contrib.go.opencensus.io/exporter/stackdriver v0.8.0
fortio.org/fortio v1.3.1
github.com/ahmetb/gen-crd-api-reference-docs v0.3.0
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/fsnotify/fsnotify v1.6.0
github.com/go-openapi/spec v0.19.5
github.com/google/go-cmp v0.5.9
Expand All @@ -23,7 +24,6 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cast v1.3.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.0
github.com/stretchr/testify v1.8.2
Expand Down Expand Up @@ -60,7 +60,6 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
Expand Down Expand Up @@ -92,6 +91,7 @@ require (
github.com/prometheus/procfs v0.10.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.3.0 // indirect
github.com/spf13/jwalterweatherman v1.0.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/https/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (

// tls is a http server interface to enable easier testing
type tls interface {
Close() error
Shutdown(context.Context) error
ListenAndServeTLS(certFile, keyFile string) error
}

Expand Down Expand Up @@ -126,7 +126,7 @@ func (s *Server) WatchForCertificateChanges() (func(), error) {
func (s *Server) Run(ctx context.Context, _ int) error {
go func() {
<-ctx.Done()
s.tls.Close() // nolint: errcheck,gosec
_ = s.tls.Shutdown(context.Background())
}()

s.logger.WithField("server", s).Infof("https server started")
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/https/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type testServer struct {
server *httptest.Server
}

func (ts *testServer) Close() error {
func (ts *testServer) Shutdown(_ context.Context) error {
ts.server.Close()
return nil
}
Expand Down
Loading

0 comments on commit a74a86f

Please sign in to comment.