Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix memory leak of grpc resolver #4490

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions core/discov/internal/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"errors"
"fmt"
"io"
"slices"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -59,6 +60,17 @@
return c.monitor(key, l, exactMatch)
}

// Unmonitor cancel monitoring of given endpoints and keys, and remove the listener.
func (r *Registry) Unmonitor(endpoints []string, key string, l UpdateListener) {
c, exists := r.getCluster(endpoints)
// if not exists, return.
if !exists {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not exist, getCluster will create a cluster and save it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have considered this issue originally. I have written a new method to obtain the cluster, but usually the cluster has already been created when the Monitor is called first, so I deleted this method. To take a step back, generally speaking, when we use etcd, the endpoints are often the same, so even if a new cluster is created, it will have little impact.

}

Check warning on line 69 in core/discov/internal/registry.go

View check run for this annotation

Codecov / codecov/patch

core/discov/internal/registry.go#L68-L69

Added lines #L68 - L69 were not covered by tests

c.unmonitor(key, l)
}

func (r *Registry) getCluster(endpoints []string) (c *cluster, exists bool) {
clusterKey := getClusterKey(endpoints)
r.lock.RLock()
Expand Down Expand Up @@ -273,6 +285,14 @@
return nil
}

func (c *cluster) unmonitor(key string, l UpdateListener) {
c.lock.Lock()
defer c.lock.Unlock()
c.listeners[key] = slices.DeleteFunc(c.listeners[key], func(listener UpdateListener) bool {
return l == listener
})
}

func (c *cluster) newClient() (EtcdClient, error) {
cli, err := NewClient(c.endpoints)
if err != nil {
Expand Down
25 changes: 25 additions & 0 deletions core/discov/internal/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,31 @@ func TestRegistry_Monitor(t *testing.T) {
assert.Error(t, GetRegistry().Monitor(endpoints, "foo", new(mockListener), false))
}

func TestRegistry_Unmonitor(t *testing.T) {
svr, err := mockserver.StartMockServers(1)
assert.NoError(t, err)
svr.StartAt(0)

endpoints := []string{svr.Servers[0].Address}
GetRegistry().lock.Lock()
GetRegistry().clusters = map[string]*cluster{
getClusterKey(endpoints): {
listeners: map[string][]UpdateListener{},
values: map[string]map[string]string{
"foo": {
"bar": "baz",
},
},
},
}
GetRegistry().lock.Unlock()
l := new(mockListener)
assert.Error(t, GetRegistry().Monitor(endpoints, "foo", l, false))
assert.Equal(t, 1, len(GetRegistry().clusters[getClusterKey(endpoints)].listeners["foo"]))
GetRegistry().Unmonitor(endpoints, "foo", l)
assert.Equal(t, 0, len(GetRegistry().clusters[getClusterKey(endpoints)].listeners["foo"]))
}

type mockListener struct {
}

Expand Down
35 changes: 21 additions & 14 deletions core/discov/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"sync"
"sync/atomic"

"github.com/zeromicro/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/syncx"
Expand All @@ -16,6 +17,7 @@
// A Subscriber is used to subscribe the given key on an etcd cluster.
Subscriber struct {
endpoints []string
key string
exclusive bool
exactMatch bool
items *container
Expand Down Expand Up @@ -52,6 +54,11 @@
return s.items.getValues()
}

// Close s.
func (s *Subscriber) Close() {
internal.GetRegistry().Unmonitor(s.endpoints, s.key, s.items)

Check warning on line 59 in core/discov/subscriber.go

View check run for this annotation

Codecov / codecov/patch

core/discov/subscriber.go#L58-L59

Added lines #L58 - L59 were not covered by tests
}

// Exclusive means that key value can only be 1:1,
// which means later added value will remove the keys associated with the same value previously.
func Exclusive() SubOption {
Expand Down Expand Up @@ -83,7 +90,7 @@

type container struct {
exclusive bool
values map[string][]string
values map[string]*collection.Set
mapping map[string]string
snapshot atomic.Value
dirty *syncx.AtomicBool
Expand All @@ -94,7 +101,7 @@
func newContainer(exclusive bool) *container {
return &container{
exclusive: exclusive,
values: make(map[string][]string),
values: make(map[string]*collection.Set),
mapping: make(map[string]string),
dirty: syncx.ForAtomicBool(true),
}
Expand All @@ -116,15 +123,21 @@
defer c.lock.Unlock()

c.dirty.Set(true)
keys := c.values[value]
if c.values[value] == nil {
c.values[value] = collection.NewSet()
}
keys := c.values[value].KeysStr()
previous := append([]string(nil), keys...)
early := len(keys) > 0
if c.exclusive && early {
for _, each := range keys {
c.doRemoveKey(each)
}
if c.values[value] == nil {
c.values[value] = collection.NewSet()
}
}
c.values[value] = append(c.values[value], key)
c.values[value].AddStr(key)
c.mapping[key] = value

if early {
Expand All @@ -147,18 +160,12 @@
}

delete(c.mapping, key)
keys := c.values[server]
remain := keys[:0]

for _, k := range keys {
if k != key {
remain = append(remain, k)
}
if c.values[server] == nil {
return

Check warning on line 164 in core/discov/subscriber.go

View check run for this annotation

Codecov / codecov/patch

core/discov/subscriber.go#L164

Added line #L164 was not covered by tests
}
c.values[server].Remove(key)

if len(remain) > 0 {
c.values[server] = remain
} else {
if c.values[server].Count() == 0 {
delete(c.values, server)
}
}
Expand Down
2 changes: 1 addition & 1 deletion zrpc/resolver/internal/discovbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
sub.AddListener(update)
update()

return &nopResolver{cc: cc}, nil
return &nopResolver{cc: cc, closeFunc: func() { sub.Close() }}, nil

Check warning on line 41 in zrpc/resolver/internal/discovbuilder.go

View check run for this annotation

Codecov / codecov/patch

zrpc/resolver/internal/discovbuilder.go#L41

Added line #L41 was not covered by tests
}

func (b *discovBuilder) Scheme() string {
Expand Down
6 changes: 5 additions & 1 deletion zrpc/resolver/internal/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@ func register() {
}

type nopResolver struct {
cc resolver.ClientConn
cc resolver.ClientConn
closeFunc func()
}

func (r *nopResolver) Close() {
if r.closeFunc != nil {
r.closeFunc()
}
}

func (r *nopResolver) ResolveNow(_ resolver.ResolveNowOptions) {
Expand Down
14 changes: 14 additions & 0 deletions zrpc/resolver/internal/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@ func TestNopResolver(t *testing.T) {
})
}

func TestNopResolver_Close(t *testing.T) {
var isChanged bool
r := nopResolver{}
r.Close()
assert.False(t, isChanged)
r = nopResolver{
closeFunc: func() {
isChanged = true
},
}
r.Close()
assert.True(t, isChanged)
}

type mockedClientConn struct {
state resolver.State
err error
Expand Down
Loading