Skip to content

Commit

Permalink
fix: Duplicated messages issue in dq.NewConsumer's Consume method
Browse files Browse the repository at this point in the history
  • Loading branch information
iwctwbai committed Apr 17, 2024
1 parent 7016060 commit ab6093a
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 14 deletions.
16 changes: 14 additions & 2 deletions dq/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ const (

type (
Producer interface {
atWithWrapper(body []byte, at time.Time) (string, error)
At(body []byte, at time.Time) (string, error)
Close() error
delayWithWrapper(body []byte, delay time.Duration) (string, error)
Delay(body []byte, delay time.Duration) (string, error)
Revoke(ids string) error
}
Expand Down Expand Up @@ -54,8 +56,9 @@ func NewProducer(beanstalks []Beanstalk) Producer {
}

func (p *producerCluster) At(body []byte, at time.Time) (string, error) {
wrapped := wrap(body, at)
return p.insert(func(node Producer) (string, error) {
return node.At(body, at)
return node.atWithWrapper(wrapped, at)
})
}

Expand All @@ -70,8 +73,9 @@ func (p *producerCluster) Close() error {
}

func (p *producerCluster) Delay(body []byte, delay time.Duration) (string, error) {
wrapped := wrap(body, time.Now().Add(delay))
return p.insert(func(node Producer) (string, error) {
return node.Delay(body, delay)
return node.delayWithWrapper(wrapped, delay)
})
}

Expand Down Expand Up @@ -152,3 +156,11 @@ func (p *producerCluster) insert(fn func(node Producer) (string, error)) (string

return "", be.Err()
}

func (p *producerCluster) atWithWrapper(body []byte, at time.Time) (string, error) {
return p.At(body, at)
}

func (p *producerCluster) delayWithWrapper(body []byte, delay time.Duration) (string, error) {
return p.Delay(body, delay)
}
22 changes: 10 additions & 12 deletions dq/producernode.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dq

import (
"bytes"
"errors"
"fmt"
"strconv"
Expand Down Expand Up @@ -29,27 +28,34 @@ func NewProducerNode(endpoint, tube string) Producer {
}

func (p *producerNode) At(body []byte, at time.Time) (string, error) {
return p.atWithWrapper(wrap(body, at), at)
}

func (p *producerNode) atWithWrapper(body []byte, at time.Time) (string, error) {
now := time.Now()
if at.Before(now) {
return "", ErrTimeBeforeNow
}

duration := at.Sub(now)
return p.Delay(body, duration)
return p.delayWithWrapper(body, duration)
}

func (p *producerNode) Close() error {
return p.conn.Close()
}

func (p *producerNode) Delay(body []byte, delay time.Duration) (string, error) {
return p.delayWithWrapper(wrap(body, time.Now().Add(delay)), delay)
}

func (p *producerNode) delayWithWrapper(body []byte, delay time.Duration) (string, error) {
conn, err := p.conn.get()
if err != nil {
return "", err
}

wrapped := p.wrap(body, time.Now().Add(delay))
id, err := conn.Put(wrapped, PriNormal, delay, defaultTimeToRun)
id, err := conn.Put(body, PriNormal, delay, defaultTimeToRun)
if err == nil {
return fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id), nil
}
Expand Down Expand Up @@ -112,11 +118,3 @@ func (p *producerNode) Revoke(jointId string) error {
// if not in this beanstalk, ignore
return nil
}

func (p *producerNode) wrap(body []byte, at time.Time) []byte {
var builder bytes.Buffer
builder.WriteString(strconv.FormatInt(at.UnixNano(), 10))
builder.WriteByte(timeSep)
builder.Write(body)
return builder.Bytes()
}
15 changes: 15 additions & 0 deletions dq/wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package dq

import (
"bytes"
"strconv"
"time"
)

func wrap(body []byte, at time.Time) []byte {
var builder bytes.Buffer
builder.WriteString(strconv.FormatInt(at.UnixNano(), 10))
builder.WriteByte(timeSep)
builder.Write(body)
return builder.Bytes()
}

0 comments on commit ab6093a

Please sign in to comment.