Skip to content

Commit

Permalink
agones-{extensions,allocator}: Make servers context aware
Browse files Browse the repository at this point in the history
I ran the HA tests on #3843 overnight in a loop and still noticed a
very low grade flake in the allocator, so I decided to go ahead and
clean up the issues I noticed while working on the previous PR:

* adds an `httpserver` utility package to handle the `Run` function
that controller/extensions use. Make that context aware using the same
method as https.Run:
https://github.com/googleforgames/agones/blob/dfa414e5e4da37798833bbf8c33919acb5f3c2ea/pkg/util/https/server.go#L127-L130
(note that I think this ^ is why the extensions flake disappeared
after I added a pause).

* also plumbs context-awareness through the allocator
run{Mux,REST,GRPC} functions, which is I suspect hy we're seeing a
low-grade flake in allocator still. (I think the delay I added
previously has sufficient to drive it off, too, but this PR should be
more better.)
  • Loading branch information
zmerlynn committed May 31, 2024
1 parent dfa414e commit 9387e7b
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 69 deletions.
57 changes: 38 additions & 19 deletions cmd/allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"agones.dev/agones/pkg/gameserverallocations"
"agones.dev/agones/pkg/gameservers"
"agones.dev/agones/pkg/util/fswatch"
"agones.dev/agones/pkg/util/httpserver"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
)
Expand Down Expand Up @@ -218,9 +219,6 @@ func main() {
health, closer := setupMetricsRecorder(conf)
defer closer()

// http.DefaultServerMux is used for http connection, not for https
http.Handle("/", health)

kubeClient, agonesClient, err := getClients(conf)
if err != nil {
logger.WithError(err).Fatal("could not create clients")
Expand Down Expand Up @@ -294,29 +292,32 @@ func main() {

// If grpc and http use the same port then use a mux.
if conf.GRPCPort == conf.HTTPPort {
runMux(h, conf.HTTPPort)
runMux(ctx, h, conf.HTTPPort)
} else {
// Otherwise, run each on a dedicated port.
if validPort(conf.HTTPPort) {
runREST(h, conf.HTTPPort)
runREST(ctx, h, conf.HTTPPort)
}
if validPort(conf.GRPCPort) {
runGRPC(h, conf.GRPCPort)
runGRPC(ctx, h, conf.GRPCPort)
}
}

// Finally listen on 8080 (http) and block the main goroutine
// this is used to serve /live and /ready handlers for Kubernetes probes.
err = http.ListenAndServe(":8080", http.DefaultServeMux)
logger.WithError(err).Fatal("allocation service crashed")
// Finally listen on 8080 (http), used to serve /live and /ready handlers for Kubernetes probes.
healthserver := httpserver.Server{Logger: logger}
healthserver.Handle("/", health)
go func() { _ = healthserver.Run(ctx, 0) }()

<-ctx.Done()
logger.Info("Shut down allocator")
}

func validPort(port int) bool {
const maxPort = 65535
return port >= 0 && port < maxPort
}

func runMux(h *serviceHandler, httpPort int) {
func runMux(ctx context.Context, h *serviceHandler, httpPort int) {
logger.Infof("Running the mux handler on port %d", httpPort)
grpcServer := grpc.NewServer(h.getMuxServerOptions()...)
pb.RegisterAllocationServiceServer(grpcServer, h)
Expand All @@ -326,19 +327,19 @@ func runMux(h *serviceHandler, httpPort int) {
panic(err)
}

runHTTP(h, httpPort, grpcHandlerFunc(grpcServer, mux))
runHTTP(ctx, h, httpPort, grpcHandlerFunc(grpcServer, mux))
}

func runREST(h *serviceHandler, httpPort int) {
func runREST(ctx context.Context, h *serviceHandler, httpPort int) {
logger.WithField("port", httpPort).Info("Running the rest handler")
mux := runtime.NewServerMux()
if err := pb.RegisterAllocationServiceHandlerServer(context.Background(), mux, h); err != nil {
panic(err)
}
runHTTP(h, httpPort, mux)
runHTTP(ctx, h, httpPort, mux)
}

func runHTTP(h *serviceHandler, httpPort int, handler http.Handler) {
func runHTTP(ctx context.Context, h *serviceHandler, httpPort int, handler http.Handler) {
cfg := &tls.Config{}
if !h.tlsDisabled {
cfg.GetCertificate = h.getTLSCert
Expand All @@ -356,21 +357,29 @@ func runHTTP(h *serviceHandler, httpPort int, handler http.Handler) {
}

go func() {
go func() {
<-ctx.Done()
_ = server.Close()
}()

var err error
if !h.tlsDisabled {
err = server.ListenAndServeTLS("", "")
} else {
err = server.ListenAndServe()
}

if err != nil {
if err == http.ErrServerClosed {
logger.WithError(err).Info("HTTP/HTTPS server closed")
os.Exit(0)
} else {
logger.WithError(err).Fatal("Unable to start HTTP/HTTPS listener")
os.Exit(1)
}
}()
}

func runGRPC(h *serviceHandler, grpcPort int) {
func runGRPC(ctx context.Context, h *serviceHandler, grpcPort int) {
logger.WithField("port", grpcPort).Info("Running the grpc handler on port")
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", grpcPort))
if err != nil {
Expand All @@ -382,9 +391,19 @@ func runGRPC(h *serviceHandler, grpcPort int) {
pb.RegisterAllocationServiceServer(grpcServer, h)

go func() {
go func() {
<-ctx.Done()
grpcServer.Stop()
}()

err := grpcServer.Serve(listener)
logger.WithError(err).Fatal("allocation service crashed")
os.Exit(1)
if err != nil {
logger.WithError(err).Fatal("allocation service crashed")
os.Exit(1)
} else {
logger.Info("allocation server closed")
os.Exit(0)
}
}()
}

Expand Down
27 changes: 2 additions & 25 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -54,6 +53,7 @@ import (
"agones.dev/agones/pkg/gameservers"
"agones.dev/agones/pkg/gameserversets"
"agones.dev/agones/pkg/metrics"
"agones.dev/agones/pkg/util/httpserver"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
)
Expand Down Expand Up @@ -170,7 +170,7 @@ func main() {
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)

server := &httpServer{}
server := &httpserver.Server{Logger: logger}
var rs []runner
var health healthcheck.Handler

Expand Down Expand Up @@ -541,10 +541,6 @@ type runner interface {
Run(ctx context.Context, workers int) error
}

type httpServer struct {
http.ServeMux
}

func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.Entry, doLeaderElection bool, kubeClient *kubernetes.Clientset, namespace string, start func(_ context.Context)) {
if !doLeaderElection {
start(ctx)
Expand Down Expand Up @@ -594,22 +590,3 @@ func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.E
},
})
}

func (h *httpServer) Run(_ context.Context, _ int) error {
logger.Info("Starting http server...")
srv := &http.Server{
Addr: ":8080",
Handler: h,
}
defer srv.Close() // nolint: errcheck

if err := srv.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
logger.WithError(err).Info("http server closed")
} else {
wrappedErr := errors.Wrap(err, "Could not listen on :8080")
runtime.HandleError(logger.WithError(wrappedErr), wrappedErr)
}
}
return nil
}
27 changes: 2 additions & 25 deletions cmd/extensions/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package main
import (
"context"
"io"
"net/http"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -46,6 +45,7 @@ import (
"agones.dev/agones/pkg/metrics"
"agones.dev/agones/pkg/util/apiserver"
"agones.dev/agones/pkg/util/https"
"agones.dev/agones/pkg/util/httpserver"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
"agones.dev/agones/pkg/util/webhooks"
Expand Down Expand Up @@ -150,7 +150,7 @@ func main() {
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)

server := &httpServer{}
server := &httpserver.Server{Logger: logger}
var health healthcheck.Handler

// Stackdriver metrics
Expand Down Expand Up @@ -340,26 +340,3 @@ type config struct {
type runner interface {
Run(ctx context.Context, workers int) error
}

type httpServer struct {
http.ServeMux
}

func (h *httpServer) Run(_ context.Context, _ int) error {
logger.Info("Starting http server...")
srv := &http.Server{
Addr: ":8080",
Handler: h,
}
defer srv.Close() // nolint: errcheck

if err := srv.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
logger.WithError(err).Info("http server closed")
} else {
wrappedErr := errors.Wrap(err, "Could not listen on :8080")
runtime.HandleError(logger.WithError(wrappedErr), wrappedErr)
}
}
return nil
}
53 changes: 53 additions & 0 deletions pkg/util/httpserver/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2024 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package httpserver

import (
"context"
"net/http"

"agones.dev/agones/pkg/util/runtime"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

type Server struct {
http.ServeMux

Logger *logrus.Entry
}

// Run runs an http server on port :8080.
func (s *Server) Run(ctx context.Context, _ int) error {
s.Logger.Info("Starting http server...")
srv := &http.Server{
Addr: ":8080",
Handler: s,
}
go func() {
<-ctx.Done()
_ = srv.Close()
}()

if err := srv.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
s.Logger.WithError(err).Info("http server closed")
} else {
wrappedErr := errors.Wrap(err, "Could not listen on :8080")
runtime.HandleError(s.Logger.WithError(wrappedErr), wrappedErr)
}
}
return nil
}

0 comments on commit 9387e7b

Please sign in to comment.