diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 156e96424..fe083bb3e 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -116,11 +116,6 @@ func (c *coordinator) Tick( metrics.CoordinatorCounter.Add(float64(now.Sub(c.lastTickTime)) / float64(time.Second)) c.lastTickTime = now - // Owner should update GC safepoint before initializing changefeed, so - // changefeed can remove its "ticdc-creating" service GC safepoint during - // initializing. - // - // See more gc doc. if err := c.updateGCSafepoint(ctx, state); err != nil { return nil, errors.Trace(err) } diff --git a/scheduler/state_machine.go b/scheduler/state_machine.go index 28567ca94..1e14fd491 100644 --- a/scheduler/state_machine.go +++ b/scheduler/state_machine.go @@ -173,7 +173,7 @@ func NewStateMachine( // proceeding further scheduling. log.Warn("found a stopping server during initializing", zap.Any("ID", sm.ID), - zap.Any("statemachine", sm), + zap.Any("statemachine", sm.ID), zap.Any("status", status)) err := sm.setCapture(captureID, RoleUndetermined) if err != nil { @@ -186,7 +186,6 @@ func NewStateMachine( default: log.Warn("unknown inferior state", zap.Any("ID", sm.ID), - zap.Any("statemachine", sm), zap.Any("status", status)) } } @@ -195,7 +194,7 @@ func NewStateMachine( if len(sm.Primary) != 0 { sm.State = SchedulerStatusWorking log.Info("initialize a working state state machine", - zap.Any("statemachine", sm)) + zap.Any("statemachine", sm.ID)) } // Move inferior or add inferior is in-progress. if sm.hasRole(RoleSecondary) { @@ -266,7 +265,7 @@ func (s *StateMachine) promoteSecondary(captureID model.CaptureID) error { if s.Primary == captureID { log.Warn("server is already promoted as the primary", zap.String("captureID", captureID), - zap.Any("statemachine", s)) + zap.Any("statemachine", s.ID)) return nil } role, ok := s.Servers[captureID] @@ -295,7 +294,7 @@ func (s *StateMachine) inconsistentError( fields = append(fields, []zap.Field{ zap.String("captureID", captureID), zap.Any("state", input), - zap.Any("statemachine", s), + zap.Any("statemachine", s.ID), }...) log.L().WithOptions(zap.AddCallerSkip(1)).Error(msg, fields...) return errors.New("inconsistent error: " + msg) @@ -307,7 +306,7 @@ func (s *StateMachine) multiplePrimaryError( fields = append(fields, []zap.Field{ zap.String("captureID", captureID), zap.Any("state", input), - zap.Any("statemachine", s), + zap.Any("statemachine", s.ID), }...) log.L().WithOptions(zap.AddCallerSkip(1)).Error(msg, fields...) return errors.New("inconsistent error: " + msg) @@ -442,7 +441,7 @@ func (s *StateMachine) pollOnAbsent( log.Warn("ignore input, unexpected state", zap.Any("status", input), zap.String("captureID", captureID), - zap.Any("statemachine", s)) + zap.Any("statemachine", s.ID)) return nil, false, nil } @@ -482,7 +481,7 @@ func (s *StateMachine) pollOnPrepare( log.Info("primary is stopped during Prepare", zap.Any("status", input), zap.String("captureID", captureID), - zap.Any("statemachine", s)) + zap.Any("statemachine", s.ID)) s.clearPrimary() return nil, false, nil } @@ -514,7 +513,7 @@ func (s *StateMachine) pollOnPrepare( log.Warn("ignore input, unexpected state", zap.Any("status", input), zap.String("captureID", captureID), - zap.Any("statemachine", s)) + zap.Any("statemachine", s.ID)) return nil, false, nil } @@ -538,7 +537,7 @@ func (s *StateMachine) pollOnCommit( log.Info("there are unknown captures during commit", zap.Any("status", input), zap.String("captureID", captureID), - zap.Any("statemachine", s)) + zap.Any("statemachine", s.ID)) return nil, false, nil } // No primary, promote secondary to primary. @@ -551,7 +550,7 @@ func (s *StateMachine) pollOnCommit( log.Info("promote secondary, no primary", zap.Any("status", input), zap.String("captureID", captureID), - zap.Any("statemachine", s)) + zap.Any("statemachine", s.ID)) } // Secondary has been promoted, retry add inferior request. if s.Primary == captureID && !s.hasRole(RoleSecondary) { @@ -568,7 +567,7 @@ func (s *StateMachine) pollOnCommit( log.Info("primary is stopped during Commit", zap.Any("status", input), zap.String("captureID", captureID), - zap.Any("statemachine", s)) + zap.Any("statemachine", s.ID)) s.State = SchedulerStatusAbsent return nil, true, nil } @@ -581,7 +580,7 @@ func (s *StateMachine) pollOnCommit( log.Info("state promote secondary", zap.Any("status", input), zap.String("secondary", captureID), - zap.Any("statemachine", s), + zap.Any("statemachine", s.ID), zap.String("original", original)) return s.Inferior.NewAddInferiorMessage(s.Primary, false), false, nil } else if s.isInRole(captureID, RoleSecondary) { @@ -591,7 +590,7 @@ func (s *StateMachine) pollOnCommit( log.Info("secondary is stopped during Commit", zap.Any("status", input), zap.String("captureID", captureID), - zap.Any("statemachine", s)) + zap.Any("statemachine", s.ID)) err := s.clearCapture(captureID, RoleSecondary) if err != nil { return nil, false, errors.Trace(err) @@ -605,7 +604,7 @@ func (s *StateMachine) pollOnCommit( log.Info("server is stopped during Commit", zap.Any("status", input), zap.String("captureID", captureID), - zap.Any("statemachine", s)) + zap.Any("statemachine", s.ID)) err := s.clearCapture(captureID, RoleUndetermined) return nil, false, errors.Trace(err) } @@ -632,7 +631,7 @@ func (s *StateMachine) pollOnCommit( // the primary, Stopping or Stopped. s.State = SchedulerStatusWorking log.Info("state transition from commit to working", - zap.String("changefeedID", s.ID.String()), + zap.String("statemachine", s.ID.String()), zap.String("inferiorID", input.GetInferiorID().String())) return nil, true, nil } @@ -647,7 +646,7 @@ func (s *StateMachine) pollOnCommit( log.Info("server is stopping during Commit", zap.Any("status", input), zap.String("captureID", captureID), - zap.Any("statemachine", s)) + zap.Any("statemachine", s.ID)) return nil, false, nil } @@ -656,7 +655,7 @@ func (s *StateMachine) pollOnCommit( log.Warn("ignore input, unexpected state", zap.Any("status", input), zap.String("captureID", captureID), - zap.Any("statemachine", s)) + zap.Any("statemachine", s.ID)) return nil, false, nil } @@ -721,7 +720,7 @@ func (s *StateMachine) pollOnRemoving( log.Warn("remove server with error", zap.Any("status", input), zap.String("captureID", captureID), - zap.Any("statemachine", s), + zap.Any("statemachine", s.ID), zap.Error(err)) } return nil, false, nil @@ -732,7 +731,7 @@ func (s *StateMachine) pollOnRemoving( log.Warn("ignore input, unexpected state", zap.Any("status", input), zap.String("captureID", captureID), - zap.Any("statemachine", s)) + zap.Any("statemachine", s.ID)) return nil, false, nil } @@ -749,7 +748,7 @@ func (s *StateMachine) HandleAddInferior( if s.State != SchedulerStatusAbsent { log.Warn("add inferior is ignored", zap.String("captureID", captureID), - zap.Any("statemachine", s)) + zap.Any("statemachine", s.ID)) return nil, nil } err := s.setCapture(captureID, RoleSecondary) @@ -778,7 +777,7 @@ func (s *StateMachine) HandleMoveInferior( // Ignore move inferior if it has been removed already. if s.HasRemoved() { log.Warn("move inferior is ignored", - zap.Any("statemachine", s)) + zap.Any("statemachine", s.ID)) return nil, nil } // Ignore move inferior if @@ -786,7 +785,7 @@ func (s *StateMachine) HandleMoveInferior( // 2) the dest server is the primary. if s.State != SchedulerStatusWorking || s.Primary == dest { log.Warn("move inferior is ignored", - zap.Any("statemachine", s)) + zap.Any("statemachine", s.ID)) return nil, nil } oldState := s.State @@ -795,14 +794,14 @@ func (s *StateMachine) HandleMoveInferior( if err != nil { log.Info("move inferior is failed", zap.Stringer("new", s.State), - zap.Any("statemachine", s), + zap.Any("statemachine", s.ID), zap.Stringer("old", oldState), zap.Error(err)) return nil, errors.Trace(err) } log.Info("state transition, move inferior", zap.Stringer("new", s.State), - zap.Any("statemachine", s), + zap.Any("statemachine", s.ID), zap.Stringer("old", oldState)) status := s.Inferior.NewInferiorStatus(heartbeatpb.ComponentState_Absent) return s.poll(status, dest) @@ -812,19 +811,19 @@ func (s *StateMachine) HandleRemoveInferior() ([]rpc.Message, error) { // Ignore remove inferior if it has been removed already. if s.HasRemoved() { log.Warn("remove inferior is ignored", - zap.Any("statemachine", s)) + zap.Any("statemachine", s.ID)) return nil, nil } // Ignore remove inferior if it's not in Working state. if s.State == SchedulerStatusRemoving { log.Warn("remove inferior is ignored", - zap.Any("statemachine", s)) + zap.Any("statemachine", s.ID)) return nil, nil } oldState := s.State s.State = SchedulerStatusRemoving log.Info("state transition, remove inferiror", - zap.Any("statemachine", s), + zap.Any("statemachine", s.ID), zap.Stringer("old", oldState)) // fake status to trigger a stop message status := s.Inferior.NewInferiorStatus(heartbeatpb.ComponentState_Working) @@ -847,7 +846,7 @@ func (s *StateMachine) HandleCaptureShutdown( oldState := s.State msgs, err := s.poll(status, captureID) log.Info("state transition, server shutdown", - zap.Any("statemachine", s), + zap.Any("statemachine", s.ID), zap.Stringer("old", oldState), zap.Stringer("new", s.State)) return msgs, true, errors.Trace(err)