This repository has been archived by the owner on Mar 14, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 23
/
s3jobresponse.go
143 lines (122 loc) · 4.09 KB
/
s3jobresponse.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package main
import "strings"
// S3JobDetail represents our stored job format, which is a little different from
// what we get from job history server
type S3JobDetail struct {
ID string `json:"job_id"`
Name string `json:"job_name"`
User string `json:"user"`
StartTime int64 `json:"submit_date"`
FinishTime int64 `json:"finish_date"`
State string `json:"outcome"`
Conf map[string]string `json:"job_properties"`
MapTasks []task `json:"map_tasks"`
ReduceTasks []task `json:"reduce_tasks"`
MapCounters map[string]int `json:"map_counters"`
ReduceCounters map[string]int `json:"reduce_counters"`
MapsTotal int `json:"total_maps"`
ReducesTotal int `json:"total_reduces"`
}
type task struct {
StartTime int64 `json:"launch_date"`
EndTime int64 `json:"finish_date"`
Status string `json:"task_status"`
}
// s3responseToJob translates the s3 response data to a job object
func s3responseToJob(data *S3JobDetail) *job {
flowID, _ := data.Conf["cascading.flow.id"]
return &job{
Details: s3jobdetailToJobDetail(data),
conf: s3responseToJobConf(data),
Tasks: s3responseToTasks(data),
Counters: s3responseToCounters(data),
FlowID: &flowID,
}
}
/**
* Translates counter names
*/
func getCounterName(s3name string) string {
if strings.Contains(s3name, "BYTES_READ") || strings.Contains(s3name, "BYTES_WRITTEN") {
return "FileSystemCounter." + s3name
} else if s3name == "REDUCE_SHUFFLE_BYTES" || strings.Contains(s3name, "PUT_RECORDS") {
return "TaskCounter." + s3name
}
return s3name
}
func s3responseToCounters(s *S3JobDetail) []counter {
counters := make([]counter, 0)
for key := range s.ReduceCounters {
counters = append(counters, counter{
Name: getCounterName(key),
Total: s.MapCounters[key] + s.ReduceCounters[key],
Map: s.MapCounters[key],
Reduce: s.ReduceCounters[key],
})
}
for key := range s.MapCounters {
counters = append(counters, counter{
Name: getCounterName(key),
Total: s.MapCounters[key] + s.ReduceCounters[key],
Map: s.MapCounters[key],
Reduce: s.ReduceCounters[key],
})
}
return counters
}
func s3responseToTasks(s *S3JobDetail) tasks {
tasks := tasks{Map: make([][]int64, len(s.MapTasks)), Reduce: make([][]int64, len(s.ReduceTasks))}
for i, task := range s.MapTasks {
tasks.Map[i] = []int64{task.StartTime, task.EndTime}
}
for i, task := range s.ReduceTasks {
tasks.Reduce[i] = []int64{task.StartTime, task.EndTime}
}
return tasks
}
func s3responseToJobConf(s *S3JobDetail) conf {
return conf{
Flags: s.Conf,
Input: s.Conf["mapreduce.input.fileinputformat.inputdir"],
Output: s.Conf["mapreduce.output.fileoutputformat.outputdir"],
}
}
func filter(vs []task, f func(task) bool) []task {
vsf := make([]task, 0)
for _, v := range vs {
if f(v) {
vsf = append(vsf, v)
}
}
return vsf
}
func s3jobdetailToJobDetail(s *S3JobDetail) jobDetail {
state := s.State
if state == "SUCCESS" {
state = "SUCCEEDED" // for consistency with job history server
}
return jobDetail{
ID: s.ID,
Name: s.Name,
User: s.User,
State: state,
StartTime: s.StartTime,
FinishTime: s.FinishTime,
MapsTotal: s.MapsTotal,
MapProgress: 100,
MapsPending: 0,
MapsRunning: 0,
MapsCompleted: len(filter(s.MapTasks, func(t task) bool { return t.Status == "SUCCESS" })),
MapsFailed: len(filter(s.MapTasks, func(t task) bool { return t.Status == "FAILED" })),
MapsKilled: len(filter(s.MapTasks, func(t task) bool { return t.Status == "KILLED" })),
MapsTotalTime: int64(s.MapCounters["CPU_MILLISECONDS"]),
ReducesTotal: s.ReducesTotal,
ReduceProgress: 100,
ReducesPending: 0,
ReducesRunning: 0,
ReducesCompleted: len(filter(s.ReduceTasks, func(t task) bool { return t.Status == "SUCCESS" })),
ReducesFailed: len(filter(s.ReduceTasks, func(t task) bool { return t.Status == "FAILED" })),
ReducesKilled: len(filter(s.ReduceTasks, func(t task) bool { return t.Status == "KILLED" })),
ReducesTotalTime: int64(s.ReduceCounters["CPU_MILLISECONDS"]),
}
}