-
Notifications
You must be signed in to change notification settings - Fork 4
/
pipeline.go
109 lines (91 loc) · 3.29 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
//go:build windows || darwin
// +build windows darwin
package main
import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
b "github.com/teamnsrg/mida/base"
"github.com/teamnsrg/mida/log"
"github.com/teamnsrg/mida/monitor"
"github.com/teamnsrg/mida/sanitize"
"github.com/teamnsrg/mida/storage"
"os"
"os/exec"
"sync"
)
// InitPipeline is the main MIDA pipeline, used whenever MIDA uses a browser to visit websites.
// It consists of five main stages: RawTask stage1, RawTask Sanitize, Site Visit, stage4, and Results Storage.
func InitPipeline(cmd *cobra.Command, args []string) {
rawTaskChan := make(chan *b.RawTask) // channel connecting stages 1 and 2
sanitizedTaskChan := make(chan *b.TaskWrapper) // channel connecting stages 2 and 3
rawResultChan := make(chan *b.RawResult) // channel connecting stages 3 and 4
finalResultChan := make(chan *b.FinalResult) // channel connection stages 4 and 5
monitorChan := make(chan *b.TaskSummary)
var crawlerWG sync.WaitGroup // Tracks active crawler workers
var postprocesserWG sync.WaitGroup // Tracks active postprocessers
var storageWG sync.WaitGroup // Tracks active storage workers
var pipelineWG sync.WaitGroup // Tracks tasks currently in pipeline
// Start our virtual display, if needed
xvfb, err := cmd.Flags().GetBool("xvfb")
if err != nil {
log.Log.Error(err)
return
}
xvfbCommand := exec.Command("Xvfb", ":99", "-screen", "0", "1920x1080x16")
if xvfb {
log.Log.Error("virtual display (Xvfb) is only available on linux")
return
}
// Start goroutine that runs the Prometheus monitoring HTTP server
if viper.GetBool("monitor") {
go monitor.RunPrometheusClient(monitorChan, viper.GetInt("prom_port"))
}
// Start goroutine(s) that handles crawl results storage
numStorers := viper.GetInt("storers")
storageWG.Add(numStorers)
for i := 0; i < numStorers; i++ {
go stage5(finalResultChan, monitorChan, &storageWG, &pipelineWG)
}
// Start goroutine that handles crawl results sanitization
numPostprocessers := viper.GetInt("postprocessers")
postprocesserWG.Add(numPostprocessers)
for i := 0; i < numPostprocessers; i++ {
go stage4(rawResultChan, finalResultChan, &postprocesserWG)
}
// Start site visitors(s) which take sanitized tasks as arguments
numCrawlers := viper.GetInt("crawlers")
crawlerWG.Add(numCrawlers)
for i := 0; i < numCrawlers; i++ {
go stage3(sanitizedTaskChan, rawResultChan, &crawlerWG)
}
// Start goroutine which sanitizes input tasks
go stage2(rawTaskChan, sanitizedTaskChan, &pipelineWG)
// Start the goroutine responsible for getting our tasks
go stage1(rawTaskChan, cmd, args)
// Wait for all of our crawlers to finish, and then allow them to exit
crawlerWG.Wait()
close(rawResultChan)
// Wait for postprocessing to finish
postprocesserWG.Wait()
close(finalResultChan)
// Wait for all of our storers to exit.
storageWG.Wait()
// Close connections to databases or storage servers
err = storage.CleanupConnections()
if err != nil {
log.Log.Error(err)
}
// Cleanup any remaining temporary files before we exit
err = os.RemoveAll(sanitize.ExpandPath(viper.GetString("tempdir")))
if err != nil {
log.Log.Error(err)
}
// Shut down our xvfb server, if running
if xvfb {
err = xvfbCommand.Process.Kill()
if err != nil {
log.Log.Error(err)
}
}
return
}