Skip to content

Commit

Permalink
fix:heartbeat not stop
Browse files Browse the repository at this point in the history
  • Loading branch information
wuhua3 committed May 6, 2024
1 parent f4df51e commit 870352b
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 17 deletions.
15 changes: 9 additions & 6 deletions endpoint/motanCommonEndpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type MotanCommonEndpoint struct {
channels *ChannelPool
destroyed bool
destroyCh chan struct{}
available bool
available atomic.Value
errorCount uint32
proxy bool
errorCountThreshold int64
Expand Down Expand Up @@ -68,7 +68,7 @@ func (m *MotanCommonEndpoint) GetRuntimeInfo() map[string]interface{} {
}

func (m *MotanCommonEndpoint) setAvailable(available bool) {
m.available = available
m.available.Store(available)
}

func (m *MotanCommonEndpoint) SetSerialization(s motan.Serialization) {
Expand Down Expand Up @@ -306,15 +306,17 @@ func (m *MotanCommonEndpoint) keepalive() {
if atomic.LoadUint32(&m.keepaliveType) == KeepaliveProfile {
m.profile()
} else {
m.heartbeat()
if m.heartbeat() {
return
}
}
case <-m.destroyCh:
return
}
}
}

func (m *MotanCommonEndpoint) heartbeat() {
func (m *MotanCommonEndpoint) heartbeat() bool {
if channel, err := m.channels.Get(); err != nil {
vlog.Infof("[keepalive] failed. url:%s, err:%s", m.url.GetIdentity(), err.Error())
} else {
Expand All @@ -323,10 +325,11 @@ func (m *MotanCommonEndpoint) heartbeat() {
m.setAvailable(true)
m.resetErr()
vlog.Infof("[keepalive] heartbeat success. url: %s", m.url.GetIdentity())
return
return true
}
vlog.Infof("[keepalive] heartbeat failed. url:%s, err:%s", m.url.GetIdentity(), err.Error())
}
return false
}

func (m *MotanCommonEndpoint) profile() {
Expand Down Expand Up @@ -357,7 +360,7 @@ func (m *MotanCommonEndpoint) SetURL(url *motan.URL) {
}

func (m *MotanCommonEndpoint) IsAvailable() bool {
return m.available
return m.available.Load().(bool)
}

type Channel struct {
Expand Down
9 changes: 6 additions & 3 deletions endpoint/motanCommonEndpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestV1RecordErrEmptyThreshold(t *testing.T) {

func TestV1RecordErrWithErrThreshold(t *testing.T) {
url := &motan.URL{Port: 8989, Protocol: "motan"}
url.PutParam(motan.TimeOutKey, "100")
url.PutParam(motan.TimeOutKey, "1100")
url.PutParam(motan.ErrorCountThresholdKey, "5")
url.PutParam(motan.ClientConnectionKey, "1")
url.PutParam(motan.AsyncInitConnection, "false")
Expand All @@ -69,12 +69,15 @@ func TestV1RecordErrWithErrThreshold(t *testing.T) {
for j := 0; j < 10; j++ {
request := &motan.MotanRequest{ServiceName: "test", Method: "test"}
request.Attachment = motan.NewStringMap(0)
request.Attachment.Store("exception", "other_exception")
ep.Call(request)
if j < 4 {
assert.True(t, ep.IsAvailable())
} else {
assert.False(t, ep.IsAvailable())
assert.Equal(t, KeepaliveHeartbeat, atomic.LoadUint32(&ep.keepaliveType))
time.Sleep(ep.keepaliveInterval * 2)
assert.True(t, ep.IsAvailable())
assert.False(t, ep.keepaliveRunning.Load().(bool))
}
}
<-ep.channels.channels
Expand All @@ -90,7 +93,7 @@ func TestV1RecordErrWithErrThreshold(t *testing.T) {

func TestNotFoundProviderCircuitBreaker(t *testing.T) {
url := &motan.URL{Port: 8989, Protocol: "motan"}
url.PutParam(motan.TimeOutKey, "2000")
url.PutParam(motan.TimeOutKey, "1200")
url.PutParam(motan.ErrorCountThresholdKey, "5")
url.PutParam(motan.ClientConnectionKey, "10")
url.PutParam(motan.AsyncInitConnection, "false")
Expand Down
24 changes: 16 additions & 8 deletions endpoint/motanEndpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,17 @@ func handle(netListen net.Listener) {
}

func handleConnection(conn net.Conn, timeout int) {
reader := bufio.NewReader(conn)
decodeBuf := make([]byte, 100)
msg, err := protocol.Decode(reader, &decodeBuf)
if err != nil {
time.Sleep(time.Millisecond * 1000)
conn.Close()
return
for {
reader := bufio.NewReader(conn)
decodeBuf := make([]byte, 100)
msg, err := protocol.Decode(reader, &decodeBuf)
if err != nil {
time.Sleep(time.Millisecond * 1000)
conn.Close()
return
}
processMsg(msg, conn)
}
processMsg(msg, conn)
}

func processMsg(msg *protocol.Message, conn net.Conn) {
Expand All @@ -352,6 +354,12 @@ func processMsg(msg *protocol.Message, conn net.Conn) {
ErrCode: motan.EProviderNotExist,
ErrMsg: motan.ProviderNotExistPrefix,
ErrType: motan.ServiceException})
case "other_exception":
resp = motan.BuildExceptionResponse(lastRequestID,
&motan.Exception{
ErrCode: motan.EUnkonwnMsg,
ErrMsg: "exception",
ErrType: motan.ServiceException})
default:
resp = &motan.MotanResponse{
RequestID: lastRequestID,
Expand Down

0 comments on commit 870352b

Please sign in to comment.