This repository has been archived by the owner on Mar 14, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 23
/
sse.go
79 lines (68 loc) · 1.65 KB
/
sse.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package main
import (
"bytes"
"fmt"
"log"
"net/http"
)
type sse struct {
events chan []byte
addClient chan chan []byte
removeClient chan chan []byte
clients map[chan []byte]bool
}
func newSSE() *sse {
return &sse{
events: make(chan []byte, 0),
addClient: make(chan chan []byte, 0),
removeClient: make(chan chan []byte, 0),
clients: make(map[chan []byte]bool, 0),
}
}
func (sse *sse) Loop() {
for {
select {
case s := <-sse.addClient:
sse.clients[s] = true
log.Println("Added sse client.", len(sse.clients))
case s := <-sse.removeClient:
delete(sse.clients, s)
log.Println("Removed sse client.", len(sse.clients))
case event := <-sse.events:
for client := range sse.clients {
client <- event
}
}
}
}
var ssecounter = 0
func (sse *sse) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "text/event-stream")
w.Header().Add("Cache-Control", "no-cache")
w.Header().Add("Connection", "keep-alive")
w.(http.Flusher).Flush()
ssecounter++
id := ssecounter
header := r.Header["User-Agent"]
events := make(chan []byte)
sse.addClient <- events
defer func() {
sse.removeClient <- events
}()
go func() {
<-w.(http.CloseNotifier).CloseNotify()
sse.removeClient <- events
close(events)
}()
newline := []byte("\n")
prefix := []byte("\ndata: ")
for event := range events {
// When we see a newline we need to add the prefix again.
event = bytes.Replace(event, newline, prefix, -1)
if _, err := fmt.Fprintf(w, "data: %s\n\n", event); err != nil {
log.Println("Error writing to SSE", id, header, err)
continue
}
w.(http.Flusher).Flush()
}
}