Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go sdk support user define app labels #750

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions clients/config_client/config_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package config_client
import (
"context"
"errors"
"fmt"
"github.com/nacos-group/nacos-sdk-go/v2/util"
"testing"

Expand All @@ -45,6 +46,7 @@ var clientConfigWithOptions = constant.NewClientConfig(
constant.WithOpenKMS(true),
constant.WithKMSVersion(constant.KMSv1),
constant.WithRegionId("cn-hangzhou"),
constant.WithAppConnLabels(map[string]string{"key1": "value1", "key2": "value2", "key3": "value3"}),
)

var clientTLsConfigWithOptions = constant.NewClientConfig(
Expand Down Expand Up @@ -310,6 +312,8 @@ func TestListen(t *testing.T) {
DataId: localConfigTest.DataId,
Group: localConfigTest.Group,
OnChange: func(namespace, group, dataId, data string) {
fmt.Printf("receive content : %s\n", data)

},
})
assert.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion clients/config_client/config_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (cp *ConfigProxy) createRpcClient(ctx context.Context, taskId string, clien
"taskId": taskId,
}

iRpcClient, _ := rpc.CreateClient(ctx, "config-"+taskId+"-"+client.uid, rpc.GRPC, labels, cp.nacosServer, &cp.clientConfig.TLSCfg)
iRpcClient, _ := rpc.CreateClient(ctx, "config-"+taskId+"-"+client.uid, rpc.GRPC, labels, cp.nacosServer, &cp.clientConfig.TLSCfg, cp.clientConfig.AppConnLabels)
rpcClient := iRpcClient.GetRpcClient()
if rpcClient.IsInitialized() {
rpcClient.RegisterServerRequestHandler(func() rpc_request.IRequest {
Expand Down
2 changes: 1 addition & 1 deletion clients/naming_client/naming_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var clientConfigTest = *constant.NewClientConfig(
constant.WithNotLoadCacheAtStart(true),
)

var serverConfigTest = *constant.NewServerConfig("127.0.0.1", 80, constant.WithContextPath("/nacos"))
var serverConfigTest = *constant.NewServerConfig("mse-xxx-p.nacos-ans.mse.aliyuncs.com", 8848, constant.WithContextPath("/nacos"))

type MockNamingProxy struct {
}
Expand Down
2 changes: 1 addition & 1 deletion clients/naming_client/naming_grpc/naming_grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewNamingGrpcProxy(ctx context.Context, clientCfg constant.ClientConfig, na
constant.LABEL_MODULE: constant.LABEL_MODULE_NAMING,
}

iRpcClient, err := rpc.CreateClient(ctx, uid.String(), rpc.GRPC, labels, srvProxy.nacosServer, &clientCfg.TLSCfg)
iRpcClient, err := rpc.CreateClient(ctx, uid.String(), rpc.GRPC, labels, srvProxy.nacosServer, &clientCfg.TLSCfg, clientCfg.AppConnLabels)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions common/constant/client_config_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,9 @@ func WithTLS(tlsCfg TLSConfig) ClientOption {
config.TLSCfg = tlsCfg
}
}

func WithAppConnLabels(appConnLabels map[string]string) ClientOption {
return func(config *ClientConfig) {
config.AppConnLabels = appConnLabels
}
}
1 change: 1 addition & 0 deletions common/constant/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type ClientConfig struct {
EndpointContextPath string // the address server endpoint contextPath
EndpointQueryParams string // the address server endpoint query params
ClusterName string // the address server clusterName
AppConnLabels map[string]string // app conn labels
}

type ClientLogSamplingConfig struct {
Expand Down
104 changes: 103 additions & 1 deletion common/remote/rpc/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package rpc

import (
"context"
"fmt"
"math"
"os"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -147,24 +150,123 @@ func getClient(clientName string) IRpcClient {
return clientMap[clientName]
}

func CreateClient(ctx context.Context, clientName string, connectionType ConnectionType, labels map[string]string, nacosServer *nacos_server.NacosServer, tlsConfig *constant.TLSConfig) (IRpcClient, error) {
func CreateClient(ctx context.Context, clientName string, connectionType ConnectionType, labels map[string]string, nacosServer *nacos_server.NacosServer, tlsConfig *constant.TLSConfig, appConnLabels map[string]string) (IRpcClient, error) {
cMux.Lock()
defer cMux.Unlock()
if _, ok := clientMap[clientName]; !ok {
logger.Infof("init rpc client for name ", clientName)
var rpcClient IRpcClient
if GRPC == connectionType {
rpcClient = NewGrpcClient(ctx, clientName, nacosServer, tlsConfig)
}
if rpcClient == nil {
return nil, errors.New("unsupported connection type")
}

logger.Infof("get app conn labels from client config ", appConnLabels)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

日志需要格式化的占位符,才能正确的格式打印出来appConnlabels

appConnLabelsEnv := getAppLabelsFromEnv()
logger.Infof("get app conn labels from env ", appConnLabelsEnv)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上


appConnLabelsFinal := mergerAppLabels(appConnLabels, appConnLabelsEnv)
logger.Infof("final app conn labels : ", appConnLabelsFinal)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上


appConnLabelsFinal = addPrefixForEachKey(appConnLabelsFinal, "app_")
if appConnLabelsFinal != nil && len(appConnLabelsFinal) != 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

只判断len(appConnLabelsFinal) != 0 就可以了

rpcClient.putAllLabels(appConnLabelsFinal)
}

rpcClient.putAllLabels(labels)
clientMap[clientName] = rpcClient
return rpcClient, nil
}
return clientMap[clientName], nil
}

func mergerAppLabels(appLabelsAppointed map[string]string, appLabelsEnv map[string]string) map[string]string {
preferred := strings.ToLower(os.Getenv("nacos_app_conn_labels_preferred"))

var preferFirst bool
if preferred != "env" {
preferFirst = true
} else {
preferFirst = false
}
return mergeMaps(appLabelsAppointed, appLabelsEnv, preferFirst)
}

func mergeMaps(map1, map2 map[string]string, preferFirst bool) map[string]string {
result := make(map[string]string)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

初始化建议给容量值,如果不确定容量大小,建议设置8


for k, v := range map1 {
result[k] = v
}

for k, v := range map2 {
if preferFirst && map1[k] != "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map 通常判断是否有值,用 _,ok:=map1[k]

continue
}
result[k] = v
}

return result
}

func getAppLabelsFromEnv() map[string]string {
configMap := make(map[string]string)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

容量


// nacos_config_gray_label
grayLabel := os.Getenv("nacos_config_gray_label")
if grayLabel != "" {
configMap["nacos_config_gray_label"] = grayLabel
}

// nacos_app_conn_labels
connLabels := os.Getenv("nacos_app_conn_labels")
if connLabels != "" {
labelsMap := parseLabels(connLabels)
for k, v := range labelsMap {
configMap[k] = v
}
}

return configMap
}

func parseLabels(rawLabels string) map[string]string {
if strings.TrimSpace(rawLabels) == "" {
return make(map[string]string, 2)
}

resultMap := make(map[string]string, 2)
labels := strings.Split(rawLabels, ",")
for _, label := range labels {
if strings.TrimSpace(label) != "" {
kv := strings.Split(label, "=")
if len(kv) == 2 {
resultMap[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1])
} else {
fmt.Println("unknown label format:", label)
}
}
}
return resultMap
}

func addPrefixForEachKey(m map[string]string, prefix string) map[string]string {
if m == nil || len(m) == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

判断len(m) == 0 就可以

return m
}

newMap := make(map[string]string, len(m))
for k, v := range m {
if strings.TrimSpace(k) != "" {
newKey := prefix + k
newMap[newKey] = v
}
}
return newMap
}

func (r *RpcClient) Start() {
if ok := atomic.CompareAndSwapInt32((*int32)(&r.rpcClientStatus), (int32)(INITIALIZED), (int32)(STARTING)); !ok {
return
Expand Down
Loading