-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathwebsocket.go
105 lines (90 loc) · 2.48 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/*
AUTHOR
Grant Street Group <[email protected]>
COPYRIGHT AND LICENSE
This software is Copyright (c) 2019 by Grant Street Group.
This is free software, licensed under:
MIT License
*/
package exasol
import (
"fmt"
"math/rand"
"net/url"
"reflect"
"regexp"
"strconv"
"time"
)
func (c *Conn) wsConnect() (err error) {
host := c.Conf.Host
isIPRange := regexp.MustCompile(`^(\d+)\.(\d+)\.(\d+)\.(\d+)\.\.(\d+)$`)
if isIPRange.MatchString(host) {
// This is an IP range so choose a node at random to connect to.
// If that connection fails try another one.
ipRange := isIPRange.FindStringSubmatch(host)
fromN, _ := strconv.ParseInt(ipRange[4], 10, 32)
toN, _ := strconv.ParseInt(ipRange[5], 10, 32)
ips := []string{}
for i := fromN; i <= toN; i++ {
ips = append(ips, fmt.Sprintf("%s.%s.%s.%d", ipRange[1], ipRange[2], ipRange[3], i))
}
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(ips), func(i, j int) { ips[i], ips[j] = ips[j], ips[i] })
for _, ip := range ips {
err = c.wsConnectHost(ip)
if err == nil {
break
}
}
} else {
err = c.wsConnectHost(host)
}
return err
}
func (c *Conn) wsConnectHost(host string) error {
uri := fmt.Sprintf("%s:%d", host, c.Conf.Port)
scheme := "ws"
if c.Conf.TLSConfig != nil {
scheme = "wss"
}
u := url.URL{
Scheme: scheme,
Host: uri,
}
c.log.Debugf("Connecting to %s", u.String())
return c.wsh.Connect(u, c.Conf.TLSConfig, c.Conf.ConnectTimeout)
}
// Request and Response are pointers to structs representing the API JSON.
// The Response struct is updated in-place.
func (c *Conn) send(request, response interface{}) error {
receiver, err := c.asyncSend(request)
if err != nil {
return err
}
return receiver(response)
}
func (c *Conn) asyncSend(request interface{}) (func(interface{}) error, error) {
err := c.wsh.WriteJSON(request)
if err != nil {
return nil, c.errorf("WebSocket API Error sending: %s", err)
}
return func(response interface{}) error {
err = c.wsh.ReadJSON(response)
if err != nil {
if regexp.MustCompile(`abnormal closure`).
MatchString(err.Error()) {
return fmt.Errorf("Server terminated statement")
}
return fmt.Errorf("WebSocket API Error recving: %s", err)
}
r := reflect.Indirect(reflect.ValueOf(response))
status := r.FieldByName("Status").String()
if status != "ok" {
err := reflect.Indirect(r.FieldByName("Exception")).
FieldByName("Text").String()
return fmt.Errorf("Server Error: %s", err)
}
return nil
}, nil
}