diff --git a/discovery/etcd/etcd.go b/discovery/etcd/etcd.go index 97b1e39..d307b27 100644 --- a/discovery/etcd/etcd.go +++ b/discovery/etcd/etcd.go @@ -4,11 +4,14 @@ import ( "context" "fmt" "io" + "net" "strings" "sync" "time" "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/datautil" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/naming/endpoints" "go.etcd.io/etcd/client/v3/naming/resolver" @@ -20,6 +23,11 @@ import ( // ZkOption defines a function type for modifying clientv3.Config type ZkOption func(*clientv3.Config) +type addrConn struct { + conn *grpc.ClientConn + addr string + isConnected bool +} // SvcDiscoveryRegistryImpl implementation type SvcDiscoveryRegistryImpl struct { @@ -30,11 +38,12 @@ type SvcDiscoveryRegistryImpl struct { endpointMgr endpoints.Manager leaseID clientv3.LeaseID rpcRegisterTarget string + watchNames []string rootDirectory string mu sync.RWMutex - connMap map[string][]*grpc.ClientConn + connMap map[string][]*addrConn } func createNoOpLogger() *zap.Logger { @@ -53,7 +62,7 @@ func createNoOpLogger() *zap.Logger { } // NewSvcDiscoveryRegistry creates a new service discovery registry implementation -func NewSvcDiscoveryRegistry(rootDirectory string, endpoints []string, options ...ZkOption) (*SvcDiscoveryRegistryImpl, error) { +func NewSvcDiscoveryRegistry(rootDirectory string, endpoints []string, watchNames []string, options ...ZkOption) (*SvcDiscoveryRegistryImpl, error) { cfg := clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, @@ -81,37 +90,76 @@ func NewSvcDiscoveryRegistry(rootDirectory string, endpoints []string, options . client: client, resolver: r, rootDirectory: rootDirectory, - connMap: make(map[string][]*grpc.ClientConn), + connMap: make(map[string][]*addrConn), + watchNames: watchNames, } - go s.watchServiceChanges() + s.watchServiceChanges() return s, nil } // initializeConnMap fetches all existing endpoints and populates the local map func (r *SvcDiscoveryRegistryImpl) initializeConnMap(opts ...grpc.DialOption) error { - fullPrefix := fmt.Sprintf("%s/", r.rootDirectory) - resp, err := r.client.Get(context.Background(), fullPrefix, clientv3.WithPrefix()) - if err != nil { - return err - } - r.connMap = make(map[string][]*grpc.ClientConn) - for _, kv := range resp.Kvs { - prefix, addr := r.splitEndpoint(string(kv.Key)) + r.mu.Lock() + defer r.mu.Unlock() + ctx := context.Background() + for _, name := range r.watchNames { + fullPrefix := fmt.Sprintf("%s/%s", r.rootDirectory, name) + resp, err := r.client.Get(ctx, fullPrefix, clientv3.WithPrefix()) + if err != nil { + return err + } - dialOpts := append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver)) + oldList := r.connMap[fullPrefix] - err := r.checkOpts(dialOpts...) // Check opts in include mw.GrpcClient() - if err != nil { - return errs.WrapMsg(err, "checkOpts is failed") + addrMap := make(map[string]*addrConn, len(oldList)) + for _, conn := range oldList { + addrMap[conn.addr] = conn } + newList := make([]*addrConn, 0, len(oldList)) + for _, kv := range resp.Kvs { + prefix, addr := r.splitEndpoint(string(kv.Key)) + if addr == "" { + continue + } + if _, _, err = net.SplitHostPort(addr); err != nil { + continue + } + if prefix != fullPrefix { + continue + } - conn, err := grpc.DialContext(context.Background(), addr, dialOpts...) - if err != nil { - continue + if conn, ok := addrMap[addr]; ok { + conn.isConnected = true + continue + } + + dialOpts := append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver)) + + err := r.checkOpts(dialOpts...) // Check opts in include mw.GrpcClient() + if err != nil { + return errs.WrapMsg(err, "checkOpts is failed") + } + + conn, err := grpc.DialContext(context.Background(), addr, dialOpts...) + if err != nil { + continue + } + newList = append(newList, &addrConn{conn: conn, addr: addr, isConnected: false}) + } + for _, conn := range oldList { + if conn.isConnected { + conn.isConnected = false + newList = append(newList, conn) + continue + } + if err = conn.conn.Close(); err != nil { + log.ZWarn(ctx, "close conn err", err) + } } - r.connMap[prefix] = append(r.connMap[prefix], conn) + r.connMap[fullPrefix] = newList } + return nil } @@ -145,14 +193,14 @@ func (r *SvcDiscoveryRegistryImpl) GetUserIdHashGatewayHost(ctx context.Context, // GetConns returns gRPC client connections for a given service name func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { fullServiceKey := fmt.Sprintf("%s/%s", r.rootDirectory, serviceName) - r.mu.RLock() - defer r.mu.RUnlock() if len(r.connMap) == 0 { if err := r.initializeConnMap(opts...); err != nil { return nil, err } } - return r.connMap[fullServiceKey], nil + r.mu.RLock() + defer r.mu.RUnlock() + return datautil.Batch(func(t *addrConn) *grpc.ClientConn { return t.conn }, r.connMap[fullServiceKey]), nil } // GetConn returns a single gRPC client connection for a given service name @@ -178,7 +226,7 @@ func (r *SvcDiscoveryRegistryImpl) GetSelfConnTarget() string { func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) { r.mu.Lock() defer r.mu.Unlock() - r.connMap = make(map[string][]*grpc.ClientConn) + r.resetConnMap() r.dialOptions = append(r.dialOptions, opts...) } @@ -230,32 +278,15 @@ func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) { // watchServiceChanges watches for changes in the service directory func (r *SvcDiscoveryRegistryImpl) watchServiceChanges() { - watchChan := r.client.Watch(context.Background(), r.rootDirectory, clientv3.WithPrefix()) - for range watchChan { - r.mu.RLock() - r.initializeConnMap() - r.mu.RUnlock() - } -} - -// refreshConnMap fetches the latest endpoints and updates the local map -func (r *SvcDiscoveryRegistryImpl) refreshConnMap(prefix string) { - r.mu.Lock() - defer r.mu.Unlock() - - fullPrefix := fmt.Sprintf("%s/", prefix) - resp, err := r.client.Get(context.Background(), fullPrefix, clientv3.WithPrefix()) - if err != nil { - return - } - r.connMap[prefix] = []*grpc.ClientConn{} // Update the connMap with new connections - for _, kv := range resp.Kvs { - _, addr := r.splitEndpoint(string(kv.Key)) - conn, err := grpc.DialContext(context.Background(), addr, append(r.dialOptions, grpc.WithResolvers(r.resolver))...) - if err != nil { - continue - } - r.connMap[prefix] = append(r.connMap[prefix], conn) + for _, s := range r.watchNames { + go func() { + watchChan := r.client.Watch(context.Background(), r.rootDirectory+"/"+s, clientv3.WithPrefix()) + for range watchChan { + if err := r.initializeConnMap(); err != nil { + log.ZWarn(context.Background(), "initializeConnMap in watch err", err) + } + } + }() } } @@ -284,12 +315,12 @@ func (r *SvcDiscoveryRegistryImpl) UnRegister() error { // Close closes the etcd client connection func (r *SvcDiscoveryRegistryImpl) Close() { + r.mu.Lock() + defer r.mu.Unlock() + r.resetConnMap() if r.client != nil { _ = r.client.Close() } - - r.mu.Lock() - defer r.mu.Unlock() } // Check verifies if etcd is running by checking the existence of the root node and optionally creates it with a lease @@ -362,3 +393,15 @@ func (r *SvcDiscoveryRegistryImpl) checkOpts(opts ...grpc.DialOption) error { // return errs.New("missing required grpc.DialOption", "option", "mw.GrpcClient") return nil } + +func (r *SvcDiscoveryRegistryImpl) resetConnMap() { + ctx := context.Background() + for _, conn := range r.connMap { + for _, c := range conn { + if err := c.conn.Close(); err != nil { + log.ZWarn(ctx, "failed to close conn", err) + } + } + } + r.connMap = make(map[string][]*addrConn) +}