Skip to content

Commit

Permalink
reuse changefeed map when scheduling the changefeed
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy committed Aug 13, 2024
1 parent b7726d6 commit 8b7bb72
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ type coordinator struct {

lastSaveTime time.Time
lastTickTime time.Time
scheduledChangefeeds map[model.ChangeFeedID]*changefeed
scheduledChangefeeds utils.Map[scheduler.InferiorID, scheduler.Inferior]
}

func NewCoordinator(capture *common.NodeInfo, version int64) server.Coordinator {
c := &coordinator{
version: version,
nodeInfo: capture,
scheduledChangefeeds: make(map[model.ChangeFeedID]*changefeed),
scheduledChangefeeds: utils.NewBtreeMap[scheduler.InferiorID, scheduler.Inferior](),
lastTickTime: time.Now(),
}
id := scheduler.ChangefeedID(model.DefaultChangeFeedID("coordinator"))
Expand Down Expand Up @@ -186,7 +186,6 @@ func (c *coordinator) scheduleMaintainer(state *orchestrator.GlobalReactorState)
if !c.supervisor.CheckAllCaptureInitialized() {
return nil, nil
}
allChangefeeds := utils.NewBtreeMap[scheduler.InferiorID, scheduler.Inferior]()
// check all changefeeds.
for id, cfState := range state.Changefeeds {
if cfState.Info == nil {
Expand All @@ -199,20 +198,18 @@ func (c *coordinator) scheduleMaintainer(state *orchestrator.GlobalReactorState)
}
if shouldRunChangefeed(cfState.Info.State) {
// todo use real changefeed instance here
cf, ok := c.scheduledChangefeeds[id]
ok := c.scheduledChangefeeds.Has(scheduler.ChangefeedID(id))
if !ok {
cf = &changefeed{}
c.scheduledChangefeeds.ReplaceOrInsert(scheduler.ChangefeedID(id), &changefeed{})
}
allChangefeeds.ReplaceOrInsert(scheduler.ChangefeedID(id), cf)
} else {
// changefeed is stopped
allChangefeeds.Delete(scheduler.ChangefeedID(id))
delete(c.scheduledChangefeeds, id)
c.scheduledChangefeeds.Delete(scheduler.ChangefeedID(id))
}
}
c.supervisor.MarkNeedAddInferior()
c.supervisor.MarkNeedRemoveInferior()
return c.supervisor.Schedule(allChangefeeds)
return c.supervisor.Schedule(c.scheduledChangefeeds)
}

func (c *coordinator) newBootstrapMessage(captureID model.CaptureID) rpc.Message {
Expand All @@ -226,17 +223,19 @@ func (c *coordinator) newChangefeed(id scheduler.InferiorID) scheduler.Inferior
cfID := model.ChangeFeedID(id.(scheduler.ChangefeedID))
cfInfo := c.lastState.Changefeeds[cfID]
cf := newChangefeed(c, cfID, cfInfo.Info, cfInfo.Status.CheckpointTs)
c.scheduledChangefeeds[cfInfo.ID] = cf
c.scheduledChangefeeds.ReplaceOrInsert(scheduler.ChangefeedID(cfInfo.ID), cf)
return cf
}

func (c *coordinator) saveChangefeedStatus() {
if time.Since(c.lastSaveTime) > time.Millisecond*500 {
for id, cf := range c.scheduledChangefeeds {
c.scheduledChangefeeds.Ascend(func(key scheduler.InferiorID, value scheduler.Inferior) bool {
id := model.ChangeFeedID(key.(scheduler.ChangefeedID))
cfState, ok := c.lastState.Changefeeds[id]
if !ok {
continue
return true
}
cf := value.(*changefeed)
if !shouldRunChangefeed(model.FeedState(cf.State.FeedState)) {
cfState.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
info.State = model.FeedState(cf.State.FeedState)
Expand Down Expand Up @@ -274,7 +273,8 @@ func (c *coordinator) saveChangefeedStatus() {
saveErrorFn(err)
}
}
}
return true
})
c.lastSaveTime = time.Now()
}
}
Expand Down

0 comments on commit 8b7bb72

Please sign in to comment.