Skip to content

Commit

Permalink
refine state_machine.go
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy committed Aug 16, 2024
1 parent a580196 commit c7535dd
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 54 deletions.
1 change: 1 addition & 0 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func (c *coordinator) scheduleMaintainer(state *orchestrator.GlobalReactorState)
}

func (c *coordinator) newBootstrapMessage(captureID model.CaptureID) *messaging.TargetMessage {
log.Info("send coordinator bootstrap request", zap.String("to", captureID))
return messaging.NewTargetMessage(
messaging.ServerId(captureID),
messaging.MaintainerManagerTopic,
Expand Down
3 changes: 3 additions & 0 deletions maintainer/maintainer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func (m *Manager) RecvMessages(ctx context.Context, msg *messaging.TargetMessage
case <-ctx.Done():
return ctx.Err()
case m.msgCh <- msg:
default:
log.Warn("msg chan is full")
}
return nil
// receive bootstrap response message from dispatcher manager manager
Expand Down Expand Up @@ -249,6 +251,7 @@ func (m *Manager) sendHeartbeat() {
func (m *Manager) handleMessage(msg *messaging.TargetMessage) {
switch msg.Type {
case messaging.TypeCoordinatorBootstrapRequest:
log.Info("received coordinator bootstrap request", zap.String("from", msg.From.String()))
m.onCoordinatorBootstrapRequest(msg)
case messaging.TypeDispatchMaintainerRequest:
absent := m.onDispatchMaintainerRequest(msg)
Expand Down
2 changes: 2 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func (s *Supervisor) Schedule(allInferiors utils.Map[InferiorID, Inferior]) ([]*
if !s.CheckAllCaptureInitialized() {
log.Info("skip scheduling since not all captures are initialized",
zap.String("id", s.ID.String()),
zap.Bool("initialized", s.initialized),
zap.Int("size", len(s.captures)),
zap.Int("totalInferiors", allInferiors.Len()),
zap.Int("totalStateMachines", s.StateMachines.Len()),
zap.Int("maxTaskConcurrency", s.maxTaskConcurrency),
Expand Down
73 changes: 28 additions & 45 deletions scheduler/state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ import (
type SchedulerStatus int

const (
SchedulerStatusUnknown SchedulerStatus = iota
SchedulerStatusAbsent
SchedulerStatusAbsent = iota
SchedulerStatusCommiting
SchedulerStatusWorking
SchedulerStatusRemoving
Expand All @@ -55,32 +54,6 @@ func (r SchedulerStatus) String() string {
}
}

// Role is the role of a server.
type Role int

const (
// RolePrimary primary role.
RolePrimary = 1
// RoleSecondary secondary role.
RoleSecondary = 2
// RoleUndetermined means that we don't know its state, it may be
// working, stopping or stopped.
RoleUndetermined = 3
)

func (r Role) String() string {
switch r {
case RolePrimary:
return "Primary"
case RoleSecondary:
return "Secondary"
case RoleUndetermined:
return "Undetermined"
default:
return fmt.Sprintf("Unknown %d", r)
}
}

type StateMachine struct {
ID InferiorID
State SchedulerStatus
Expand Down Expand Up @@ -175,8 +148,8 @@ func (s *StateMachine) multiplePrimaryError(
return errors.New("inconsistent error: " + msg)
}

// poll transit state based on input and the current state.
func (s *StateMachine) poll(
// HandleInferiorStatus transit state based on input and the current state.
func (s *StateMachine) HandleInferiorStatus(
input InferiorStatus, captureID model.CaptureID,
) (*messaging.TargetMessage, error) {
if s.Primary != captureID {
Expand Down Expand Up @@ -285,12 +258,6 @@ func (s *StateMachine) pollOnRemoving(
return nil
}

func (s *StateMachine) HandleInferiorStatus(
input InferiorStatus, from model.CaptureID,
) (*messaging.TargetMessage, error) {
return s.poll(input, from)
}

func (s *StateMachine) HandleAddInferior(
captureID model.CaptureID,
) (*messaging.TargetMessage, error) {
Expand Down Expand Up @@ -366,23 +333,39 @@ func (s *StateMachine) HandleRemoveInferior() (*messaging.TargetMessage, error)
// whether s is affected by the server shutdown.
func (s *StateMachine) HandleCaptureShutdown(
captureID model.CaptureID,
) (*messaging.TargetMessage, bool, error) {
) (*messaging.TargetMessage, bool) {
if s.Primary != captureID && s.Secondary != captureID {
return nil, false
}
oldState := s.State
if s.Primary == captureID {
var msg *messaging.TargetMessage
switch oldState {
case SchedulerStatusAbsent, SchedulerStatusCommiting, SchedulerStatusWorking:
// primary node is stopped, set to absent to reschedule
s.Primary = ""
s.State = SchedulerStatusAbsent
} else if s.Secondary == captureID {
// clear the secondary
s.Secondary = ""
} else {
// r is not affected by the server shutdown.
return nil, false, nil
case SchedulerStatusRemoving:
// check if we are moving this state machine
if s.Secondary == "" {
s.Primary = ""
s.State = SchedulerStatusAbsent
} else {
if s.Secondary == captureID {
// destination capture is stopped during moving, clear secondary node
s.Secondary = ""
} else {
// primary capture is stopped, move to secondary
s.State = SchedulerStatusCommiting
msg = s.Inferior.NewAddInferiorMessage(s.Primary)
}
}
}
log.Info("state transition, server shutdown",
zap.String("statemachine", s.ID.String()),
zap.String("captureID", captureID),
zap.Stringer("old", oldState),
zap.Stringer("new", s.State))
return nil, true, nil
return msg, true
}

func (s *StateMachine) HasRemoved() bool {
Expand Down
10 changes: 1 addition & 9 deletions scheduler/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,9 @@ func (s *Supervisor) handleRemovedNodes(
) ([]*messaging.TargetMessage, error) {
sentMsgs := make([]*messaging.TargetMessage, 0)
if len(removed) > 0 {
var err error
s.StateMachines.Ascend(func(id InferiorID, stateMachine *StateMachine) bool {
for _, captureID := range removed {
msg, affected, err1 := stateMachine.HandleCaptureShutdown(captureID)
if err != nil {
err = errors.Trace(err1)
return false
}
msg, affected := stateMachine.HandleCaptureShutdown(captureID)
if msg != nil {
sentMsgs = append(sentMsgs, msg)
}
Expand All @@ -266,9 +261,6 @@ func (s *Supervisor) handleRemovedNodes(
}
return true
})
if err != nil {
return nil, errors.Trace(err)
}
}
return sentMsgs, nil
}
Expand Down

0 comments on commit c7535dd

Please sign in to comment.