-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdispatcher.go
144 lines (122 loc) · 2.62 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package gopool
import (
"fmt"
"sync"
"sync/atomic"
)
const (
dispatcherStatusNone = 0
dispatcherStatusRunning = 1
dispatcherStatusStopped = 2
)
// Dispatcher 一致性派发器
type Dispatcher struct {
config *PoolConfig
wg *sync.WaitGroup
quit chan struct{}
started chan struct{}
status uint32 // 派发器状态
batchExe BatchExecutor
exe Executor
workers []Worker
batchers []Batcher
isBatch bool // 是否位批量处理
}
func NewDispatcher(options ...Option) *Dispatcher {
config := &PoolConfig{
batchSize: 10,
workers: 2,
}
for i := range options {
options[i](config)
}
if config.chanSize <= config.workers {
config.chanSize = config.workers
}
c := &Dispatcher{
config: config,
wg: new(sync.WaitGroup),
quit: make(chan struct{}),
started: make(chan struct{}),
status: dispatcherStatusNone,
exe: config.exe,
batchExe: config.batchExe,
}
if c.exe == nil && c.batchExe == nil {
panic("not set exe or batch exe")
}
if c.batchExe != nil {
c.isBatch = true
}
go c.start()
atomic.StoreUint32(&c.status, dispatcherStatusRunning)
<-c.started // wait dispather start ok
return c
}
func (c *Dispatcher) start() {
if !c.isBatch {
for i := 0; i < c.config.workers; i++ {
c.workers = append(c.workers, Worker{
config: c.config,
taskQueue: make(chan interface{}, c.config.chanSize),
executor: c.exe,
finshed: make(chan struct{}),
id: i,
})
}
for i := range c.workers {
c.wg.Add(1)
go c.workers[i].start()
}
} else {
for i := 0; i < c.config.workers; i++ {
c.batchers = append(c.batchers, Batcher{
config: c.config,
taskQueue: make(chan interface{}, c.config.chanSize),
executor: c.batchExe,
id: i,
})
}
for i := range c.batchers {
c.wg.Add(1)
go c.batchers[i].start()
}
}
c.started <- struct{}{}
for {
select {
case <-c.quit:
if c.isBatch {
for i := range c.batchers {
c.batchers[i].quit()
c.wg.Done()
}
} else {
for i := range c.workers {
c.workers[i].quit()
c.wg.Done()
}
}
}
}
}
func (c *Dispatcher) AddHashTask(h uint64, task Task) error {
if c.status != dispatcherStatusRunning {
return fmt.Errorf("hash dispatcher is stopped")
}
if !c.isBatch {
idx := h % uint64(len(c.workers))
return c.workers[idx].addTask(task)
}
if c.isBatch {
idx := h % uint64(len(c.batchers))
return c.batchers[idx].addTask(task)
}
return fmt.Errorf("unknow executor to work")
}
func (c *Dispatcher) GraceQuit() {
atomic.StoreUint32(&c.status, dispatcherStatusStopped)
c.quit <- struct{}{}
c.wg.Wait()
return
}