Skip to content

Commit

Permalink
agones-{extensions,allocator}: Make servers context aware, add gRPC h…
Browse files Browse the repository at this point in the history
…ealth check

* adds an `httpserver` utility package to handle the `Run` function
that controller/extensions use. Make that context aware using the same
method as https.Run:
https://github.com/googleforgames/agones/blob/dfa414e5e4da37798833bbf8c33919acb5f3c2ea/pkg/util/https/server.go#L127-L130

* also plumbs context-awareness through the allocator
run{Mux,REST,GRPC} functions.

* adds a gRPC health server to the allocator, calls .Shutdown() on it
during graceful termination - this seems to push the client off correctly.

Tested with e2e in a loop.

Towards googleforgames#3853
  • Loading branch information
zmerlynn committed Jun 5, 2024
1 parent c4b3596 commit 1775f9e
Show file tree
Hide file tree
Showing 12 changed files with 500 additions and 140 deletions.
74 changes: 49 additions & 25 deletions cmd/allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
grpchealth "google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -52,6 +54,7 @@ import (
"agones.dev/agones/pkg/gameserverallocations"
"agones.dev/agones/pkg/gameservers"
"agones.dev/agones/pkg/util/fswatch"
"agones.dev/agones/pkg/util/httpserver"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
)
Expand Down Expand Up @@ -218,9 +221,6 @@ func main() {
health, closer := setupMetricsRecorder(conf)
defer closer()

// http.DefaultServerMux is used for http connection, not for https
http.Handle("/", health)

kubeClient, agonesClient, err := getClients(conf)
if err != nil {
logger.WithError(err).Fatal("could not create clients")
Expand All @@ -229,8 +229,9 @@ func main() {
// This will test the connection to agones on each readiness probe
// so if one of the allocator pod can't reach Kubernetes it will be removed
// from the Kubernetes service.
ctx, cancelCtx := context.WithCancel(context.Background())
listenCtx, cancelListenCtx := context.WithCancel(context.Background())
podReady = true
grpcHealth := grpchealth.NewServer() // only used for gRPC, ignored o/w
health.AddReadinessCheck("allocator-agones-client", func() error {
if !podReady {
return errors.New("asked to shut down, failed readiness check")
Expand All @@ -245,16 +246,14 @@ func main() {
signals.NewSigTermHandler(func() {
logger.Info("Pod shutdown has been requested, failing readiness check")
podReady = false
grpcHealth.Shutdown()
time.Sleep(conf.ReadinessShutdownDuration)
cancelCtx()
logger.Infof("Readiness shutdown duration has passed, context cancelled")
time.Sleep(1 * time.Second) // allow a brief time for cleanup, but force exit if main doesn't
os.Exit(0)
cancelListenCtx()
})

grpcUnallocatedStatusCode := grpcCodeFromHTTPStatus(conf.httpUnallocatedStatusCode)

h := newServiceHandler(ctx, kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime, grpcUnallocatedStatusCode)
h := newServiceHandler(context.Background(), kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime, grpcUnallocatedStatusCode)

if !h.tlsDisabled {
cancelTLS, err := fswatch.Watch(logger, tlsDir, time.Second, func() {
Expand Down Expand Up @@ -294,51 +293,57 @@ func main() {

// If grpc and http use the same port then use a mux.
if conf.GRPCPort == conf.HTTPPort {
runMux(h, conf.HTTPPort)
runMux(listenCtx, h, grpcHealth, conf.HTTPPort)
} else {
// Otherwise, run each on a dedicated port.
if validPort(conf.HTTPPort) {
runREST(h, conf.HTTPPort)
runREST(listenCtx, h, conf.HTTPPort)
}
if validPort(conf.GRPCPort) {
runGRPC(h, conf.GRPCPort)
runGRPC(listenCtx, h, grpcHealth, conf.GRPCPort)
}
}

// Finally listen on 8080 (http) and block the main goroutine
// this is used to serve /live and /ready handlers for Kubernetes probes.
err = http.ListenAndServe(":8080", http.DefaultServeMux)
logger.WithError(err).Fatal("allocation service crashed")
// Finally listen on 8080 (http), used to serve /live and /ready handlers for Kubernetes probes.
healthserver := httpserver.Server{Logger: logger}
healthserver.Handle("/", health)
go func() { _ = healthserver.Run(listenCtx, 0) }()

<-listenCtx.Done()
logger.Infof("Listen context cancelled")
time.Sleep(5 * time.Second) // allow a brief time for cleanup/shutdown
logger.Info("Shut down allocator")
}

func validPort(port int) bool {
const maxPort = 65535
return port >= 0 && port < maxPort
}

func runMux(h *serviceHandler, httpPort int) {
func runMux(ctx context.Context, h *serviceHandler, grpcHealth *grpchealth.Server, httpPort int) {
logger.Infof("Running the mux handler on port %d", httpPort)
grpcServer := grpc.NewServer(h.getMuxServerOptions()...)
pb.RegisterAllocationServiceServer(grpcServer, h)
grpc_health_v1.RegisterHealthServer(grpcServer, grpcHealth)

mux := runtime.NewServerMux()
if err := pb.RegisterAllocationServiceHandlerServer(context.Background(), mux, h); err != nil {
panic(err)
}

runHTTP(h, httpPort, grpcHandlerFunc(grpcServer, mux))
runHTTP(ctx, h, httpPort, grpcHandlerFunc(grpcServer, mux))
}

func runREST(h *serviceHandler, httpPort int) {
func runREST(ctx context.Context, h *serviceHandler, httpPort int) {
logger.WithField("port", httpPort).Info("Running the rest handler")
mux := runtime.NewServerMux()
if err := pb.RegisterAllocationServiceHandlerServer(context.Background(), mux, h); err != nil {
panic(err)
}
runHTTP(h, httpPort, mux)
runHTTP(ctx, h, httpPort, mux)
}

func runHTTP(h *serviceHandler, httpPort int, handler http.Handler) {
func runHTTP(ctx context.Context, h *serviceHandler, httpPort int, handler http.Handler) {
cfg := &tls.Config{}
if !h.tlsDisabled {
cfg.GetCertificate = h.getTLSCert
Expand All @@ -356,21 +361,29 @@ func runHTTP(h *serviceHandler, httpPort int, handler http.Handler) {
}

go func() {
go func() {
<-ctx.Done()
_ = server.Close()
}()

var err error
if !h.tlsDisabled {
err = server.ListenAndServeTLS("", "")
} else {
err = server.ListenAndServe()
}

if err != nil {
if err == http.ErrServerClosed {
logger.WithError(err).Info("HTTP/HTTPS server closed")
os.Exit(0)
} else {
logger.WithError(err).Fatal("Unable to start HTTP/HTTPS listener")
os.Exit(1)
}
}()
}

func runGRPC(h *serviceHandler, grpcPort int) {
func runGRPC(ctx context.Context, h *serviceHandler, grpcHealth *grpchealth.Server, grpcPort int) {
logger.WithField("port", grpcPort).Info("Running the grpc handler on port")
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", grpcPort))
if err != nil {
Expand All @@ -380,11 +393,22 @@ func runGRPC(h *serviceHandler, grpcPort int) {

grpcServer := grpc.NewServer(h.getGRPCServerOptions()...)
pb.RegisterAllocationServiceServer(grpcServer, h)
grpc_health_v1.RegisterHealthServer(grpcServer, grpcHealth)

go func() {
go func() {
<-ctx.Done()
grpcServer.Stop()
}()

err := grpcServer.Serve(listener)
logger.WithError(err).Fatal("allocation service crashed")
os.Exit(1)
if err != nil {
logger.WithError(err).Fatal("allocation service crashed")
os.Exit(1)
} else {
logger.Info("allocation server closed")
os.Exit(0)
}
}()
}

Expand Down
27 changes: 2 additions & 25 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -54,6 +53,7 @@ import (
"agones.dev/agones/pkg/gameservers"
"agones.dev/agones/pkg/gameserversets"
"agones.dev/agones/pkg/metrics"
"agones.dev/agones/pkg/util/httpserver"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
)
Expand Down Expand Up @@ -170,7 +170,7 @@ func main() {
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)

server := &httpServer{}
server := &httpserver.Server{Logger: logger}
var rs []runner
var health healthcheck.Handler

Expand Down Expand Up @@ -541,10 +541,6 @@ type runner interface {
Run(ctx context.Context, workers int) error
}

type httpServer struct {
http.ServeMux
}

func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.Entry, doLeaderElection bool, kubeClient *kubernetes.Clientset, namespace string, start func(_ context.Context)) {
if !doLeaderElection {
start(ctx)
Expand Down Expand Up @@ -594,22 +590,3 @@ func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.E
},
})
}

func (h *httpServer) Run(_ context.Context, _ int) error {
logger.Info("Starting http server...")
srv := &http.Server{
Addr: ":8080",
Handler: h,
}
defer srv.Close() // nolint: errcheck

if err := srv.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
logger.WithError(err).Info("http server closed")
} else {
wrappedErr := errors.Wrap(err, "Could not listen on :8080")
runtime.HandleError(logger.WithError(wrappedErr), wrappedErr)
}
}
return nil
}
27 changes: 2 additions & 25 deletions cmd/extensions/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package main
import (
"context"
"io"
"net/http"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -46,6 +45,7 @@ import (
"agones.dev/agones/pkg/metrics"
"agones.dev/agones/pkg/util/apiserver"
"agones.dev/agones/pkg/util/https"
"agones.dev/agones/pkg/util/httpserver"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
"agones.dev/agones/pkg/util/webhooks"
Expand Down Expand Up @@ -150,7 +150,7 @@ func main() {
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)

server := &httpServer{}
server := &httpserver.Server{Logger: logger}
var health healthcheck.Handler

// Stackdriver metrics
Expand Down Expand Up @@ -340,26 +340,3 @@ type config struct {
type runner interface {
Run(ctx context.Context, workers int) error
}

type httpServer struct {
http.ServeMux
}

func (h *httpServer) Run(_ context.Context, _ int) error {
logger.Info("Starting http server...")
srv := &http.Server{
Addr: ":8080",
Handler: h,
}
defer srv.Close() // nolint: errcheck

if err := srv.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
logger.WithError(err).Info("http server closed")
} else {
wrappedErr := errors.Wrap(err, "Could not listen on :8080")
runtime.HandleError(logger.WithError(wrappedErr), wrappedErr)
}
}
return nil
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
contrib.go.opencensus.io/exporter/stackdriver v0.8.0
fortio.org/fortio v1.3.1
github.com/ahmetb/gen-crd-api-reference-docs v0.3.0
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/fsnotify/fsnotify v1.6.0
github.com/go-openapi/spec v0.19.5
github.com/google/go-cmp v0.5.9
Expand All @@ -23,7 +24,6 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cast v1.3.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.0
github.com/stretchr/testify v1.8.2
Expand Down Expand Up @@ -60,7 +60,6 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
Expand Down Expand Up @@ -92,6 +91,7 @@ require (
github.com/prometheus/procfs v0.10.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.3.0 // indirect
github.com/spf13/jwalterweatherman v1.0.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
Expand Down
55 changes: 55 additions & 0 deletions pkg/util/httpserver/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2024 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package httpserver

import (
"context"
"net/http"

"agones.dev/agones/pkg/util/runtime"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// Server is a HTTPs server that conforms to the runner interface
// we use in /cmd/controller.
type Server struct {
http.ServeMux

Logger *logrus.Entry
}

// Run runs an http server on port :8080.
func (s *Server) Run(ctx context.Context, _ int) error {
s.Logger.Info("Starting http server...")
srv := &http.Server{
Addr: ":8080",
Handler: s,
}
go func() {
<-ctx.Done()
_ = srv.Close()
}()

if err := srv.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
s.Logger.WithError(err).Info("http server closed")
} else {
wrappedErr := errors.Wrap(err, "Could not listen on :8080")
runtime.HandleError(s.Logger.WithError(wrappedErr), wrappedErr)
}
}
return nil
}
Loading

0 comments on commit 1775f9e

Please sign in to comment.