Skip to content

Commit

Permalink
Merge pull request #394 from Hoofffman/weighted_lb
Browse files Browse the repository at this point in the history
Feat/Weighted lb and Meta info
  • Loading branch information
rayzhang0603 authored Apr 7, 2024
2 parents 938ec49 + 4004aeb commit bc604a1
Show file tree
Hide file tree
Showing 24 changed files with 1,455 additions and 27 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
testing:
strategy:
matrix:
go-version: [1.12.x,1.13.x,1.14.x,1.15.x,1.16.x,1.17.x,1.18.x,1.19.x,1.20.x,1.21.x]
go-version: [1.16.x,1.17.x,1.18.x,1.19.x,1.20.x,1.21.x]
platform: [ubuntu-latest]
runs-on: ${{ matrix.platform }}
steps:
Expand All @@ -45,10 +45,10 @@ jobs:
name: codecov
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.15
- name: Set up Go 1.16
uses: actions/setup-go@v3
with:
go-version: 1.15.x
go-version: 1.16.x
id: go

- name: Checkout code
Expand Down
20 changes: 19 additions & 1 deletion agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"github.com/weibocom/motan-go/endpoint"
vlog "github.com/weibocom/motan-go/log"
"github.com/weibocom/motan-go/meta"
"github.com/weibocom/motan-go/provider"
"gopkg.in/yaml.v2"
"io/ioutil"
"net"
Expand Down Expand Up @@ -193,6 +195,8 @@ func (a *Agent) StartMotanAgentFromConfig(config *cfg.Config) {
return
}
fmt.Println("init agent context success.")
// initialize meta package
meta.Initialize(a.Context)
a.initParam()
a.SetSanpshotConf()
a.initAgentURL()
Expand Down Expand Up @@ -948,7 +952,8 @@ func (a *Agent) doExportService(url *motan.URL) {
}

type serverAgentMessageHandler struct {
providers *motan.CopyOnWriteMap
providers *motan.CopyOnWriteMap
frameworkProviders *motan.CopyOnWriteMap
}

func (sa *serverAgentMessageHandler) GetName() string {
Expand All @@ -973,6 +978,12 @@ func (sa *serverAgentMessageHandler) GetRuntimeInfo() map[string]interface{} {

func (sa *serverAgentMessageHandler) Initialize() {
sa.providers = motan.NewCopyOnWriteMap()
sa.frameworkProviders = motan.NewCopyOnWriteMap()
sa.initFrameworkServiceProvider()
}

func (sa *serverAgentMessageHandler) initFrameworkServiceProvider() {
sa.frameworkProviders.Store(meta.MetaServiceName, &provider.MetaProvider{})
}

func getServiceKey(group, path string) string {
Expand All @@ -990,6 +1001,13 @@ func (sa *serverAgentMessageHandler) Call(request motan.Request) (res motan.Resp
group = request.GetAttachment(motan.GroupKey)
}
serviceKey := getServiceKey(group, request.GetServiceName())
if mfs := request.GetAttachment(mpro.MFrameworkService); mfs != "" {
if fp, ok := sa.frameworkProviders.Load(request.GetServiceName()); ok {
return fp.(motan.Provider).Call(request)
}
//throw specific exception to avoid triggering forced fusing on the client side。
return motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: 501, ErrMsg: motan.ServiceNotSupport, ErrType: motan.ServiceException})
}
if p := sa.providers.LoadOrNil(serviceKey); p != nil {
p := p.(motan.Provider)
res = p.Call(request)
Expand Down
4 changes: 4 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,7 @@ func (m *MCContext) GetRefer(service string) interface{} {
// TODO 对client的封装,可以根据idl自动生成代码时支持
return nil
}

func (m *MCContext) GetContext() *motan.Context {
return m.context
}
3 changes: 3 additions & 0 deletions cluster/motanCluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ func (m *MotanCluster) Destroy() {
vlog.Infof("destroy endpoint %s .", e.GetURL().GetIdentity())
e.Destroy()
}
if d, ok := m.LoadBalance.(motan.Destroyable); ok {
d.Destroy()
}
m.closed = true
}
}
Expand Down
9 changes: 7 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,17 @@ func (c *Config) Int64(key string) (int64, error) {

// String returns the string value for a given key.
func (c *Config) String(key string) string {
return c.GetStringWithDefault(key, "")
}

// String returns the string value for a given key.
func (c *Config) GetStringWithDefault(key string, def string) string {
if value, err := c.getData(key); err != nil {
return ""
return def
} else if vv, ok := value.(string); ok {
return vv
}
return ""
return def
}

// GetSection returns map for the given key
Expand Down
15 changes: 15 additions & 0 deletions core/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,21 @@ const (
DefaultReferVersion = "1.0"
)

// meta info
const (
DefaultMetaPrefix = "META_"
EnvMetaPrefixKey = "envMetaPrefix"
URLRegisterMeta = "registerMeta"
DefaultRegisterMeta = true
MetaCacheExpireSecondKey = "metaCacheExpireSecond"
DynamicMetaKey = "dynamicMeta"
DefaultDynamicMeta = true
WeightRefreshPeriodSecondKey = "weightRefreshPeriodSecond"
WeightMetaSuffixKey = "WEIGHT"
ServiceNotSupport = "service not support"
)


//----------- runtime -------------

const (
Expand Down
1 change: 0 additions & 1 deletion core/globalContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ func (c *Context) basicConfToURLs(section string) map[string]*URL {
if len(finalFilters) > 0 {
newURL.PutParam(FilterKey, c.FilterSetToStr(finalFilters))
}

newURLs[key] = newURL
}
return newURLs
Expand Down
6 changes: 6 additions & 0 deletions core/motan.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ type LoadBalance interface {
SetWeight(weight string)
}

// WeightLoadBalance : weight loadBalance for cluster
type WeightLoadBalance interface {
LoadBalance
NotifyWeightChange()
}

// DiscoverService : discover service for cluster
type DiscoverService interface {
Subscribe(url *URL, listener NotifyListener)
Expand Down
8 changes: 5 additions & 3 deletions core/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,11 @@ func (u *URL) CanServe(other *URL) bool {
vlog.Errorf("can not serve path, err : p1:%s, p2:%s", u.Path, other.Path)
return false
}
if !IsSame(u.Parameters, other.Parameters, SerializationKey, "") {
vlog.Errorf("can not serve serialization, err : s1:%s, s2:%s", u.Parameters[SerializationKey], other.Parameters[SerializationKey])
return false
if u.Protocol != "motan2" {
if !IsSame(u.Parameters, other.Parameters, SerializationKey, "") {
vlog.Errorf("can not serve serialization, err : s1:%s, s2:%s", u.Parameters[SerializationKey], other.Parameters[SerializationKey])
return false
}
}
// compatible with old version: 0.1
if !(IsSame(u.Parameters, other.Parameters, VersionKey, "0.1") || IsSame(u.Parameters, other.Parameters, VersionKey, DefaultReferVersion)) {
Expand Down
25 changes: 25 additions & 0 deletions core/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,24 @@ func SliceShuffle(slice []string) []string {
return slice
}

func EndpointShuffle(slice []EndPoint) []EndPoint {
for i := 0; i < len(slice); i++ {
a := rand.Intn(len(slice))
b := rand.Intn(len(slice))
slice[a], slice[b] = slice[b], slice[a]
}
return slice
}

func ByteSliceShuffle(slice []byte) []byte {
for i := 0; i < len(slice); i++ {
a := rand.Intn(len(slice))
b := rand.Intn(len(slice))
slice[a], slice[b] = slice[b], slice[a]
}
return slice
}

func FirstUpper(s string) string {
r := []rune(s)

Expand Down Expand Up @@ -286,3 +304,10 @@ func ClearDirectEnvRegistry() {
directRpc = nil
initDirectEnv = sync.Once{}
}

func GetNonNegative(originValue int64) int64 {
if originValue > 0 {
return originValue
}
return 0x7fffffffffffffff & originValue
}
6 changes: 6 additions & 0 deletions default.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ func GetDefaultManageHandlers() map[string]http.Handler {
defaultManageHandlers["/registry/list"] = dynamicConfigurer
defaultManageHandlers["/registry/info"] = dynamicConfigurer

metaInfo := &MetaInfo{}
defaultManageHandlers["/meta/update"] = metaInfo
defaultManageHandlers["/meta/delete"] = metaInfo
defaultManageHandlers["/meta/get"] = metaInfo
defaultManageHandlers["/meta/getAll"] = metaInfo

hotReload := &HotReload{}
defaultManageHandlers["/reload/clusters"] = hotReload

Expand Down
91 changes: 91 additions & 0 deletions endpoint/mockDynamicEndpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package endpoint

import (
motan "github.com/weibocom/motan-go/core"
mpro "github.com/weibocom/motan-go/protocol"
"strconv"
"sync"
"sync/atomic"
)

type MockDynamicEndpoint struct {
URL *motan.URL
available bool
DynamicWeight int64
StaticWeight int64
Count int64
dynamicMeta sync.Map
}

func (m *MockDynamicEndpoint) GetName() string {
return "mockEndpoint"
}

func (m *MockDynamicEndpoint) GetURL() *motan.URL {
return m.URL
}

func (m *MockDynamicEndpoint) SetURL(url *motan.URL) {
m.URL = url
}

func (m *MockDynamicEndpoint) IsAvailable() bool {
return m.available
}

func (m *MockDynamicEndpoint) SetAvailable(a bool) {
m.available = a
}

func (m *MockDynamicEndpoint) SetProxy(proxy bool) {}

func (m *MockDynamicEndpoint) SetSerialization(s motan.Serialization) {}

func (m *MockDynamicEndpoint) Call(request motan.Request) motan.Response {
if isMetaServiceRequest(request) {
resMap := make(map[string]string)
m.dynamicMeta.Range(func(key, value interface{}) bool {
resMap[key.(string)] = value.(string)
return true
})
atomic.AddInt64(&m.Count, 1)
return &motan.MotanResponse{ProcessTime: 1, Value: resMap}
}
atomic.AddInt64(&m.Count, 1)
return &motan.MotanResponse{ProcessTime: 1, Value: "ok"}
}

func (m *MockDynamicEndpoint) Destroy() {}

func (m *MockDynamicEndpoint) GetRuntimeInfo() map[string]interface{} {
return make(map[string]interface{})
}

func (m *MockDynamicEndpoint) SetWeight(isDynamic bool, weight int64) {
if isDynamic {
m.DynamicWeight = weight
m.dynamicMeta.Store(motan.DefaultMetaPrefix+motan.WeightMetaSuffixKey, strconv.Itoa(int(weight)))
} else {
m.StaticWeight = weight
m.URL.PutParam(motan.DefaultMetaPrefix+motan.WeightMetaSuffixKey, strconv.Itoa(int(weight)))
}
}

func NewMockDynamicEndpoint(url *motan.URL) *MockDynamicEndpoint {
return &MockDynamicEndpoint{
URL: url,
available: true,
}
}

func NewMockDynamicEndpointWithWeight(url *motan.URL, staticWeight int64) *MockDynamicEndpoint {
res := NewMockDynamicEndpoint(url)
res.StaticWeight = staticWeight
res.URL.PutParam(motan.DefaultMetaPrefix+motan.WeightMetaSuffixKey, strconv.Itoa(int(staticWeight)))
return res
}

func isMetaServiceRequest(request motan.Request) bool {
return request != nil && "com.weibo.api.motan.runtime.meta.MetaService" == request.GetServiceName() &&
"getDynamicMeta" == request.GetMethod() && "y" == request.GetAttachment(mpro.MFrameworkService)
}
2 changes: 1 addition & 1 deletion endpoint/motanCommonEndpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ func (s *Stream) RemoveFromChannel() bool {

// Call send request to the server.
//
// about return: exception in response will record error count, err will not.
// about return: exception in response will record error Count, err will not.
func (c *Channel) Call(req motan.Request, deadline time.Duration, rc *motan.RPCContext) (motan.Response, error) {
stream, err := c.newStream(req, rc, deadline)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/weibocom/motan-go

go 1.11
go 1.16

require (
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
Expand All @@ -16,15 +16,16 @@ require (
github.com/mitchellh/mapstructure v1.1.2
github.com/opentracing/opentracing-go v1.0.2
github.com/panjf2000/ants/v2 v2.9.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
github.com/shirou/gopsutil/v3 v3.21.9
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/stretchr/testify v1.8.2
github.com/stretchr/testify v1.8.4
github.com/valyala/fasthttp v1.2.0
github.com/weibreeze/breeze-go v0.1.1
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/atomic v1.4.0
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0
golang.org/x/net v0.0.0-20201224014010-6772e930b67b
Expand Down
Loading

0 comments on commit bc604a1

Please sign in to comment.