-
Notifications
You must be signed in to change notification settings - Fork 51
/
client.go
148 lines (118 loc) · 3.23 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package sdk
import (
"context"
"github.com/apex/log"
"github.com/cenkalti/backoff/v4"
grpc2 "github.com/crawlab-team/crawlab-grpc"
"github.com/crawlab-team/crawlab-sdk/entity"
"github.com/crawlab-team/crawlab-sdk/interfaces"
"github.com/crawlab-team/go-trace"
"google.golang.org/grpc"
"os"
"time"
)
var C *Client
type Client struct {
// settings
address *entity.Address
timeout time.Duration
// internals
conn *grpc.ClientConn
// dependencies
ModelDelegateClient grpc2.ModelDelegateClient
ModelBaseServiceClient grpc2.ModelBaseServiceClient
NodeClient grpc2.NodeServiceClient
TaskClient grpc2.TaskServiceClient
PluginClient grpc2.PluginServiceClient
}
func (c *Client) GetModelDelegateClient() grpc2.ModelDelegateClient {
return c.ModelDelegateClient
}
func (c *Client) GetModelBaseServiceClient() grpc2.ModelBaseServiceClient {
return c.ModelBaseServiceClient
}
func (c *Client) GetNodeClient() grpc2.NodeServiceClient {
return c.NodeClient
}
func (c *Client) GetTaskClient() grpc2.TaskServiceClient {
return c.TaskClient
}
func (c *Client) GetPluginClient() grpc2.PluginServiceClient {
return c.PluginClient
}
func (c *Client) init() (err error) {
// connect
op := c.connect
b := backoff.NewExponentialBackOff()
notify := func(err error, duration time.Duration) {
log.Errorf("init client error: %v, re-attempt in %.1f seconds", err, duration.Seconds())
}
if err := backoff.RetryNotify(op, b, notify); err != nil {
return trace.TraceError(err)
}
// register
if err := c.register(); err != nil {
return err
}
return nil
}
func (c *Client) connect() (err error) {
// grpc server address
address := c.address.String()
// timeout context
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
// connection
// TODO: configure dial options
var opts []grpc.DialOption
opts = append(opts, grpc.WithInsecure())
opts = append(opts, grpc.WithBlock())
opts = append(opts, grpc.WithChainUnaryInterceptor(GetAuthTokenUnaryChainInterceptor()))
opts = append(opts, grpc.WithChainStreamInterceptor(GetAuthTokenStreamChainInterceptor()))
c.conn, err = grpc.DialContext(ctx, address, opts...)
if err != nil {
return trace.TraceError(err)
}
return nil
}
func (c *Client) register() (err error) {
// model delegate
c.ModelDelegateClient = grpc2.NewModelDelegateClient(c.conn)
// model base service
c.ModelBaseServiceClient = grpc2.NewModelBaseServiceClient(c.conn)
// node
c.NodeClient = grpc2.NewNodeServiceClient(c.conn)
// task
c.TaskClient = grpc2.NewTaskServiceClient(c.conn)
// plugin
c.PluginClient = grpc2.NewPluginServiceClient(c.conn)
return nil
}
func GetClient(opts ...ClientOption) interfaces.Client {
if C != nil {
return C
}
// address
address, err := entity.NewAddressFromString(os.Getenv("CRAWLAB_GRPC_ADDRESS"))
if err != nil {
address = entity.NewAddress(&entity.AddressOptions{
Host: os.Getenv("CRAWLAB_GRPC_ADDRESS_HOST"),
Port: os.Getenv("CRAWLAB_GRPC_ADDRESS_PORT"),
})
}
// client
client := &Client{
address: address,
timeout: 10 * time.Second,
}
// apply options
for _, opt := range opts {
opt(client)
}
// initialize
if err := client.init(); err != nil {
panic(err)
}
C = client
return client
}