Skip to content

Commit

Permalink
Merge pull request #60 from ch3nnn/master
Browse files Browse the repository at this point in the history
fix: producerNode.Delay func not wrap()
  • Loading branch information
kevwan authored Apr 8, 2024
2 parents 28fb062 + 4d3c427 commit 15d2b23
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 38 deletions.
26 changes: 19 additions & 7 deletions dq/consumernode.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dq

import (
"errors"
"time"

"github.com/beanstalkd/go-beanstalk"
Expand Down Expand Up @@ -59,14 +60,25 @@ func (c *consumerNode) consumeEvents(consume Consume) {
}

// the error can only be beanstalk.NameError or beanstalk.ConnError
switch cerr := err.(type) {
case beanstalk.ConnError:
switch cerr.Err {
case beanstalk.ErrTimeout:
var cerr beanstalk.ConnError
switch {
case errors.As(err, &cerr):
switch {
case errors.Is(cerr.Err, beanstalk.ErrTimeout):
// timeout error on timeout, just continue the loop
case beanstalk.ErrBadChar, beanstalk.ErrBadFormat, beanstalk.ErrBuried, beanstalk.ErrDeadline,
beanstalk.ErrDraining, beanstalk.ErrEmpty, beanstalk.ErrInternal, beanstalk.ErrJobTooBig,
beanstalk.ErrNoCRLF, beanstalk.ErrNotFound, beanstalk.ErrNotIgnored, beanstalk.ErrTooLong:
case
errors.Is(cerr.Err, beanstalk.ErrBadChar),
errors.Is(cerr.Err, beanstalk.ErrBadFormat),
errors.Is(cerr.Err, beanstalk.ErrBuried),
errors.Is(cerr.Err, beanstalk.ErrDeadline),
errors.Is(cerr.Err, beanstalk.ErrDraining),
errors.Is(cerr.Err, beanstalk.ErrEmpty),
errors.Is(cerr.Err, beanstalk.ErrInternal),
errors.Is(cerr.Err, beanstalk.ErrJobTooBig),
errors.Is(cerr.Err, beanstalk.ErrNoCRLF),
errors.Is(cerr.Err, beanstalk.ErrNotFound),
errors.Is(cerr.Err, beanstalk.ErrNotIgnored),
errors.Is(cerr.Err, beanstalk.ErrTooLong):
// won't reset
logx.Error(err)
default:
Expand Down
16 changes: 2 additions & 14 deletions dq/producer.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package dq

import (
"bytes"
"log"
"math/rand"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -56,9 +54,8 @@ func NewProducer(beanstalks []Beanstalk) Producer {
}

func (p *producerCluster) At(body []byte, at time.Time) (string, error) {
wrapped := p.wrap(body, at)
return p.insert(func(node Producer) (string, error) {
return node.At(wrapped, at)
return node.At(body, at)
})
}

Expand All @@ -73,9 +70,8 @@ func (p *producerCluster) Close() error {
}

func (p *producerCluster) Delay(body []byte, delay time.Duration) (string, error) {
wrapped := p.wrap(body, time.Now().Add(delay))
return p.insert(func(node Producer) (string, error) {
return node.Delay(wrapped, delay)
return node.Delay(body, delay)
})
}

Expand Down Expand Up @@ -156,11 +152,3 @@ func (p *producerCluster) insert(fn func(node Producer) (string, error)) (string

return "", be.Err()
}

func (p *producerCluster) wrap(body []byte, at time.Time) []byte {
var builder bytes.Buffer
builder.WriteString(strconv.FormatInt(at.UnixNano(), 10))
builder.WriteByte(timeSep)
builder.Write(body)
return builder.Bytes()
}
38 changes: 31 additions & 7 deletions dq/producernode.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package dq

import (
"bytes"
"errors"
"fmt"
"strconv"
"strings"
"time"

"github.com/beanstalkd/go-beanstalk"
"github.com/zeromicro/go-zero/core/logx"
)

var ErrTimeBeforeNow = errors.New("can't schedule task to past time")
Expand Down Expand Up @@ -46,24 +48,38 @@ func (p *producerNode) Delay(body []byte, delay time.Duration) (string, error) {
return "", err
}

id, err := conn.Put(body, PriNormal, delay, defaultTimeToRun)
wrapped := p.wrap(body, time.Now().Add(delay))
id, err := conn.Put(wrapped, PriNormal, delay, defaultTimeToRun)
if err == nil {
return fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id), nil
}

// the error can only be beanstalk.NameError or beanstalk.ConnError
// just return when the error is beanstalk.NameError, don't reset
switch cerr := err.(type) {
case beanstalk.ConnError:
switch cerr.Err {
case beanstalk.ErrBadChar, beanstalk.ErrBadFormat, beanstalk.ErrBuried, beanstalk.ErrDeadline,
beanstalk.ErrDraining, beanstalk.ErrEmpty, beanstalk.ErrInternal, beanstalk.ErrJobTooBig,
beanstalk.ErrNoCRLF, beanstalk.ErrNotFound, beanstalk.ErrNotIgnored, beanstalk.ErrTooLong:
var cerr beanstalk.ConnError
switch {
case errors.As(err, &cerr):
switch {
case
errors.Is(cerr.Err, beanstalk.ErrBadChar),
errors.Is(cerr.Err, beanstalk.ErrBadFormat),
errors.Is(cerr.Err, beanstalk.ErrBuried),
errors.Is(cerr.Err, beanstalk.ErrDeadline),
errors.Is(cerr.Err, beanstalk.ErrDraining),
errors.Is(cerr.Err, beanstalk.ErrEmpty),
errors.Is(cerr.Err, beanstalk.ErrInternal),
errors.Is(cerr.Err, beanstalk.ErrJobTooBig),
errors.Is(cerr.Err, beanstalk.ErrNoCRLF),
errors.Is(cerr.Err, beanstalk.ErrNotFound),
errors.Is(cerr.Err, beanstalk.ErrNotIgnored),
errors.Is(cerr.Err, beanstalk.ErrTooLong):
// won't reset
default:
// beanstalk.ErrOOM, beanstalk.ErrTimeout, beanstalk.ErrUnknown and other errors
p.conn.reset()
}
default:
logx.Error(err)
}

return "", err
Expand Down Expand Up @@ -96,3 +112,11 @@ func (p *producerNode) Revoke(jointId string) error {
// if not in this beanstalk, ignore
return nil
}

func (p *producerNode) wrap(body []byte, at time.Time) []byte {
var builder bytes.Buffer
builder.WriteString(strconv.FormatInt(at.UnixNano(), 10))
builder.WriteByte(timeSep)
builder.Write(body)
return builder.Bytes()
}
28 changes: 28 additions & 0 deletions example/dq/producer/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

import (
"fmt"
"strconv"
"time"

"github.com/zeromicro/go-queue/dq"
)

func main() {

Check failure on line 11 in example/dq/producer/cluster.go

View workflow job for this annotation

GitHub Actions / Build

other declaration of main
producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11300",
Tube: "tube",
},
})
for i := 1000; i < 1005; i++ {
_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
if err != nil {
fmt.Println(err)
}
}
}
12 changes: 2 additions & 10 deletions example/dq/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,8 @@ import (
)

func main() {

Check failure on line 11 in example/dq/producer/producer.go

View workflow job for this annotation

GitHub Actions / Build

main redeclared in this block
producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11300",
Tube: "tube",
},
})
producer := dq.NewProducerNode("localhost:11300", "tube")

for i := 1000; i < 1005; i++ {
_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
if err != nil {
Expand Down

0 comments on commit 15d2b23

Please sign in to comment.