Skip to content

Commit

Permalink
Merge branch 'master' of github.com:flowbehappy/tigate
Browse files Browse the repository at this point in the history
  • Loading branch information
flowbehappy committed Aug 16, 2024
2 parents be4f25c + bb6ec4a commit 8d8687a
Show file tree
Hide file tree
Showing 20 changed files with 282 additions and 1,053 deletions.
10 changes: 2 additions & 8 deletions coordinator/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/flowbehappy/tigate/heartbeatpb"
"github.com/flowbehappy/tigate/pkg/messaging"
"github.com/flowbehappy/tigate/pkg/rpc"
"github.com/flowbehappy/tigate/scheduler"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -106,26 +105,21 @@ func (c *changefeed) NewInferiorStatus(status heartbeatpb.ComponentState) schedu
}}
}

func (c *changefeed) IsAlive() bool {
return true
}

func (c *changefeed) NewAddInferiorMessage(server model.CaptureID, secondary bool) rpc.Message {
func (c *changefeed) NewAddInferiorMessage(server model.CaptureID) *messaging.TargetMessage {
return messaging.NewTargetMessage(messaging.ServerId(server),
messaging.MaintainerManagerTopic,
&heartbeatpb.DispatchMaintainerRequest{
AddMaintainers: []*heartbeatpb.AddMaintainerRequest{
{
Id: c.ID.ID,
IsSecondary: secondary,
CheckpointTs: c.checkpointTs,
Config: c.configBytes,
},
},
})
}

func (c *changefeed) NewRemoveInferiorMessage(server model.CaptureID) rpc.Message {
func (c *changefeed) NewRemoveInferiorMessage(server model.CaptureID) *messaging.TargetMessage {
cf, ok := c.coordinator.lastState.Changefeeds[c.ID]
cascade := !ok || cf == nil || !shouldRunChangefeed(cf.Info.State)
return messaging.NewTargetMessage(messaging.ServerId(server),
Expand Down
17 changes: 7 additions & 10 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/flowbehappy/tigate/pkg/common/server"
"github.com/flowbehappy/tigate/pkg/messaging"
"github.com/flowbehappy/tigate/pkg/metrics"
"github.com/flowbehappy/tigate/pkg/rpc"
"github.com/flowbehappy/tigate/scheduler"
"github.com/flowbehappy/tigate/utils"
"github.com/pingcap/log"
Expand Down Expand Up @@ -87,6 +86,7 @@ func NewCoordinator(capture *common.NodeInfo,
id,
c.newChangefeed, c.newBootstrapMessage,
scheduler.NewBasicScheduler(id),
scheduler.NewBalanceScheduler(time.Minute, 1000),
)

// receive messages
Expand Down Expand Up @@ -193,17 +193,17 @@ func shouldRunChangefeed(state model.FeedState) bool {
func (c *coordinator) AsyncStop() {
}

func (c *coordinator) sendMessages(msgs []rpc.Message) {
func (c *coordinator) sendMessages(msgs []*messaging.TargetMessage) {
for _, msg := range msgs {
err := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter).SendCommand(msg.(*messaging.TargetMessage))
err := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter).SendCommand(msg)
if err != nil {
log.Error("failed to send coordinator request", zap.Any("msg", msg), zap.Error(err))
continue
}
}
}

func (c *coordinator) scheduleMaintainer(state *orchestrator.GlobalReactorState) ([]rpc.Message, error) {
func (c *coordinator) scheduleMaintainer(state *orchestrator.GlobalReactorState) ([]*messaging.TargetMessage, error) {
if !c.supervisor.CheckAllCaptureInitialized() {
return nil, nil
}
Expand Down Expand Up @@ -233,7 +233,8 @@ func (c *coordinator) scheduleMaintainer(state *orchestrator.GlobalReactorState)
return c.supervisor.Schedule(c.scheduledChangefeeds)
}

func (c *coordinator) newBootstrapMessage(captureID model.CaptureID) rpc.Message {
func (c *coordinator) newBootstrapMessage(captureID model.CaptureID) *messaging.TargetMessage {
log.Info("send coordinator bootstrap request", zap.String("to", captureID))
return messaging.NewTargetMessage(
messaging.ServerId(captureID),
messaging.MaintainerManagerTopic,
Expand Down Expand Up @@ -419,17 +420,14 @@ func (c *coordinator) calculateGCSafepoint(state *orchestrator.GlobalReactorStat
func (c *coordinator) printStatus() {
if time.Since(c.lastCheckTime) > time.Second*10 {
workingTask := 0
prepareTask := 0
absentTask := 0
commitTask := 0
removingTask := 0
c.supervisor.StateMachines.Ascend(func(key scheduler.InferiorID, value *scheduler.StateMachine) bool {
switch value.State {
case scheduler.SchedulerStatusAbsent:
absentTask++
case scheduler.SchedulerStatusPrepare:
prepareTask++
case scheduler.SchedulerStatusCommit:
case scheduler.SchedulerStatusCommiting:
commitTask++
case scheduler.SchedulerStatusWorking:
workingTask++
Expand All @@ -440,7 +438,6 @@ func (c *coordinator) printStatus() {
})
log.Info("changefeed status",
zap.Int("absent", absentTask),
zap.Int("prepare", prepareTask),
zap.Int("commit", commitTask),
zap.Int("working", workingTask),
zap.Int("removing", removingTask),
Expand Down
2 changes: 0 additions & 2 deletions coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ func (m *mockMaintainerManager) onDispatchMaintainerRequest(
cf = &Maintainer{config: cfConfig}
m.maintainers.Store(cfID, cf)
}
cf.(*Maintainer).isSecondary.Store(req.IsSecondary)
}

for _, req := range request.RemoveMaintainers {
Expand Down Expand Up @@ -220,7 +219,6 @@ type Maintainer struct {
lastReportTime time.Time
removing atomic.Bool
cascadeRemoving atomic.Bool
isSecondary atomic.Bool

config *model.ChangeFeedInfo
}
Expand Down
10 changes: 1 addition & 9 deletions downstreamadapter/dispatchermanager/heartbeat_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,7 @@ func (c *HeartBeatCollector) handleDispatcherRequestMessages(req *heartbeatpb.Sc
scheduleAction := req.ScheduleAction
config := req.Config
if scheduleAction == heartbeatpb.ScheduleAction_Create {
// TODO: 后续需要优化这段逻辑,perpared 这种调度状态需要多发 message 回去
if !req.IsSecondary {
eventDispatcherManager.NewTableEventDispatcher(&common.TableSpan{TableSpan: config.Span}, config.StartTs)
} else {
// eventDispatcherManager.GetTableSpanStatusesChan() <- &heartbeatpb.TableSpanStatus{
// Span: config.Span,
// ComponentStatus: heartbeatpb.ComponentState_Prepared,
// }
}
eventDispatcherManager.NewTableEventDispatcher(&common.TableSpan{TableSpan: config.Span}, config.StartTs)
} else if scheduleAction == heartbeatpb.ScheduleAction_Remove {
eventDispatcherManager.RemoveTableEventDispatcher(&common.TableSpan{TableSpan: config.Span})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ func (m *DispatcherManagerManager) handleAddDispatcherManager(from messaging.Ser
eventDispatcherManager.SetMaintainerID(from)
}
response := &heartbeatpb.MaintainerBootstrapResponse{
Statuses: make([]*heartbeatpb.TableSpanStatus, 0, eventDispatcherManager.GetDispatcherMap().Len()),
ChangefeedID: maintainerBootstrapRequest.ChangefeedID,
Statuses: make([]*heartbeatpb.TableSpanStatus, 0, eventDispatcherManager.GetDispatcherMap().Len()),
}
eventDispatcherManager.GetDispatcherMap().ForEach(func(tableSpan *common.TableSpan, tableEventDispatcher *dispatcher.TableEventDispatcher) {
response.Statuses = append(response.Statuses, &heartbeatpb.TableSpanStatus{
Expand Down
Loading

0 comments on commit 8d8687a

Please sign in to comment.