diff --git a/dq/producer.go b/dq/producer.go index d147db7..d2905b9 100644 --- a/dq/producer.go +++ b/dq/producer.go @@ -158,9 +158,13 @@ func (p *producerCluster) insert(fn func(node Producer) (string, error)) (string } func (p *producerCluster) atWithWrapper(body []byte, at time.Time) (string, error) { - return p.At(body, at) + return p.insert(func(node Producer) (string, error) { + return node.atWithWrapper(body, at) + }) } func (p *producerCluster) delayWithWrapper(body []byte, delay time.Duration) (string, error) { - return p.Delay(body, delay) + return p.insert(func(node Producer) (string, error) { + return node.delayWithWrapper(body, delay) + }) }