Skip to content

Commit

Permalink
PS-827: Enable harvester to use dynamic prefix for keys (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
wimspaargaren authored Oct 8, 2021
1 parent 6aaf505 commit 84469b7
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 15 deletions.
21 changes: 14 additions & 7 deletions harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func (h *harvester) Harvest(ctx context.Context) error {
}

type consulConfig struct {
addr, dataCenter, token string
timeout time.Duration
addr, dataCenter, token, folderPrefix string
timeout time.Duration
}

// Builder of a harvester instance.
Expand Down Expand Up @@ -101,14 +101,21 @@ func (b *Builder) WithConsulSeed(addr, dataCenter, token string, timeout time.Du
// WithConsulMonitor enables support for monitoring key/prefixes on ConsulLogger. It automatically parses the config
// and monitors every field found tagged with ConsulLogger.
func (b *Builder) WithConsulMonitor(addr, dataCenter, token string, timeout time.Duration) *Builder {
return b.WithConsulFolderPrefixMonitor(addr, dataCenter, token, "", timeout)
}

// WithConsulFolderPrefixMonitor enables support for monitoring key/prefixes on ConsulLogger. It automatically parses the config
// and monitors every field found tagged with ConsulLogger.
func (b *Builder) WithConsulFolderPrefixMonitor(addr, dataCenter, token, folderPrefix string, timeout time.Duration) *Builder {
if b.err != nil {
return b
}
b.monitorConsulCfg = &consulConfig{
addr: addr,
dataCenter: dataCenter,
token: token,
timeout: timeout,
addr: addr,
dataCenter: dataCenter,
token: token,
folderPrefix: folderPrefix,
timeout: timeout,
}
return b
}
Expand Down Expand Up @@ -254,7 +261,7 @@ func (b *Builder) setupConsulMonitoring(cfg *config.Config) (*consul.Watcher, er
continue
}
log.Debugf(`automatically monitoring consul key "%s"`, consulKey)
items = append(items, consul.NewKeyItem(consulKey))
items = append(items, consul.NewKeyItemWithPrefix(consulKey, b.monitorConsulCfg.folderPrefix))
}
return consul.New(b.monitorConsulCfg.addr, b.monitorConsulCfg.dataCenter, b.monitorConsulCfg.token,
b.monitorConsulCfg.timeout, items...)
Expand Down
39 changes: 39 additions & 0 deletions harvester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package harvester

import (
"context"
"path/filepath"
"testing"
"time"

Expand Down Expand Up @@ -88,6 +89,7 @@ func TestCreateWithConsulAndRedis(t *testing.T) {
got, err := New(tt.args.cfg).
WithConsulSeed(tt.args.consulAddress, "", "", 0).
WithConsulMonitor(tt.args.consulAddress, "", "", 0).
WithConsulFolderPrefixMonitor(tt.args.consulAddress, "", "", "", 0).
WithRedisSeed(tt.args.seedRedisClient).
WithRedisMonitor(tt.args.monitorRedisClient, tt.args.monitoringPollInterval).
Create()
Expand Down Expand Up @@ -128,6 +130,43 @@ func TestWithNotification(t *testing.T) {
}
}

func TestWithConsulFolderPrefixMonitor(t *testing.T) {
tests := []struct {
Name string
InputFolderPrefix string
ExpectedKeyLocation string
}{
{
Name: "Setup Consul with folder prefix",
InputFolderPrefix: "folder/prefix",
ExpectedKeyLocation: "folder/prefix/key1",
},
{
Name: "Setup Consul with empty folder prefix",
ExpectedKeyLocation: "key1",
},
{
Name: "Setup Consul with folder prefix trailing /",
InputFolderPrefix: "folder/prefix/",
ExpectedKeyLocation: "folder/prefix/key1",
},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
builder := New(testConfig{})
builder.WithConsulFolderPrefixMonitor("addr", "data-center", "token", test.InputFolderPrefix, time.Second*42)

assert.Equal(t, "addr", builder.monitorConsulCfg.addr)
assert.Equal(t, "data-center", builder.monitorConsulCfg.dataCenter)
assert.Equal(t, "token", builder.monitorConsulCfg.token)
assert.Equal(t, time.Second*42, builder.monitorConsulCfg.timeout)
assert.Equal(t, test.ExpectedKeyLocation, filepath.Join(builder.monitorConsulCfg.folderPrefix, "key1"))
})
}

}

func TestCreate_NoConsulOrRedis(t *testing.T) {
cfg := &testConfigNoConsul{}
got, err := New(cfg).Create()
Expand Down
19 changes: 13 additions & 6 deletions monitor/consul/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package consul
import (
"context"
"errors"
"path"
"time"

"github.com/beatlabs/harvester/change"
Expand All @@ -15,15 +16,21 @@ import (

// Item definition.
type Item struct {
tp string
key string
tp string
key string
prefix string
}

// NewKeyItem creates a new key watch item for the watcher.
func NewKeyItem(key string) Item {
return Item{tp: "key", key: key}
}

// NewKeyItemWithPrefix creates a new key item for a given key and prefix.
func NewKeyItemWithPrefix(key, prefix string) Item {
return Item{tp: "key", key: key, prefix: prefix}
}

// NewPrefixItem creates a prefix key watch item for the watcher.
func NewPrefixItem(key string) Item {
return Item{tp: "keyprefix", key: key}
Expand Down Expand Up @@ -72,7 +79,7 @@ func (w *Watcher) Watch(ctx context.Context, ch chan<- []*change.Change) error {
var err error
switch i.tp {
case "key":
pl, err = w.createKeyPlan(i.key, ch)
pl, err = w.createKeyPlanWithPrefix(i.key, i.prefix, ch)
case "keyprefix":
pl, err = w.createKeyPrefixPlan(i.key, ch)
}
Expand Down Expand Up @@ -100,8 +107,8 @@ func (w *Watcher) Watch(ctx context.Context, ch chan<- []*change.Change) error {
return nil
}

func (w *Watcher) createKeyPlan(key string, ch chan<- []*change.Change) (*watch.Plan, error) {
pl, err := w.getPlan("key", key)
func (w *Watcher) createKeyPlanWithPrefix(key, prefix string, ch chan<- []*change.Change) (*watch.Plan, error) {
pl, err := w.getPlan("key", path.Join(prefix, key))
if err != nil {
return nil, err
}
Expand All @@ -113,7 +120,7 @@ func (w *Watcher) createKeyPlan(key string, ch chan<- []*change.Change) (*watch.
if !ok {
log.Errorf("data is not kv pair: %v", data)
} else {
ch <- []*change.Change{change.New(config.SourceConsul, pair.Key, string(pair.Value), pair.ModifyIndex)}
ch <- []*change.Change{change.New(config.SourceConsul, key, string(pair.Value), pair.ModifyIndex)}
}
}
log.Debugf("plan for key %s created", key)
Expand Down
10 changes: 8 additions & 2 deletions monitor/consul/watcher_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestMain(m *testing.M) {

func TestWatch(t *testing.T) {
ch := make(chan []*change.Change)
w, err := New(addr, "", "", 0, NewKeyItem("key1"), NewPrefixItem("prefix1"))
w, err := New(addr, "", "", 0, NewKeyItemWithPrefix("key4", "consul/folder"), NewKeyItemWithPrefix("key1", ""), NewPrefixItem("prefix"))
require.NoError(t, err)
require.NotNil(t, w)
ctx, cnl := context.WithCancel(context.Background())
Expand All @@ -61,6 +61,8 @@ func TestWatch(t *testing.T) {
assert.Equal(t, "3", cng.Value())
case "key1":
assert.Equal(t, "1", cng.Value())
case "key4":
assert.Equal(t, "42", cng.Value())
default:
assert.Fail(t, "key invalid", cng.Key())
}
Expand All @@ -82,7 +84,11 @@ func cleanup(consul *api.Client) error {
}

func setup(consul *api.Client) error {
_, err := consul.KV().Put(&api.KVPair{Key: "key1", Value: []byte("1")}, nil)
_, err := consul.KV().Put(&api.KVPair{Key: "consul/folder/key4", Value: []byte("42")}, nil)
if err != nil {
return err
}
_, err = consul.KV().Put(&api.KVPair{Key: "key1", Value: []byte("1")}, nil)
if err != nil {
return err
}
Expand Down

0 comments on commit 84469b7

Please sign in to comment.