-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathpolyglot.go
147 lines (123 loc) · 3.91 KB
/
polyglot.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package main
import (
"encoding/json"
"encoding/base64"
"fmt"
"strings"
"strconv"
"net/http"
"github.com/julienschmidt/httprouter"
zmq "github.com/pebbe/zmq4"
)
// default handler
func process(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {
if request.Method == "POST" {
err := request.ParseForm()
failOnError(err, "Failed to parse form")
request.ParseMultipartForm(10485760)
}
// marshal the HTTP request struct into JSON
reqJson, err := json.Marshal(request)
failOnError(err, "Failed to marshal the request into JSON")
routeId := request.Method + request.URL.Path
// send request
// create ZMQ REQ socket
client, err := zmq.NewSocket(zmq.REQ)
failOnError(err, "Failed to create socket")
defer client.Close()
// set the identity to the route ID eg GET/_/path
client.SetIdentity(routeId)
client.Connect(config.Broker)
poller := zmq.NewPoller()
poller.Add(client, zmq.POLLIN)
retries_left := config.RequestRetries
RETRIES_LOOP:
for retries_left > 0 {
// We send a request, then we work to get a reply
client.SendMessage(reqJson)
for expect_reply := true; expect_reply; {
// Poll socket for a reply, with timeout
sockets, err := poller.Poll(config.Timeout()); if err != nil {
reply(writer, 500, []byte(err.Error()))
retries_left = 0
break RETRIES_LOOP
}
// if there is a reply
if len(sockets) > 0 {
response, err := client.RecvMessage(0); if err != nil {
reply(writer, 500, []byte(err.Error()))
retries_left = 0
break RETRIES_LOOP
}
info(response[0])
status := response[0] // HTTP response code eg 200, 404
headers_json := response[1] // JSON encoded HTTP response headers
body := response[2] // HTTP response body as a string
// // unmarshal header JSON
var headers map[string]string
err = json.Unmarshal([]byte(headers_json), &headers); if err != nil {
reply(writer, 500, []byte(err.Error()))
retries_left = 0
break RETRIES_LOOP
}
// write headers
for k, v := range headers {
writer.Header().Set(k, v)
}
s, err := strconv.Atoi(status); if err != nil {
reply(writer, 500, []byte(err.Error()))
retries_left = 0
break RETRIES_LOOP
}
var data []byte
// get content type
ctype, hasCType := headers["Content-Type"]; if hasCType == true {
if is_text_mime_type(ctype) {
data = []byte(body)
} else {
data, _ = base64.StdEncoding.DecodeString(body)
}
} else {
data = []byte(body) // if not given the content type, assume it's text
}
// write status and body to response
reply(writer, s, data)
retries_left = 0
expect_reply = false
// if there are no replies, try again
} else {
retries_left--
if retries_left == 0 {
reply(writer, 500, []byte("Cannot connect with broker, giving up."))
break
} else {
fmt.Println("Cannot reach broker, retrying...")
// Old socket is confused; close it and open a new one
client.Close()
client, err = zmq.NewSocket(zmq.REQ)
failOnError(err, "Failed to create socket")
defer client.Close()
client.SetIdentity(routeId)
client.Connect(config.Broker)
// Recreate poller for new client
poller = zmq.NewPoller()
poller.Add(client, zmq.POLLIN)
// Send request again, on new socket
client.SendMessage(reqJson)
}
}
}
}
}
func reply(writer http.ResponseWriter, status int, body []byte) {
writer.WriteHeader(status)
writer.Write(body)
}
func is_text_mime_type(ctype string) bool {
if strings.HasPrefix(ctype, "text") ||
strings.HasPrefix(ctype, "application/json") {
return true
} else {
return false
}
}