From 2ce2f5a840a2930f3ad7b1fc3c092f2b82196a15 Mon Sep 17 00:00:00 2001 From: liaochuntao Date: Thu, 16 Nov 2023 18:05:09 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8DhealthcheckLeader?= =?UTF-8?q?=E5=81=B6=E5=8F=91=E5=8D=A1=E6=AD=BB=E9=97=AE=E9=A2=98=20(#1289?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugin/healthchecker/leader/checker_leader.go | 24 ++------- plugin/healthchecker/leader/peer.go | 50 ++++++++++--------- version | 2 +- 3 files changed, 30 insertions(+), 46 deletions(-) diff --git a/plugin/healthchecker/leader/checker_leader.go b/plugin/healthchecker/leader/checker_leader.go index 381aca21b..97e474e42 100644 --- a/plugin/healthchecker/leader/checker_leader.go +++ b/plugin/healthchecker/leader/checker_leader.go @@ -105,10 +105,6 @@ type LeaderHealthChecker struct { self Peer // s store.Store s store.Store - // putBatchCtrl 批任务执行器 - putBatchCtrl *batchjob.BatchController - // getBatchCtrl 批任务执行器 - getBatchCtrl *batchjob.BatchController // subCtx subCtx *eventhub.SubscribtionContext } @@ -146,22 +142,6 @@ func (c *LeaderHealthChecker) Initialize(entry *plugin.ConfigEntry) error { if err := c.s.StartLeaderElection(electionKey); err != nil { return err } - c.getBatchCtrl = batchjob.NewBatchController(context.Background(), batchjob.CtrlConfig{ - Label: "RecordGetter", - QueueSize: conf.Batch.QueueSize, - WaitTime: conf.Batch.WaitTime, - MaxBatchCount: conf.Batch.MaxBatchCount, - Concurrency: conf.Batch.Concurrency, - Handler: c.handleSendGetRecords, - }) - c.putBatchCtrl = batchjob.NewBatchController(context.Background(), batchjob.CtrlConfig{ - Label: "RecordPutter", - QueueSize: conf.Batch.QueueSize, - WaitTime: conf.Batch.WaitTime, - MaxBatchCount: conf.Batch.MaxBatchCount, - Concurrency: conf.Batch.Concurrency, - Handler: c.handleSendPutRecords, - }) registerMetrics() return nil } @@ -235,6 +215,7 @@ func (c *LeaderHealthChecker) becomeFollower(e store.LeaderChangeEvent, leaderVe remoteLeader := NewRemotePeerFunc() remoteLeader.Initialize(*c.conf) if err := remoteLeader.Serve(context.Background(), c, e.LeaderHost, uint32(utils.LocalPort)); err != nil { + _ = remoteLeader.Close() plog.Error("[HealthCheck][Leader] follower run serve, do retry", zap.Error(err)) go func(e store.LeaderChangeEvent, leaderVersion int64) { time.Sleep(time.Second) @@ -267,7 +248,8 @@ func (c *LeaderHealthChecker) Type() plugin.HealthCheckType { // Report process heartbeat info report func (c *LeaderHealthChecker) Report(ctx context.Context, request *plugin.ReportRequest) error { - if isSendFromPeer(ctx) { + if !c.isLeader() && isSendFromPeer(ctx) { + plog.Error("[Health Check][Leader] follower checker receive other follower request") return ErrorRedirectOnlyOnce } diff --git a/plugin/healthchecker/leader/peer.go b/plugin/healthchecker/leader/peer.go index dd7e31847..563694bc6 100644 --- a/plugin/healthchecker/leader/peer.go +++ b/plugin/healthchecker/leader/peer.go @@ -132,10 +132,6 @@ type RemotePeer struct { port uint32 // Conn grpc connection conns []*grpc.ClientConn - // putBatchCtrl 批任务执行器 - putBatchCtrl *batchjob.BatchController - // getBatchCtrl 批任务执行器 - getBatchCtrl *batchjob.BatchController // Puters 批量心跳发送, 由于一个 stream 对于 server 是一个 goroutine,为了加快 follower 发往 leader 的效率 // 这里采用多个 Putter Client 创建多个 Stream puters []*beatSender @@ -185,12 +181,8 @@ func (p *RemotePeer) Serve(_ context.Context, checker *LeaderHealthChecker, _ = p.Close() return err } - p.puters = append(p.puters, &beatSender{ - sender: puter, - }) + p.puters = append(p.puters, newBeatSender(ctx, p, puter)) } - p.getBatchCtrl = checker.getBatchCtrl - p.putBatchCtrl = checker.putBatchCtrl p.Cache = newRemoteBeatRecordCache(p.GetFunc, p.PutFunc, p.DelFunc) return nil } @@ -201,26 +193,14 @@ func (p *RemotePeer) Host() string { // Get get records func (p *RemotePeer) Get(key string) (*ReadBeatRecord, error) { - future := p.getBatchCtrl.SubmitWithTimeout(&PeerTask{ - Key: key, - Peer: p, - }, time.Second) - resp, err := future.DoneTimeout(time.Second) - if err != nil { - return nil, err - } - ret := resp.(map[string]*ReadBeatRecord) + ret := p.Cache.Get(key) return ret[key], nil } // Put put records func (p *RemotePeer) Put(record WriteBeatRecord) error { - future := p.putBatchCtrl.SubmitWithTimeout(&PeerTask{ - Record: &record, - Peer: p, - }, time.Second) - _, err := future.DoneTimeout(time.Second) - return err + p.Cache.Put(record) + return nil } // Del del records @@ -343,6 +323,28 @@ type beatSender struct { sender apiservice.PolarisHeartbeatGRPC_BatchHeartbeatClient } +func newBeatSender(ctx context.Context, p *RemotePeer, sender apiservice.PolarisHeartbeatGRPC_BatchHeartbeatClient) *beatSender { + go func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + plog.Info("[HealthCheck][Leader] cancel receive put record result", zap.String("host", p.Host()), + zap.Uint32("port", p.port)) + return + default: + if _, err := sender.Recv(); err != nil { + plog.Error("[HealthCheck][Leader] receive put record result", zap.String("host", p.Host()), + zap.Uint32("port", p.port), zap.Error(err)) + } + } + } + }(ctx) + + return &beatSender{ + sender: sender, + } +} + func (s *beatSender) Send(req *apiservice.HeartbeatsRequest) error { s.lock.Lock() defer s.lock.Unlock() diff --git a/version b/version index 3d8c1fb76..9fe35360a 100644 --- a/version +++ b/version @@ -1 +1 @@ -v1.17.7 +v1.17.8