diff --git a/dq/producer.go b/dq/producer.go index 3aff57b..d147db7 100644 --- a/dq/producer.go +++ b/dq/producer.go @@ -19,8 +19,10 @@ const ( type ( Producer interface { + atWithWrapper(body []byte, at time.Time) (string, error) At(body []byte, at time.Time) (string, error) Close() error + delayWithWrapper(body []byte, delay time.Duration) (string, error) Delay(body []byte, delay time.Duration) (string, error) Revoke(ids string) error } @@ -54,8 +56,9 @@ func NewProducer(beanstalks []Beanstalk) Producer { } func (p *producerCluster) At(body []byte, at time.Time) (string, error) { + wrapped := wrap(body, at) return p.insert(func(node Producer) (string, error) { - return node.At(body, at) + return node.atWithWrapper(wrapped, at) }) } @@ -70,8 +73,9 @@ func (p *producerCluster) Close() error { } func (p *producerCluster) Delay(body []byte, delay time.Duration) (string, error) { + wrapped := wrap(body, time.Now().Add(delay)) return p.insert(func(node Producer) (string, error) { - return node.Delay(body, delay) + return node.delayWithWrapper(wrapped, delay) }) } @@ -152,3 +156,11 @@ func (p *producerCluster) insert(fn func(node Producer) (string, error)) (string return "", be.Err() } + +func (p *producerCluster) atWithWrapper(body []byte, at time.Time) (string, error) { + return p.At(body, at) +} + +func (p *producerCluster) delayWithWrapper(body []byte, delay time.Duration) (string, error) { + return p.Delay(body, delay) +} diff --git a/dq/producernode.go b/dq/producernode.go index ef2faa5..0ebd30c 100644 --- a/dq/producernode.go +++ b/dq/producernode.go @@ -1,7 +1,6 @@ package dq import ( - "bytes" "errors" "fmt" "strconv" @@ -29,13 +28,17 @@ func NewProducerNode(endpoint, tube string) Producer { } func (p *producerNode) At(body []byte, at time.Time) (string, error) { + return p.atWithWrapper(wrap(body, at), at) +} + +func (p *producerNode) atWithWrapper(body []byte, at time.Time) (string, error) { now := time.Now() if at.Before(now) { return "", ErrTimeBeforeNow } duration := at.Sub(now) - return p.Delay(body, duration) + return p.delayWithWrapper(body, duration) } func (p *producerNode) Close() error { @@ -43,13 +46,16 @@ func (p *producerNode) Close() error { } func (p *producerNode) Delay(body []byte, delay time.Duration) (string, error) { + return p.delayWithWrapper(wrap(body, time.Now().Add(delay)), delay) +} + +func (p *producerNode) delayWithWrapper(body []byte, delay time.Duration) (string, error) { conn, err := p.conn.get() if err != nil { return "", err } - wrapped := p.wrap(body, time.Now().Add(delay)) - id, err := conn.Put(wrapped, PriNormal, delay, defaultTimeToRun) + id, err := conn.Put(body, PriNormal, delay, defaultTimeToRun) if err == nil { return fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id), nil } @@ -112,11 +118,3 @@ func (p *producerNode) Revoke(jointId string) error { // if not in this beanstalk, ignore return nil } - -func (p *producerNode) wrap(body []byte, at time.Time) []byte { - var builder bytes.Buffer - builder.WriteString(strconv.FormatInt(at.UnixNano(), 10)) - builder.WriteByte(timeSep) - builder.Write(body) - return builder.Bytes() -} diff --git a/dq/wrapper.go b/dq/wrapper.go new file mode 100644 index 0000000..ec18dba --- /dev/null +++ b/dq/wrapper.go @@ -0,0 +1,15 @@ +package dq + +import ( + "bytes" + "strconv" + "time" +) + +func wrap(body []byte, at time.Time) []byte { + var builder bytes.Buffer + builder.WriteString(strconv.FormatInt(at.UnixNano(), 10)) + builder.WriteByte(timeSep) + builder.Write(body) + return builder.Bytes() +}