Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: routinegroup & etcd watch goroutine leak #4514

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 68 additions & 11 deletions core/discov/internal/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@
return c.monitor(key, l, exactMatch)
}

func (r *Registry) UnMonitor(endpoints []string, key string, l UpdateListener) {
c, exists := r.getCluster(endpoints)
if !exists {
return
}

Check warning on line 66 in core/discov/internal/registry.go

View check run for this annotation

Codecov / codecov/patch

core/discov/internal/registry.go#L65-L66

Added lines #L65 - L66 were not covered by tests

c.unMonitor(key, l)
}

func (r *Registry) getCluster(endpoints []string) (c *cluster, exists bool) {
clusterKey := getClusterKey(endpoints)
r.lock.RLock()
Expand Down Expand Up @@ -88,6 +97,8 @@
done chan lang.PlaceholderType
lock sync.RWMutex
exactMatch bool
watchCtx map[string]context.CancelFunc
watchFlag map[string]bool
}

func newCluster(endpoints []string) *cluster {
Expand All @@ -98,6 +109,8 @@
listeners: make(map[string][]UpdateListener),
watchGroup: threading.NewRoutineGroup(),
done: make(chan lang.PlaceholderType),
watchCtx: make(map[string]context.CancelFunc),
watchFlag: make(map[string]bool),
}
}

Expand Down Expand Up @@ -260,19 +273,42 @@
c.exactMatch = exactMatch
c.lock.Unlock()

cli, err := c.getClient()
if err != nil {
return err
}
if !c.watchFlag[key] {
cli, err := c.getClient()
if err != nil {
return err
}

rev := c.load(cli, key)
c.watchGroup.Run(func() {
c.watch(cli, key, rev)
})
rev := c.load(cli, key)
c.watchGroup.Run(func() {
c.watch(cli, key, rev)
})

Check warning on line 285 in core/discov/internal/registry.go

View check run for this annotation

Codecov / codecov/patch

core/discov/internal/registry.go#L282-L285

Added lines #L282 - L285 were not covered by tests
}

return nil
}

func (c *cluster) unMonitor(key string, l UpdateListener) {
c.lock.Lock()
defer c.lock.Unlock()

listeners := c.listeners[key]
for i, listener := range listeners {
if listener == l {
c.listeners[key] = append(listeners[:i], listeners[i+1:]...)
break
}
}

if len(c.listeners[key]) == 0 && c.watchFlag[key] {
if cancel, ok := c.watchCtx[key]; ok {
cancel()
delete(c.watchCtx, key)
}
c.watchFlag[key] = false
}
}

func (c *cluster) newClient() (EtcdClient, error) {
cli, err := NewClient(c.endpoints)
if err != nil {
Expand All @@ -294,6 +330,7 @@
for k := range c.listeners {
keys = append(keys, k)
}
c.clearWatch()

Check warning on line 333 in core/discov/internal/registry.go

View check run for this annotation

Codecov / codecov/patch

core/discov/internal/registry.go#L333

Added line #L333 was not covered by tests
c.lock.Unlock()

for _, key := range keys {
Expand All @@ -306,8 +343,9 @@
}

func (c *cluster) watch(cli EtcdClient, key string, rev int64) {
ctx := c.addWatch(key, cli)
for {
err := c.watchStream(cli, key, rev)
err := c.watchStream(cli, key, rev, ctx)
if err == nil {
return
}
Expand All @@ -322,7 +360,7 @@
}
}

func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) error {
func (c *cluster) watchStream(cli EtcdClient, key string, rev int64, ctx context.Context) error {
var (
rch clientv3.WatchChan
ops []clientv3.OpOption
Expand All @@ -336,7 +374,7 @@
ops = append(ops, clientv3.WithRev(rev+1))
}

rch = cli.Watch(clientv3.WithRequireLeader(c.context(cli)), watchKey, ops...)
rch = cli.Watch(clientv3.WithRequireLeader(ctx), watchKey, ops...)

for {
select {
Expand All @@ -354,6 +392,8 @@
c.handleWatchEvents(key, wresp.Events)
case <-c.done:
return nil
case <-ctx.Done():
return nil
}
}
}
Expand All @@ -366,6 +406,23 @@
watcher.watch(cli.ActiveConnection())
}

func (c *cluster) addWatch(key string, cli EtcdClient) context.Context {
ctx, cancel := context.WithCancel(cli.Ctx())
c.lock.Lock()
c.watchCtx[key] = cancel
c.watchFlag[key] = true
c.lock.Unlock()
return ctx
}

func (c *cluster) clearWatch() {
for _, cancel := range c.watchCtx {
cancel()
}
c.watchCtx = make(map[string]context.CancelFunc)
c.watchFlag = make(map[string]bool)
}

// DialClient dials an etcd cluster with given endpoints.
func DialClient(endpoints []string) (EtcdClient, error) {
cfg := clientv3.Config{
Expand Down
65 changes: 62 additions & 3 deletions core/discov/internal/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,10 @@ func TestCluster_Watch(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
c := &cluster{
listeners: make(map[string][]UpdateListener),
values: make(map[string]map[string]string),
listeners: make(map[string][]UpdateListener),
watchCtx: make(map[string]context.CancelFunc),
watchFlag: make(map[string]bool),
}
listener := NewMockUpdateListener(ctrl)
c.listeners["any"] = []UpdateListener{listener}
Expand Down Expand Up @@ -211,7 +213,7 @@ func TestClusterWatch_RespFailures(t *testing.T) {
ch := make(chan clientv3.WatchResponse)
cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch).AnyTimes()
cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
c := new(cluster)
c := newCluster([]string{})
c.done = make(chan lang.PlaceholderType)
go func() {
ch <- resp
Expand All @@ -231,7 +233,7 @@ func TestClusterWatch_CloseChan(t *testing.T) {
ch := make(chan clientv3.WatchResponse)
cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch).AnyTimes()
cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
c := new(cluster)
c := newCluster([]string{})
c.done = make(chan lang.PlaceholderType)
go func() {
close(ch)
Expand All @@ -240,6 +242,37 @@ func TestClusterWatch_CloseChan(t *testing.T) {
c.watch(cli, "any", 0)
}

func TestClusterWatch_CtxCancel(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
cli := NewMockEtcdClient(ctrl)
restore := setMockClient(cli)
defer restore()
ch := make(chan clientv3.WatchResponse)
cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch).AnyTimes()
ctx, cancelFunc := context.WithCancel(context.Background())
cli.EXPECT().Ctx().Return(ctx).AnyTimes()
c := newCluster([]string{})
c.done = make(chan lang.PlaceholderType)
go func() {
cancelFunc()
close(ch)
}()
c.watch(cli, "any", 0)
}

func TestCluster_ClearWatch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
c := &cluster{
watchCtx: map[string]context.CancelFunc{"foo": cancel},
watchFlag: map[string]bool{"foo": true},
}
c.clearWatch()
assert.Equal(t, ctx.Err(), context.Canceled)
assert.Equal(t, 0, len(c.watchCtx))
assert.Equal(t, 0, len(c.watchFlag))
}

func TestValueOnlyContext(t *testing.T) {
ctx := contextx.ValueOnlyFrom(context.Background())
ctx.Done()
Expand Down Expand Up @@ -286,12 +319,38 @@ func TestRegistry_Monitor(t *testing.T) {
"bar": "baz",
},
},
watchCtx: map[string]context.CancelFunc{},
watchFlag: map[string]bool{},
},
}
GetRegistry().lock.Unlock()
assert.Error(t, GetRegistry().Monitor(endpoints, "foo", new(mockListener), false))
}

func TestRegistry_UnMonitor(t *testing.T) {
svr, err := mockserver.StartMockServers(1)
assert.NoError(t, err)
svr.StartAt(0)

endpoints := []string{svr.Servers[0].Address}
l := new(mockListener)
GetRegistry().lock.Lock()
GetRegistry().clusters = map[string]*cluster{
getClusterKey(endpoints): {
listeners: map[string][]UpdateListener{"foo": {l}},
values: map[string]map[string]string{
"foo": {
"bar": "baz",
},
},
watchFlag: map[string]bool{"foo": true},
watchCtx: map[string]context.CancelFunc{"foo": func() {}},
},
}
GetRegistry().lock.Unlock()
GetRegistry().UnMonitor(endpoints, "foo", l)
}

type mockListener struct {
}

Expand Down
7 changes: 7 additions & 0 deletions core/discov/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
endpoints []string
exclusive bool
exactMatch bool
key string
items *container
}
)
Expand All @@ -29,6 +30,7 @@
func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscriber, error) {
sub := &Subscriber{
endpoints: endpoints,
key: key,

Check warning on line 33 in core/discov/subscriber.go

View check run for this annotation

Codecov / codecov/patch

core/discov/subscriber.go#L33

Added line #L33 was not covered by tests
}
for _, opt := range opts {
opt(sub)
Expand All @@ -52,6 +54,11 @@
return s.items.getValues()
}

// Close the subscriber created watch goroutine and remove listener
func (s *Subscriber) Close() {
internal.GetRegistry().UnMonitor(s.endpoints, s.key, s.items)
}

// Exclusive means that key value can only be 1:1,
// which means later added value will remove the keys associated with the same value previously.
func Exclusive() SubOption {
Expand Down
12 changes: 12 additions & 0 deletions core/discov/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,18 @@ func TestSubscriber(t *testing.T) {
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
}

func TestSubscriberClos(t *testing.T) {
l := newContainer(false)
sub := &Subscriber{
endpoints: []string{"localhost:2379"},
key: "foo",
items: l,
}
_ = internal.GetRegistry().Monitor(sub.endpoints, sub.key, l, false)
sub.Close()
assert.Empty(t, sub.items.listeners)
}

func TestWithSubEtcdAccount(t *testing.T) {
endpoints := []string{"localhost:2379"}
user := stringx.Rand()
Expand Down
2 changes: 1 addition & 1 deletion zrpc/resolver/internal/discovbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
sub.AddListener(update)
update()

return &nopResolver{cc: cc}, nil
return &nopResolver{cc: cc, closeFunc: func() { sub.Close() }}, nil

Check warning on line 41 in zrpc/resolver/internal/discovbuilder.go

View check run for this annotation

Codecov / codecov/patch

zrpc/resolver/internal/discovbuilder.go#L41

Added line #L41 was not covered by tests
}

func (b *discovBuilder) Scheme() string {
Expand Down
6 changes: 5 additions & 1 deletion zrpc/resolver/internal/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@ func register() {
}

type nopResolver struct {
cc resolver.ClientConn
cc resolver.ClientConn
closeFunc func()
}

func (r *nopResolver) Close() {
if r.closeFunc != nil {
r.closeFunc()
}
}

func (r *nopResolver) ResolveNow(_ resolver.ResolveNowOptions) {
Expand Down
11 changes: 11 additions & 0 deletions zrpc/resolver/internal/resolver_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package internal

import (
"github.com/zeromicro/go-zero/core/discov"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -18,6 +19,16 @@ func TestNopResolver(t *testing.T) {
})
}

func TestNopResolverClose(t *testing.T) {
assert.NotPanics(t, func() {
sub := &discov.Subscriber{}
r := nopResolver{
closeFunc: sub.Close,
}
r.Close()
})
}

type mockedClientConn struct {
state resolver.State
err error
Expand Down
Loading