From 5b7c29e8da0574ce552f2e34bb5f1f8cdd52f757 Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Fri, 4 Nov 2022 12:22:01 +0800 Subject: [PATCH] scheduler: modify baseSchedulePlan to fit new balance region impl (#5614) ref tikv/pd#5257, ref tikv/pd#5544 modify baseSchedulePlan to fit new balance region impl Signed-off-by: Cabinfever_B Co-authored-by: Ti Chi Robot --- server/schedule/filter/filters.go | 1 - server/schedulers/balance_plan.go | 27 +++++++++++++------------- server/schedulers/balance_plan_test.go | 6 ++---- server/schedulers/balance_region.go | 4 ++++ server/schedulers/balance_test.go | 6 +++--- 5 files changed, 23 insertions(+), 21 deletions(-) diff --git a/server/schedule/filter/filters.go b/server/schedule/filter/filters.go index b663730d50d..0b1563a0348 100644 --- a/server/schedule/filter/filters.go +++ b/server/schedule/filter/filters.go @@ -109,7 +109,6 @@ func SelectTargetStores(stores []*core.StoreInfo, filters []Filter, opt *config. if collector != nil { collector.Collect(plan.SetResource(s), plan.SetStatus(status)) } - return false } return true diff --git a/server/schedulers/balance_plan.go b/server/schedulers/balance_plan.go index 421b24ab9ac..04dd1a494c1 100644 --- a/server/schedulers/balance_plan.go +++ b/server/schedulers/balance_plan.go @@ -72,6 +72,7 @@ func (p *balanceSchedulerPlan) GetResource(step int) uint64 { if p.step < step { return 0 } + // Please use with care. Add a nil check if need in the future switch step { case pickSource: return p.source.GetID() @@ -114,10 +115,10 @@ func (p *balanceSchedulerPlan) Clone(opts ...plan.Option) plan.Plan { // BalancePlanSummary is used to summarize for BalancePlan func BalancePlanSummary(plans []plan.Plan) (map[uint64]plan.Status, bool, error) { // storeStatusCounter is used to count the number of various statuses of each store - var storeStatusCounter map[uint64]map[plan.Status]int + storeStatusCounter := make(map[uint64]map[plan.Status]int) // statusCounter is used to count the number of status which is regarded as best status of each store statusCounter := make(map[uint64]plan.Status) - maxStep := -1 + storeMaxStep := make(map[uint64]int) normal := true for _, pi := range plans { p, ok := pi.(*balanceSchedulerPlan) @@ -129,16 +130,6 @@ func BalancePlanSummary(plans []plan.Plan) (map[uint64]plan.Status, bool, error) if step > pickTarget { step = pickTarget } - if step > maxStep { - storeStatusCounter = make(map[uint64]map[plan.Status]int) - maxStep = step - normal = true - } else if step < maxStep { - continue - } - if !p.status.IsNormal() { - normal = false - } var store uint64 // `step == pickRegion` is a special processing in summary, because we want to exclude the factor of region // and consider the failure as the status of source store. @@ -147,8 +138,18 @@ func BalancePlanSummary(plans []plan.Plan) (map[uint64]plan.Status, bool, error) } else { store = p.GetResource(step) } - if _, ok := storeStatusCounter[store]; !ok { + maxStep, ok := storeMaxStep[store] + if !ok { + maxStep = -1 + } + if step > maxStep { storeStatusCounter[store] = make(map[plan.Status]int) + storeMaxStep[store] = step + } else if step < maxStep { + continue + } + if !p.status.IsNormal() { + normal = false } storeStatusCounter[store][*p.status]++ } diff --git a/server/schedulers/balance_plan_test.go b/server/schedulers/balance_plan_test.go index 266dc22b60c..842fc984b3d 100644 --- a/server/schedulers/balance_plan_test.go +++ b/server/schedulers/balance_plan_test.go @@ -223,19 +223,17 @@ func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult4() { func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult5() { plans := make([]plan.Plan, 0) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[4], step: 0, status: plan.NewStatus(plan.StatusStoreDown)}) + plans = append(plans, &balanceSchedulerPlan{source: suite.stores[4], step: 0, status: plan.NewStatus(plan.StatusStoreRemoveLimitThrottled)}) plans = append(plans, &balanceSchedulerPlan{source: suite.stores[3], region: suite.regions[0], step: 1, status: plan.NewStatus(plan.StatusRegionNotMatchRule)}) plans = append(plans, &balanceSchedulerPlan{source: suite.stores[2], region: suite.regions[0], step: 1, status: plan.NewStatus(plan.StatusRegionNotMatchRule)}) plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[0], step: 2, status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[1], step: 2, status: plan.NewStatus(plan.StatusStoreAlreadyHasPeer)}) plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[2], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[3], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[4], step: 2, status: plan.NewStatus(plan.StatusStoreDown)}) plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[0], step: 2, status: plan.NewStatus(plan.StatusStoreAlreadyHasPeer)}) plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[1], step: 3, status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[2], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[3], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[4], step: 4, status: plan.NewStatus(plan.StatusCreateOperatorFailed)}) statuses, isNormal, err := BalancePlanSummary(plans) suite.NoError(err) suite.False(isNormal) @@ -245,7 +243,7 @@ func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult5() { 2: plan.NewStatus(plan.StatusStoreAlreadyHasPeer), 3: plan.NewStatus(plan.StatusStoreNotMatchRule), 4: plan.NewStatus(plan.StatusStoreNotMatchRule), - 5: plan.NewStatus(plan.StatusCreateOperatorFailed), + 5: plan.NewStatus(plan.StatusStoreRemoveLimitThrottled), })) } diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index 8b0c21ab8cf..b71ccc082f5 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -173,6 +173,10 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster, dryRun bool) baseRegionFilters = append(baseRegionFilters, filter.NewRegionEmptyFilter(cluster)) } + if collector != nil && len(sourceStores) > 0 { + collector.Collect(plan.SetResource(sourceStores[0]), plan.SetStatus(plan.NewStatus(plan.StatusStoreScoreDisallowed))) + } + solver.step++ var sourceIndex int diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 84499fbf3e1..b1addea160e 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -787,15 +787,15 @@ func TestBalanceRegionSchedule1(t *testing.T) { // test region replicate not match opt.SetMaxReplicas(3) ops, plans := sb.Schedule(tc, true) - re.Len(plans, 100) + re.Len(plans, 101) re.Empty(ops) - re.Equal(int(plans[0].GetStatus().StatusCode), plan.StatusRegionNotReplicated) + re.Equal(int(plans[1].GetStatus().StatusCode), plan.StatusRegionNotReplicated) tc.SetStoreOffline(1) opt.SetMaxReplicas(1) ops, plans = sb.Schedule(tc, true) re.NotEmpty(ops) - re.Len(plans, 3) + re.Len(plans, 4) re.True(plans[0].GetStatus().IsOK()) }