Skip to content

Commit

Permalink
check hole
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Dec 24, 2024
1 parent a2662a0 commit 5c1e5c7
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 35 deletions.
57 changes: 52 additions & 5 deletions maintainer/maintainer_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ func TestDynamicMergeTableBasic(t *testing.T) {

totalTables := 10
victim := rand.Intn(totalTables) + 1
var holeSpan *heartbeatpb.TableSpan
for i := 1; i <= totalTables; i++ {
totalSpan := spanz.TableIDToComparableSpan(int64(i))
partialSpans := []*heartbeatpb.TableSpan{
Expand All @@ -713,6 +714,7 @@ func TestDynamicMergeTableBasic(t *testing.T) {
// victim has hole, should not merged
k := i % 3
old := partialSpans
holeSpan = old[k]
partialSpans = old[:k]
partialSpans = append(partialSpans, old[k+1:]...)
}
Expand All @@ -727,21 +729,28 @@ func TestDynamicMergeTableBasic(t *testing.T) {
s.replicationDB.AddReplicatingSpan(spanReplica)
}
}

expected := (totalTables - 1) * 3
victimExpected := 2
replicas := s.replicationDB.GetReplicating()
require.Equal(t, totalTables*3-1, s.replicationDB.GetReplicatingSize())
require.Equal(t, expected+victimExpected, s.replicationDB.GetReplicatingSize())

scheduler := s.schedulerController.GetScheduler(scheduler.SplitScheduler)
for i := 0; i < replica.DefaultScoreThreshold; i++ {
scheduler.Execute()
}
scheduler.Execute()
require.Equal(t, 0, s.replicationDB.GetReplicatingSize())
require.Equal(t, totalTables*3-1, s.replicationDB.GetSchedulingSize())
require.Equal(t, totalTables*3-1, s.operatorController.OperatorSize())
scheduler.Execute() // dummy execute does not take effect
require.Equal(t, victimExpected, s.replicationDB.GetReplicatingSize())
require.Equal(t, expected, s.replicationDB.GetSchedulingSize())
require.Equal(t, expected, s.operatorController.OperatorSize())

primarys := make(map[int64]pkgOpearator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus])
for _, task := range replicas {
op := s.operatorController.GetOperator(task.ID)
if op == nil {
require.Equal(t, int64(victim), task.Span.GetTableID())
continue
}
op.Schedule()
op.Check(task.GetNodeID(), &heartbeatpb.TableSpanStatus{
ID: op.ID().ToPB(),
Expand All @@ -760,6 +769,44 @@ func TestDynamicMergeTableBasic(t *testing.T) {
op.PostFinish()
}

require.Equal(t, totalTables-1, s.replicationDB.GetAbsentSize())

// merge the hole
dispatcherID := common.NewDispatcherID()
spanReplica := replica.NewWorkingReplicaSet(cfID, dispatcherID, tsoClient, 1, holeSpan, &heartbeatpb.TableSpanStatus{
ID: dispatcherID.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 10,
EventSizePerSecond: 0,
}, node.ID(fmt.Sprintf("node%d", 0)))
s.replicationDB.AddReplicatingSpan(spanReplica)
replicas = s.replicationDB.GetReplicating()
require.Equal(t, 3, len(replicas))
for i := 0; i < replica.DefaultScoreThreshold; i++ {
scheduler.Execute()
}
require.Equal(t, 0, s.replicationDB.GetReplicatingSize())
require.Equal(t, 30, s.operatorController.OperatorSize())
primarys = make(map[int64]pkgOpearator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus])
for _, task := range replicas {
op := s.operatorController.GetOperator(task.ID)
op.Schedule()
op.Check(task.GetNodeID(), &heartbeatpb.TableSpanStatus{
ID: op.ID().ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Stopped,
CheckpointTs: 10,
})
if op.IsFinished() {
op.PostFinish()
} else {
primarys[task.Span.GetTableID()] = op
}
}
for _, op := range primarys {
finished := op.IsFinished()
require.True(t, finished)
op.PostFinish()
}
require.Equal(t, totalTables, s.replicationDB.GetAbsentSize())
}

Expand Down
4 changes: 4 additions & 0 deletions maintainer/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ func (oc *Controller) NewSplitOperator(
return NewSplitDispatcherOperator(oc.replicationDB, replicaSet, originNode, splitSpans)
}

// AddMergeSplitOperator adds a merge split operator to the controller.
// 1. Merge Operator: len(affectedReplicaSets) > 1, len(splitSpans) == 1
// 2. Split Operator: len(affectedReplicaSets) == 1, len(splitSpans) > 1
// 3. MergeAndSplit Operator: len(affectedReplicaSets) > 1, len(splitSpans) > 1
func (oc *Controller) AddMergeSplitOperator(
affectedReplicaSets []*replica.SpanReplication,
splitSpans []*heartbeatpb.TableSpan,
Expand Down
15 changes: 15 additions & 0 deletions maintainer/replica/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ type CheckResult struct {
Replications []*SpanReplication
}

func (c CheckResult) String() string {
opStr := ""
switch c.OpType {
case OpSplit:
opStr = "split"
case OpMerge:
opStr = "merge"
case OpMergeAndSplit:
opStr = "merge and split"
default:
panic("unknown op type")
}
return fmt.Sprintf("OpType: %s, ReplicationSize: %d", opStr, len(c.Replications))
}

func getNewGroupChecker(
cfID common.ChangeFeedID, enableTableAcrossNodes bool,
) func(replica.GroupID) replica.GroupChecker[common.DispatcherID, *SpanReplication] {
Expand Down
75 changes: 45 additions & 30 deletions maintainer/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/ticdc/pkg/scheduler"
pkgReplica "github.com/pingcap/ticdc/pkg/scheduler/replica"
"github.com/pingcap/ticdc/server/watcher"
"github.com/pingcap/ticdc/utils"
"github.com/pingcap/tiflow/pkg/spanz"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -76,7 +77,7 @@ func newSplitScheduler(
db: db,
nodeManager: nodeManager,
batchSize: batchSize,
maxCheckTime: time.Second * 5,
maxCheckTime: time.Second * 500,
checkInterval: checkInterval,
}
}
Expand Down Expand Up @@ -123,29 +124,17 @@ func (s *splitScheduler) doCheck(ret pkgReplica.GroupCheckResult, start time.Tim
return checkedIndex, true
}
ret := checkResults[checkedIndex]
totalSpan, valid := s.valid(ret)
if !valid {
continue
}

switch ret.OpType {
case replica.OpMerge:
s.opController.AddMergeSplitOperator(ret.Replications, []*heartbeatpb.TableSpan{totalSpan})
case replica.OpSplit:
span := ret.Replications[0]
if s.db.GetTaskByID(span.ID) == nil {
continue
}
spans := s.splitter.SplitSpans(context.Background(), span.Span, len(s.nodeManager.GetAliveNodes()), 0)
if len(spans) > 1 {
log.Info("split span",
zap.String("changefeed", s.changefeedID.Name()),
zap.String("dispatcher", span.ID.String()),
zap.Int("span szie", len(spans)))
// s.opController.AddOperator(operator.NewSplitDispatcherOperator(s.db, span, span.GetNodeID(), spans))
s.opController.AddMergeSplitOperator(ret.Replications, spans)
}
fallthrough
case replica.OpMergeAndSplit:
// check hole
span := spanz.TableIDToComparableSpan(ret.Replications[0].Span.TableID)
totalSpan := &heartbeatpb.TableSpan{
TableID: span.TableID,
StartKey: span.StartKey,
EndKey: span.EndKey,
}
spans := s.splitter.SplitSpans(context.Background(), totalSpan, len(s.nodeManager.GetAliveNodes()), 0)
if len(spans) > 1 {
log.Info("split span",
Expand All @@ -154,16 +143,42 @@ func (s *splitScheduler) doCheck(ret pkgReplica.GroupCheckResult, start time.Tim
zap.Int("span szie", len(spans)))
s.opController.AddMergeSplitOperator(ret.Replications, spans)
}
case replica.OpMerge:
newSpan := spanz.TableIDToComparableSpan(ret.Replications[0].Span.TableID)
s.opController.AddMergeSplitOperator(ret.Replications, []*heartbeatpb.TableSpan{
{
TableID: newSpan.TableID,
StartKey: newSpan.StartKey,
EndKey: newSpan.EndKey,
},
})
}
}
return checkedIndex, false
}

func (s *splitScheduler) valid(c replica.CheckResult) (*heartbeatpb.TableSpan, bool) {
if c.OpType == replica.OpSplit && len(c.Replications) != 1 {
log.Panic("split operation should have only one replication",
zap.String("changefeed", s.changefeedID.Name()),
zap.Int64("tableId", c.Replications[0].Span.TableID),
zap.Stringer("checkResult", c))
}
if len(c.Replications) <= 1 {
log.Panic("invalid replication size",
zap.String("changefeed", s.changefeedID.Name()),
zap.Int64("tableId", c.Replications[0].Span.TableID),
zap.Stringer("checkResult", c))
}

span := spanz.TableIDToComparableSpan(c.Replications[0].Span.TableID)
totalSpan := &heartbeatpb.TableSpan{
TableID: span.TableID,
StartKey: span.StartKey,
EndKey: span.EndKey,
}
if c.OpType == replica.OpMerge || c.OpType == replica.OpMergeAndSplit {
spanMap := utils.NewBtreeMap[*heartbeatpb.TableSpan, *replica.SpanReplication](heartbeatpb.LessTableSpan)
for _, r := range c.Replications {
spanMap.ReplaceOrInsert(r.Span, r)
}
holes := split.FindHoles(spanMap, totalSpan)
log.Warn("skip merge operation since there are holes",
zap.String("changefeed", s.changefeedID.Name()),
zap.Int64("tableId", c.Replications[0].Span.TableID),
zap.Int("holes", len(holes)), zap.Stringer("checkResult", c))
return totalSpan, len(holes) == 0
}
return totalSpan, true
}

0 comments on commit 5c1e5c7

Please sign in to comment.