Skip to content

Commit

Permalink
Merge pull request #9 from alcounit/develop
Browse files Browse the repository at this point in the history
Fixed unexpected pod deletion. Added logging for pod termination.
  • Loading branch information
alcounit authored Apr 26, 2021
2 parents e7728e6 + 18b4ede commit e3b174e
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 57 deletions.
53 changes: 24 additions & 29 deletions cmd/seleniferous/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -42,28 +43,36 @@ func command() *cobra.Command {
logger := logrus.New()
logger.Formatter = &logrus.JSONFormatter{}

logger.Infof("starting seleniferous %s", buildVersion)

hostname, err := os.Hostname()
if err != nil {
logger.Fatalf("can't get container hostname: %v", err)
}

logger.Infof("starting seleniferous %s", buildVersion)
logger.Infof("pod hostname %s", hostname)

client, err := buildClusterClient()
if err != nil {
logger.Fatalf("failed to build kubernetes client: %v", err)
}

ctx := context.Background()
_, err = client.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{})
if err != nil {
logger.Fatalf("failed to get namespace: %s: %v", namespace, err)
logger.Info("kubernetes client created")

deleteFunc := func() {
context := context.Background()
client.CoreV1().Pods(namespace).Delete(context, hostname, metav1.DeleteOptions{
GracePeriodSeconds: pointer.Int64Ptr(15),
})
defer logger.Infof("deleting pod %s", hostname)
}
defer deleteFunc()

logger.Info("kubernetes client created")
quit := make(chan error, 1)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)

storage := seleniferous.NewStorage()

app := seleniferous.New(&seleniferous.Config{
BrowserPort: browserPort,
ProxyPath: proxyPath,
Expand All @@ -74,6 +83,7 @@ func command() *cobra.Command {
Storage: storage,
Logger: logger,
Client: client,
Quit: quit,
})

router := mux.NewRouter()
Expand All @@ -88,25 +98,14 @@ func command() *cobra.Command {
}
w.WriteHeader(http.StatusOK)
})

srv := &http.Server{
Addr: net.JoinHostPort("", listhenPort),
Handler: router,
}

stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)

e := make(chan error)

cancelFunc := func() {
context := context.Background()
client.CoreV1().Pods(namespace).Delete(context, hostname, metav1.DeleteOptions{
GracePeriodSeconds: pointer.Int64Ptr(15),
})
}

go func() {
e <- srv.ListenAndServe()
quit <- srv.ListenAndServe()
}()

go func() {
Expand All @@ -118,7 +117,7 @@ func command() *cobra.Command {
case <-timeout:
shuttingDown = true
logger.Warn("session wait timeout exceeded")
cancelFunc()
quit <- errors.New("new session request timeout")
break loop
case <-ticker:
if storage.IsEmpty() {
Expand All @@ -130,14 +129,10 @@ func command() *cobra.Command {
}()

select {
case err := <-e:
logger.Fatalf("failed to start: %v", err)
case <-stop:
if !shuttingDown {
logger.Warn("unexpected stop signal received")
defer cancelFunc()
}
logger.Warn("stopping seleniferous")
case err := <-quit:
logger.Infof("stopping seleniferous: %v", err)
case sig := <-sigs:
logger.Warnf("stopping seleniferous: %s", sig.String())
}

ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
Expand Down
50 changes: 23 additions & 27 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
)

var (
Expand Down Expand Up @@ -90,35 +88,20 @@ func (app *App) HandleSession(w http.ResponseWriter, r *http.Request) {
})

done := make(chan func())
cancel := func() {}

go func() {
(<-done)()
}()

cancel := func() {}

defer func() {
done <- cancel
}()

cancelFunc := func() {
context := context.Background()
app.client.CoreV1().Pods(app.namespace).Delete(context, app.hostname, metav1.DeleteOptions{
GracePeriodSeconds: pointer.Int64Ptr(15),
})
}

(&httputil.ReverseProxy{
Director: func(r *http.Request) {
r.URL.Scheme = "http"
r.URL.Host, r.URL.Path = net.JoinHostPort(app.hostname, app.browserPort), app.proxyPath

go func() {
<-r.Context().Done()

cancel = cancelFunc
}()

logger.Info("new session request")
},
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
Expand All @@ -135,19 +118,25 @@ func (app *App) HandleSession(w http.ResponseWriter, r *http.Request) {
if !ok {
value, ok := msg["value"]
if !ok {
cancel = cancelFunc
cancel = func() {
app.quit <- errors.New("failed to extract sessionId from response")
}
logger.Errorf("unable to extract sessionId from response")
return errors.New("selenium protocol")
}
valueMap, ok := value.(map[string]interface{})
if !ok {
cancel = cancelFunc
cancel = func() {
app.quit <- errors.New("failed to extract sessionId from response")
}
logger.Errorf("unable to extract sessionId from response")
return errors.New("selenium protocol")
}
sessionId, ok = valueMap["sessionId"].(string)
if !ok {
cancel = cancelFunc
cancel = func() {
app.quit <- errors.New("failed to extract sessionId from response")
}
logger.Errorf("unable to extract sessionId from response")
return errors.New("selenium protocol")
}
Expand All @@ -170,20 +159,24 @@ func (app *App) HandleSession(w http.ResponseWriter, r *http.Request) {
ID: sessionId,
OnTimeout: onTimeout(app.idleTimeout, func() {
logger.Warnf("session timed out: %s, after %.2fs", sessionId, app.idleTimeout.Seconds())
cancelFunc()
app.quit <- fmt.Errorf("session time out after %.2fs", app.idleTimeout.Seconds())

}),
CancelFunc: cancelFunc,
}
app.bucket.put(app.hostname, service)
logger.Infof("new session request completed: %s", sessionId)

return nil
}
cancel = cancelFunc
cancel = func() {
app.quit <- fmt.Errorf("failed to parse response body: %v", err)
}
logger.Errorf("unable to parse response body: %v", err)
return errors.New("response body parse error")
}
cancel = cancelFunc
cancel = func() {
app.quit <- fmt.Errorf("failed to read response body: %v", err)
}
logger.Errorf("unable to read response body: %v", err)
return errors.New("response body read error")
},
Expand Down Expand Up @@ -237,14 +230,17 @@ func (app *App) HandleProxy(w http.ResponseWriter, r *http.Request) {
}

if r.Method == http.MethodDelete && len(fragments) == 5 {
cancel = sess.CancelFunc
cancel = func() {
app.quit <- errors.New("session deleted")
}
logger.Warnf("session %s delete request", id)
} else {
sess.OnTimeout = onTimeout(app.idleTimeout, func() {
logger.Infof("session timed out: %s, after %.2fs", id, app.idleTimeout.Seconds())
logger.Infof("session timeout: %s, after %.2fs", id, app.idleTimeout.Seconds())
err := req{r}.buildURL(r.Host, id).delete()
if err != nil {
logger.Warnf("session %s delete request failed: %v", id, err)
app.quit <- errors.New("failed to perform delete request")
}
})

Expand Down
3 changes: 3 additions & 0 deletions seleniferous.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Config struct {
Storage *Storage
Logger *logrus.Logger
Client *kubernetes.Clientset
Quit chan error
}

//App ...
Expand All @@ -31,6 +32,7 @@ type App struct {
bucket *Storage
logger *logrus.Logger
client *kubernetes.Clientset
quit chan error
}

//New ...
Expand All @@ -45,5 +47,6 @@ func New(conf *Config) *App {
bucket: conf.Storage,
logger: conf.Logger,
client: conf.Client,
quit: conf.Quit,
}
}
1 change: 0 additions & 1 deletion storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ type session struct {
ID string
URL *url.URL
OnTimeout chan struct{}
CancelFunc func()
}

type Storage struct {
Expand Down

0 comments on commit e3b174e

Please sign in to comment.