From 18b4edee5effcec9225d6d72f850ee7d3c53b908 Mon Sep 17 00:00:00 2001 From: Danylo Kuvshynov Date: Mon, 26 Apr 2021 23:30:27 +0300 Subject: [PATCH] Fix unexpected pod deletion. Improvements in correct termination. --- cmd/seleniferous/main.go | 49 ++++++++++++++++++--------------------- handlers.go | 50 ++++++++++++++++++---------------------- seleniferous.go | 3 +++ storage.go | 1 - 4 files changed, 48 insertions(+), 55 deletions(-) diff --git a/cmd/seleniferous/main.go b/cmd/seleniferous/main.go index b5f9a00..35c0598 100644 --- a/cmd/seleniferous/main.go +++ b/cmd/seleniferous/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "fmt" "net" "net/http" @@ -42,28 +43,36 @@ func command() *cobra.Command { logger := logrus.New() logger.Formatter = &logrus.JSONFormatter{} + logger.Infof("starting seleniferous %s", buildVersion) + hostname, err := os.Hostname() if err != nil { logger.Fatalf("can't get container hostname: %v", err) } - logger.Infof("starting seleniferous %s", buildVersion) + logger.Infof("pod hostname %s", hostname) client, err := buildClusterClient() if err != nil { logger.Fatalf("failed to build kubernetes client: %v", err) } - ctx := context.Background() - _, err = client.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{}) - if err != nil { - logger.Fatalf("failed to get namespace: %s: %v", namespace, err) + logger.Info("kubernetes client created") + + deleteFunc := func() { + context := context.Background() + client.CoreV1().Pods(namespace).Delete(context, hostname, metav1.DeleteOptions{ + GracePeriodSeconds: pointer.Int64Ptr(15), + }) + defer logger.Infof("deleting pod %s", hostname) } + defer deleteFunc() - logger.Info("kubernetes client created") + quit := make(chan error, 1) + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) storage := seleniferous.NewStorage() - app := seleniferous.New(&seleniferous.Config{ BrowserPort: browserPort, ProxyPath: proxyPath, @@ -74,6 +83,7 @@ func command() *cobra.Command { Storage: storage, Logger: logger, Client: client, + Quit: quit, }) router := mux.NewRouter() @@ -88,25 +98,14 @@ func command() *cobra.Command { } w.WriteHeader(http.StatusOK) }) + srv := &http.Server{ Addr: net.JoinHostPort("", listhenPort), Handler: router, } - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) - - e := make(chan error) - - cancelFunc := func() { - context := context.Background() - client.CoreV1().Pods(namespace).Delete(context, hostname, metav1.DeleteOptions{ - GracePeriodSeconds: pointer.Int64Ptr(15), - }) - } - go func() { - e <- srv.ListenAndServe() + quit <- srv.ListenAndServe() }() go func() { @@ -118,7 +117,7 @@ func command() *cobra.Command { case <-timeout: shuttingDown = true logger.Warn("session wait timeout exceeded") - cancelFunc() + quit <- errors.New("new session request timeout") break loop case <-ticker: if storage.IsEmpty() { @@ -130,13 +129,9 @@ func command() *cobra.Command { }() select { - case err := <-e: - logger.Fatalf("failed to start: %v", err) + case err := <-quit: + logger.Infof("stopping seleniferous: %v", err) case sig := <-sigs: - if !shuttingDown { - logger.Warn("unexpected stop signal received") - defer cancelFunc() - } logger.Warnf("stopping seleniferous: %s", sig.String()) } diff --git a/handlers.go b/handlers.go index deee2dd..9d0f31b 100644 --- a/handlers.go +++ b/handlers.go @@ -18,8 +18,6 @@ import ( "github.com/google/uuid" "github.com/gorilla/mux" "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/utils/pointer" ) var ( @@ -90,35 +88,20 @@ func (app *App) HandleSession(w http.ResponseWriter, r *http.Request) { }) done := make(chan func()) + cancel := func() {} go func() { (<-done)() }() - cancel := func() {} - defer func() { done <- cancel }() - cancelFunc := func() { - context := context.Background() - app.client.CoreV1().Pods(app.namespace).Delete(context, app.hostname, metav1.DeleteOptions{ - GracePeriodSeconds: pointer.Int64Ptr(15), - }) - } - (&httputil.ReverseProxy{ Director: func(r *http.Request) { r.URL.Scheme = "http" r.URL.Host, r.URL.Path = net.JoinHostPort(app.hostname, app.browserPort), app.proxyPath - - go func() { - <-r.Context().Done() - - cancel = cancelFunc - }() - logger.Info("new session request") }, ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) { @@ -135,19 +118,25 @@ func (app *App) HandleSession(w http.ResponseWriter, r *http.Request) { if !ok { value, ok := msg["value"] if !ok { - cancel = cancelFunc + cancel = func() { + app.quit <- errors.New("failed to extract sessionId from response") + } logger.Errorf("unable to extract sessionId from response") return errors.New("selenium protocol") } valueMap, ok := value.(map[string]interface{}) if !ok { - cancel = cancelFunc + cancel = func() { + app.quit <- errors.New("failed to extract sessionId from response") + } logger.Errorf("unable to extract sessionId from response") return errors.New("selenium protocol") } sessionId, ok = valueMap["sessionId"].(string) if !ok { - cancel = cancelFunc + cancel = func() { + app.quit <- errors.New("failed to extract sessionId from response") + } logger.Errorf("unable to extract sessionId from response") return errors.New("selenium protocol") } @@ -170,20 +159,24 @@ func (app *App) HandleSession(w http.ResponseWriter, r *http.Request) { ID: sessionId, OnTimeout: onTimeout(app.idleTimeout, func() { logger.Warnf("session timed out: %s, after %.2fs", sessionId, app.idleTimeout.Seconds()) - cancelFunc() + app.quit <- fmt.Errorf("session time out after %.2fs", app.idleTimeout.Seconds()) + }), - CancelFunc: cancelFunc, } app.bucket.put(app.hostname, service) logger.Infof("new session request completed: %s", sessionId) return nil } - cancel = cancelFunc + cancel = func() { + app.quit <- fmt.Errorf("failed to parse response body: %v", err) + } logger.Errorf("unable to parse response body: %v", err) return errors.New("response body parse error") } - cancel = cancelFunc + cancel = func() { + app.quit <- fmt.Errorf("failed to read response body: %v", err) + } logger.Errorf("unable to read response body: %v", err) return errors.New("response body read error") }, @@ -237,14 +230,17 @@ func (app *App) HandleProxy(w http.ResponseWriter, r *http.Request) { } if r.Method == http.MethodDelete && len(fragments) == 5 { - cancel = sess.CancelFunc + cancel = func() { + app.quit <- errors.New("session deleted") + } logger.Warnf("session %s delete request", id) } else { sess.OnTimeout = onTimeout(app.idleTimeout, func() { - logger.Infof("session timed out: %s, after %.2fs", id, app.idleTimeout.Seconds()) + logger.Infof("session timeout: %s, after %.2fs", id, app.idleTimeout.Seconds()) err := req{r}.buildURL(r.Host, id).delete() if err != nil { logger.Warnf("session %s delete request failed: %v", id, err) + app.quit <- errors.New("failed to perform delete request") } }) diff --git a/seleniferous.go b/seleniferous.go index 9a25ed0..d6aff67 100644 --- a/seleniferous.go +++ b/seleniferous.go @@ -18,6 +18,7 @@ type Config struct { Storage *Storage Logger *logrus.Logger Client *kubernetes.Clientset + Quit chan error } //App ... @@ -31,6 +32,7 @@ type App struct { bucket *Storage logger *logrus.Logger client *kubernetes.Clientset + quit chan error } //New ... @@ -45,5 +47,6 @@ func New(conf *Config) *App { bucket: conf.Storage, logger: conf.Logger, client: conf.Client, + quit: conf.Quit, } } diff --git a/storage.go b/storage.go index eaf1bf9..02dce86 100644 --- a/storage.go +++ b/storage.go @@ -9,7 +9,6 @@ type session struct { ID string URL *url.URL OnTimeout chan struct{} - CancelFunc func() } type Storage struct {