Skip to content

Commit

Permalink
scheduler: consider leader score when evict leader
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Dec 12, 2024
1 parent 7df0ff9 commit 6b0e6ed
Show file tree
Hide file tree
Showing 14 changed files with 49 additions and 24 deletions.
4 changes: 2 additions & 2 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (c *RuleChecker) fixLooseMatchPeer(region *core.RegionInfo, fit *placement.
if region.GetLeader().GetId() != peer.GetId() && rf.Rule.Role == placement.Leader {
ruleCheckerFixLeaderRoleCounter.Inc()
if c.allowLeader(fit, peer) {
return operator.CreateTransferLeaderOperator("fix-leader-role", c.cluster, region, peer.GetStoreId(), []uint64{}, 0)
return operator.CreateTransferLeaderOperator("fix-leader-role", c.cluster, region, peer.GetStoreId(), 0)
}
ruleCheckerNotAllowLeaderCounter.Inc()
return nil, errPeerCannotBeLeader
Expand All @@ -329,7 +329,7 @@ func (c *RuleChecker) fixLooseMatchPeer(region *core.RegionInfo, fit *placement.
ruleCheckerFixFollowerRoleCounter.Inc()
for _, p := range region.GetPeers() {
if c.allowLeader(fit, p) {
return operator.CreateTransferLeaderOperator("fix-follower-role", c.cluster, region, p.GetStoreId(), []uint64{}, 0)
return operator.CreateTransferLeaderOperator("fix-follower-role", c.cluster, region, p.GetStoreId(), 0)
}
}
ruleCheckerNoNewLeaderCounter.Inc()
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ type SchedulerConfigProvider interface {
IsTraceRegionFlow() bool

GetTolerantSizeRatio() float64
GetLeaderSchedulePolicy() constant.SchedulePolicy

IsDebugMetricsEnabled() bool
IsDiagnosticAllowed() bool
Expand Down Expand Up @@ -112,6 +111,7 @@ type SharedConfigProvider interface {
IsCrossTableMergeEnabled() bool
IsOneWayMergeEnabled() bool
GetMergeScheduleLimit() uint64
GetLeaderSchedulePolicy() constant.SchedulePolicy
GetRegionScoreFormulaVersion() string
GetSchedulerMaxWaitingOperator() uint64
GetStoreLimitByType(uint64, storelimit.Type) float64
Expand Down
18 changes: 18 additions & 0 deletions pkg/schedule/filter/comparer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,24 @@ func RegionScoreComparer(conf config.SharedConfigProvider) StoreComparer {
}
}

// LeaderScoreComparer creates a StoreComparer to sort store by leader
// score.
func LeaderScoreComparer(conf config.SchedulerConfigProvider) StoreComparer {
leaderSchedulePolicy := conf.GetLeaderSchedulePolicy()
return func(a, b *core.StoreInfo) int {
sa := a.LeaderScore(leaderSchedulePolicy, 0)
sb := b.LeaderScore(leaderSchedulePolicy, 0)
switch {
case sa > sb:
return 1
case sa < sb:
return -1
default:
return 0
}
}
}

// IsolationComparer creates a StoreComparer to sort store by isolation score.
func IsolationComparer(locationLabels []string, regionStores []*core.StoreInfo) StoreComparer {
return func(a, b *core.StoreInfo) int {
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) err
return errors.Errorf("region has no voter in store %v", storeID)
}

op, err := operator.CreateTransferLeaderOperator("admin-transfer-leader", c, region, newLeader.GetStoreId(), []uint64{}, operator.OpAdmin)
op, err := operator.CreateTransferLeaderOperator("admin-transfer-leader", c, region, newLeader.GetStoreId(), operator.OpAdmin)
if err != nil {
log.Debug("fail to create transfer leader operator", errs.ZapError(err))
return err
Expand Down
3 changes: 1 addition & 2 deletions pkg/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,9 @@ func CreateRemovePeerOperator(desc string, ci sche.SharedCluster, kind OpKind, r
}

// CreateTransferLeaderOperator creates an operator that transfers the leader from a source store to a target store.
func CreateTransferLeaderOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, targetStoreID uint64, targetStoreIDs []uint64, kind OpKind) (*Operator, error) {
func CreateTransferLeaderOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, targetStoreID uint64, kind OpKind) (*Operator, error) {
return NewBuilder(desc, ci, region, SkipOriginJointStateCheck).
SetLeader(targetStoreID).
SetLeaders(targetStoreIDs).
Build(kind)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/operator/create_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (suite *createOperatorTestSuite) TestCreateTransferLeaderOperator() {
}
for _, testCase := range testCases {
region := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: testCase.originPeers}, testCase.originPeers[0])
op, err := CreateTransferLeaderOperator("test", suite.cluster, region, testCase.targetLeaderStoreID, []uint64{}, 0)
op, err := CreateTransferLeaderOperator("test", suite.cluster, region, testCase.targetLeaderStoreID, 0)

if testCase.isErr {
re.Error(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func (s *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.
}
solver.Step++
defer func() { solver.Step-- }()
op, err := operator.CreateTransferLeaderOperator(s.GetName(), solver, solver.Region, solver.targetStoreID(), []uint64{}, operator.OpLeader)
op, err := operator.CreateTransferLeaderOperator(s.GetName(), solver, solver.Region, solver.targetStoreID(), operator.OpLeader)
if err != nil {
log.Debug("fail to create balance leader operator", errs.ZapError(err))
if collector != nil {
Expand Down
29 changes: 19 additions & 10 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,19 +361,12 @@ func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCl
filters = append(filters, &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent})
candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)).
FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...)
// Compatible with old TiKV transfer leader logic.
target := candidates.RandomPick()
targets := candidates.PickAll()
// `targets` MUST contains `target`, so only needs to check if `target` is nil here.
if target == nil {

if len(candidates.Stores) == 0 {
evictLeaderNoTargetStoreCounter.Inc()
continue
}
targetIDs := make([]uint64, 0, len(targets))
for _, t := range targets {
targetIDs = append(targetIDs, t.GetID())
}
op, err := operator.CreateTransferLeaderOperator(name, cluster, region, target.GetID(), targetIDs, operator.OpLeader)
op, err := createOperatorWithSort(name, cluster, candidates, region)
if err != nil {
log.Debug("fail to create evict leader operator", errs.ZapError(err))
continue
Expand All @@ -385,6 +378,22 @@ func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCl
return ops
}

func createOperatorWithSort(name string, cluster sche.SchedulerCluster, candidates *filter.StoreCandidates, region *core.RegionInfo) (*operator.Operator, error) {
// we will pick low leader score store firstly.
candidates.Sort(filter.RegionScoreComparer(cluster.GetSharedConfig()))
var (
op *operator.Operator
err error
)
for _, target := range candidates.Stores {
op, err = operator.CreateTransferLeaderOperator(name, cluster, region, target.GetID(), operator.OpLeader)
if op != nil && err == nil {
return op, err
}
}
return op, err
}

type evictLeaderHandler struct {
rd *render.Render
config *evictLeaderSchedulerConfig
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/grant_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (s *grantHotRegionScheduler) transfer(cluster sche.SchedulerCluster, region
dstStore := &metapb.Peer{StoreId: destStoreIDs[i]}

if isLeader {
op, err = operator.CreateTransferLeaderOperator(s.GetName()+"-leader", cluster, srcRegion, dstStore.StoreId, []uint64{}, operator.OpLeader)
op, err = operator.CreateTransferLeaderOperator(s.GetName()+"-leader", cluster, srcRegion, dstStore.StoreId, operator.OpLeader)
} else {
op, err = operator.CreateMovePeerOperator(s.GetName()+"-move", cluster, srcRegion, operator.OpRegion|operator.OpLeader, srcStore.GetID(), dstStore)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1473,7 +1473,6 @@ func (bs *balanceSolver) createOperator(region *core.RegionInfo, srcStoreID, dst
bs,
region,
dstStoreID,
[]uint64{},
operator.OpHotRegion)
} else {
srcPeer := region.GetStorePeer(srcStoreID) // checked in `filterHotPeers`
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) {
case movePeer:
op, err = operator.CreateMovePeerOperator("move-peer-test", tc, region, operator.OpAdmin, 2, &metapb.Peer{Id: region.GetID()*10000 + 1, StoreId: 4})
case transferLeader:
op, err = operator.CreateTransferLeaderOperator("transfer-leader-test", tc, region, 2, []uint64{}, operator.OpAdmin)
op, err = operator.CreateTransferLeaderOperator("transfer-leader-test", tc, region, 2, operator.OpAdmin)
}
re.NoError(err)
re.NotNil(op)
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *labelScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*ope
continue
}

op, err := operator.CreateTransferLeaderOperator("label-reject-leader", cluster, region, target.GetID(), []uint64{}, operator.OpLeader)
op, err := operator.CreateTransferLeaderOperator("label-reject-leader", cluster, region, target.GetID(), operator.OpLeader)
if err != nil {
log.Debug("fail to create transfer label reject leader operator", errs.ZapError(err))
return nil, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/shuffle_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *shuffleLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool)
shuffleLeaderNoFollowerCounter.Inc()
return nil, nil
}
op, err := operator.CreateTransferLeaderOperator(s.GetName(), cluster, region, targetStore.GetID(), []uint64{}, operator.OpAdmin)
op, err := operator.CreateTransferLeaderOperator(s.GetName(), cluster, region, targetStore.GetID(), operator.OpAdmin)
if err != nil {
log.Debug("fail to create shuffle leader operator", errs.ZapError(err))
return nil, nil
Expand Down
2 changes: 1 addition & 1 deletion plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) (
if target == nil {
continue
}
op, err := operator.CreateTransferLeaderOperator(s.GetName(), cluster, region, target.GetID(), []uint64{}, operator.OpLeader)
op, err := operator.CreateTransferLeaderOperator(s.GetName(), cluster, region, target.GetID(), operator.OpLeader)
if err != nil {
log.Debug("fail to create evict leader operator", errs.ZapError(err))
continue
Expand Down

0 comments on commit 6b0e6ed

Please sign in to comment.