diff --git a/maintainer/barrier.go b/maintainer/barrier.go new file mode 100644 index 000000000..9ecf4bc16 --- /dev/null +++ b/maintainer/barrier.go @@ -0,0 +1,264 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package maintainer + +import ( + "github.com/flowbehappy/tigate/heartbeatpb" + "github.com/flowbehappy/tigate/pkg/common" + "github.com/flowbehappy/tigate/pkg/messaging" + "github.com/flowbehappy/tigate/scheduler" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/errors" +) + +type Barrier struct { + blockedTs map[uint64]*BlockEvent + scheduler *Scheduler + // if maintainer is down, the barrier will be re-built, so we can use the dispatcher as the key + blockedDispatcher map[common.DispatcherID]*BlockEvent +} + +func NewBarrier(scheduler *Scheduler) *Barrier { + return &Barrier{ + blockedTs: make(map[uint64]*BlockEvent), + blockedDispatcher: make(map[common.DispatcherID]*BlockEvent), + scheduler: scheduler, + } +} + +func (b *Barrier) HandleStatus(from messaging.ServerId, + request *heartbeatpb.HeartBeatRequest) ([]*messaging.TargetMessage, error) { + var msgs []*messaging.TargetMessage + var dispatcherStatus []*heartbeatpb.DispatcherStatus + for _, status := range request.Statuses { + msgsList, resp, err := b.handleOneStatus(request.ChangefeedID, status) + if err != nil { + return nil, err + } + if resp != nil { + dispatcherStatus = append(dispatcherStatus, resp) + } + if msgsList != nil { + msgs = append(msgs, msgsList...) + } + } + // send ack message to dispatcher + if len(dispatcherStatus) > 0 { + msgs = append(msgs, messaging.NewSingleTargetMessage(from, + messaging.HeartbeatCollectorTopic, + &heartbeatpb.HeartBeatResponse{ + ChangefeedID: request.ChangefeedID, + DispatcherStatuses: dispatcherStatus, + })) + } + return msgs, nil +} + +func (b *Barrier) Resend() []*messaging.TargetMessage { + // todo add resend logic + return nil +} + +func (b *Barrier) handleOneStatus(changefeedID string, status *heartbeatpb.TableSpanStatus) ([]*messaging.TargetMessage, *heartbeatpb.DispatcherStatus, error) { + dispatcherID := common.NewDispatcherIDFromPB(status.ID) + var ( + msgs []*messaging.TargetMessage + distacherStatus *heartbeatpb.DispatcherStatus + err error + ) + if status.State == nil { + msgs, err = b.handleNoStateHeartbeat(dispatcherID, status.CheckpointTs) + return msgs, distacherStatus, err + } else { + msgs, distacherStatus, err = b.handleStateHeartbeat(changefeedID, dispatcherID, status) + } + return msgs, distacherStatus, nil +} + +func (b *Barrier) handleNoStateHeartbeat(dispatcherID common.DispatcherID, checkpointTs uint64) ([]*messaging.TargetMessage, error) { + event, ok := b.blockedDispatcher[dispatcherID] + // no block event found + if !ok { + return nil, nil + } + var ( + err error + msgs []*messaging.TargetMessage + ) + // there is a block event and the dispatcher advanced its checkpoint ts + // which means we have sent pass or write action to it + if checkpointTs > event.commitTs { + // the writer already synced ddl to downstream + if event.selectedDispatcher == dispatcherID { + // schedule new and removed tasks + msgs, err = event.scheduleBlockEvent() + if err != nil { + return nil, errors.Trace(err) + } + msgs = append(msgs, event.sendPassAction()...) + } + + // checkpoint ts is advanced, clear the map, so do not need to resend message anymore + delete(b.blockedDispatcher, dispatcherID) + // all blocked dispatchers are advanced checkpoint ts + if len(b.blockedDispatcher) == 0 { + delete(b.blockedTs, event.commitTs) + } + } + return msgs, nil +} + +func (b *Barrier) handleStateHeartbeat(changefeedID string, + dispatcherID common.DispatcherID, + status *heartbeatpb.TableSpanStatus) ([]*messaging.TargetMessage, *heartbeatpb.DispatcherStatus, error) { + var ( + msgs []*messaging.TargetMessage + distacherStatus *heartbeatpb.DispatcherStatus + err error + ) + blockState := status.State + if blockState.IsBlocked { + event, ok := b.blockedTs[blockState.BlockTs] + ack := &heartbeatpb.DispatcherStatus{ + ID: status.ID, + Ack: &heartbeatpb.ACK{CommitTs: status.CheckpointTs}} + if !ok { + event = NewBlockEvent(changefeedID, b.scheduler, blockState) + b.blockedTs[blockState.BlockTs] = event + } + _, ok = b.blockedDispatcher[dispatcherID] + if !ok { + b.blockedDispatcher[dispatcherID] = event + } + + event.blockedDispatcherMap[dispatcherID] = true + // all dispatcher reported heartbeat, select one to write + if !event.selected && event.allDispatcherReported() { + ack.Action = &heartbeatpb.DispatcherAction{ + Action: heartbeatpb.Action_Write, + CommitTs: status.CheckpointTs, + } + event.selectedDispatcher = common.NewDispatcherIDFromPB(dispatcherID.ToPB()) + event.selected = true + } + distacherStatus = ack + } else { + // not blocked event, it must be sent by table event trigger dispatcher + // the ddl already synced to downstream , e.g.: create table, drop table + distacherStatus = &heartbeatpb.DispatcherStatus{ + ID: status.ID, + Ack: &heartbeatpb.ACK{CommitTs: status.CheckpointTs}, + } + msgs, err = NewBlockEvent(changefeedID, b.scheduler, blockState).scheduleBlockEvent() + if err != nil { + return nil, nil, errors.Trace(err) + } + } + return msgs, distacherStatus, nil +} + +type BlockEvent struct { + cfID string + commitTs uint64 + scheduler *Scheduler + selected bool + selectedDispatcher common.DispatcherID + // todo: support big set of dispatcher, like sync point, create database + blockedDispatcherMap map[common.DispatcherID]bool + newTableSpans []*heartbeatpb.TableSpan + dropDispatcherIDs []*heartbeatpb.DispatcherID +} + +func NewBlockEvent(cfID string, scheduler *Scheduler, + status *heartbeatpb.State) *BlockEvent { + event := &BlockEvent{ + scheduler: scheduler, + selected: false, + cfID: cfID, + commitTs: status.BlockTs, + newTableSpans: status.NeedAddedTableSpan, + dropDispatcherIDs: status.NeedDroppedDispatcherIDs, + blockedDispatcherMap: make(map[common.DispatcherID]bool), + } + for _, block := range status.BlockDispatcherIDs { + event.blockedDispatcherMap[common.NewDispatcherIDFromPB(block)] = false + } + return event +} + +func (b *BlockEvent) scheduleBlockEvent() ([]*messaging.TargetMessage, error) { + var msgs []*messaging.TargetMessage + for _, removed := range b.dropDispatcherIDs { + msg, err := b.scheduler.RemoveTask(common.NewDispatcherIDFromPB(removed)) + if err != nil { + return nil, errors.Trace(err) + } + if msg != nil { + msgs = append(msgs, msg) + } + } + for _, add := range b.newTableSpans { + tableSpan := &common.TableSpan{TableSpan: add} + dispatcherID := common.NewDispatcherID() + replicaSet := NewReplicaSet(model.DefaultChangeFeedID(b.cfID), + dispatcherID, tableSpan, b.commitTs).(*ReplicaSet) + stm, err := scheduler.NewStateMachine(dispatcherID, nil, replicaSet) + if err != nil { + return nil, errors.Trace(err) + } + b.scheduler.AddNewTask(stm) + } + // todo: trigger a schedule event support filter, rename table or databases + msgList, err := b.scheduler.Schedule() + if err != nil { + return nil, errors.Trace(err) + } + msgs = append(msgs, msgList...) + return msgs, nil +} + +func (b *BlockEvent) allDispatcherReported() bool { + for _, value := range b.blockedDispatcherMap { + if !value { + return false + } + } + return true +} + +func (b *BlockEvent) sendPassAction() []*messaging.TargetMessage { + var msgs = make([]*messaging.TargetMessage, 0, len(b.blockedDispatcherMap)) + // send pass action + for dispatcherID, _ := range b.blockedDispatcherMap { + // skip write dispatcher, we already send the write action + if b.selectedDispatcher == dispatcherID { + continue + } + stm := b.scheduler.GetTask(dispatcherID) + if stm != nil { + //todo: group DispatcherStatus by servers + msgs = append(msgs, messaging.NewSingleTargetMessage(messaging.ServerId(stm.Primary), + messaging.HeartbeatCollectorTopic, + &heartbeatpb.HeartBeatResponse{DispatcherStatuses: []*heartbeatpb.DispatcherStatus{ + { + Action: &heartbeatpb.DispatcherAction{ + Action: heartbeatpb.Action_Pass, + CommitTs: b.commitTs, + }, + }, + }})) + } + } + return msgs +} diff --git a/maintainer/barrier_test.go b/maintainer/barrier_test.go new file mode 100644 index 000000000..7e3f6df64 --- /dev/null +++ b/maintainer/barrier_test.go @@ -0,0 +1,212 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package maintainer + +import ( + "testing" + + "github.com/flowbehappy/tigate/heartbeatpb" + "github.com/flowbehappy/tigate/pkg/common" + "github.com/flowbehappy/tigate/scheduler" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/spanz" + "github.com/stretchr/testify/require" +) + +func TestBlock(t *testing.T) { + sche := NewScheduler("test", 1000, 0) + sche.AddNewNode("node1") + sche.AddNewNode("node2") + var blockedDispatcherIDS []*heartbeatpb.DispatcherID + for id := 0; id < 3; id++ { + span := spanz.TableIDToComparableSpan(int64(id)) + tableSpan := &common.TableSpan{TableSpan: &heartbeatpb.TableSpan{ + TableID: uint64(id), + StartKey: span.StartKey, + EndKey: span.EndKey, + }} + dispatcherID := common.NewDispatcherID() + blockedDispatcherIDS = append(blockedDispatcherIDS, dispatcherID.ToPB()) + replicaSet := NewReplicaSet(model.DefaultChangeFeedID("test"), dispatcherID, tableSpan, 0) + stm, _ := scheduler.NewStateMachine(dispatcherID, nil, replicaSet) + sche.working[dispatcherID] = stm + stm.Primary = "node1" + sche.nodeTasks["node1"][dispatcherID] = stm + } + + var selectDispatcherID = common.NewDispatcherIDFromPB(blockedDispatcherIDS[2]) + sche.nodeTasks["node2"][selectDispatcherID] = sche.nodeTasks["node1"][selectDispatcherID] + delete(sche.nodeTasks["node1"], selectDispatcherID) + + newSpan := &heartbeatpb.TableSpan{TableID: 10} + barrier := NewBarrier(sche) + + // first node block request + msgs, err := barrier.HandleStatus("node1", &heartbeatpb.HeartBeatRequest{ + ChangefeedID: "test", + Statuses: []*heartbeatpb.TableSpanStatus{ + { + ID: blockedDispatcherIDS[0], + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockDispatcherIDs: blockedDispatcherIDS, + NeedDroppedDispatcherIDs: []*heartbeatpb.DispatcherID{selectDispatcherID.ToPB()}, + NeedAddedTableSpan: []*heartbeatpb.TableSpan{newSpan}, + }, + CheckpointTs: 9, + }, + { + ID: blockedDispatcherIDS[1], + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockDispatcherIDs: blockedDispatcherIDS, + NeedDroppedDispatcherIDs: []*heartbeatpb.DispatcherID{selectDispatcherID.ToPB()}, + NeedAddedTableSpan: []*heartbeatpb.TableSpan{newSpan}, + }, + CheckpointTs: 9, + }, + }, + }) + require.NoError(t, err) + require.Len(t, msgs, 1) + resp := msgs[0].Message[0].(*heartbeatpb.HeartBeatResponse) + require.Len(t, resp.DispatcherStatuses, 2) + + // other node block request + msgs, err = barrier.HandleStatus("node2", &heartbeatpb.HeartBeatRequest{ + ChangefeedID: "test", + Statuses: []*heartbeatpb.TableSpanStatus{ + { + ID: selectDispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockDispatcherIDs: blockedDispatcherIDS, + NeedDroppedDispatcherIDs: []*heartbeatpb.DispatcherID{selectDispatcherID.ToPB()}, + NeedAddedTableSpan: []*heartbeatpb.TableSpan{newSpan}, + }, + CheckpointTs: 9, + }, + }, + }) + require.NoError(t, err) + require.Len(t, msgs, 1) + require.Equal(t, barrier.blockedDispatcher[selectDispatcherID], barrier.blockedTs[10]) + event := barrier.blockedTs[10] + require.Equal(t, uint64(10), event.commitTs) + require.True(t, event.selectedDispatcher == selectDispatcherID) + require.True(t, event.blockedDispatcherMap[selectDispatcherID]) + require.True(t, event.allDispatcherReported()) + + // repeated status + _, _ = barrier.HandleStatus("node1", &heartbeatpb.HeartBeatRequest{ + ChangefeedID: "test", + Statuses: []*heartbeatpb.TableSpanStatus{ + { + ID: blockedDispatcherIDS[0], + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockDispatcherIDs: blockedDispatcherIDS, + NeedDroppedDispatcherIDs: []*heartbeatpb.DispatcherID{selectDispatcherID.ToPB()}, + NeedAddedTableSpan: []*heartbeatpb.TableSpan{newSpan}, + }, + CheckpointTs: 9, + }, + { + ID: blockedDispatcherIDS[1], + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockDispatcherIDs: blockedDispatcherIDS, + NeedDroppedDispatcherIDs: []*heartbeatpb.DispatcherID{selectDispatcherID.ToPB()}, + NeedAddedTableSpan: []*heartbeatpb.TableSpan{newSpan}, + }, + CheckpointTs: 9, + }, + }, + }) + require.Equal(t, uint64(10), event.commitTs) + require.True(t, event.selectedDispatcher == selectDispatcherID) + require.True(t, event.blockedDispatcherMap[selectDispatcherID]) + require.True(t, event.allDispatcherReported()) + + // selected node write done + msgs, err = barrier.HandleStatus("node2", &heartbeatpb.HeartBeatRequest{ + ChangefeedID: "test", + Statuses: []*heartbeatpb.TableSpanStatus{ + { + ID: blockedDispatcherIDS[2], + CheckpointTs: 11, + }, + }, + }) + require.NoError(t, err) + //two schedule messages and 2 pass action message + require.Len(t, msgs, 4) + require.Len(t, barrier.blockedTs, 1) + require.Len(t, barrier.blockedDispatcher, 2) + msgs, err = barrier.HandleStatus("node1", &heartbeatpb.HeartBeatRequest{ + ChangefeedID: "test", + Statuses: []*heartbeatpb.TableSpanStatus{ + { + ID: blockedDispatcherIDS[0], + CheckpointTs: 19, + }, + { + ID: blockedDispatcherIDS[1], + CheckpointTs: 13, + }, + }, + }) + require.Len(t, barrier.blockedTs, 0) + require.Len(t, barrier.blockedDispatcher, 0) + require.Len(t, msgs, 0) +} + +func TestNonBlocked(t *testing.T) { + sche := NewScheduler("test", 1000, 0) + sche.AddNewNode("node1") + barrier := NewBarrier(sche) + + var blockedDispatcherIDS []*heartbeatpb.DispatcherID + for id := 0; id < 3; id++ { + blockedDispatcherIDS = append(blockedDispatcherIDS, common.NewDispatcherID().ToPB()) + } + msgs, err := barrier.HandleStatus("node1", &heartbeatpb.HeartBeatRequest{ + ChangefeedID: "test", + Statuses: []*heartbeatpb.TableSpanStatus{ + { + ID: blockedDispatcherIDS[0], + State: &heartbeatpb.State{ + IsBlocked: false, + BlockTs: 10, + BlockDispatcherIDs: blockedDispatcherIDS, + NeedAddedTableSpan: []*heartbeatpb.TableSpan{ + {TableID: 1}, {TableID: 2}, + }, + }, + CheckpointTs: 9, + }, + }, + }) + require.NoError(t, err) + // 1 ack and two scheduling messages + require.Len(t, msgs, 3) + require.Len(t, barrier.blockedTs, 0) + require.Len(t, barrier.blockedDispatcher, 0) + require.Len(t, barrier.scheduler.committing, 2) +} diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 8ec1ba002..7e005324b 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -79,6 +79,7 @@ type Maintainer struct { lastReportTime time.Time scheduler *Scheduler + barrier *Barrier removing bool cascadeRemoving bool @@ -141,6 +142,7 @@ func NewMaintainer(cfID model.ChangeFeedID, tableCountGauge: metrics.TableGauge.WithLabelValues(cfID.Namespace, cfID.ID), } m.bootstrapper = NewBootstrapper(m.id.ID, m.getNewBootstrapFn()) + m.barrier = NewBarrier(m.scheduler) log.Info("maintainer is created", zap.String("id", cfID.String())) metrics.MaintainerGauge.WithLabelValues(cfID.Namespace, cfID.ID).Inc() return m @@ -469,6 +471,14 @@ func (m *Maintainer) onHeartBeatRequest(msg *messaging.TargetMessage) error { return errors.Trace(err) } m.sendMessages(msgs) + msgs, err = m.barrier.HandleStatus(msg.From, req) + if err != nil { + log.Error("handle status failed, ignore", + zap.String("changefeed", m.id.ID), + zap.Error(err)) + return errors.Trace(err) + } + m.sendMessages(msgs) if req.Warning != nil { m.errLock.Lock() m.runningWarnings[msg.From] = req.Warning diff --git a/maintainer/scheduler.go b/maintainer/scheduler.go index 17a36ef29..8deac584f 100644 --- a/maintainer/scheduler.go +++ b/maintainer/scheduler.go @@ -129,7 +129,28 @@ func (s *Scheduler) FinishBootstrap(workingMap utils.Map[*common.TableSpan, *sch s.tempTasks = nil } -func (s *Scheduler) RemoveTask(stm *scheduler.StateMachine) (*messaging.TargetMessage, error) { +// GetTask queries a task by dispatcherID, return nil if not found +func (s *Scheduler) GetTask(dispatcherID common.DispatcherID) *scheduler.StateMachine { + var stm *scheduler.StateMachine + var ok bool + for _, m := range s.totalMaps { + stm, ok = m[dispatcherID] + if ok { + break + } + } + return stm +} + +// RemoveTask removes task by dispatcherID +func (s *Scheduler) RemoveTask(dispatcherID common.DispatcherID) (*messaging.TargetMessage, error) { + var stm = s.GetTask(dispatcherID) + if stm == nil { + log.Warn("dispatcher is not found", + zap.String("cf", s.changefeedID), + zap.Any("dispatcherID", s.changefeedID)) + return nil, nil + } oldState := stm.State oldPrimary := stm.Primary msg, err := stm.HandleRemoveInferior() @@ -219,7 +240,9 @@ func (s *Scheduler) Schedule() ([]*messaging.TargetMessage, error) { if err != nil { return nil, errors.Trace(err) } - msgs = append(msgs, msg) + if msg != nil { + msgs = append(msgs, msg) + } s.committing[key] = value s.nodeTasks[item.Node][key] = value