Skip to content

Commit

Permalink
fix:修复healthcheckLeader偶发卡死问题 (#1289)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Nov 16, 2023
1 parent 05a1a37 commit 2ce2f5a
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 46 deletions.
24 changes: 3 additions & 21 deletions plugin/healthchecker/leader/checker_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,6 @@ type LeaderHealthChecker struct {
self Peer
// s store.Store
s store.Store
// putBatchCtrl 批任务执行器
putBatchCtrl *batchjob.BatchController
// getBatchCtrl 批任务执行器
getBatchCtrl *batchjob.BatchController
// subCtx
subCtx *eventhub.SubscribtionContext
}
Expand Down Expand Up @@ -146,22 +142,6 @@ func (c *LeaderHealthChecker) Initialize(entry *plugin.ConfigEntry) error {
if err := c.s.StartLeaderElection(electionKey); err != nil {
return err
}
c.getBatchCtrl = batchjob.NewBatchController(context.Background(), batchjob.CtrlConfig{
Label: "RecordGetter",
QueueSize: conf.Batch.QueueSize,
WaitTime: conf.Batch.WaitTime,
MaxBatchCount: conf.Batch.MaxBatchCount,
Concurrency: conf.Batch.Concurrency,
Handler: c.handleSendGetRecords,
})
c.putBatchCtrl = batchjob.NewBatchController(context.Background(), batchjob.CtrlConfig{
Label: "RecordPutter",
QueueSize: conf.Batch.QueueSize,
WaitTime: conf.Batch.WaitTime,
MaxBatchCount: conf.Batch.MaxBatchCount,
Concurrency: conf.Batch.Concurrency,
Handler: c.handleSendPutRecords,
})
registerMetrics()
return nil
}
Expand Down Expand Up @@ -235,6 +215,7 @@ func (c *LeaderHealthChecker) becomeFollower(e store.LeaderChangeEvent, leaderVe
remoteLeader := NewRemotePeerFunc()
remoteLeader.Initialize(*c.conf)
if err := remoteLeader.Serve(context.Background(), c, e.LeaderHost, uint32(utils.LocalPort)); err != nil {
_ = remoteLeader.Close()
plog.Error("[HealthCheck][Leader] follower run serve, do retry", zap.Error(err))
go func(e store.LeaderChangeEvent, leaderVersion int64) {
time.Sleep(time.Second)
Expand Down Expand Up @@ -267,7 +248,8 @@ func (c *LeaderHealthChecker) Type() plugin.HealthCheckType {

// Report process heartbeat info report
func (c *LeaderHealthChecker) Report(ctx context.Context, request *plugin.ReportRequest) error {
if isSendFromPeer(ctx) {
if !c.isLeader() && isSendFromPeer(ctx) {
plog.Error("[Health Check][Leader] follower checker receive other follower request")
return ErrorRedirectOnlyOnce
}

Expand Down
50 changes: 26 additions & 24 deletions plugin/healthchecker/leader/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,6 @@ type RemotePeer struct {
port uint32
// Conn grpc connection
conns []*grpc.ClientConn
// putBatchCtrl 批任务执行器
putBatchCtrl *batchjob.BatchController
// getBatchCtrl 批任务执行器
getBatchCtrl *batchjob.BatchController
// Puters 批量心跳发送, 由于一个 stream 对于 server 是一个 goroutine,为了加快 follower 发往 leader 的效率
// 这里采用多个 Putter Client 创建多个 Stream
puters []*beatSender
Expand Down Expand Up @@ -185,12 +181,8 @@ func (p *RemotePeer) Serve(_ context.Context, checker *LeaderHealthChecker,
_ = p.Close()
return err
}
p.puters = append(p.puters, &beatSender{
sender: puter,
})
p.puters = append(p.puters, newBeatSender(ctx, p, puter))
}
p.getBatchCtrl = checker.getBatchCtrl
p.putBatchCtrl = checker.putBatchCtrl
p.Cache = newRemoteBeatRecordCache(p.GetFunc, p.PutFunc, p.DelFunc)
return nil
}
Expand All @@ -201,26 +193,14 @@ func (p *RemotePeer) Host() string {

// Get get records
func (p *RemotePeer) Get(key string) (*ReadBeatRecord, error) {
future := p.getBatchCtrl.SubmitWithTimeout(&PeerTask{
Key: key,
Peer: p,
}, time.Second)
resp, err := future.DoneTimeout(time.Second)
if err != nil {
return nil, err
}
ret := resp.(map[string]*ReadBeatRecord)
ret := p.Cache.Get(key)
return ret[key], nil
}

// Put put records
func (p *RemotePeer) Put(record WriteBeatRecord) error {
future := p.putBatchCtrl.SubmitWithTimeout(&PeerTask{
Record: &record,
Peer: p,
}, time.Second)
_, err := future.DoneTimeout(time.Second)
return err
p.Cache.Put(record)
return nil
}

// Del del records
Expand Down Expand Up @@ -343,6 +323,28 @@ type beatSender struct {
sender apiservice.PolarisHeartbeatGRPC_BatchHeartbeatClient
}

func newBeatSender(ctx context.Context, p *RemotePeer, sender apiservice.PolarisHeartbeatGRPC_BatchHeartbeatClient) *beatSender {
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
plog.Info("[HealthCheck][Leader] cancel receive put record result", zap.String("host", p.Host()),
zap.Uint32("port", p.port))
return
default:
if _, err := sender.Recv(); err != nil {
plog.Error("[HealthCheck][Leader] receive put record result", zap.String("host", p.Host()),
zap.Uint32("port", p.port), zap.Error(err))
}
}
}
}(ctx)

return &beatSender{
sender: sender,
}
}

func (s *beatSender) Send(req *apiservice.HeartbeatsRequest) error {
s.lock.Lock()
defer s.lock.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.17.7
v1.17.8

0 comments on commit 2ce2f5a

Please sign in to comment.