From 870352b3afd4ec08425fa4fd3e11818f7822fc9e Mon Sep 17 00:00:00 2001 From: wuhua3 Date: Mon, 6 May 2024 17:21:06 +0800 Subject: [PATCH] fix:heartbeat not stop --- endpoint/motanCommonEndpoint.go | 15 +++++++++------ endpoint/motanCommonEndpoint_test.go | 9 ++++++--- endpoint/motanEndpoint_test.go | 24 ++++++++++++++++-------- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/endpoint/motanCommonEndpoint.go b/endpoint/motanCommonEndpoint.go index 8f6a87a0..be0bfa91 100644 --- a/endpoint/motanCommonEndpoint.go +++ b/endpoint/motanCommonEndpoint.go @@ -37,7 +37,7 @@ type MotanCommonEndpoint struct { channels *ChannelPool destroyed bool destroyCh chan struct{} - available bool + available atomic.Value errorCount uint32 proxy bool errorCountThreshold int64 @@ -68,7 +68,7 @@ func (m *MotanCommonEndpoint) GetRuntimeInfo() map[string]interface{} { } func (m *MotanCommonEndpoint) setAvailable(available bool) { - m.available = available + m.available.Store(available) } func (m *MotanCommonEndpoint) SetSerialization(s motan.Serialization) { @@ -306,7 +306,9 @@ func (m *MotanCommonEndpoint) keepalive() { if atomic.LoadUint32(&m.keepaliveType) == KeepaliveProfile { m.profile() } else { - m.heartbeat() + if m.heartbeat() { + return + } } case <-m.destroyCh: return @@ -314,7 +316,7 @@ func (m *MotanCommonEndpoint) keepalive() { } } -func (m *MotanCommonEndpoint) heartbeat() { +func (m *MotanCommonEndpoint) heartbeat() bool { if channel, err := m.channels.Get(); err != nil { vlog.Infof("[keepalive] failed. url:%s, err:%s", m.url.GetIdentity(), err.Error()) } else { @@ -323,10 +325,11 @@ func (m *MotanCommonEndpoint) heartbeat() { m.setAvailable(true) m.resetErr() vlog.Infof("[keepalive] heartbeat success. url: %s", m.url.GetIdentity()) - return + return true } vlog.Infof("[keepalive] heartbeat failed. url:%s, err:%s", m.url.GetIdentity(), err.Error()) } + return false } func (m *MotanCommonEndpoint) profile() { @@ -357,7 +360,7 @@ func (m *MotanCommonEndpoint) SetURL(url *motan.URL) { } func (m *MotanCommonEndpoint) IsAvailable() bool { - return m.available + return m.available.Load().(bool) } type Channel struct { diff --git a/endpoint/motanCommonEndpoint_test.go b/endpoint/motanCommonEndpoint_test.go index 47d97540..f36da532 100644 --- a/endpoint/motanCommonEndpoint_test.go +++ b/endpoint/motanCommonEndpoint_test.go @@ -56,7 +56,7 @@ func TestV1RecordErrEmptyThreshold(t *testing.T) { func TestV1RecordErrWithErrThreshold(t *testing.T) { url := &motan.URL{Port: 8989, Protocol: "motan"} - url.PutParam(motan.TimeOutKey, "100") + url.PutParam(motan.TimeOutKey, "1100") url.PutParam(motan.ErrorCountThresholdKey, "5") url.PutParam(motan.ClientConnectionKey, "1") url.PutParam(motan.AsyncInitConnection, "false") @@ -69,12 +69,15 @@ func TestV1RecordErrWithErrThreshold(t *testing.T) { for j := 0; j < 10; j++ { request := &motan.MotanRequest{ServiceName: "test", Method: "test"} request.Attachment = motan.NewStringMap(0) + request.Attachment.Store("exception", "other_exception") ep.Call(request) if j < 4 { assert.True(t, ep.IsAvailable()) } else { - assert.False(t, ep.IsAvailable()) assert.Equal(t, KeepaliveHeartbeat, atomic.LoadUint32(&ep.keepaliveType)) + time.Sleep(ep.keepaliveInterval * 2) + assert.True(t, ep.IsAvailable()) + assert.False(t, ep.keepaliveRunning.Load().(bool)) } } <-ep.channels.channels @@ -90,7 +93,7 @@ func TestV1RecordErrWithErrThreshold(t *testing.T) { func TestNotFoundProviderCircuitBreaker(t *testing.T) { url := &motan.URL{Port: 8989, Protocol: "motan"} - url.PutParam(motan.TimeOutKey, "2000") + url.PutParam(motan.TimeOutKey, "1200") url.PutParam(motan.ErrorCountThresholdKey, "5") url.PutParam(motan.ClientConnectionKey, "10") url.PutParam(motan.AsyncInitConnection, "false") diff --git a/endpoint/motanEndpoint_test.go b/endpoint/motanEndpoint_test.go index 3897a9fa..809facf8 100644 --- a/endpoint/motanEndpoint_test.go +++ b/endpoint/motanEndpoint_test.go @@ -319,15 +319,17 @@ func handle(netListen net.Listener) { } func handleConnection(conn net.Conn, timeout int) { - reader := bufio.NewReader(conn) - decodeBuf := make([]byte, 100) - msg, err := protocol.Decode(reader, &decodeBuf) - if err != nil { - time.Sleep(time.Millisecond * 1000) - conn.Close() - return + for { + reader := bufio.NewReader(conn) + decodeBuf := make([]byte, 100) + msg, err := protocol.Decode(reader, &decodeBuf) + if err != nil { + time.Sleep(time.Millisecond * 1000) + conn.Close() + return + } + processMsg(msg, conn) } - processMsg(msg, conn) } func processMsg(msg *protocol.Message, conn net.Conn) { @@ -352,6 +354,12 @@ func processMsg(msg *protocol.Message, conn net.Conn) { ErrCode: motan.EProviderNotExist, ErrMsg: motan.ProviderNotExistPrefix, ErrType: motan.ServiceException}) + case "other_exception": + resp = motan.BuildExceptionResponse(lastRequestID, + &motan.Exception{ + ErrCode: motan.EUnkonwnMsg, + ErrMsg: "exception", + ErrType: motan.ServiceException}) default: resp = &motan.MotanResponse{ RequestID: lastRequestID,