diff --git a/core/configcenter/configurator.go b/core/configcenter/configurator.go index 809dedff8b65..cfd02a806866 100644 --- a/core/configcenter/configurator.go +++ b/core/configcenter/configurator.go @@ -15,8 +15,8 @@ import ( ) var ( - errorEmptyConfig = errors.New("empty config value") - errorMissUnmarshalerType = errors.New("miss unmarshaler type") + errEmptyConfig = errors.New("empty config value") + errMissingUnmarshalerType = errors.New("missing unmarshaler type") ) // Configurator is the interface for configuration center. @@ -32,19 +32,17 @@ type ( Config struct { // Type is the value type, yaml, json or toml. Type string `json:",default=yaml,options=[yaml,json,toml]"` - // Log indicates whether to log the configuration. - Log bool `json:",default=ture"` + // Log is the flag to control logging. + Log bool `json:",default=true"` } configCenter[T any] struct { conf Config unmarshaler LoaderFn - - subscriber subscriber.Subscriber - - listeners []func() - lock sync.Mutex - snapshot atomic.Value + subscriber subscriber.Subscriber + listeners []func() + lock sync.Mutex + snapshot atomic.Value } value[T any] struct { @@ -61,7 +59,6 @@ var _ Configurator[any] = (*configCenter[any])(nil) func MustNewConfigCenter[T any](c Config, subscriber subscriber.Subscriber) Configurator[T] { cc, err := NewConfigCenter[T](c, subscriber) logx.Must(err) - return cc } @@ -76,9 +73,6 @@ func NewConfigCenter[T any](c Config, subscriber subscriber.Subscriber) (Configu conf: c, unmarshaler: unmarshaler, subscriber: subscriber, - listeners: nil, - lock: sync.Mutex{}, - snapshot: atomic.Value{}, } if err := cc.loadConfig(); err != nil { @@ -105,10 +99,10 @@ func (c *configCenter[T]) AddListener(listener func()) { // GetConfig return structured config. func (c *configCenter[T]) GetConfig() (T, error) { - var r T v := c.value() - if v == nil || len(v.data) < 1 { - return r, errorEmptyConfig + if v == nil || len(v.data) == 0 { + var empty T + return empty, errEmptyConfig } return v.marshalData, v.err @@ -141,7 +135,9 @@ func (c *configCenter[T]) loadConfig() error { } func (c *configCenter[T]) onChange() { - _ = c.loadConfig() + if err := c.loadConfig(); err != nil { + return + } c.lock.Lock() listeners := make([]func(), len(c.listeners)) @@ -165,40 +161,39 @@ func (c *configCenter[T]) genValue(data string) *value[T] { v := &value[T]{ data: data, } - if len(data) <= 0 { + if len(data) == 0 { return v } t := reflect.TypeOf(v.marshalData) // if the type is nil, it means that the user has not set the type of the configuration. if t == nil { - v.err = errorMissUnmarshalerType + v.err = errMissingUnmarshalerType return v } t = mapping.Deref(t) - switch t.Kind() { case reflect.Struct, reflect.Array, reflect.Slice: - err := c.unmarshaler([]byte(data), &v.marshalData) - if err != nil { + if err := c.unmarshaler([]byte(data), &v.marshalData); err != nil { v.err = err if c.conf.Log { - logx.Errorf("ConfigCenter unmarshal configuration failed, err: %+v, content [%s]", err.Error(), data) + logx.Errorf("ConfigCenter unmarshal configuration failed, err: %+v, content [%s]", + err.Error(), data) } } case reflect.String: if str, ok := any(data).(T); ok { v.marshalData = str } else { - v.err = errorMissUnmarshalerType + v.err = errMissingUnmarshalerType } default: if c.conf.Log { logx.Errorf("ConfigCenter unmarshal configuration missing unmarshaler for type: %s, content [%s]", t.Kind(), data) } - v.err = errorMissUnmarshalerType + v.err = errMissingUnmarshalerType } return v diff --git a/core/configcenter/configurator_test.go b/core/configcenter/configurator_test.go index 703afe7f3c16..7bec36fe33fa 100644 --- a/core/configcenter/configurator_test.go +++ b/core/configcenter/configurator_test.go @@ -150,7 +150,7 @@ func TestConfigCenter_AddListener(t *testing.T) { func TestConfigCenter_genValue(t *testing.T) { t.Run("data is empty", func(t *testing.T) { c := &configCenter[string]{ - unmarshaler: defaultRegistry.unmarshalers["json"], + unmarshaler: registry.unmarshalers["json"], conf: Config{Log: true}, } v := c.genValue("") @@ -159,25 +159,25 @@ func TestConfigCenter_genValue(t *testing.T) { t.Run("invalid template type", func(t *testing.T) { c := &configCenter[any]{ - unmarshaler: defaultRegistry.unmarshalers["json"], + unmarshaler: registry.unmarshalers["json"], conf: Config{Log: true}, } v := c.genValue("xxxx") - assert.Equal(t, errorMissUnmarshalerType, v.err) + assert.Equal(t, errMissingUnmarshalerType, v.err) }) t.Run("unsupported template type", func(t *testing.T) { c := &configCenter[int]{ - unmarshaler: defaultRegistry.unmarshalers["json"], + unmarshaler: registry.unmarshalers["json"], conf: Config{Log: true}, } v := c.genValue("1") - assert.Equal(t, errorMissUnmarshalerType, v.err) + assert.Equal(t, errMissingUnmarshalerType, v.err) }) t.Run("supported template string type", func(t *testing.T) { c := &configCenter[string]{ - unmarshaler: defaultRegistry.unmarshalers["json"], + unmarshaler: registry.unmarshalers["json"], conf: Config{Log: true}, } v := c.genValue("12345") @@ -189,7 +189,7 @@ func TestConfigCenter_genValue(t *testing.T) { c := &configCenter[struct { Name string `json:"name"` }]{ - unmarshaler: defaultRegistry.unmarshalers["json"], + unmarshaler: registry.unmarshalers["json"], conf: Config{Log: true}, } v := c.genValue(`{"name":"new name}`) @@ -201,7 +201,7 @@ func TestConfigCenter_genValue(t *testing.T) { c := &configCenter[struct { Name string `json:"name"` }]{ - unmarshaler: defaultRegistry.unmarshalers["json"], + unmarshaler: registry.unmarshalers["json"], conf: Config{Log: true}, } v := c.genValue(`{"name":"new name"}`) diff --git a/core/configcenter/subscriber/etcd.go b/core/configcenter/subscriber/etcd.go index 942dd532366b..f9a11efccbd6 100644 --- a/core/configcenter/subscriber/etcd.go +++ b/core/configcenter/subscriber/etcd.go @@ -12,30 +12,19 @@ type ( } // EtcdConf is the configuration for etcd. - EtcdConf discov.EtcdConf + EtcdConf = discov.EtcdConf ) // MustNewEtcdSubscriber returns an etcd Subscriber, exits on errors. func MustNewEtcdSubscriber(conf EtcdConf) Subscriber { s, err := NewEtcdSubscriber(conf) logx.Must(err) - return s } // NewEtcdSubscriber returns an etcd Subscriber. func NewEtcdSubscriber(conf EtcdConf) (Subscriber, error) { - var opts = []discov.SubOption{ - discov.WithDisablePrefix(), - } - if len(conf.User) != 0 { - opts = append(opts, discov.WithSubEtcdAccount(conf.User, conf.Pass)) - } - if len(conf.CertFile) != 0 || len(conf.CertKeyFile) != 0 || len(conf.CACertFile) != 0 { - opts = append(opts, - discov.WithSubEtcdTLS(conf.CertFile, conf.CertKeyFile, conf.CACertFile, conf.InsecureSkipVerify)) - } - + opts := buildSubOptions(conf) s, err := discov.NewSubscriber(conf.Hosts, conf.Key, opts...) if err != nil { return nil, err @@ -44,6 +33,23 @@ func NewEtcdSubscriber(conf EtcdConf) (Subscriber, error) { return &etcdSubscriber{Subscriber: s}, nil } +// buildSubOptions constructs the options for creating a new etcd subscriber. +func buildSubOptions(conf EtcdConf) []discov.SubOption { + opts := []discov.SubOption{ + discov.WithExactMatch(), + } + + if len(conf.User) > 0 { + opts = append(opts, discov.WithSubEtcdAccount(conf.User, conf.Pass)) + } + if len(conf.CertFile) > 0 || len(conf.CertKeyFile) > 0 || len(conf.CACertFile) > 0 { + opts = append(opts, discov.WithSubEtcdTLS(conf.CertFile, conf.CertKeyFile, + conf.CACertFile, conf.InsecureSkipVerify)) + } + + return opts +} + // AddListener adds a listener to the subscriber. func (s *etcdSubscriber) AddListener(listener func()) error { s.Subscriber.AddListener(listener) @@ -53,8 +59,9 @@ func (s *etcdSubscriber) AddListener(listener func()) error { // Value returns the value of the subscriber. func (s *etcdSubscriber) Value() (string, error) { vs := s.Subscriber.Values() - if len(vs) != 0 { + if len(vs) > 0 { return vs[len(vs)-1], nil } + return "", nil } diff --git a/core/configcenter/unmarshaler.go b/core/configcenter/unmarshaler.go index c8b68e312018..dc9fa507448f 100644 --- a/core/configcenter/unmarshaler.go +++ b/core/configcenter/unmarshaler.go @@ -6,41 +6,36 @@ import ( "github.com/zeromicro/go-zero/core/conf" ) +var registry = &unmarshalerRegistry{ + unmarshalers: map[string]LoaderFn{ + "json": conf.LoadFromJsonBytes, + "toml": conf.LoadFromTomlBytes, + "yaml": conf.LoadFromYamlBytes, + }, +} + type ( + // LoaderFn is the function type for loading configuration. + LoaderFn func([]byte, any) error + // unmarshalerRegistry is the registry for unmarshalers. unmarshalerRegistry struct { unmarshalers map[string]LoaderFn - - mu sync.RWMutex + mu sync.RWMutex } - - // LoaderFn is the function type for loading configuration. - LoaderFn func([]byte, any) error ) -var defaultRegistry *unmarshalerRegistry - -func init() { - defaultRegistry = &unmarshalerRegistry{ - unmarshalers: map[string]LoaderFn{ - "json": conf.LoadFromJsonBytes, - "toml": conf.LoadFromTomlBytes, - "yaml": conf.LoadFromYamlBytes, - }, - } -} - // RegisterUnmarshaler registers an unmarshaler. func RegisterUnmarshaler(name string, fn LoaderFn) { - defaultRegistry.mu.Lock() - defaultRegistry.unmarshalers[name] = fn - defaultRegistry.mu.Unlock() + registry.mu.Lock() + defer registry.mu.Unlock() + registry.unmarshalers[name] = fn } // Unmarshaler returns the unmarshaler by name. func Unmarshaler(name string) (LoaderFn, bool) { - defaultRegistry.mu.RLock() - fn, ok := defaultRegistry.unmarshalers[name] - defaultRegistry.mu.RUnlock() + registry.mu.RLock() + defer registry.mu.RUnlock() + fn, ok := registry.unmarshalers[name] return fn, ok } diff --git a/core/discov/internal/registry.go b/core/discov/internal/registry.go index f4652f3ac47a..9fcb5aabbde3 100644 --- a/core/discov/internal/registry.go +++ b/core/discov/internal/registry.go @@ -46,7 +46,7 @@ func (r *Registry) GetConn(endpoints []string) (EtcdClient, error) { } // Monitor monitors the key on given etcd endpoints, notify with the given UpdateListener. -func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener, disablePrefix bool) error { +func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener, exactMatch bool) error { c, exists := r.getCluster(endpoints) // if exists, the existing values should be updated to the listener. if exists { @@ -56,7 +56,7 @@ func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener, dis } } - return c.monitor(key, l, disablePrefix) + return c.monitor(key, l, exactMatch) } func (r *Registry) getCluster(endpoints []string) (c *cluster, exists bool) { @@ -80,14 +80,14 @@ func (r *Registry) getCluster(endpoints []string) (c *cluster, exists bool) { } type cluster struct { - endpoints []string - key string - values map[string]map[string]string - listeners map[string][]UpdateListener - watchGroup *threading.RoutineGroup - done chan lang.PlaceholderType - lock sync.RWMutex - disablePrefix bool + endpoints []string + key string + values map[string]map[string]string + listeners map[string][]UpdateListener + watchGroup *threading.RoutineGroup + done chan lang.PlaceholderType + lock sync.RWMutex + exactMatch bool } func newCluster(endpoints []string) *cluster { @@ -226,7 +226,7 @@ func (c *cluster) load(cli EtcdClient, key string) int64 { for { var err error ctx, cancel := context.WithTimeout(c.context(cli), RequestTimeout) - if c.disablePrefix { + if c.exactMatch { resp, err = cli.Get(ctx, key) } else { resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix()) @@ -254,10 +254,10 @@ func (c *cluster) load(cli EtcdClient, key string) int64 { return resp.Header.Revision } -func (c *cluster) monitor(key string, l UpdateListener, disablePrefix bool) error { +func (c *cluster) monitor(key string, l UpdateListener, exactMatch bool) error { c.lock.Lock() c.listeners[key] = append(c.listeners[key], l) - c.disablePrefix = disablePrefix + c.exactMatch = exactMatch c.lock.Unlock() cli, err := c.getClient() @@ -328,7 +328,7 @@ func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) error { ops []clientv3.OpOption watchKey = key ) - if !c.disablePrefix { + if !c.exactMatch { watchKey = makeKeyPrefix(key) ops = append(ops, clientv3.WithPrefix()) } diff --git a/core/discov/subscriber.go b/core/discov/subscriber.go index e0dd5de0b18a..08f89a601ff7 100644 --- a/core/discov/subscriber.go +++ b/core/discov/subscriber.go @@ -15,10 +15,10 @@ type ( // A Subscriber is used to subscribe the given key on an etcd cluster. Subscriber struct { - endpoints []string - exclusive bool - disablePrefix bool - items *container + endpoints []string + exclusive bool + exactMatch bool + items *container } ) @@ -35,7 +35,7 @@ func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscrib } sub.items = newContainer(sub.exclusive) - if err := internal.GetRegistry().Monitor(endpoints, key, sub.items, sub.disablePrefix); err != nil { + if err := internal.GetRegistry().Monitor(endpoints, key, sub.items, sub.exactMatch); err != nil { return nil, err } @@ -60,10 +60,10 @@ func Exclusive() SubOption { } } -// WithDisablePrefix turn off querying using key prefixes. -func WithDisablePrefix() SubOption { +// WithExactMatch turn off querying using key prefixes. +func WithExactMatch() SubOption { return func(sub *Subscriber) { - sub.disablePrefix = true + sub.exactMatch = true } }