diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index a947bf034..156e96424 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -15,7 +15,6 @@ package coordinator import ( "context" - "fmt" "math" "sync" "time" @@ -263,6 +262,9 @@ func (c *coordinator) saveChangefeedStatus() { return true } cf := value.(*changefeed) + if cf.State == nil { + return true + } if !shouldRunChangefeed(model.FeedState(cf.State.FeedState)) { cfState.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { info.State = model.FeedState(cf.State.FeedState) @@ -426,7 +428,6 @@ func (c *coordinator) printStatus() { absentTask := 0 commitTask := 0 removingTask := 0 - var taskDistribution string c.supervisor.StateMachines.Ascend(func(key scheduler.InferiorID, value *scheduler.StateMachine) bool { switch value.State { case scheduler.SchedulerStatusAbsent: @@ -440,11 +441,9 @@ func (c *coordinator) printStatus() { case scheduler.SchedulerStatusRemoving: removingTask++ } - taskDistribution = fmt.Sprintf("%s, %s==>%s", taskDistribution, value.ID.String(), value.Primary) return true }) log.Info("changefeed status", - zap.String("distribution", taskDistribution), zap.Int("absent", absentTask), zap.Int("prepare", prepareTask), zap.Int("commit", commitTask), diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index 30bd49a65..fe25f3432 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -14,17 +14,296 @@ package coordinator import ( + "context" + "encoding/json" + "flag" + "fmt" + "net/http" + "net/http/pprof" + "strconv" + "sync" "testing" + "time" + "github.com/flowbehappy/tigate/heartbeatpb" + "github.com/flowbehappy/tigate/pkg/common" + appcontext "github.com/flowbehappy/tigate/pkg/common/context" + "github.com/flowbehappy/tigate/pkg/config" + "github.com/flowbehappy/tigate/pkg/messaging" + "github.com/golang/mock/gomock" + "github.com/google/uuid" + "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + config2 "github.com/pingcap/tiflow/pkg/config" + mock_etcd "github.com/pingcap/tiflow/pkg/etcd/mock" + "github.com/pingcap/tiflow/pkg/orchestrator" + "github.com/pingcap/tiflow/pkg/pdutil" + pd "github.com/tikv/pd/client" + "go.uber.org/atomic" + "go.uber.org/zap" ) -type CFID model.ChangeFeedID +type mockPdClient struct { + pd.Client +} + +func (m *mockPdClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + return safePoint, nil +} + +type mockMaintainerManager struct { + mc messaging.MessageCenter + msgCh chan *messaging.TargetMessage + coordinatorVersion int64 + coordinatorID messaging.ServerId + maintainers sync.Map +} + +func NewMaintainerManager() *mockMaintainerManager { + mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter) + m := &mockMaintainerManager{ + mc: mc, + maintainers: sync.Map{}, + msgCh: make(chan *messaging.TargetMessage, 1024), + } + mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages) + return m +} + +func (m *mockMaintainerManager) Run(ctx context.Context) error { + tick := time.NewTicker(time.Millisecond * 500) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case msg := <-m.msgCh: + m.handleMessage(msg) + case <-tick.C: + //1. try to send heartbeat to coordinator + m.sendHeartbeat() + } + } +} + +func (m *mockMaintainerManager) handleMessage(msg *messaging.TargetMessage) { + switch msg.Type { + case messaging.TypeCoordinatorBootstrapRequest: + m.onCoordinatorBootstrapRequest(msg) + case messaging.TypeDispatchMaintainerRequest: + absent := m.onDispatchMaintainerRequest(msg) + if m.coordinatorVersion > 0 { + response := &heartbeatpb.MaintainerHeartbeat{} + for _, id := range absent { + response.Statuses = append(response.Statuses, &heartbeatpb.MaintainerStatus{ + ChangefeedID: id, + State: heartbeatpb.ComponentState_Absent, + }) + } + if len(response.Statuses) != 0 { + m.sendMessages(response) + } + } + } +} +func (m *mockMaintainerManager) sendMessages(msg *heartbeatpb.MaintainerHeartbeat) { + target := messaging.NewTargetMessage( + m.coordinatorID, + messaging.CoordinatorTopic, + msg, + ) + err := m.mc.SendCommand(target) + if err != nil { + log.Warn("send command failed", zap.Error(err)) + } +} +func (m *mockMaintainerManager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error { + switch msg.Type { + // receive message from coordinator + case messaging.TypeDispatchMaintainerRequest: + fallthrough + case messaging.TypeCoordinatorBootstrapRequest: + select { + case <-ctx.Done(): + return ctx.Err() + case m.msgCh <- msg: + } + return nil + default: + log.Panic("unknown message type", zap.Any("message", msg.Message)) + } + return nil +} +func (m *mockMaintainerManager) onCoordinatorBootstrapRequest(msg *messaging.TargetMessage) { + req := msg.Message.(*heartbeatpb.CoordinatorBootstrapRequest) + if m.coordinatorVersion > req.Version { + log.Warn("ignore invalid coordinator version", + zap.Int64("version", req.Version)) + return + } + m.coordinatorID = msg.From + m.coordinatorVersion = req.Version + + response := &heartbeatpb.CoordinatorBootstrapResponse{} + err := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter).SendCommand(messaging.NewTargetMessage( + m.coordinatorID, + messaging.CoordinatorTopic, + response, + )) + if err != nil { + log.Warn("send command failed", zap.Error(err)) + } + log.Info("New coordinator online", + zap.Int64("version", m.coordinatorVersion)) +} +func (m *mockMaintainerManager) onDispatchMaintainerRequest( + msg *messaging.TargetMessage, +) []string { + request := msg.Message.(*heartbeatpb.DispatchMaintainerRequest) + if m.coordinatorID != msg.From { + log.Warn("ignore invalid coordinator id", + zap.Any("request", request), + zap.Any("coordinator", msg.From)) + return nil + } + absent := make([]string, 0) + for _, req := range request.AddMaintainers { + cfID := model.DefaultChangeFeedID(req.GetId()) + cf, ok := m.maintainers.Load(cfID) + if !ok { + cfConfig := &model.ChangeFeedInfo{} + err := json.Unmarshal(req.Config, cfConfig) + if err != nil { + log.Panic("decode changefeed fail", zap.Error(err)) + } + cf = &Maintainer{config: cfConfig} + m.maintainers.Store(cfID, cf) + } + cf.(*Maintainer).isSecondary.Store(req.IsSecondary) + } + + for _, req := range request.RemoveMaintainers { + cfID := model.DefaultChangeFeedID(req.GetId()) + cf, ok := m.maintainers.Load(cfID) + if !ok { + log.Warn("ignore remove maintainer request, "+ + "since the maintainer not found", + zap.String("changefeed", cfID.String()), + zap.Any("request", req)) + absent = append(absent, req.GetId()) + continue + } + cf.(*Maintainer).removing.Store(true) + cf.(*Maintainer).cascadeRemoving.Store(req.Cascade) + } + return absent +} +func (m *mockMaintainerManager) sendHeartbeat() { + if m.coordinatorVersion > 0 { + response := &heartbeatpb.MaintainerHeartbeat{} + m.maintainers.Range(func(key, value interface{}) bool { + cfMaintainer := value.(*Maintainer) + if cfMaintainer.statusChanged.Load() || time.Since(cfMaintainer.lastReportTime) > time.Second*2 { + response.Statuses = append(response.Statuses, cfMaintainer.GetMaintainerStatus()) + cfMaintainer.statusChanged.Store(false) + cfMaintainer.lastReportTime = time.Now() + } + return true + }) + if len(response.Statuses) != 0 { + m.sendMessages(response) + } + } +} + +type Maintainer struct { + statusChanged atomic.Bool + lastReportTime time.Time + removing atomic.Bool + cascadeRemoving atomic.Bool + isSecondary atomic.Bool + + config *model.ChangeFeedInfo +} -func (c CFID) Less(t any) bool { - cf := t.(CFID) - return c.ID < cf.ID +func (m *Maintainer) GetMaintainerStatus() *heartbeatpb.MaintainerStatus { + return &heartbeatpb.MaintainerStatus{ + ChangefeedID: m.config.ID, + State: heartbeatpb.ComponentState_Working, + } } -func TestCoordinatorRun(t *testing.T) { +func TestCoordinatorScheduling(t *testing.T) { + mux := http.NewServeMux() + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + go func() { + t.Fatal(http.ListenAndServe(":8300", mux)) + }() + + ctx := context.Background() + node := &common.NodeInfo{ID: uuid.New().String()} + etcdClient := mock_etcd.NewMockCDCEtcdClient(gomock.NewController(t)) + etcdClient.EXPECT().GetGCServiceID().Return("default").AnyTimes() + appcontext.SetService(appcontext.MessageCenter, messaging.NewMessageCenter(ctx, + messaging.ServerId(node.ID), 100, config.NewDefaultMessageCenterConfig())) + m := NewMaintainerManager() + go m.Run(ctx) + + cr := NewCoordinator(node, &mockPdClient{}, pdutil.NewClock4Test(), etcdClient, 100) + var metadata orchestrator.ReactorState + + cfs := map[model.ChangeFeedID]*orchestrator.ChangefeedReactorState{} + + if !flag.Parsed() { + flag.Parse() + } + + argList := flag.Args() + if len(argList) > 1 { + t.Fatal("unexpected args", argList) + } + cfSize := 100 + if len(argList) == 1 { + cfSize, _ = strconv.Atoi(argList[0]) + } + + for i := 0; i < cfSize; i++ { + cfID := model.DefaultChangeFeedID(fmt.Sprintf("%d", i)) + cfs[cfID] = &orchestrator.ChangefeedReactorState{ + ID: cfID, + Info: &model.ChangeFeedInfo{ + ID: cfID.ID, + Namespace: cfID.Namespace, + Config: config2.GetDefaultReplicaConfig(), + State: model.StateNormal, + }, + Status: &model.ChangeFeedStatus{CheckpointTs: 10, MinTableBarrierTs: 10}, + } + } + metadata = &orchestrator.GlobalReactorState{ + Captures: map[model.CaptureID]*model.CaptureInfo{ + node.ID: { + ID: node.ID, + AdvertiseAddr: "127.0.0.1:8300", + }, + }, + Changefeeds: cfs, + } + + tick := time.NewTicker(time.Millisecond * 50) + var err error + for { + select { + case <-tick.C: + metadata, err = cr.Tick(ctx, metadata) + if err != nil { + t.Fatal(err) + } + case <-ctx.Done(): + return + } + } } diff --git a/downstream_performance.go b/downstream_performance.go index ef8cbb731..952e2e619 100644 --- a/downstream_performance.go +++ b/downstream_performance.go @@ -16,7 +16,6 @@ import ( appcontext "github.com/flowbehappy/tigate/pkg/common/context" "github.com/flowbehappy/tigate/pkg/config" "github.com/flowbehappy/tigate/pkg/messaging" - "github.com/flowbehappy/tigate/server/watcher" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "go.uber.org/zap" @@ -27,7 +26,7 @@ const dispatcherCount = 1000 const databaseCount = 20 func initContext(serverId messaging.ServerId) { - appcontext.SetService(appcontext.MessageCenter, messaging.NewMessageCenter(context.Background(), serverId, watcher.TempEpoch, config.NewDefaultMessageCenterConfig())) + appcontext.SetService(appcontext.MessageCenter, messaging.NewMessageCenter(context.Background(), serverId, 100, config.NewDefaultMessageCenterConfig())) appcontext.SetService(appcontext.EventCollector, eventcollector.NewEventCollector(100*1024*1024*1024, serverId)) // 100GB for demo appcontext.SetService(appcontext.HeartbeatCollector, dispatchermanager.NewHeartBeatCollector(serverId)) } @@ -88,7 +87,7 @@ func main() { managerMap := make(map[int]*dispatchermanager.EventDispatcherManager) for db_index := 0; db_index < databaseCount; db_index++ { - changefeedConfig := model.ChangefeedConfig{ + changefeedConfig := config.ChangefeedConfig{ SinkURI: "tidb://root:@127.0.0.1:4000", } changefeedID := model.DefaultChangeFeedID("test" + strconv.Itoa(db_index)) diff --git a/scheduler/state_machine.go b/scheduler/state_machine.go index aa481e61e..28567ca94 100644 --- a/scheduler/state_machine.go +++ b/scheduler/state_machine.go @@ -214,7 +214,8 @@ func NewStateMachine( sm.State = SchedulerStatusRemoving } log.Info("initialize state machine", - zap.Any("statemachine", sm)) + zap.Any("id", sm.ID), + zap.Any("state", sm.State)) return sm, nil }