From 37a6ae0370daeb020a841174c9faba0fa3f9395e Mon Sep 17 00:00:00 2001 From: Ivan Krutov Date: Thu, 1 Jun 2017 12:56:40 +0300 Subject: [PATCH] Added sse tests --- main.go | 62 ++++++++++++------------- main_test.go | 61 +++++++++++++++++++------ selenoid/selenoid.go | 14 +++--- selenoid/selenoid_test.go | 53 ++++++++++------------ sse/sse.go | 27 +++++++++-- sse/sse_test.go | 95 +++++++++++++++++++++++++++++++++++++++ ws.html | 53 ---------------------- 7 files changed, 228 insertions(+), 137 deletions(-) create mode 100644 sse/sse_test.go delete mode 100644 ws.html diff --git a/main.go b/main.go index dba4125d..b15af275 100644 --- a/main.go +++ b/main.go @@ -3,16 +3,16 @@ package main import ( "context" "flag" + "fmt" + "github.com/aerokube/selenoid-ui/selenoid" + "github.com/aerokube/selenoid-ui/sse" "log" "net/http" "os" "os/signal" + "regexp" "syscall" "time" - "github.com/aerokube/selenoid-ui/selenoid" - "github.com/aerokube/selenoid-ui/sse" - "fmt" - "regexp" ) //go:generate go-bindata-assetfs data/... @@ -68,37 +68,39 @@ func init() { } } +func tick(broker sse.Broker, selenoidUri string, period time.Duration, stop chan os.Signal) { + ticker := time.NewTicker(period) + for { + ctx, cancel := context.WithCancel(context.Background()) + select { + case <-ticker.C: + { + if broker.HasClients() { + res, err := selenoid.Status(ctx, selenoidUri) + if err != nil { + log.Printf("can't get status (%s)\n", err) + broker.Notify([]byte(`{ "errors": [{"msg": "can't get status"}] }`)) + continue + } + broker.Notify(res) + } + } + case <-stop: + { + cancel() + ticker.Stop() + os.Exit(0) + } + } + } +} + func main() { broker := sse.NewSseBroker() stop := make(chan os.Signal) signal.Notify(stop, syscall.SIGTERM, syscall.SIGINT) - go func() { - ticker := time.NewTicker(period) - for { - ctx, cancel := context.WithCancel(context.Background()) - select { - case <-ticker.C: - { - if broker.HasClients() { - res, err := selenoid.Status(ctx, selenoidUri) - if err != nil { - log.Printf("can't get status (%s)\n", err) - broker.Notifier <- []byte(`{ "errors": [{"msg": "can't get status"}] }`) - continue - } - broker.Notifier <- res - } - } - case <-stop: - { - cancel() - ticker.Stop() - os.Exit(0) - } - } - } - }() + go tick(broker, selenoidUri, period, stop) log.Printf("Listening on %s\n", listen) log.Fatal(http.ListenAndServe(listen, mux(broker))) diff --git a/main_test.go b/main_test.go index 768205e1..83d2adff 100644 --- a/main_test.go +++ b/main_test.go @@ -1,26 +1,23 @@ package main import ( - "testing" + "encoding/json" . "github.com/aandryashin/matchers" . "github.com/aandryashin/matchers/httpresp" - "fmt" - "strings" - "net/http/httptest" + "github.com/aerokube/selenoid-ui/selenoid" "github.com/aerokube/selenoid-ui/sse" "net/http" + "net/http/httptest" + "os" + "testing" + "time" + "strings" + "fmt" ) -var ( - srv *httptest.Server - broker = sse.NewSseBroker() -) - -func init() { - srv = httptest.NewServer(mux(broker)) -} - func TestRootLoads(t *testing.T) { + broker := sse.NewSseBroker() + srv := httptest.NewServer(mux(broker)) resp, err := http.Get(srv.URL + "/") AssertThat(t, err, Is{nil}) AssertThat(t, resp, Code{200}) @@ -55,3 +52,41 @@ func (m NotContains) Match(i interface{}) bool { func (m NotContains) String() string { return fmt.Sprintf("not contains %v", m.V) } + +type MockBroker struct { + messages chan string +} + +func (mb *MockBroker) HasClients() bool { + return true +} + +func (mb *MockBroker) Notify(data []byte) { + mb.messages <- string(data) +} + +func (mb *MockBroker) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + rw.WriteHeader(http.StatusOK) +} + +func TestTick(t *testing.T) { + srv := httptest.NewServer(selenoidApi()) + broker := &MockBroker{messages: make(chan string, 10)} + stop := make(chan os.Signal) + go tick(broker, srv.URL, 10*time.Millisecond, stop) + time.Sleep(50 * time.Millisecond) + close(stop) + AssertThat(t, len(broker.messages) > 0, Is{true}) +} + +func selenoidApi() http.Handler { + mux := http.NewServeMux() + mux.HandleFunc("/status", mockStatus) + return mux +} + +func mockStatus(w http.ResponseWriter, _ *http.Request) { + data, _ := json.MarshalIndent(selenoid.State{}, "", " ") + w.WriteHeader(http.StatusOK) + w.Write(data) +} diff --git a/selenoid/selenoid.go b/selenoid/selenoid.go index 5bad8f9c..e31e18d1 100644 --- a/selenoid/selenoid.go +++ b/selenoid/selenoid.go @@ -1,10 +1,10 @@ package selenoid import ( - "net/http" "context" - "time" "encoding/json" + "net/http" + "time" ) /* -------------- * @@ -58,11 +58,11 @@ type sessionInfo struct { // result - processed selenoid state type result struct { - State State `json:"state"` - Origin string `json:"origin"` - Browsers map[string]int `json:"browsers"` - Sessions map[string]sessionInfo `json:"sessions"` - Errors []interface{} `json:"errors"` + State State `json:"state"` + Origin string `json:"origin"` + Browsers map[string]int `json:"browsers"` + Sessions map[string]sessionInfo `json:"sessions"` + Errors []interface{} `json:"errors"` } func httpDo(ctx context.Context, req *http.Request, handle func(*http.Response, error) error) error { diff --git a/selenoid/selenoid_test.go b/selenoid/selenoid_test.go index 58c7fe13..b2006873 100644 --- a/selenoid/selenoid_test.go +++ b/selenoid/selenoid_test.go @@ -1,51 +1,27 @@ package selenoid import ( - "testing" + "context" "encoding/json" . "github.com/aandryashin/matchers" - "net/http/httptest" "net/http" - "context" -) - -var ( - srv *httptest.Server + "net/http/httptest" + "testing" ) -func init() { - srv = httptest.NewServer(mockSelenoid()) -} - -func mockSelenoid() http.Handler { +func selenoidApi() http.Handler { mux := http.NewServeMux() mux.HandleFunc(statusPath, mockStatus) return mux } func mockStatus(w http.ResponseWriter, _ *http.Request) { - data, _ := json.MarshalIndent(getTestState(), "", " ") + data, _ := json.MarshalIndent(selenoidState(), "", " ") w.WriteHeader(http.StatusOK) w.Write(data) } -func TestToUI(t *testing.T) { - ui := toUI(getTestState(), "http://localhost") - data, err := json.MarshalIndent(ui, "", " ") - AssertThat(t, err, Is{nil}) - AssertThat(t, data, Is{Not{nil}}) - AssertThat(t, ui.Browsers["firefox"], Is{1}) - AssertThat(t, ui.Browsers["chrome"], Is{1}) - AssertThat(t, ui.Browsers["opera"], Is{0}) -} - -func TestStatus(t *testing.T) { - data, err := Status(context.Background(), srv.URL) - AssertThat(t, err, Is{nil}) - AssertThat(t, data, Is{Not{nil}}) -} - -func getTestState() State { +func selenoidState() State { var state State json.Unmarshal([]byte(`{ "total": 20, @@ -88,3 +64,20 @@ func getTestState() State { }`), &state) return state } + +func TestToUI(t *testing.T) { + ui := toUI(selenoidState(), "http://localhost") + data, err := json.MarshalIndent(ui, "", " ") + AssertThat(t, err, Is{nil}) + AssertThat(t, data, Is{Not{nil}}) + AssertThat(t, ui.Browsers["firefox"], Is{1}) + AssertThat(t, ui.Browsers["chrome"], Is{1}) + AssertThat(t, ui.Browsers["opera"], Is{0}) +} + +func TestStatus(t *testing.T) { + srv := httptest.NewServer(selenoidApi()) + data, err := Status(context.Background(), srv.URL) + AssertThat(t, err, Is{nil}) + AssertThat(t, data, Not{nil}) +} diff --git a/sse/sse.go b/sse/sse.go index f98bf5c5..53aa5a69 100644 --- a/sse/sse.go +++ b/sse/sse.go @@ -1,9 +1,10 @@ package sse import ( - "net/http" "fmt" "log" + "net/http" + "sync" "time" ) @@ -11,9 +12,15 @@ import ( // a slow client or a client that closed after `range clients` started. const patience time.Duration = time.Second * 1 +type Broker interface { + http.Handler + Notify(data []byte) + HasClients() bool +} + type SseBroker struct { // Events are pushed to this channel - Notifier chan []byte + notifier chan []byte // New client connections newClients chan chan []byte @@ -23,11 +30,13 @@ type SseBroker struct { // Client connections registry clients map[chan []byte]bool + + lock sync.RWMutex } func NewSseBroker() (broker *SseBroker) { broker = &SseBroker{ - Notifier: make(chan []byte, 1), + notifier: make(chan []byte, 1), newClients: make(chan chan []byte), closingClients: make(chan chan []byte), clients: make(map[chan []byte]bool), @@ -72,7 +81,13 @@ func (sse *SseBroker) ServeHTTP(rw http.ResponseWriter, req *http.Request) { } +func (sse *SseBroker) Notify(data []byte) { + sse.notifier <- data +} + func (sse *SseBroker) HasClients() bool { + sse.lock.RLock() + defer sse.lock.RUnlock() return len(sse.clients) > 0 } @@ -81,15 +96,19 @@ func (broker *SseBroker) listen() { select { case s := <-broker.newClients: { + broker.lock.Lock() broker.clients[s] = true + broker.lock.Unlock() log.Printf("Client added. %d registered clients", len(broker.clients)) } case s := <-broker.closingClients: { + broker.lock.Lock() delete(broker.clients, s) + broker.lock.Unlock() log.Printf("Removed client. %d registered clients", len(broker.clients)) } - case event := <-broker.Notifier: + case event := <-broker.notifier: { for clientMessageChan := range broker.clients { select { diff --git a/sse/sse_test.go b/sse/sse_test.go new file mode 100644 index 00000000..da2c6a7b --- /dev/null +++ b/sse/sse_test.go @@ -0,0 +1,95 @@ +package sse + +import ( + "bufio" + "fmt" + . "github.com/aandryashin/matchers" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +var ( + srv *httptest.Server + broker = NewSseBroker() +) + +func init() { + mux := http.NewServeMux() + mux.Handle("/events", broker) + srv = httptest.NewServer(mux) +} + +func TestEvents(t *testing.T) { + const testData = "test-data" + ch := make(chan string) + errors := make(chan error) + go waitForMessage(srv.URL+"/events", ch, errors) + stop := make(chan struct{}) + connected := make(chan struct{}) + go waitForConnection(broker, connected, stop) + + select { + case <-connected: + { + broker.Notify([]byte(testData)) + + select { + case text := <-ch: + { + AssertThat(t, strings.TrimSpace(text), EqualTo{fmt.Sprintf("data: %s", testData)}) + } + case err := <-errors: + { + t.Fatalf("Failed to receive message: %v", err) + } + } + } + case <-time.After(100 * time.Millisecond): + { + close(stop) + t.Fatal("Test timed out") + } + } +} + +func waitForConnection(broker *SseBroker, connected chan struct{}, stop chan struct{}) { + for { + select { + case <-time.After(10 * time.Millisecond): + if broker.HasClients() { + connected <- struct{}{} + } + case <-stop: + return + } + } +} + +func waitForMessage(url string, ch chan string, errors chan error) { + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + errors <- err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + errors <- err + } + defer resp.Body.Close() + br := bufio.NewReader(resp.Body) + const delim = '\n' + for { + data, err := br.ReadBytes(delim) + if err != nil { + break + } + + if err == io.EOF { + break + } + ch <- string(data) + } +} diff --git a/ws.html b/ws.html deleted file mode 100644 index b470e29b..00000000 --- a/ws.html +++ /dev/null @@ -1,53 +0,0 @@ - - - Camel Twitter WebSocket Example - - - - -
- - -