Skip to content

Commit

Permalink
fix: etcd conn
Browse files Browse the repository at this point in the history
  • Loading branch information
icey-yu committed Jan 4, 2025
1 parent cb44916 commit eaddead
Showing 1 changed file with 96 additions and 53 deletions.
149 changes: 96 additions & 53 deletions discovery/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"context"
"fmt"
"io"
"net"
"strings"
"sync"
"time"

"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"go.etcd.io/etcd/client/v3/naming/resolver"
Expand All @@ -20,6 +23,11 @@ import (

// ZkOption defines a function type for modifying clientv3.Config
type ZkOption func(*clientv3.Config)
type addrConn struct {
conn *grpc.ClientConn
addr string
isConnected bool
}

// SvcDiscoveryRegistryImpl implementation
type SvcDiscoveryRegistryImpl struct {
Expand All @@ -30,11 +38,12 @@ type SvcDiscoveryRegistryImpl struct {
endpointMgr endpoints.Manager
leaseID clientv3.LeaseID
rpcRegisterTarget string
watchNames []string

rootDirectory string

mu sync.RWMutex
connMap map[string][]*grpc.ClientConn
connMap map[string][]*addrConn
}

func createNoOpLogger() *zap.Logger {
Expand All @@ -53,7 +62,7 @@ func createNoOpLogger() *zap.Logger {
}

// NewSvcDiscoveryRegistry creates a new service discovery registry implementation
func NewSvcDiscoveryRegistry(rootDirectory string, endpoints []string, options ...ZkOption) (*SvcDiscoveryRegistryImpl, error) {
func NewSvcDiscoveryRegistry(rootDirectory string, endpoints []string, watchNames []string, options ...ZkOption) (*SvcDiscoveryRegistryImpl, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
Expand Down Expand Up @@ -81,37 +90,76 @@ func NewSvcDiscoveryRegistry(rootDirectory string, endpoints []string, options .
client: client,
resolver: r,
rootDirectory: rootDirectory,
connMap: make(map[string][]*grpc.ClientConn),
connMap: make(map[string][]*addrConn),
watchNames: watchNames,
}

go s.watchServiceChanges()
s.watchServiceChanges()
return s, nil
}

// initializeConnMap fetches all existing endpoints and populates the local map
func (r *SvcDiscoveryRegistryImpl) initializeConnMap(opts ...grpc.DialOption) error {
fullPrefix := fmt.Sprintf("%s/", r.rootDirectory)
resp, err := r.client.Get(context.Background(), fullPrefix, clientv3.WithPrefix())
if err != nil {
return err
}
r.connMap = make(map[string][]*grpc.ClientConn)
for _, kv := range resp.Kvs {
prefix, addr := r.splitEndpoint(string(kv.Key))
r.mu.Lock()
defer r.mu.Unlock()
ctx := context.Background()
for _, name := range r.watchNames {
fullPrefix := fmt.Sprintf("%s/%s", r.rootDirectory, name)
resp, err := r.client.Get(ctx, fullPrefix, clientv3.WithPrefix())
if err != nil {
return err
}

dialOpts := append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))
oldList := r.connMap[fullPrefix]

err := r.checkOpts(dialOpts...) // Check opts in include mw.GrpcClient()
if err != nil {
return errs.WrapMsg(err, "checkOpts is failed")
addrMap := make(map[string]*addrConn, len(oldList))
for _, conn := range oldList {
addrMap[conn.addr] = conn
}
newList := make([]*addrConn, 0, len(oldList))
for _, kv := range resp.Kvs {
prefix, addr := r.splitEndpoint(string(kv.Key))
if addr == "" {
continue
}
if _, _, err = net.SplitHostPort(addr); err != nil {
continue
}
if prefix != fullPrefix {
continue
}

conn, err := grpc.DialContext(context.Background(), addr, dialOpts...)
if err != nil {
continue
if conn, ok := addrMap[addr]; ok {
conn.isConnected = true
continue
}

dialOpts := append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))

err := r.checkOpts(dialOpts...) // Check opts in include mw.GrpcClient()
if err != nil {
return errs.WrapMsg(err, "checkOpts is failed")
}

conn, err := grpc.DialContext(context.Background(), addr, dialOpts...)
if err != nil {
continue
}
newList = append(newList, &addrConn{conn: conn, addr: addr, isConnected: false})
}
for _, conn := range oldList {
if conn.isConnected {
conn.isConnected = false
newList = append(newList, conn)
continue
}
if err = conn.conn.Close(); err != nil {
log.ZWarn(ctx, "close conn err", err)
}
}
r.connMap[prefix] = append(r.connMap[prefix], conn)
r.connMap[fullPrefix] = newList
}

return nil
}

Expand Down Expand Up @@ -145,14 +193,14 @@ func (r *SvcDiscoveryRegistryImpl) GetUserIdHashGatewayHost(ctx context.Context,
// GetConns returns gRPC client connections for a given service name
func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
fullServiceKey := fmt.Sprintf("%s/%s", r.rootDirectory, serviceName)
r.mu.RLock()
defer r.mu.RUnlock()
if len(r.connMap) == 0 {
if err := r.initializeConnMap(opts...); err != nil {
return nil, err
}
}
return r.connMap[fullServiceKey], nil
r.mu.RLock()
defer r.mu.RUnlock()
return datautil.Batch(func(t *addrConn) *grpc.ClientConn { return t.conn }, r.connMap[fullServiceKey]), nil
}

// GetConn returns a single gRPC client connection for a given service name
Expand All @@ -178,7 +226,7 @@ func (r *SvcDiscoveryRegistryImpl) GetSelfConnTarget() string {
func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) {
r.mu.Lock()
defer r.mu.Unlock()
r.connMap = make(map[string][]*grpc.ClientConn)
r.resetConnMap()
r.dialOptions = append(r.dialOptions, opts...)
}

Expand Down Expand Up @@ -230,32 +278,15 @@ func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) {

// watchServiceChanges watches for changes in the service directory
func (r *SvcDiscoveryRegistryImpl) watchServiceChanges() {
watchChan := r.client.Watch(context.Background(), r.rootDirectory, clientv3.WithPrefix())
for range watchChan {
r.mu.RLock()
r.initializeConnMap()
r.mu.RUnlock()
}
}

// refreshConnMap fetches the latest endpoints and updates the local map
func (r *SvcDiscoveryRegistryImpl) refreshConnMap(prefix string) {
r.mu.Lock()
defer r.mu.Unlock()

fullPrefix := fmt.Sprintf("%s/", prefix)
resp, err := r.client.Get(context.Background(), fullPrefix, clientv3.WithPrefix())
if err != nil {
return
}
r.connMap[prefix] = []*grpc.ClientConn{} // Update the connMap with new connections
for _, kv := range resp.Kvs {
_, addr := r.splitEndpoint(string(kv.Key))
conn, err := grpc.DialContext(context.Background(), addr, append(r.dialOptions, grpc.WithResolvers(r.resolver))...)
if err != nil {
continue
}
r.connMap[prefix] = append(r.connMap[prefix], conn)
for _, s := range r.watchNames {
go func() {
watchChan := r.client.Watch(context.Background(), r.rootDirectory+"/"+s, clientv3.WithPrefix())
for range watchChan {
if err := r.initializeConnMap(); err != nil {
log.ZWarn(context.Background(), "initializeConnMap in watch err", err)
}
}
}()
}
}

Expand Down Expand Up @@ -284,12 +315,12 @@ func (r *SvcDiscoveryRegistryImpl) UnRegister() error {

// Close closes the etcd client connection
func (r *SvcDiscoveryRegistryImpl) Close() {
r.mu.Lock()
defer r.mu.Unlock()
r.resetConnMap()
if r.client != nil {
_ = r.client.Close()
}

r.mu.Lock()
defer r.mu.Unlock()
}

// Check verifies if etcd is running by checking the existence of the root node and optionally creates it with a lease
Expand Down Expand Up @@ -362,3 +393,15 @@ func (r *SvcDiscoveryRegistryImpl) checkOpts(opts ...grpc.DialOption) error {
// return errs.New("missing required grpc.DialOption", "option", "mw.GrpcClient")
return nil
}

func (r *SvcDiscoveryRegistryImpl) resetConnMap() {
ctx := context.Background()
for _, conn := range r.connMap {
for _, c := range conn {
if err := c.conn.Close(); err != nil {
log.ZWarn(ctx, "failed to close conn", err)
}
}
}
r.connMap = make(map[string][]*addrConn)
}

0 comments on commit eaddead

Please sign in to comment.