Skip to content

Commit

Permalink
refine some log
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy committed Aug 14, 2024
1 parent fed9530 commit 3e670b4
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 34 deletions.
5 changes: 0 additions & 5 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,6 @@ func (c *coordinator) Tick(
metrics.CoordinatorCounter.Add(float64(now.Sub(c.lastTickTime)) / float64(time.Second))
c.lastTickTime = now

// Owner should update GC safepoint before initializing changefeed, so
// changefeed can remove its "ticdc-creating" service GC safepoint during
// initializing.
//
// See more gc doc.
if err := c.updateGCSafepoint(ctx, state); err != nil {
return nil, errors.Trace(err)
}
Expand Down
57 changes: 28 additions & 29 deletions scheduler/state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func NewStateMachine(
// proceeding further scheduling.
log.Warn("found a stopping server during initializing",
zap.Any("ID", sm.ID),
zap.Any("statemachine", sm),
zap.Any("statemachine", sm.ID),
zap.Any("status", status))
err := sm.setCapture(captureID, RoleUndetermined)
if err != nil {
Expand All @@ -186,7 +186,6 @@ func NewStateMachine(
default:
log.Warn("unknown inferior state",
zap.Any("ID", sm.ID),
zap.Any("statemachine", sm),
zap.Any("status", status))
}
}
Expand All @@ -195,7 +194,7 @@ func NewStateMachine(
if len(sm.Primary) != 0 {
sm.State = SchedulerStatusWorking
log.Info("initialize a working state state machine",
zap.Any("statemachine", sm))
zap.Any("statemachine", sm.ID))
}
// Move inferior or add inferior is in-progress.
if sm.hasRole(RoleSecondary) {
Expand Down Expand Up @@ -266,7 +265,7 @@ func (s *StateMachine) promoteSecondary(captureID model.CaptureID) error {
if s.Primary == captureID {
log.Warn("server is already promoted as the primary",
zap.String("captureID", captureID),
zap.Any("statemachine", s))
zap.Any("statemachine", s.ID))
return nil
}
role, ok := s.Servers[captureID]
Expand Down Expand Up @@ -295,7 +294,7 @@ func (s *StateMachine) inconsistentError(
fields = append(fields, []zap.Field{
zap.String("captureID", captureID),
zap.Any("state", input),
zap.Any("statemachine", s),
zap.Any("statemachine", s.ID),
}...)
log.L().WithOptions(zap.AddCallerSkip(1)).Error(msg, fields...)
return errors.New("inconsistent error: " + msg)
Expand All @@ -307,7 +306,7 @@ func (s *StateMachine) multiplePrimaryError(
fields = append(fields, []zap.Field{
zap.String("captureID", captureID),
zap.Any("state", input),
zap.Any("statemachine", s),
zap.Any("statemachine", s.ID),
}...)
log.L().WithOptions(zap.AddCallerSkip(1)).Error(msg, fields...)
return errors.New("inconsistent error: " + msg)
Expand Down Expand Up @@ -442,7 +441,7 @@ func (s *StateMachine) pollOnAbsent(
log.Warn("ignore input, unexpected state",
zap.Any("status", input),
zap.String("captureID", captureID),
zap.Any("statemachine", s))
zap.Any("statemachine", s.ID))
return nil, false, nil
}

Expand Down Expand Up @@ -482,7 +481,7 @@ func (s *StateMachine) pollOnPrepare(
log.Info("primary is stopped during Prepare",
zap.Any("status", input),
zap.String("captureID", captureID),
zap.Any("statemachine", s))
zap.Any("statemachine", s.ID))
s.clearPrimary()
return nil, false, nil
}
Expand Down Expand Up @@ -514,7 +513,7 @@ func (s *StateMachine) pollOnPrepare(
log.Warn("ignore input, unexpected state",
zap.Any("status", input),
zap.String("captureID", captureID),
zap.Any("statemachine", s))
zap.Any("statemachine", s.ID))
return nil, false, nil
}

Expand All @@ -538,7 +537,7 @@ func (s *StateMachine) pollOnCommit(
log.Info("there are unknown captures during commit",
zap.Any("status", input),
zap.String("captureID", captureID),
zap.Any("statemachine", s))
zap.Any("statemachine", s.ID))
return nil, false, nil
}
// No primary, promote secondary to primary.
Expand All @@ -551,7 +550,7 @@ func (s *StateMachine) pollOnCommit(
log.Info("promote secondary, no primary",
zap.Any("status", input),
zap.String("captureID", captureID),
zap.Any("statemachine", s))
zap.Any("statemachine", s.ID))
}
// Secondary has been promoted, retry add inferior request.
if s.Primary == captureID && !s.hasRole(RoleSecondary) {
Expand All @@ -568,7 +567,7 @@ func (s *StateMachine) pollOnCommit(
log.Info("primary is stopped during Commit",
zap.Any("status", input),
zap.String("captureID", captureID),
zap.Any("statemachine", s))
zap.Any("statemachine", s.ID))
s.State = SchedulerStatusAbsent
return nil, true, nil
}
Expand All @@ -581,7 +580,7 @@ func (s *StateMachine) pollOnCommit(
log.Info("state promote secondary",
zap.Any("status", input),
zap.String("secondary", captureID),
zap.Any("statemachine", s),
zap.Any("statemachine", s.ID),
zap.String("original", original))
return s.Inferior.NewAddInferiorMessage(s.Primary, false), false, nil
} else if s.isInRole(captureID, RoleSecondary) {
Expand All @@ -591,7 +590,7 @@ func (s *StateMachine) pollOnCommit(
log.Info("secondary is stopped during Commit",
zap.Any("status", input),
zap.String("captureID", captureID),
zap.Any("statemachine", s))
zap.Any("statemachine", s.ID))
err := s.clearCapture(captureID, RoleSecondary)
if err != nil {
return nil, false, errors.Trace(err)
Expand All @@ -605,7 +604,7 @@ func (s *StateMachine) pollOnCommit(
log.Info("server is stopped during Commit",
zap.Any("status", input),
zap.String("captureID", captureID),
zap.Any("statemachine", s))
zap.Any("statemachine", s.ID))
err := s.clearCapture(captureID, RoleUndetermined)
return nil, false, errors.Trace(err)
}
Expand All @@ -632,7 +631,7 @@ func (s *StateMachine) pollOnCommit(
// the primary, Stopping or Stopped.
s.State = SchedulerStatusWorking
log.Info("state transition from commit to working",
zap.String("changefeedID", s.ID.String()),
zap.String("statemachine", s.ID.String()),
zap.String("inferiorID", input.GetInferiorID().String()))
return nil, true, nil
}
Expand All @@ -647,7 +646,7 @@ func (s *StateMachine) pollOnCommit(
log.Info("server is stopping during Commit",
zap.Any("status", input),
zap.String("captureID", captureID),
zap.Any("statemachine", s))
zap.Any("statemachine", s.ID))
return nil, false, nil
}

Expand All @@ -656,7 +655,7 @@ func (s *StateMachine) pollOnCommit(
log.Warn("ignore input, unexpected state",
zap.Any("status", input),
zap.String("captureID", captureID),
zap.Any("statemachine", s))
zap.Any("statemachine", s.ID))
return nil, false, nil
}

Expand Down Expand Up @@ -721,7 +720,7 @@ func (s *StateMachine) pollOnRemoving(
log.Warn("remove server with error",
zap.Any("status", input),
zap.String("captureID", captureID),
zap.Any("statemachine", s),
zap.Any("statemachine", s.ID),
zap.Error(err))
}
return nil, false, nil
Expand All @@ -732,7 +731,7 @@ func (s *StateMachine) pollOnRemoving(
log.Warn("ignore input, unexpected state",
zap.Any("status", input),
zap.String("captureID", captureID),
zap.Any("statemachine", s))
zap.Any("statemachine", s.ID))
return nil, false, nil
}

Expand All @@ -749,7 +748,7 @@ func (s *StateMachine) HandleAddInferior(
if s.State != SchedulerStatusAbsent {
log.Warn("add inferior is ignored",
zap.String("captureID", captureID),
zap.Any("statemachine", s))
zap.Any("statemachine", s.ID))
return nil, nil
}
err := s.setCapture(captureID, RoleSecondary)
Expand Down Expand Up @@ -778,15 +777,15 @@ func (s *StateMachine) HandleMoveInferior(
// Ignore move inferior if it has been removed already.
if s.HasRemoved() {
log.Warn("move inferior is ignored",
zap.Any("statemachine", s))
zap.Any("statemachine", s.ID))
return nil, nil
}
// Ignore move inferior if
// 1) it's not in Working state or
// 2) the dest server is the primary.
if s.State != SchedulerStatusWorking || s.Primary == dest {
log.Warn("move inferior is ignored",
zap.Any("statemachine", s))
zap.Any("statemachine", s.ID))
return nil, nil
}
oldState := s.State
Expand All @@ -795,14 +794,14 @@ func (s *StateMachine) HandleMoveInferior(
if err != nil {
log.Info("move inferior is failed",
zap.Stringer("new", s.State),
zap.Any("statemachine", s),
zap.Any("statemachine", s.ID),
zap.Stringer("old", oldState),
zap.Error(err))
return nil, errors.Trace(err)
}
log.Info("state transition, move inferior",
zap.Stringer("new", s.State),
zap.Any("statemachine", s),
zap.Any("statemachine", s.ID),
zap.Stringer("old", oldState))
status := s.Inferior.NewInferiorStatus(heartbeatpb.ComponentState_Absent)
return s.poll(status, dest)
Expand All @@ -812,19 +811,19 @@ func (s *StateMachine) HandleRemoveInferior() ([]rpc.Message, error) {
// Ignore remove inferior if it has been removed already.
if s.HasRemoved() {
log.Warn("remove inferior is ignored",
zap.Any("statemachine", s))
zap.Any("statemachine", s.ID))
return nil, nil
}
// Ignore remove inferior if it's not in Working state.
if s.State == SchedulerStatusRemoving {
log.Warn("remove inferior is ignored",
zap.Any("statemachine", s))
zap.Any("statemachine", s.ID))
return nil, nil
}
oldState := s.State
s.State = SchedulerStatusRemoving
log.Info("state transition, remove inferiror",
zap.Any("statemachine", s),
zap.Any("statemachine", s.ID),
zap.Stringer("old", oldState))
// fake status to trigger a stop message
status := s.Inferior.NewInferiorStatus(heartbeatpb.ComponentState_Working)
Expand All @@ -847,7 +846,7 @@ func (s *StateMachine) HandleCaptureShutdown(
oldState := s.State
msgs, err := s.poll(status, captureID)
log.Info("state transition, server shutdown",
zap.Any("statemachine", s),
zap.Any("statemachine", s.ID),
zap.Stringer("old", oldState),
zap.Stringer("new", s.State))
return msgs, true, errors.Trace(err)
Expand Down

0 comments on commit 3e670b4

Please sign in to comment.