forked from wellle/rmq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue.go
657 lines (559 loc) · 18.4 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
package rmq
import (
"context"
"fmt"
"math/rand"
"strings"
"sync"
"time"
)
const (
defaultBatchTimeout = time.Second
purgeBatchSize = int64(100)
)
type DeliveryState int
const (
PayloadStateUnknown DeliveryState = iota
PayloadStateReady
PayloadStateUnacked
PayloadStateAcked
PayloadStateRejected
)
type Queue interface {
Publish(payload ...string) error
PublishBytes(payload ...[]byte) error
Locate(payload string) (DeliveryState, int64, error)
LocateBytes(payload []byte) (DeliveryState, int64, error)
SetPushQueue(pushQueue Queue)
Remove(payload string, count int64, removeFromRejected bool) error
RemoveBytes(payload []byte, count int64, removeFromRejected bool) error
StartConsuming(prefetchLimit int64, pollDuration time.Duration) error
StopConsuming() <-chan struct{}
AddConsumer(tag string, consumer Consumer) (string, error)
AddConsumerFunc(tag string, consumerFunc ConsumerFunc) (string, error)
AddBatchConsumer(tag string, batchSize int64, timeout time.Duration, consumer BatchConsumer) (string, error)
AddBatchConsumerFunc(tag string, batchSize int64, timeout time.Duration, batchConsumerFunc BatchConsumerFunc) (string, error)
PurgeReady() (int64, error)
PurgeRejected() (int64, error)
ReturnUnacked(max int64) (int64, error)
ReturnRejected(max int64) (int64, error)
Destroy() (readyCount, rejectedCount int64, err error)
Drain(count int64) ([]string, error)
ReadyKey() string
// internals
// used in cleaner
closeInStaleConnection() error
// used for stats
readyCount() (int64, error)
unackedCount() (int64, error)
rejectedCount() (int64, error)
getConsumers() ([]string, error)
}
var _ Queue = (*redisQueue)(nil)
type redisQueue struct {
name string
connectionName string
queuesKey string // key to list of queues consumed by this connection
consumersKey string // key to set of consumers using this connection
unackedKey string // key to list of currently consuming deliveries
readyKey string // key to list of ready deliveries
rejectedKey string // key to list of rejected deliveries
pushKey string // key to list of pushed deliveries
redisClient RedisClient
errChan chan<- error
prefetchLimit int64 // max number of prefetched deliveries number of unacked can go up to prefetchLimit + numConsumers
pollDuration time.Duration
lock sync.Mutex // protects the fields below related to starting and stopping this queue
consumingStopped chan struct{} // this chan gets closed when consuming on this queue has stopped
stopWg sync.WaitGroup
ackCtx context.Context
ackCancel context.CancelFunc
deliveryChan chan Delivery // nil for publish channels, not nil for consuming channels
}
func newQueue(
name, connectionName, queuesKey string,
consumersTemplate, unackedTemplate, readyTemplate, rejectedTemplate string,
redisClient RedisClient,
errChan chan<- error,
) *redisQueue {
consumersKey := strings.Replace(consumersTemplate, phConnection, connectionName, 1)
consumersKey = strings.Replace(consumersKey, phQueue, name, 1)
unackedKey := strings.Replace(unackedTemplate, phConnection, connectionName, 1)
unackedKey = strings.Replace(unackedKey, phQueue, name, 1)
readyKey := strings.Replace(readyTemplate, phQueue, name, 1)
rejectedKey := strings.Replace(rejectedTemplate, phQueue, name, 1)
consumingStopped := make(chan struct{})
ackCtx, ackCancel := context.WithCancel(context.Background())
queue := &redisQueue{
name: name,
connectionName: connectionName,
queuesKey: queuesKey,
consumersKey: consumersKey,
unackedKey: unackedKey,
readyKey: readyKey,
rejectedKey: rejectedKey,
redisClient: redisClient,
errChan: errChan,
consumingStopped: consumingStopped,
ackCtx: ackCtx,
ackCancel: ackCancel,
}
return queue
}
func (queue *redisQueue) String() string {
return fmt.Sprintf("[%s conn:%s]", queue.name, queue.connectionName)
}
func (queue *redisQueue) ReadyKey() string {
return queue.readyKey
}
// Publish adds a delivery with the given payload to the queue
// returns how many deliveries are in the queue afterwards
func (queue *redisQueue) Publish(payload ...string) error {
_, err := queue.redisClient.LPush(queue.readyKey, payload...)
return err
}
// PublishBytes just casts the bytes and calls Publish
func (queue *redisQueue) PublishBytes(payload ...[]byte) error {
stringifiedBytes := make([]string, len(payload))
for i, b := range payload {
stringifiedBytes[i] = string(b)
}
return queue.Publish(stringifiedBytes...)
}
func (queue *redisQueue) Locate(payload string) (DeliveryState, int64, error) {
index, err := queue.redisClient.LPos(queue.readyKey, payload)
if err != nil {
return PayloadStateUnknown, 0, err
}
if index != -1 {
return PayloadStateReady, index, nil
}
index, err = queue.redisClient.LPos(queue.unackedKey, payload)
if err != nil {
return PayloadStateUnknown, 0, err
}
if index != -1 {
return PayloadStateUnacked, index, nil
}
index, err = queue.redisClient.LPos(queue.rejectedKey, payload)
if err != nil {
return PayloadStateUnknown, 0, err
}
if index != -1 {
return PayloadStateRejected, index, nil
}
return PayloadStateAcked, -1, nil
}
func (queue *redisQueue) LocateBytes(payload []byte) (DeliveryState, int64, error) {
return queue.Locate(string(payload))
}
// Remove elements with specific value from the queue (WARN: this operation is pretty slow with O(N+M) complexity where N is length of the queue and M is number of removed elements)
func (queue *redisQueue) Remove(payload string, count int64, removeFromRejected bool) error {
_, err := queue.redisClient.LRem(queue.readyKey, count, payload)
if removeFromRejected {
queue.redisClient.LRem(queue.rejectedKey, count, payload)
}
return err
}
// RemoveBytes casts bytes to string and calls Remove (WARN: this operation is pretty slow with O(N+M) complexity where N is length of the queue and M is number of removed elements)
func (queue *redisQueue) RemoveBytes(payload []byte, count int64, removeFromRejected bool) error {
return queue.Remove(string(payload), count, removeFromRejected)
}
// SetPushQueue sets a push queue. In the consumer function you can call
// delivery.Push(). If a push queue is set the delivery then gets moved from
// the original queue to the push queue. If no push queue is set it's
// equivalent to calling delivery.Reject().
// NOTE: panics if pushQueue is not a *redisQueue
func (queue *redisQueue) SetPushQueue(pushQueue Queue) {
queue.pushKey = pushQueue.(*redisQueue).readyKey
}
// StartConsuming starts consuming into a channel of size prefetchLimit
// must be called before consumers can be added!
// pollDuration is the duration the queue sleeps before checking for new deliveries
func (queue *redisQueue) StartConsuming(prefetchLimit int64, pollDuration time.Duration) error {
queue.lock.Lock()
defer queue.lock.Unlock()
// If deliveryChan is set, then we are already consuming
if queue.deliveryChan != nil {
return ErrorAlreadyConsuming
}
select {
case <-queue.consumingStopped:
// If consuming is stopped then we must not try to
return ErrorConsumingStopped
default:
}
// add queue to list of queues consumed on this connection
if _, err := queue.redisClient.SAdd(queue.queuesKey, queue.name); err != nil {
return err
}
queue.prefetchLimit = prefetchLimit
queue.pollDuration = pollDuration
queue.deliveryChan = make(chan Delivery, prefetchLimit)
// log.Printf("rmq queue started consuming %s %d %s", queue, prefetchLimit, pollDuration)
go queue.consume()
return nil
}
func (queue *redisQueue) consume() {
errorCount := 0 // number of consecutive batch errors
for {
switch err := queue.consumeBatch(); err {
case nil: // success
errorCount = 0
case ErrorConsumingStopped:
close(queue.deliveryChan)
return
default: // redis error
errorCount++
select { // try to add error to channel, but don't block
case queue.errChan <- &ConsumeError{RedisErr: err, Count: errorCount}:
default:
}
}
time.Sleep(jitteredDuration(queue.pollDuration))
}
}
func (queue *redisQueue) consumeBatch() error {
select {
case <-queue.consumingStopped:
return ErrorConsumingStopped
default:
}
// unackedCount == <deliveries in deliveryChan> + <deliveries in Consume()>
unackedCount, err := queue.unackedCount()
if err != nil {
return err
}
batchSize := queue.prefetchLimit - unackedCount
if batchSize <= 0 {
return nil
}
for i := int64(0); i < batchSize; i++ {
select {
case <-queue.consumingStopped:
return ErrorConsumingStopped
default:
}
payload, err := queue.redisClient.RPopLPush(queue.readyKey, queue.unackedKey)
if err == ErrorNotFound {
return nil
}
if err != nil {
return err
}
d, err := queue.newDelivery(payload)
if err != nil {
return fmt.Errorf("create new delivery: %w", err)
}
queue.deliveryChan <- d
}
return nil
}
func (queue *redisQueue) newDelivery(payload string) (Delivery, error) {
rd := &redisDelivery{
ctx: queue.ackCtx,
payload: payload,
unackedKey: queue.unackedKey,
rejectedKey: queue.rejectedKey,
pushKey: queue.pushKey,
redisClient: queue.redisClient,
errChan: queue.errChan,
}
var err error
rd.header, rd.clearPayload, err = ExtractHeaderAndPayload(payload)
if err == nil {
return rd, nil
}
// we need to reject a delivery here to move the delivery from the unacked to the rejected list.
rejectErr := rd.Reject()
if rejectErr != nil {
return nil, fmt.Errorf("%s, reject faulty delivery: %w", err, rejectErr)
}
return nil, err
}
// StopConsuming can be used to stop all consumers on this queue. It returns a
// channel which can be used to wait for all active consumers to finish their
// current Consume() call. This is useful to implement graceful shutdown.
func (queue *redisQueue) StopConsuming() <-chan struct{} {
finishedChan := make(chan struct{})
// We only stop consuming once
// This function returns immediately, while the work of actually stopping runs in a separate goroutine
go func() {
queue.lock.Lock()
defer queue.lock.Unlock()
select {
case <-queue.consumingStopped:
// already stopped, nothing to do
close(finishedChan)
return
default:
close(queue.consumingStopped)
queue.ackCancel()
queue.stopWg.Wait()
close(finishedChan)
}
}()
return finishedChan
}
// AddConsumer adds a consumer to the queue and returns its internal name
func (queue *redisQueue) AddConsumer(tag string, consumer Consumer) (name string, err error) {
name, err = queue.addConsumer(tag)
if err != nil {
return "", err
}
go queue.consumerConsume(consumer)
return name, nil
}
func (queue *redisQueue) consumerConsume(consumer Consumer) {
defer func() {
queue.stopWg.Done()
}()
for {
select {
case <-queue.consumingStopped: // prefer this case
return
default:
}
select {
case <-queue.consumingStopped:
return
case delivery, ok := <-queue.deliveryChan:
if !ok { // deliveryChan closed
return
}
consumer.Consume(delivery)
}
}
}
// AddConsumerFunc adds a consumer which is defined only by a function. This is
// similar to http.HandlerFunc and useful if your consumers don't need any
// state.
func (queue *redisQueue) AddConsumerFunc(tag string, consumerFunc ConsumerFunc) (string, error) {
return queue.AddConsumer(tag, consumerFunc)
}
// AddBatchConsumer is similar to AddConsumer, but for batches of deliveries
// timeout limits the amount of time waiting to fill an entire batch
// The timer is only started when the first message in a batch is received
func (queue *redisQueue) AddBatchConsumer(tag string, batchSize int64, timeout time.Duration, consumer BatchConsumer) (string, error) {
name, err := queue.addConsumer(tag)
if err != nil {
return "", err
}
go queue.consumerBatchConsume(batchSize, timeout, consumer)
return name, nil
}
// AddBatchConsumerFunc is similar to AddConsumerFunc, but for batches of deliveries
// timeout limits the amount of time waiting to fill an entire batch
// The timer is only started when the first message in a batch is received
func (queue *redisQueue) AddBatchConsumerFunc(tag string, batchSize int64, timeout time.Duration, batchConsumerFunc BatchConsumerFunc) (string, error) {
name, err := queue.addConsumer(tag)
if err != nil {
return "", err
}
go queue.consumerBatchConsume(batchSize, timeout, batchConsumerFunc)
return name, nil
}
func (queue *redisQueue) consumerBatchConsume(batchSize int64, timeout time.Duration, consumer BatchConsumer) {
defer func() {
queue.stopWg.Done()
}()
batch := []Delivery{}
for {
select {
case <-queue.consumingStopped: // prefer this case
return
default:
}
select {
case <-queue.consumingStopped:
return
case delivery, ok := <-queue.deliveryChan: // Wait for first delivery
if !ok { // deliveryChan closed
return
}
batch = append(batch, delivery)
batch, ok = queue.batchTimeout(batchSize, batch, timeout)
if !ok {
return
}
consumer.Consume(batch)
batch = batch[:0] // reset batch
}
}
}
func (queue *redisQueue) batchTimeout(batchSize int64, batch []Delivery, timeout time.Duration) (fullBatch []Delivery, ok bool) {
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-queue.consumingStopped: // prefer this case
return nil, false
default:
}
select {
case <-queue.consumingStopped: // consuming stopped: abort batch
return nil, false
case <-timer.C: // timeout: submit batch
return batch, true
case delivery, ok := <-queue.deliveryChan:
if !ok { // deliveryChan closed: abort batch
return nil, false
}
batch = append(batch, delivery)
if int64(len(batch)) >= batchSize {
return batch, true // once big enough: submit batch
}
}
}
}
func (queue *redisQueue) addConsumer(tag string) (name string, err error) {
queue.lock.Lock()
defer queue.lock.Unlock()
if err := queue.ensureConsuming(); err != nil {
return "", err
}
name = fmt.Sprintf("%s-%s", tag, RandomString(6))
// add consumer to list of consumers of this queue
if _, err := queue.redisClient.SAdd(queue.consumersKey, name); err != nil {
return "", err
}
queue.stopWg.Add(1)
// log.Printf("rmq queue added consumer %s %s", queue, name)
return name, nil
}
// PurgeReady removes all ready deliveries from the queue and returns the number of purged deliveries
func (queue *redisQueue) PurgeReady() (int64, error) {
return queue.deleteRedisList(queue.readyKey)
}
// PurgeRejected removes all rejected deliveries from the queue and returns the number of purged deliveries
func (queue *redisQueue) PurgeRejected() (int64, error) {
return queue.deleteRedisList(queue.rejectedKey)
}
// return number of deleted list items
// https://www.redisgreen.net/blog/deleting-large-lists
func (queue *redisQueue) deleteRedisList(key string) (int64, error) {
total, err := queue.redisClient.LLen(key)
if total == 0 {
return 0, err // nothing to do
}
// delete elements without blocking
for todo := total; todo > 0; todo -= purgeBatchSize {
// minimum of purgeBatchSize and todo
batchSize := purgeBatchSize
if batchSize > todo {
batchSize = todo
}
// remove one batch
err := queue.redisClient.LTrim(key, 0, -1-batchSize)
if err != nil {
return 0, err
}
}
return total, nil
}
// ReturnUnacked tries to return max unacked deliveries back to
// the ready queue and returns the number of returned deliveries
func (queue *redisQueue) ReturnUnacked(max int64) (count int64, error error) {
return queue.move(queue.unackedKey, queue.readyKey, max)
}
// ReturnRejected tries to return max rejected deliveries back to
// the ready queue and returns the number of returned deliveries
func (queue *redisQueue) ReturnRejected(max int64) (count int64, err error) {
return queue.move(queue.rejectedKey, queue.readyKey, max)
}
func (queue *redisQueue) move(from, to string, max int64) (n int64, error error) {
for n = 0; n < max; n++ {
switch _, err := queue.redisClient.RPopLPush(from, to); err {
case nil: // moved one
continue
case ErrorNotFound: // nothing left
return n, nil
default: // error
return 0, err
}
}
return n, nil
}
// Drain removes and returns 'count' elements from the queue. In case of an error,
// Drain return all elements removed until the error occurred and the error itself.
func (queue *redisQueue) Drain(count int64) ([]string, error) {
var (
n int64
err error
)
out := make([]string, 0, count)
for n = 0; n < count; n++ {
val, err := queue.redisClient.RPop(queue.readyKey)
if err != nil {
return out, err
}
out = append(out, val)
}
return out, err
}
// Destroy purges and removes the queue from the list of queues
func (queue *redisQueue) Destroy() (readyCount, rejectedCount int64, err error) {
readyCount, err = queue.PurgeReady()
if err != nil {
return 0, 0, err
}
rejectedCount, err = queue.PurgeRejected()
if err != nil {
return 0, 0, err
}
count, err := queue.redisClient.SRem(queuesKey, queue.name)
if err != nil {
return 0, 0, err
}
if count == 0 {
return 0, 0, ErrorNotFound
}
return readyCount, rejectedCount, nil
}
// closeInStaleConnection closes the queue in the associated connection by removing all related keys
// not supposed to be called on queues in active sessions
func (queue *redisQueue) closeInStaleConnection() error {
if _, err := queue.redisClient.Del(queue.unackedKey); err != nil {
return err
}
if _, err := queue.redisClient.Del(queue.consumersKey); err != nil {
return err
}
count, err := queue.redisClient.SRem(queue.queuesKey, queue.name)
if err != nil {
return err
}
if count == 0 {
return ErrorNotFound
}
return nil
}
func (queue *redisQueue) readyCount() (int64, error) {
return queue.redisClient.LLen(queue.readyKey)
}
func (queue *redisQueue) unackedCount() (int64, error) {
return queue.redisClient.LLen(queue.unackedKey)
}
func (queue *redisQueue) rejectedCount() (int64, error) {
return queue.redisClient.LLen(queue.rejectedKey)
}
func (queue *redisQueue) getConsumers() ([]string, error) {
return queue.redisClient.SMembers(queue.consumersKey)
}
// The caller of this method should be holding the queue.lock mutex
func (queue *redisQueue) ensureConsuming() error {
if queue.deliveryChan == nil {
return ErrorNotConsuming
}
select {
case <-queue.consumingStopped:
return ErrorConsumingStopped
default:
return nil
}
}
// jitteredDuration calculates and returns a value that is +/-10% the input duration
func jitteredDuration(duration time.Duration) time.Duration {
factor := 0.9 + rand.Float64()*0.2 // a jitter factor between 0.9 and 1.1 (+-10%)
return time.Duration(float64(duration) * factor)
}