diff --git a/pkg/api/api.go b/pkg/api/api.go index 4843792..5d36754 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -19,10 +19,48 @@ import ( "github.com/opensergo/opensergo-go/pkg/transport/subscribe" ) +type ClientOptions struct { + connectRetryTimes uint +} + +type ClientOption func(*ClientOptions) + +func (clientOptions *ClientOptions) ApplyClientOptions(opts ...ClientOption) { + if len(opts) > 0 { + for _, opt := range opts { + opt(clientOptions) + } + } +} + +func NewDefaultClientOptions() *ClientOptions { + return &ClientOptions{ + connectRetryTimes: 3, + } +} + +func (opts *ClientOptions) ConnectRetryTimes() uint { + return opts.connectRetryTimes +} + +func WithConnectRetryTimes(connectRetryTimes uint) ClientOption { + return func(opts *ClientOptions) { + opts.connectRetryTimes = connectRetryTimes + } +} + // SubscribeOptions represents the options of OpenSergo data subscription. type SubscribeOptions struct { - Subscribers []subscribe.Subscriber - Attachments map[string]interface{} + subscribers []subscribe.Subscriber + attachments map[string]interface{} +} + +func (opts *SubscribeOptions) Subscribers() []subscribe.Subscriber { + return opts.subscribers +} + +func (opts *SubscribeOptions) Attachments() map[string]interface{} { + return opts.attachments } type SubscribeOption func(*SubscribeOptions) @@ -30,20 +68,20 @@ type SubscribeOption func(*SubscribeOptions) // WithSubscriber provides a subscriber. func WithSubscriber(subscriber subscribe.Subscriber) SubscribeOption { return func(opts *SubscribeOptions) { - if opts.Subscribers == nil { - opts.Subscribers = make([]subscribe.Subscriber, 0) + if opts.subscribers == nil { + opts.subscribers = make([]subscribe.Subscriber, 0) } - opts.Subscribers = append(opts.Subscribers, subscriber) + opts.subscribers = append(opts.subscribers, subscriber) } } // WithAttachment provides an attachment (key-value pair). func WithAttachment(key string, value interface{}) SubscribeOption { return func(opts *SubscribeOptions) { - if opts.Attachments == nil { - opts.Attachments = make(map[string]interface{}) + if opts.attachments == nil { + opts.attachments = make(map[string]interface{}) } - opts.Attachments[key] = value + opts.attachments[key] = value } } @@ -51,6 +89,8 @@ func WithAttachment(key string, value interface{}) SubscribeOption { type OpenSergoClient interface { // Start the client. Start() error + // Close the client. + Close() error // SubscribeConfig subscribes data for given subscribe target. SubscribeConfig(key model.SubscribeKey, opts ...SubscribeOption) error // UnsubscribeConfig unsubscribes data for given subscribe target. diff --git a/pkg/client/opensergo_client.go b/pkg/client/opensergo_client.go index f2807a7..1e95002 100644 --- a/pkg/client/opensergo_client.go +++ b/pkg/client/opensergo_client.go @@ -33,8 +33,10 @@ import ( // OpenSergoClient is the client to communicate with opensergo-control-plane. type OpenSergoClient struct { - host string - port uint32 + host string + port uint32 + clientOptions *api.ClientOptions + transportServiceClient transportPb.OpenSergoUniversalTransportServiceClient subscribeConfigStreamPtr atomic.Value // type of value is *client.subscribeConfigStream @@ -48,7 +50,10 @@ type OpenSergoClient struct { } // NewOpenSergoClient returns an instance of OpenSergoClient, and init some properties. -func NewOpenSergoClient(host string, port uint32) (*OpenSergoClient, error) { +func NewOpenSergoClient(host string, port uint32, opts ...api.ClientOption) (*OpenSergoClient, error) { + clientOptions := api.NewDefaultClientOptions() + clientOptions.ApplyClientOptions(opts...) + address := host + ":" + strconv.FormatUint(uint64(port), 10) clientConn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { @@ -59,6 +64,7 @@ func NewOpenSergoClient(host string, port uint32) (*OpenSergoClient, error) { openSergoClient := &OpenSergoClient{ host: host, port: port, + clientOptions: clientOptions, transportServiceClient: transportServiceClient, subscribeDataCache: &subscribe.SubscribeDataCache{}, subscriberRegistry: &subscribe.SubscriberRegistry{}, @@ -156,8 +162,8 @@ func (c *OpenSergoClient) SubscribeConfig(subscribeKey model.SubscribeKey, opts } // Register subscribers. - if len(options.Subscribers) > 0 { - for _, subscriber := range options.Subscribers { + if len(options.Subscribers()) > 0 { + for _, subscriber := range options.Subscribers() { c.subscriberRegistry.RegisterSubscriber(subscribeKey, subscriber) } } diff --git a/samples/main/main.go b/samples/main/main.go index 251262e..7885c77 100644 --- a/samples/main/main.go +++ b/samples/main/main.go @@ -15,9 +15,10 @@ package main import ( + "log" + "github.com/opensergo/opensergo-go/pkg/client" "github.com/opensergo/opensergo-go/pkg/common/logging" - "log" "github.com/opensergo/opensergo-go/pkg/api" "github.com/opensergo/opensergo-go/pkg/configkind" @@ -45,7 +46,7 @@ func StartAndSubscribeOpenSergoConfig() error { // logging.NewFileLogger("./opensergo-universal-transport-service.log", logging.InfoLevel, logging.JsonFormat, true) // Create a OpenSergoClient. - openSergoClient, err := client.NewOpenSergoClient("127.0.0.1", 10246) + openSergoClient, err := client.NewOpenSergoClient("127.0.0.1", 10246, api.WithConnectRetryTimes(5)) if err != nil { return err }