-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathtasks.go
238 lines (205 loc) · 5.92 KB
/
tasks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
package main
import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"sync"
)
// A Task performs some part of the RHMAP System Dump Tool.
type Task func() error
// Logger is a minimal interface implemented by log.Logger.
type Logger interface {
Printf(format string, v ...interface{})
}
// RunAllDumpTasks runs all tasks known to the dump tool using concurrent
// workers and returns task errors. Dump output goes to path. Progress is
// communicated by writing to out.
func RunAllDumpTasks(runner Runner, path string, workers int, out io.Writer) []error {
var errs []error
tasks := GetAllDumpTasks(runner, path)
results := make(chan error)
// Start worker goroutines to run tasks concurrently.
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
for task := range tasks {
results <- task()
}
}()
}
// Wait for all workers to terminate, then close the results channel to
// communicate that no more results will be sent.
go func() {
wg.Wait()
close(results)
}()
// Loop through the task execution results and log errors.
for err := range results {
if err != nil {
errs = append(errs, err)
}
fmt.Fprint(out, ".")
}
fmt.Fprintln(out)
return errs
}
// GetAllDumpTasks returns a channel of all tasks known to the dump tool. It returns
// immediately and sends tasks to the channel in a separate goroutine. The
// channel is closed after all tasks are sent.
// FIXME: GetAllDumpTasks should not need to know about basepath.
func GetAllDumpTasks(runner Runner, basepath string) <-chan Task {
tasks := make(chan Task)
go func() {
defer close(tasks)
projects, err := GetProjects(runner)
if err != nil {
tasks <- NewError(err)
return
}
if len(projects) == 0 {
tasks <- NewError(errors.New("no projects visible to the currently logged in user"))
return
}
var wg sync.WaitGroup
// Add tasks to fetch OpenShift metadata.
wg.Add(1)
go func() {
defer wg.Done()
GetOpenShiftMetadataTasks(tasks, runner, projects)
}()
// Add tasks to fetch resource definitions.
wg.Add(1)
go func() {
defer wg.Done()
resources := []string{
"deploymentconfigs", "pods", "services",
"events", "persistentvolumeclaims", "configmaps",
}
GetResourceDefinitionTasks(tasks, runner, projects, resources)
// For cluster-scoped resources we need only one task to
// fetch all definitions, instead of one per project.
clusterScoped := []string{"persistentvolumes", "nodes"}
for _, resource := range clusterScoped {
tasks <- ResourceDefinition(runner, "", resource)
}
}()
// Add tasks to fetch logs.
wg.Add(1)
go func() {
defer wg.Done()
// We should only care about logs for pods, because they
// cover all other possible types.
resourcesWithLogs := []string{"pods"}
// FIXME: we should not be accessing a flag value
// (global) here, instead take maxLines as an argument.
GetFetchLogsTasks(tasks, runner, projects, resourcesWithLogs, *maxLogLines)
}()
// Add tasks to fetch Nagios data.
wg.Add(1)
go func() {
defer wg.Done()
GetNagiosTasks(tasks, runner, projects)
}()
wg.Add(1)
go func() {
defer wg.Done()
GetMillicoreConfigTasks(tasks, runner, projects, getResourceNamesBySubstr)
}()
wg.Add(1)
go func() {
defer wg.Done()
tasks <- GetOcAdmDiagnosticsTask(runner)
}()
wg.Wait()
}()
return tasks
}
// RunAllAnalysisTasks runs all tasks known to the analysis tool using concurrent workers.
func RunAllAnalysisTasks(runner Runner, path string, workers int) AnalysisResult {
analysisResults := make(chan AnalysisResult)
tasks := GetAllAnalysisTasks(runner, path, analysisResults)
results := make(chan error)
// Start worker goroutines to run tasks concurrently.
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
for task := range tasks {
results <- task()
}
}()
}
// Listen to the checkResults channel and write all the results into
// the analysis.json file
var writeWait sync.WaitGroup
writeWait.Add(1)
var analysisResult AnalysisResult
go func() {
defer writeWait.Done()
filepath := filepath.Join(path, "analysis.json")
err := os.MkdirAll(path, 0770)
if err != nil {
results <- err
return
}
for result := range analysisResults {
analysisResult.Platform = append(analysisResult.Platform, result.Platform...)
analysisResult.Projects = append(analysisResult.Projects, result.Projects...)
output, err := json.MarshalIndent(analysisResult, "", " ")
if err != nil {
results <- err
}
ioutil.WriteFile(filepath, []byte(output), 0644)
}
}()
// Wait for all workers to terminate, then close the results channel to
// communicate that no more results will be sent.
go func() {
wg.Wait()
close(analysisResults)
close(results)
}()
// Loop through the task execution results and log errors.
for err := range results {
if err != nil {
// TODO: there should be a way to identify which task
// had an error.
fmt.Fprintln(os.Stderr)
log.Printf("Task error: %v", err)
continue
}
fmt.Fprint(os.Stderr, ".")
}
fmt.Fprintln(os.Stderr)
writeWait.Wait()
return analysisResult
}
// GetAllAnalysisTasks returns a channel of all the analysis tasks known to the dump tool. It returns
// immediately and sends tasks to the channel in a separate goroutine. The channel is closed after
// all tasks are sent.
// FIXME: GetAllAnalysisTasks should not need to know about basepath.
func GetAllAnalysisTasks(runner Runner, basepath string, results chan<- AnalysisResult) <-chan Task {
tasks := make(chan Task)
go func() {
defer close(tasks)
projects, err := GetProjects(runner)
if err != nil {
tasks <- NewError(err)
return
}
GetAnalysisTasks(tasks, basepath, projects, results)
}()
return tasks
}
// NewError returns a Task that always return the given error.
func NewError(err error) Task {
return func() error { return err }
}