Skip to content

Commit

Permalink
Fix Redis monitoring version (#109)
Browse files Browse the repository at this point in the history
  • Loading branch information
aziule authored Jan 18, 2022
1 parent c880c51 commit e0c5e44
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 20 deletions.
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,9 @@ services:
- '8500:8500'
- '8600:8600'
- '8600:8600/udp'

redis:
container_name: harvester_redis_dev
image: redis:alpine
ports:
- '6379:6379'
26 changes: 25 additions & 1 deletion monitor/redis/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package redis

import (
"context"
"crypto/md5"
"encoding/hex"
"errors"
"time"

Expand All @@ -16,6 +18,8 @@ import (
type Watcher struct {
client redis.UniversalClient
keys []string
versions []uint64
hashes []string
pollInterval time.Duration
}

Expand All @@ -34,6 +38,8 @@ func New(client redis.UniversalClient, pollInterval time.Duration, keys []string
return &Watcher{
client: client,
keys: keys,
versions: make([]uint64, len(keys)),
hashes: make([]string, len(keys)),
pollInterval: pollInterval,
}, nil
}
Expand Down Expand Up @@ -77,8 +83,26 @@ func (w *Watcher) getValues(ctx context.Context, ch chan<- []*change.Change) {
continue
}

changes = append(changes, change.New(config.SourceRedis, key, values[i].(string), 0))
value := values[i].(string)
hash := w.hash(value)
if hash == w.hashes[i] {
continue
}

w.versions[i]++
w.hashes[i] = hash

changes = append(changes, change.New(config.SourceRedis, key, value, w.versions[i]))
}

if len(changes) == 0 {
return
}

ch <- changes
}

func (w *Watcher) hash(value string) string {
hash := md5.Sum([]byte(value))
return hex.EncodeToString(hash[:])
}
69 changes: 50 additions & 19 deletions monitor/redis/watcher_integration_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build integration
// +build integration

package redis
Expand All @@ -8,6 +9,7 @@ import (
"time"

"github.com/beatlabs/harvester/change"
"github.com/beatlabs/harvester/config"
"github.com/go-redis/redis/v8"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -27,40 +29,57 @@ func TestWatch(t *testing.T) {
val3 = "value3"
)

set(t, client, key1, val1)
set(t, client, key2, val2)
set(t, client, key3, val3)
defer func() {
del(t, client, key1)
del(t, client, key2)
del(t, client, key3)
}()

ch := make(chan []*change.Change)
w, err := New(client, 10*time.Millisecond, []string{key1, key2, key3})
require.NoError(t, err)
require.NotNil(t, w)

ctx, cnl := context.WithCancel(context.Background())
defer cnl()

// Initial values, set even before watching - not all keys have a value
set(t, client, key1, val1)
set(t, client, key2, val1)

// Start watching
ch := make(chan []*change.Change, 100)
defer close(ch)
err = w.Watch(ctx, ch)
require.NoError(t, err)

for i := 0; i < 2; i++ {
cc := <-ch
for _, cng := range cc {
switch cng.Key() {
case key1:
assert.Equal(t, val1, cng.Value())
case key2:
assert.Equal(t, val2, cng.Value())
case key3:
assert.Equal(t, val3, cng.Value())
default:
assert.Fail(t, "key invalid", cng.Key())
}
assert.True(t, cng.Version() == 0)
}
// First values update
time.Sleep(1 * time.Second)
set(t, client, key1, val1) // Same value
set(t, client, key2, val2)
set(t, client, key3, val1) // First value for this key

// Second values update
time.Sleep(1 * time.Second)
set(t, client, key1, val1) // Same value
set(t, client, key2, val1) // Second value - same as the initial value
set(t, client, key3, val3)

time.Sleep(1 * time.Second)

found := transformChangesToSlices(ch)
expected := []*change.Change{
// Initial values
change.New(config.SourceRedis, key1, val1, 1),
change.New(config.SourceRedis, key2, val1, 1),
// First update
change.New(config.SourceRedis, key2, val2, 2),
change.New(config.SourceRedis, key3, val1, 1),
// Second update
change.New(config.SourceRedis, key2, val1, 3),
change.New(config.SourceRedis, key3, val3, 2),
}

assert.Equal(t, expected, found)
}

func set(t *testing.T, client redis.UniversalClient, key string, val string) {
Expand All @@ -74,3 +93,15 @@ func del(t *testing.T, client redis.UniversalClient, key string) {
require.NoError(t, err)
require.Equal(t, int64(1), delResult)
}

func transformChangesToSlices(ch chan []*change.Change) []*change.Change {
changes := make([]*change.Change, 0)
for {
select {
case cc := <-ch:
changes = append(changes, cc...)
default:
return changes
}
}
}
83 changes: 83 additions & 0 deletions monitor/redis/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package redis

import (
"context"
"sync"
"testing"
"time"

"github.com/beatlabs/harvester/change"
"github.com/beatlabs/harvester/config"
"github.com/go-redis/redis/v8"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -65,3 +67,84 @@ func TestWatcher_Watch(t *testing.T) {
})
}
}

func TestWatcher_Versioning(t *testing.T) {
client := (&clientStub{t: t}).
WithValues("val1.1", "val2.1", "val3.1"). // Initial values
WithValues("val1.1", "val2.2", "val3.2"). // Only keys 2 and 3 are updated
WithValues("val1.1", "val2.1", "val3.2") // Only 2 is updated, to its previous value

expected := [][]*change.Change{
{
change.New(config.SourceRedis, "key1", "val1.1", 1),
change.New(config.SourceRedis, "key2", "val2.1", 1),
change.New(config.SourceRedis, "key3", "val3.1", 1),
},
{
change.New(config.SourceRedis, "key2", "val2.2", 2),
change.New(config.SourceRedis, "key3", "val3.2", 2),
},
{
change.New(config.SourceRedis, "key2", "val2.1", 3),
},
}

w, err := New(client, 1*time.Millisecond, []string{"key1", "key2", "key3"})
require.NoError(t, err)
assert.Equal(t, []uint64{0, 0, 0}, w.versions)
assert.Equal(t, []string{"", "", ""}, w.hashes)

ctx, cancel := context.WithCancel(context.Background())

ch := make(chan []*change.Change, 10)
err = w.Watch(ctx, ch)
assert.NoError(t, err)

time.Sleep(100 * time.Millisecond)

cancel()

found := make([][]*change.Change, 0)

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
for {
select {
case cc := <-ch:
if len(cc) == 0 {
break
}
found = append(found, cc)
default:
wg.Done()
return
}
}
}()
wg.Wait()

assert.Equal(t, expected, found)
}

type clientStub struct {
t *testing.T
*redis.Client

cmds []*redis.SliceCmd
}

func (c *clientStub) WithValues(values ...interface{}) *clientStub {
c.cmds = append(c.cmds, redis.NewSliceResult(values, nil))
return c
}

func (c *clientStub) MGet(_ context.Context, keys ...string) *redis.SliceCmd {
if len(c.cmds) == 0 {
return redis.NewSliceResult(make([]interface{}, len(keys)), nil)
}
shifted := c.cmds[0]
c.cmds = c.cmds[1:]
return shifted
}

0 comments on commit e0c5e44

Please sign in to comment.