Skip to content

Commit

Permalink
chore: coding style
Browse files Browse the repository at this point in the history
  • Loading branch information
kevwan committed Apr 18, 2024
1 parent 004b867 commit 328f46c
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 60 deletions.
37 changes: 17 additions & 20 deletions dq/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ const (

type (
Producer interface {
atWithWrapper(body []byte, at time.Time) (string, error)
At(body []byte, at time.Time) (string, error)
Close() error
delayWithWrapper(body []byte, delay time.Duration) (string, error)
Delay(body []byte, delay time.Duration) (string, error)
Revoke(ids string) error

at(body []byte, at time.Time) (string, error)
delay(body []byte, delay time.Duration) (string, error)
}

producerCluster struct {
Expand Down Expand Up @@ -57,9 +58,7 @@ func NewProducer(beanstalks []Beanstalk) Producer {

func (p *producerCluster) At(body []byte, at time.Time) (string, error) {
wrapped := wrap(body, at)
return p.insert(func(node Producer) (string, error) {
return node.atWithWrapper(wrapped, at)
})
return p.at(wrapped, at)
}

func (p *producerCluster) Close() error {
Expand All @@ -74,9 +73,7 @@ func (p *producerCluster) Close() error {

func (p *producerCluster) Delay(body []byte, delay time.Duration) (string, error) {
wrapped := wrap(body, time.Now().Add(delay))
return p.insert(func(node Producer) (string, error) {
return node.delayWithWrapper(wrapped, delay)
})
return p.delay(wrapped, delay)
}

func (p *producerCluster) Revoke(ids string) error {
Expand All @@ -98,10 +95,22 @@ func (p *producerCluster) Revoke(ids string) error {
return be.Err()
}

func (p *producerCluster) at(body []byte, at time.Time) (string, error) {
return p.insert(func(node Producer) (string, error) {
return node.at(body, at)
})
}

func (p *producerCluster) cloneNodes() []Producer {
return append([]Producer(nil), p.nodes...)
}

func (p *producerCluster) delay(body []byte, delay time.Duration) (string, error) {
return p.insert(func(node Producer) (string, error) {
return node.delay(body, delay)
})
}

func (p *producerCluster) getWriteNodes() []Producer {
if len(p.nodes) <= replicaNodes {
return p.nodes
Expand Down Expand Up @@ -156,15 +165,3 @@ func (p *producerCluster) insert(fn func(node Producer) (string, error)) (string

return "", be.Err()
}

func (p *producerCluster) atWithWrapper(body []byte, at time.Time) (string, error) {
return p.insert(func(node Producer) (string, error) {
return node.atWithWrapper(body, at)
})
}

func (p *producerCluster) delayWithWrapper(body []byte, delay time.Duration) (string, error) {
return p.insert(func(node Producer) (string, error) {
return node.delayWithWrapper(body, delay)
})
}
80 changes: 40 additions & 40 deletions dq/producernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,56 @@ func NewProducerNode(endpoint, tube string) Producer {
}

func (p *producerNode) At(body []byte, at time.Time) (string, error) {
return p.atWithWrapper(wrap(body, at), at)
return p.at(wrap(body, at), at)
}

func (p *producerNode) atWithWrapper(body []byte, at time.Time) (string, error) {
func (p *producerNode) Close() error {
return p.conn.Close()
}

func (p *producerNode) Delay(body []byte, delay time.Duration) (string, error) {
return p.delay(wrap(body, time.Now().Add(delay)), delay)
}

func (p *producerNode) Revoke(jointId string) error {
ids := strings.Split(jointId, idSep)
for _, id := range ids {
fields := strings.Split(id, "/")
if len(fields) < 3 {
continue
}
if fields[0] != p.endpoint || fields[1] != p.tube {
continue
}

conn, err := p.conn.get()
if err != nil {
return err
}

n, err := strconv.ParseUint(fields[2], 10, 64)
if err != nil {
return err
}

return conn.Delete(n)
}

// if not in this beanstalk, ignore
return nil
}

func (p *producerNode) at(body []byte, at time.Time) (string, error) {
now := time.Now()
if at.Before(now) {
return "", ErrTimeBeforeNow
}

duration := at.Sub(now)
return p.delayWithWrapper(body, duration)
return p.delay(body, duration)
}

func (p *producerNode) Close() error {
return p.conn.Close()
}

func (p *producerNode) Delay(body []byte, delay time.Duration) (string, error) {
return p.delayWithWrapper(wrap(body, time.Now().Add(delay)), delay)
}

func (p *producerNode) delayWithWrapper(body []byte, delay time.Duration) (string, error) {
func (p *producerNode) delay(body []byte, delay time.Duration) (string, error) {
conn, err := p.conn.get()
if err != nil {
return "", err
Expand Down Expand Up @@ -90,31 +118,3 @@ func (p *producerNode) delayWithWrapper(body []byte, delay time.Duration) (strin

return "", err
}

func (p *producerNode) Revoke(jointId string) error {
ids := strings.Split(jointId, idSep)
for _, id := range ids {
fields := strings.Split(id, "/")
if len(fields) < 3 {
continue
}
if fields[0] != p.endpoint || fields[1] != p.tube {
continue
}

conn, err := p.conn.get()
if err != nil {
return err
}

n, err := strconv.ParseUint(fields[2], 10, 64)
if err != nil {
return err
}

return conn.Delete(n)
}

// if not in this beanstalk, ignore
return nil
}

0 comments on commit 328f46c

Please sign in to comment.