-
Notifications
You must be signed in to change notification settings - Fork 1
/
dispatcher.go
305 lines (244 loc) · 7.31 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
package flowprocess
import (
sj "github.com/guyannanfei25/go-simplejson"
"fmt"
"sync"
"sync/atomic"
"time"
"log"
"os"
)
// 配置文件从ini改为json,json能够更好的处理复杂的层级关系
// http://www.guyannanfei25.site/2017/05/14/misc-talk/
type Dispatcher interface {
Init(conf *sj.Json) error
SetName(name string)
GetName() string
// 只有需要级联的dispatch才需要Register接口
// 注册下游dispatcher
DownRegister(d Dispatcher)
// 注册上游dispatcher
UpRegister(d Dispatcher)
// Dispatch前预处理函数
SetPreFunc(preFunctor Functor)
// Dispatch后处理函数
SetSufFuc(sufFunctor Functor)
// 开始运行,在注册好dispatcher并设置好预处理函数之后
Start()
// 是否正在运行
IsRunning() bool
Dispatch(item interface{})
Ack(result interface{}) error
// 自定义logger
SetLogger(l logger, lvl LogLevel)
getLogger() (logger, LogLevel)
// 一些定时任务,比如打印状态,更新缓存。。。
Tick()
Close()
}
// 函数对象
// 针对prefunc 返回错误则会终止后续的处理
// 可以定制返回结果,后续操作继续操作
type Functor func(interface{}) (interface{}, error)
// type Item struct {
// }
// 仅做中转,由具体的用户定制的dispatcher去转换
// 自定义类型外部需要导入
// type Item interface{}
// 返回结果,用户定制,比如统计成功率
// type Result interface{}
type DefaultDispatcher struct {
name string
// 保存配置,以便用户需要获取相应参数
Conf *sj.Json
// 改为map用于去重
downDispatchers map[Dispatcher]int
upDispatchers map[Dispatcher]int
msgChan chan interface{} // *item
msgMaxSize int // 最大缓存大小
concurrency int // 并发执行数
chanCount uint32 // 当前协程数
running bool // 是否正在运行
// 预处理函数
preFunctor Functor
// 最后处理函数
sufFunctor Functor
wg sync.WaitGroup
// 自定义logger
logger logger
logLvl LogLevel
logGuard sync.RWMutex
}
func (d *DefaultDispatcher) Init(conf *sj.Json) error {
d.Conf = conf
d.concurrency = conf.Get("concurrency").MustInt(1)
if d.concurrency < 1 {
d.concurrency = 1
}
d.msgMaxSize = conf.Get("msgMaxSize").MustInt(0)
if d.msgMaxSize < 0 {
d.msgMaxSize = 0 // 无缓存
}
d.msgChan = make(chan interface{}, d.msgMaxSize)
d.name = conf.Get("name").MustString("DefaultName")
d.downDispatchers = make(map[Dispatcher]int)
d.upDispatchers = make(map[Dispatcher]int)
d.chanCount = 0
d.preFunctor = nil
d.sufFunctor = nil
d.running = false
d.logger = log.New(os.Stderr, "", log.Flags())
d.logLvl = LogLevelInfo
return nil
}
func (d *DefaultDispatcher) Start() {
if d.running {
d.logf(LogLevelInfo, " is already running!\n")
return
}
for i := 0; i < d.concurrency; i++ {
go d.process(i)
}
// Make sure down and up dispatcher running
// 由上一级 dispatcher 启动所有下一级
for down, _ := range d.downDispatchers {
down.Start()
// to ensure downregister start success
for {
if !down.IsRunning() {
time.Sleep(time.Second)
} else {
break
}
}
}
d.running = true
d.logf(LogLevelInfo, " start running!\n")
}
func (d *DefaultDispatcher) IsRunning() bool {
return d.running
}
func (d *DefaultDispatcher) SetName(name string) {
d.name = name
}
func (d *DefaultDispatcher) GetName() string {
return d.name
}
func (d *DefaultDispatcher) SetLogger(l logger, lvl LogLevel) {
d.logGuard.Lock()
defer d.logGuard.Unlock()
d.logger = l
d.logLvl = lvl
}
func (d *DefaultDispatcher) getLogger() (logger, LogLevel) {
d.logGuard.Lock()
defer d.logGuard.Unlock()
return d.logger, d.logLvl
}
func (d *DefaultDispatcher) DownRegister(down Dispatcher) {
d.downDispatchers[down] = 1
down.UpRegister(d)
}
func (d *DefaultDispatcher) UpRegister(up Dispatcher) {
d.upDispatchers[up] = 1
}
func (d *DefaultDispatcher) SetPreFunc (preFunctor Functor) {
d.preFunctor = preFunctor
}
func (d *DefaultDispatcher) SetSufFuc (sufFunctor Functor) {
d.sufFunctor = sufFunctor
}
func (d *DefaultDispatcher) process(id int) {
d.wg.Add(1)
defer d.wg.Done()
atomic.AddUint32(&d.chanCount, 1)
d.logf(LogLevelInfo, " %dth process starting...\n", id)
PROCESS_MAIN:
for {
select {
case item, ok := <- d.msgChan:
if !ok {
break PROCESS_MAIN
}
// 经过用户业务处理之后的返回值,可以简简单单返回item自身
var ret interface{}
var err error
// dispatch之前预处理函数
// 当需要中断处理,即不需要后续的dispatcher处理则需要返回错误
if d.preFunctor != nil {
if ret, err = d.preFunctor(item); err != nil {
continue
}
} else {
// 如果没有设置用户自己业务,则仅仅做转发使用
ret = item
}
for sub, _ := range d.downDispatchers {
sub.Dispatch(ret)
}
// dispatch之后处理函数
if d.sufFunctor != nil {
d.sufFunctor(item)
}
}
}
d.logf(LogLevelInfo, " %dth process quiting...\n", id)
atomic.AddUint32(&d.chanCount, ^uint32(0))
}
func (d *DefaultDispatcher) Dispatch(item interface{}) {
d.msgChan <- item
}
func (d *DefaultDispatcher) Ack(result interface{}) error {
// TODO:是否合理???
// DefaultDispatcher不应该调用上游Dispatcher,应由继承者来实现
// for up, _ := range d.upDispatchers {
// up.Ack()
// }
return nil
}
// 一些定时任务,比如打印状态,更新缓存。。。
// 使用时建议使用一个单独协程,
// 由首个dispatcher调用,触犯后面dispatcher链
// framework := new(DefaultDispatcher)
// go func(){
// tick := time.NewTicker(time.Duration(20) * time.Second)
// for {
// <- tick.C
// framework.Tick()
// }
// }()
// defer tick.Stop()
func (d *DefaultDispatcher) Tick() {
for sub, _ := range d.downDispatchers {
sub.Tick()
}
}
func (d *DefaultDispatcher) logf(lvl LogLevel, line string, args ...interface{}) {
logger, logLvl := d.getLogger()
if logger == nil {
return
}
if logLvl > lvl {
return
}
logger.Output(2, fmt.Sprintf("[%-7s %s] %s", lvl, d.GetName(), fmt.Sprintf(line, args...)))
}
func (d *DefaultDispatcher) Close() {
close(d.msgChan)
d.wg.Wait()
d.running = false
// 关闭下游dispatcher
for sub, _ := range d.downDispatchers {
if sub.IsRunning() {
sub.Close()
}
for {
if sub.IsRunning() {
time.Sleep(time.Second)
} else {
break
}
}
}
d.logf(LogLevelInfo, " exit Success\n")
}