Skip to content

Commit

Permalink
feat: support get the pos of message in queue
Browse files Browse the repository at this point in the history
  • Loading branch information
asjdf committed Sep 11, 2024
1 parent 1d31dc9 commit 233ee73
Show file tree
Hide file tree
Showing 13 changed files with 79 additions and 10 deletions.
2 changes: 1 addition & 1 deletion example/batch_consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"syscall"
"time"

"github.com/adjust/rmq/v5"
"github.com/05sec/rmq/v5"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion example/cleaner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"log"
"time"

"github.com/adjust/rmq/v5"
"github.com/05sec/rmq/v5"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion example/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"syscall"
"time"

"github.com/adjust/rmq/v5"
"github.com/05sec/rmq/v5"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion example/handler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"log"
"net/http"

"github.com/adjust/rmq/v5"
"github.com/05sec/rmq/v5"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion example/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"log"
"time"

"github.com/adjust/rmq/v5"
"github.com/05sec/rmq/v5"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion example/purger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package main
import (
"log"

"github.com/adjust/rmq/v5"
"github.com/05sec/rmq/v5"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion example/returner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"log"
"math"

"github.com/adjust/rmq/v5"
"github.com/05sec/rmq/v5"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/adjust/rmq/v5
module github.com/05sec/rmq/v5

go 1.17

Expand Down
2 changes: 1 addition & 1 deletion header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"strings"
"testing"

"github.com/adjust/rmq/v5"
"github.com/05sec/rmq/v5"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down
52 changes: 52 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,21 @@ const (
purgeBatchSize = int64(100)
)

type DeliveryState int

const (
PayloadStateUnknown DeliveryState = iota
PayloadStateReady
PayloadStateUnacked
PayloadStateAcked
PayloadStateRejected
)

type Queue interface {
Publish(payload ...string) error
PublishBytes(payload ...[]byte) error
Locate(payload string) (DeliveryState, int64, error)
LocateBytes(payload []byte) (DeliveryState, int64, error)
SetPushQueue(pushQueue Queue)
Remove(payload string, count int64, removeFromRejected bool) error
RemoveBytes(payload []byte, count int64, removeFromRejected bool) error
Expand All @@ -33,6 +45,8 @@ type Queue interface {
Destroy() (readyCount, rejectedCount int64, err error)
Drain(count int64) ([]string, error)

ReadyKey() string

// internals
// used in cleaner
closeInStaleConnection() error
Expand All @@ -43,6 +57,8 @@ type Queue interface {
getConsumers() ([]string, error)
}

var _ Queue = (*redisQueue)(nil)

type redisQueue struct {
name string
connectionName string
Expand Down Expand Up @@ -105,6 +121,10 @@ func (queue *redisQueue) String() string {
return fmt.Sprintf("[%s conn:%s]", queue.name, queue.connectionName)
}

func (queue *redisQueue) ReadyKey() string {
return queue.readyKey
}

// Publish adds a delivery with the given payload to the queue
// returns how many deliveries are in the queue afterwards
func (queue *redisQueue) Publish(payload ...string) error {
Expand All @@ -121,6 +141,38 @@ func (queue *redisQueue) PublishBytes(payload ...[]byte) error {
return queue.Publish(stringifiedBytes...)
}

func (queue *redisQueue) Locate(payload string) (DeliveryState, int64, error) {
index, err := queue.redisClient.LPos(queue.readyKey, payload)
if err != nil {
return PayloadStateUnknown, 0, err
}
if index != -1 {
return PayloadStateReady, index, nil
}

index, err = queue.redisClient.LPos(queue.unackedKey, payload)
if err != nil {
return PayloadStateUnknown, 0, err
}
if index != -1 {
return PayloadStateUnacked, index, nil
}

index, err = queue.redisClient.LPos(queue.rejectedKey, payload)
if err != nil {
return PayloadStateUnknown, 0, err
}
if index != -1 {
return PayloadStateRejected, index, nil
}

return PayloadStateAcked, -1, nil
}

func (queue *redisQueue) LocateBytes(payload []byte) (DeliveryState, int64, error) {
return queue.Locate(string(payload))
}

// Remove elements with specific value from the queue (WARN: this operation is pretty slow with O(N+M) complexity where N is length of the queue and M is number of removed elements)
func (queue *redisQueue) Remove(payload string, count int64, removeFromRejected bool) error {
_, err := queue.redisClient.LRem(queue.readyKey, count, payload)
Expand Down
5 changes: 4 additions & 1 deletion redis_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package rmq

import "time"
import (
"time"
)

type RedisClient interface {
// simple keys
Expand All @@ -11,6 +13,7 @@ type RedisClient interface {
// lists
LPush(key string, value ...string) (total int64, err error)
LLen(key string) (affected int64, err error)
LPos(key string, value string) (index int64, err error)
LRem(key string, count int64, value string) (affected int64, err error)
LTrim(key string, start, stop int64) error
RPopLPush(source, destination string) (value string, err error)
Expand Down
6 changes: 6 additions & 0 deletions redis_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/redis/go-redis/v9"
)

var _ RedisClient = &RedisWrapper{}

type RedisWrapper struct {
rawClient redis.Cmdable
}
Expand All @@ -32,6 +34,10 @@ func (wrapper RedisWrapper) LLen(key string) (affected int64, err error) {
return wrapper.rawClient.LLen(context.TODO(), key).Result()
}

func (wrapper RedisWrapper) LPos(key string, value string) (index int64, err error) {
return wrapper.rawClient.LPos(context.TODO(), key, value, redis.LPosArgs{}).Result()
}

func (wrapper RedisWrapper) LRem(key string, count int64, value string) (affected int64, err error) {
return wrapper.rawClient.LRem(context.TODO(), key, int64(count), value).Result()
}
Expand Down
8 changes: 8 additions & 0 deletions test_redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/redis/go-redis/v9"
)

var _ RedisClient = &TestRedisClient{}

// TestRedisClient is a mock for redis
type TestRedisClient struct {
store sync.Map
Expand Down Expand Up @@ -167,6 +169,12 @@ func (client *TestRedisClient) LLen(key string) (affected int64, err error) {
return int64(len(list)), nil
}

// LPos returns the index of the first element matching element in the list stored at key.
func (client *TestRedisClient) LPos(key string, value string) (index int64, err error) {
//TODO implement me
panic("implement me")
}

// LRem removes the first count occurrences of elements equal to
// value from the list stored at key. The count argument influences
// the operation in the following ways:
Expand Down

0 comments on commit 233ee73

Please sign in to comment.