-
Notifications
You must be signed in to change notification settings - Fork 247
/
glow.go
133 lines (112 loc) · 5.22 KB
/
glow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
import (
"bufio"
"log"
"os"
"runtime"
"strconv"
"sync"
"github.com/chrislusf/glow/netchan"
kingpin "gopkg.in/alecthomas/kingpin.v2"
a "github.com/chrislusf/glow/agent"
r "github.com/chrislusf/glow/netchan/receiver"
s "github.com/chrislusf/glow/netchan/sender"
m "github.com/chrislusf/glow/resource/service_discovery/master"
"github.com/chrislusf/glow/util"
)
var (
app = kingpin.New("glow", "A command-line net channel.")
master = app.Command("master", "Start a master process")
masterAddress = master.Flag("address", "listening address host:port").Default(":8930").String()
masterCerts = netchan.CertFiles{}
agent = app.Command("agent", "Channel Agent")
agentOption = &a.AgentServerOption{
Dir: agent.Flag("dir", "agent folder to store computed data").Default(os.TempDir()).String(),
Host: agent.Flag("host", "agent listening host address. Required in 2-way SSL mode.").Default("").String(),
Port: agent.Flag("port", "agent listening port").Default("8931").Int(),
Master: agent.Flag("master", "master address").Default("localhost:8930").String(),
DataCenter: agent.Flag("dataCenter", "data center name").Default("defaultDataCenter").String(),
Rack: agent.Flag("rack", "rack name").Default("defaultRack").String(),
MaxExecutor: agent.Flag("max.executors", "upper limit of executors").Default(strconv.Itoa(runtime.NumCPU())).Int(),
CPULevel: agent.Flag("cpu.level", "relative computing power of single cpu core").Default("1").Int(),
MemoryMB: agent.Flag("memory", "memory size in MB").Default("1024").Int64(),
CleanRestart: agent.Flag("clean.restart", "clean up previous dataset files").Default("true").Bool(),
CertFiles: netchan.CertFiles{},
}
sender = app.Command("send", "Send data to a channel")
sendToChanName = sender.Flag("to", "Name of a channel").Required().String()
sendFile = sender.Flag("file", "file to post.").ExistingFile()
senderAgentAddress = sender.Flag("agent", "agent host:port").Default("localhost:8931").String()
senderCerts = netchan.CertFiles{}
// sendDelimiter = sender.Flag("delimiter", "Verbose mode.").Short('d').String()
receiver = app.Command("receive", "Receive data from a channel")
receiveFromChanName = receiver.Flag("from", "Name of a source channel").Required().String()
receiverMaster = receiver.Flag("master", "ip:port format").Default("localhost:8930").String()
receiverCerts = netchan.CertFiles{}
)
func main() {
master.Flag("cert.file", "A PEM eoncoded certificate file").Default("").StringVar(&masterCerts.CertFile)
master.Flag("key.file", "A PEM encoded private key file.").Default("").StringVar(&masterCerts.KeyFile)
master.Flag("ca.file", "A PEM eoncoded CA's certificate file.").Default("").StringVar(&masterCerts.CaFile)
agent.Flag("cert.file", "A PEM eoncoded certificate file").Default("").StringVar(&agentOption.CertFiles.CertFile)
agent.Flag("key.file", "A PEM encoded private key file.").Default("").StringVar(&agentOption.CertFiles.KeyFile)
agent.Flag("ca.file", "A PEM eoncoded CA's certificate file.").Default("").StringVar(&agentOption.CertFiles.CaFile)
sender.Flag("cert.file", "A PEM eoncoded certificate file").Default("").StringVar(&senderCerts.CertFile)
sender.Flag("key.file", "A PEM encoded private key file.").Default("").StringVar(&senderCerts.KeyFile)
sender.Flag("ca.file", "A PEM eoncoded CA's certificate file.").Default("").StringVar(&senderCerts.CaFile)
receiver.Flag("cert.file", "A PEM eoncoded certificate file").Default("").StringVar(&receiverCerts.CertFile)
receiver.Flag("key.file", "A PEM encoded private key file.").Default("").StringVar(&receiverCerts.KeyFile)
receiver.Flag("ca.file", "A PEM eoncoded CA's certificate file.").Default("").StringVar(&receiverCerts.CaFile)
switch kingpin.MustParse(app.Parse(os.Args[1:])) {
case master.FullCommand():
println("listening on", *masterAddress)
m.RunMaster(masterCerts.MakeTLSConfig(), *masterAddress)
case sender.FullCommand():
tlsConfig := senderCerts.MakeTLSConfig()
util.SetupHttpClient(tlsConfig)
var wg sync.WaitGroup
sendChan, err := s.NewDirectSendChannel(tlsConfig, *sendToChanName, *senderAgentAddress, &wg)
if err != nil {
panic(err)
}
file := os.Stdin
if *sendFile != "" {
file, err = os.Open(*sendFile)
if err != nil {
log.Fatal(err)
}
defer file.Close()
}
counter := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() {
sendChan <- scanner.Bytes()
counter++
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
close(sendChan)
wg.Wait()
case receiver.FullCommand():
tlsConfig := receiverCerts.MakeTLSConfig()
util.SetupHttpClient(tlsConfig)
target := r.FindTarget(*receiveFromChanName, *receiverMaster)
rc := r.NewReceiveChannel(tlsConfig, *receiveFromChanName, 0)
recvChan, err := rc.GetDirectChannel(target, 128)
if err != nil {
panic(err)
}
for m := range recvChan {
println(string(m))
}
case agent.FullCommand():
if agentOption.CertFiles.IsEnabled() {
if *agentOption.Host == "" {
log.Fatalf("Usage Note: --host option is needed in 2-way SSL mode and must match CN in the certificate.")
}
}
agentServer := a.NewAgentServer(agentOption)
agentServer.Run()
}
}