-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.go
129 lines (111 loc) · 3.43 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package main
import (
"log"
"net/http"
"time"
"github.com/gorilla/websocket"
)
const (
// Send pings to peer with this period
pingPeriod = 10 * time.Second
// Time allowed to read the next ping message from the peer. Must be a greater than pingPeriod
pongWait = (pingPeriod * 11) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// connection is an middleman between the websocket connection and the hub.
type Subscriber struct {
connection *websocket.Conn // The websocket connection.
send chan Message // Buffered channel of messages to be written to connection (i.e websocket)
roomId string
id string
}
// readPump pumps messages from the websocket connection to the hub. i.e from client to server/hub
func (subscriber *Subscriber) websocketReader() {
defer func() {
leavingMessage := Message{
LeavingChat: true,
RoomId: subscriber.roomId,
SenderId: subscriber.id,
SentAt: time.Now(),
}
h.broadcast <- leavingMessage
h.unregister <- subscriber
subscriber.connection.Close()
}()
subscriber.connection.SetReadLimit(maxMessageSize)
subscriber.connection.SetReadDeadline(time.Now().Add(pongWait))
// setting pong handler function to respond to ping messages
subscriber.connection.SetPongHandler(func(string) error {
// extending read deadline of websocket connection on receiving the ping message
subscriber.connection.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
joiningMessage := Message{
JoiningChat: true,
RoomId: subscriber.roomId,
SenderId: subscriber.id,
SentAt: time.Now(),
}
h.broadcast <- joiningMessage
for {
var message Message
err := subscriber.connection.ReadJSON(&message)
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
log.Printf("error: %v", err)
}
break
}
message.RoomId = subscriber.roomId
message.SentAt = time.Now()
h.broadcast <- message
}
}
// websocketWriter pumps messages from the hub to the websocket connection. i.e from server to client
func (subscriber *Subscriber) websocketWriter() {
// starting a ticker to periodically send ping messages to peers
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
subscriber.connection.Close()
}()
for {
select {
case message, ok := <-subscriber.send:
if !ok {
subscriber.connection.WriteMessage(websocket.CloseMessage, []byte{})
return
}
if err := subscriber.connection.WriteJSON(message); err != nil {
return
}
case <-ticker.C:
// extending write deadline and sending ping message
subscriber.connection.SetWriteDeadline(time.Now().Add(pingPeriod))
if err := subscriber.connection.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
return
}
}
}
}
// serveWs handles websocket requests from the peer.
func serveWs(w http.ResponseWriter, r *http.Request, roomId string) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err.Error())
return
}
subscriber := &Subscriber{send: make(chan Message, 256), connection: ws, roomId: roomId, id: r.URL.Query().Get("sender_id")}
h.register <- subscriber
// spiing up 2 go-routines per client
go subscriber.websocketReader() // for reading from websocket and broadcasting to hub
go subscriber.websocketWriter() // for reading from send channel and writing to websocket
}