Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Commit

Permalink
Parallel polling (#22)
Browse files Browse the repository at this point in the history
Implemented parallel polling
  • Loading branch information
aandryashin authored and vania-pooh committed May 27, 2018
1 parent 9d272dc commit 711b592
Show file tree
Hide file tree
Showing 4 changed files with 391 additions and 89 deletions.
11 changes: 1 addition & 10 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

111 changes: 69 additions & 42 deletions ggr-ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,26 @@ import (
type Status map[string]interface{}

var paths = struct {
Status string
VNC string
Logs string
Ping string
Status string
VNC string
Logs string
Ping string
Metrics string
}{
Status: "/status",
VNC: "/vnc/",
Logs: "/logs/",
Ping: "/ping",
Status: "/status",
VNC: "/vnc/",
Logs: "/logs/",
Ping: "/ping",
Metrics: "/metrics",
}

func mux() http.Handler {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc(paths.Status, status)
mux.Handle(paths.VNC, websocket.Handler(proxyWS(paths.VNC)))
mux.Handle(paths.Logs, websocket.Handler(proxyWS(paths.Logs)))
mux.HandleFunc(paths.Ping, ping)
mux.Handle(paths.Metrics, promhttp.Handler())
return mux
}

Expand All @@ -45,44 +47,70 @@ func status(w http.ResponseWriter, r *http.Request) {
_, remote := util.RequestInfo(r)
ch := make(chan struct{}, limit)
rslt := make(chan Status)
for _, u := range hosts {
ch <- struct{}{}
go func(u string) {
defer func() {
<-ch
}()
r, err := http.NewRequest(http.MethodGet, u+paths.Status, nil)
if err != nil {
rslt <- nil
log.Printf("[STATUS] [Failed to fetch status from %s: %v] [%s]", u, err, remote)
return
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
resp, err := http.DefaultClient.Do(r.WithContext(ctx))
if err != nil {
rslt <- nil
log.Printf("[STATUS] [Failed to fetch status from %s: %v] [%s]", u, err, remote)
done := make(chan Status)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func(ctx context.Context) {
for _, u := range hosts {
select {
case ch <- struct{}{}:
go func(ctx context.Context, u string) {
defer func() {
<-ch
}()
r, err := http.NewRequest(http.MethodGet, u+paths.Status, nil)
if err != nil {
rslt <- nil
log.Printf("[STATUS] [Failed to fetch status: %v] [%s]", err, remote)
return
}
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
resp, err := http.DefaultClient.Do(r.WithContext(ctx))
if err != nil {
rslt <- nil
log.Printf("[STATUS] [Failed to fetch status: %v] [%s]", err, remote)
return
}
defer resp.Body.Close()
m := make(map[string]interface{})
err = json.NewDecoder(resp.Body).Decode(&m)
if err != nil {
rslt <- nil
log.Printf("[STATUS] [Failed to parse response: %v] [%s]", err, remote)
return
}
rslt <- m
}(ctx, u)
case <-ctx.Done():
return
}
defer resp.Body.Close()
m := make(map[string]interface{})
err = json.NewDecoder(resp.Body).Decode(&m)
if err != nil {
rslt <- nil
log.Printf("[STATUS] [Failed to parse response from %s: %v] [%s]", u, err, remote)
}
}(ctx)
go func(ctx context.Context) {
s := make(Status)
loop:
for sum, _ := range hosts {
select {
case m := <-rslt:
if m != nil {
s.Add(sum, m)
}
case <-time.After(responseTime):
break loop
case <-ctx.Done():
return
}
rslt <- m
}(u)
}
s := make(Status)
for sum, _ := range hosts {
if m := <-rslt; m != nil {
s.Add(sum, m)
}
done <- s
}(ctx)
select {
case <-w.(http.CloseNotifier).CloseNotify():
cancel()
case s := <-done:
w.Header().Add("Content-Type", "application/json")
json.NewEncoder(w).Encode(s)
}
json.NewEncoder(w).Encode(s)
}

func (cur Status) Add(sum string, m map[string]interface{}) {
Expand Down Expand Up @@ -113,7 +141,6 @@ func (cur Status) Add(sum string, m map[string]interface{}) {
Status(cur[k].(map[string]interface{})).Add(sum, v.(map[string]interface{}))
}
}

}

func proxyWS(p string) func(wsconn *websocket.Conn) {
Expand Down
Loading

0 comments on commit 711b592

Please sign in to comment.