This repository has been archived by the owner on Mar 14, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 23
/
main.go
295 lines (253 loc) · 8.78 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
package main
import (
"encoding/json"
"flag"
"github.com/zenazn/goji/bind"
"github.com/zenazn/goji/web"
"github.com/zenazn/goji/web/middleware"
"log"
"net/http"
"net/http/pprof"
"os"
"path/filepath"
"reflect"
"strings"
"time"
)
var clusterNames = flag.String("cluster-name", "default", "The user-visible names for the clusters")
var resourceManagerURL = flag.String("resource-manager-url", "http://localhost:8088", "The HTTP URL to access the resource manager.")
var historyServerURL = flag.String("history-server-url", "http://localhost:19888", "The HTTP URL to access the history server.")
var publicResourceManagerURL = flag.String("public-resource-manager-url", "", "The HTTP URL to access the resource manager.")
var publicHistoryServerURL = flag.String("public-history-server-url", "", "The HTTP URL to access the history server.")
var proxyServerURL = flag.String("proxy-server-url", "", "The HTTP URL to access the proxy server, if separate from the resource manager.")
var namenodeAddress = flag.String("namenode-address", "localhost:9000", "The host:port to access the Namenode metadata service.")
var yarnLogDir = flag.String("yarn-logs-dir", "/tmp/logs", "The HDFS path where YARN stores logs. This is the controlled by the hadoop property yarn.nodemanager.remote-app-log-dir.")
var yarnHistoryDir = flag.String("yarn-history-dir", "/tmp/staging/history/done", "The HDFS path where YARN stores finished job history files. This is the controlled by the hadoop property mapreduce.jobhistory.done-dir.")
var httpTimeout = flag.Duration("http-timeout", time.Second*2, "The timeout used for connecting to YARN API. Pass values like: 2s")
var pollInterval = flag.Duration("poll-interval", time.Second*5, "How often should we poll the job APIs. Pass values like: 2s")
var enableDebug = flag.Bool("pprof", false, "Enable pprof debugging tools at /debug.")
var s3BucketName = flag.String("s3-bucket", "", "S3 bucket to fetch old jobs from")
var s3Region = flag.String("s3-region", "", "AWS region for the job storage S3 bucket")
var s3JobsPrefix = flag.String("s3-jobs-prefix", "", "S3 key prefix (\"folder\") where jobs are stored")
var s3FlowPrefix = flag.String("s3-flow-prefix", "", "S3 key prefix (\"folder\") where cascading flows are stored")
var jts map[string]*jobTracker
var persistedJobClient PersistedJobClient
var rootPath, staticPath string
var mux *web.Mux
func init() {
bind.WithFlag()
mux = web.New()
mux.Use(middleware.RequestID)
mux.Use(middleware.Logger)
mux.Use(middleware.Recoverer)
mux.Use(middleware.AutomaticOptions)
}
func index(c web.C, w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, filepath.Join(rootPath, "index.html"))
}
func getJobs(c web.C, w http.ResponseWriter, r *http.Request) {
// We only need the details for listing pages.
var jobs []*job
for clusterName, tracker := range jts {
for _, j := range tracker.jobs {
jobs = append(jobs, &job{
Cluster: tracker.clusterName,
Details: j.Details,
conf: conf{
Input: j.conf.Input,
Output: j.conf.Output,
ScaldingSteps: j.conf.ScaldingSteps,
name: j.conf.name,
},
})
}
log.Printf("Appending %d jobs for Cluster %s: %s %s\n", len(jobs), clusterName, tracker.hs, tracker.rm)
}
jsonBytes, err := json.Marshal(jobs)
if err != nil {
log.Println("getJobs error:", err)
w.WriteHeader(500)
return
}
w.Write(jsonBytes)
}
func getNumClusters(c web.C, w http.ResponseWriter, r *http.Request) {
jsonBytes, err := json.Marshal(len(jts))
if err != nil {
log.Println("getNumbClusters error:", err)
w.WriteHeader(500)
return
}
w.Write(jsonBytes)
}
func getConf(c web.C, w http.ResponseWriter, r *http.Request) {
id := c.URLParams["id"]
log.Printf("Getting job conf for %s", id)
job := getJob(id)
if job == nil {
w.WriteHeader(404)
return
}
jsonBytes, err := json.Marshal(jobConf{
Conf: job.conf,
ID: job.Details.ID,
Name: job.Details.Name,
})
if err != nil {
log.Println("could not marshal:", err)
w.WriteHeader(500)
return
}
w.Write(jsonBytes)
}
func getJob(rawJobID string) *job {
// check if we have it in memory
for _, jt := range jts {
if _, ok := jt.jobs[jobID(rawJobID)]; ok {
job := jt.getJob(rawJobID)
jt.reifyJob(job)
return job
}
}
// check if we have it in long-term storage
job, _ := persistedJobClient.FetchJob(rawJobID)
return job
}
func getJobAPIHandler(c web.C, w http.ResponseWriter, r *http.Request) {
job := getJob(c.URLParams["id"])
if job == nil {
w.WriteHeader(404)
return
}
jsonBytes, err := json.Marshal(job)
if err != nil {
log.Println("error serializing job:", err)
w.WriteHeader(500)
return
}
w.Write(jsonBytes)
}
func getJobIdsAPIHandler(c web.C, w http.ResponseWriter, r *http.Request) {
jobIds, err := persistedJobClient.FetchFlowJobIds(c.URLParams["flowID"])
if err != nil {
log.Println("FetchFlowJobIds error:", err)
w.WriteHeader(500)
return
}
jsonBytes, err := json.Marshal(jobIds)
if err != nil {
log.Println("JSON marshal error:", err)
w.WriteHeader(500)
return
}
w.Write(jsonBytes)
}
func killJob(c web.C, w http.ResponseWriter, r *http.Request) {
id := c.URLParams["id"]
app, jobID := hadoopIDs(id)
for _, jt := range jts {
if _, ok := jt.jobs[jobID]; ok {
err := jt.killJob(app, jt.jobs[jobID].Details.User)
if err != nil {
log.Println("killJob error: ", err)
w.WriteHeader(500)
return
}
w.WriteHeader(204)
return
}
}
w.WriteHeader(404)
}
func init() {
binPath, err := filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
log.Fatal(err)
}
rootPath = filepath.Join(binPath, "..")
staticPath = filepath.Join(rootPath, "static")
}
func main() {
flag.Parse()
var clusterNames = strings.Split(*clusterNames, ",")
var resourceManagerURLs = strings.Split(*resourceManagerURL, ",")
var historyServerURLs = strings.Split(*historyServerURL, ",")
var publicResourceManagerURLs = strings.Split(*publicResourceManagerURL, ",")
var publicHistoryServerURLs = strings.Split(*publicHistoryServerURL, ",")
var proxyServerURLs = strings.Split(*proxyServerURL, ",")
var namenodeAddresses = strings.Split(*namenodeAddress, ",")
if *publicResourceManagerURL == "" {
publicResourceManagerURLs = resourceManagerURLs
}
if *publicHistoryServerURL == "" {
publicHistoryServerURLs = resourceManagerURLs
}
if len(resourceManagerURLs) != len(historyServerURLs) {
log.Fatal("resource-manager-url and history-server-url are not 1:1")
}
if !reflect.DeepEqual(proxyServerURLs, []string{""}) && len(proxyServerURLs) != len(resourceManagerURLs) {
log.Fatal("proxy-server-url exists and is not 1:1 with resource-manager-url")
}
if len(resourceManagerURLs) != len(namenodeAddresses) {
log.Fatal("resource-manager-url and namenode-address are not 1:1")
}
if len(resourceManagerURLs) != len(clusterNames) {
log.Fatal("cluster-names and resource-manager-url are not 1:1")
}
persistedJobClient = NewS3JobClient(*s3Region, *s3BucketName, *s3JobsPrefix, *s3FlowPrefix)
jts = make(map[string]*jobTracker)
for i := range resourceManagerURLs {
var proxyServerURL string
if reflect.DeepEqual(proxyServerURLs, []string{""}) {
proxyServerURL = resourceManagerURLs[i]
} else {
proxyServerURL = proxyServerURLs[i]
}
log.Printf("Creating new JT [%d]: %s %s %s\n", i, resourceManagerURLs[i], historyServerURLs[i], proxyServerURL)
jts[clusterNames[i]] = newJobTracker(
clusterNames[i],
publicResourceManagerURLs[i],
publicHistoryServerURLs[i],
newRecentJobClient(
resourceManagerURLs[i],
historyServerURLs[i],
proxyServerURL,
namenodeAddresses[i],
),
&hdfsJobHistoryClient{},
)
}
log.Println("initiating JT loop")
for clusterName, jt := range jts {
go jt.Loop()
if err := jt.testLogsDir(); err != nil {
log.Printf("WARNING: Could not read yarn logs directory for cluster %s. Error message: `%s`\n", clusterName, err)
log.Println("\tYou can change the path with --yarn-logs-dir=HDFS_PATH.")
log.Println("\tTo talk to HDFS, Timberlake needs to be able to access the namenode (--namenode-address) and datanodes.")
}
}
sse := newSSE()
go sse.Loop()
static := http.StripPrefix("/static/", http.FileServer(http.Dir(staticPath)))
log.Println("serving static files from", staticPath)
mux.Get("/static/*", static)
mux.Get("/", index)
mux.Get("/jobs/", getJobs)
mux.Get("/numClusters/", getNumClusters)
mux.Get("/sse", sse)
mux.Get("/jobIds/:flowID", getJobIdsAPIHandler)
mux.Get("/jobs/:id", getJobAPIHandler)
mux.Get("/jobs/:id/conf", getConf)
mux.Post("/jobs/:id/kill", killJob)
if *enableDebug {
mux.Get("/debug/pprof/*", pprof.Index)
mux.Get("/debug/pprof/cmdline", pprof.Cmdline)
mux.Get("/debug/pprof/profile", pprof.Profile)
mux.Get("/debug/pprof/symbol", pprof.Symbol)
mux.Get("/debug/pprof/trace", pprof.Trace)
}
for _, jt := range jts {
go jt.sendUpdates(sse)
}
http.Serve(bind.Default(), mux)
}