Skip to content

Commit

Permalink
scheduler: modify baseSchedulePlan to fit new balance region impl (#5614
Browse files Browse the repository at this point in the history
)

ref #5257, ref #5544

modify baseSchedulePlan to fit new balance region impl

Signed-off-by: Cabinfever_B <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
CabinfeverB and ti-chi-bot authored Nov 4, 2022
1 parent 99528a6 commit 5b7c29e
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 21 deletions.
1 change: 0 additions & 1 deletion server/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ func SelectTargetStores(stores []*core.StoreInfo, filters []Filter, opt *config.
if collector != nil {
collector.Collect(plan.SetResource(s), plan.SetStatus(status))
}

return false
}
return true
Expand Down
27 changes: 14 additions & 13 deletions server/schedulers/balance_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (p *balanceSchedulerPlan) GetResource(step int) uint64 {
if p.step < step {
return 0
}
// Please use with care. Add a nil check if need in the future
switch step {
case pickSource:
return p.source.GetID()
Expand Down Expand Up @@ -114,10 +115,10 @@ func (p *balanceSchedulerPlan) Clone(opts ...plan.Option) plan.Plan {
// BalancePlanSummary is used to summarize for BalancePlan
func BalancePlanSummary(plans []plan.Plan) (map[uint64]plan.Status, bool, error) {
// storeStatusCounter is used to count the number of various statuses of each store
var storeStatusCounter map[uint64]map[plan.Status]int
storeStatusCounter := make(map[uint64]map[plan.Status]int)
// statusCounter is used to count the number of status which is regarded as best status of each store
statusCounter := make(map[uint64]plan.Status)
maxStep := -1
storeMaxStep := make(map[uint64]int)
normal := true
for _, pi := range plans {
p, ok := pi.(*balanceSchedulerPlan)
Expand All @@ -129,16 +130,6 @@ func BalancePlanSummary(plans []plan.Plan) (map[uint64]plan.Status, bool, error)
if step > pickTarget {
step = pickTarget
}
if step > maxStep {
storeStatusCounter = make(map[uint64]map[plan.Status]int)
maxStep = step
normal = true
} else if step < maxStep {
continue
}
if !p.status.IsNormal() {
normal = false
}
var store uint64
// `step == pickRegion` is a special processing in summary, because we want to exclude the factor of region
// and consider the failure as the status of source store.
Expand All @@ -147,8 +138,18 @@ func BalancePlanSummary(plans []plan.Plan) (map[uint64]plan.Status, bool, error)
} else {
store = p.GetResource(step)
}
if _, ok := storeStatusCounter[store]; !ok {
maxStep, ok := storeMaxStep[store]
if !ok {
maxStep = -1
}
if step > maxStep {
storeStatusCounter[store] = make(map[plan.Status]int)
storeMaxStep[store] = step
} else if step < maxStep {
continue
}
if !p.status.IsNormal() {
normal = false
}
storeStatusCounter[store][*p.status]++
}
Expand Down
6 changes: 2 additions & 4 deletions server/schedulers/balance_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,19 +223,17 @@ func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult4() {

func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult5() {
plans := make([]plan.Plan, 0)
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[4], step: 0, status: plan.NewStatus(plan.StatusStoreDown)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[4], step: 0, status: plan.NewStatus(plan.StatusStoreRemoveLimitThrottled)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[3], region: suite.regions[0], step: 1, status: plan.NewStatus(plan.StatusRegionNotMatchRule)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[2], region: suite.regions[0], step: 1, status: plan.NewStatus(plan.StatusRegionNotMatchRule)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[0], step: 2, status: plan.NewStatus(plan.StatusStoreScoreDisallowed)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[1], step: 2, status: plan.NewStatus(plan.StatusStoreAlreadyHasPeer)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[2], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[3], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[4], step: 2, status: plan.NewStatus(plan.StatusStoreDown)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[0], step: 2, status: plan.NewStatus(plan.StatusStoreAlreadyHasPeer)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[1], step: 3, status: plan.NewStatus(plan.StatusStoreScoreDisallowed)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[2], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[3], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[4], step: 4, status: plan.NewStatus(plan.StatusCreateOperatorFailed)})
statuses, isNormal, err := BalancePlanSummary(plans)
suite.NoError(err)
suite.False(isNormal)
Expand All @@ -245,7 +243,7 @@ func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult5() {
2: plan.NewStatus(plan.StatusStoreAlreadyHasPeer),
3: plan.NewStatus(plan.StatusStoreNotMatchRule),
4: plan.NewStatus(plan.StatusStoreNotMatchRule),
5: plan.NewStatus(plan.StatusCreateOperatorFailed),
5: plan.NewStatus(plan.StatusStoreRemoveLimitThrottled),
}))
}

Expand Down
4 changes: 4 additions & 0 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster, dryRun bool)
baseRegionFilters = append(baseRegionFilters, filter.NewRegionEmptyFilter(cluster))
}

if collector != nil && len(sourceStores) > 0 {
collector.Collect(plan.SetResource(sourceStores[0]), plan.SetStatus(plan.NewStatus(plan.StatusStoreScoreDisallowed)))
}

solver.step++
var sourceIndex int

Expand Down
6 changes: 3 additions & 3 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,15 +787,15 @@ func TestBalanceRegionSchedule1(t *testing.T) {
// test region replicate not match
opt.SetMaxReplicas(3)
ops, plans := sb.Schedule(tc, true)
re.Len(plans, 100)
re.Len(plans, 101)
re.Empty(ops)
re.Equal(int(plans[0].GetStatus().StatusCode), plan.StatusRegionNotReplicated)
re.Equal(int(plans[1].GetStatus().StatusCode), plan.StatusRegionNotReplicated)

tc.SetStoreOffline(1)
opt.SetMaxReplicas(1)
ops, plans = sb.Schedule(tc, true)
re.NotEmpty(ops)
re.Len(plans, 3)
re.Len(plans, 4)
re.True(plans[0].GetStatus().IsOK())
}

Expand Down

0 comments on commit 5b7c29e

Please sign in to comment.