Skip to content

Commit

Permalink
scheduler: add metrics when there is potential reverse
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Oct 23, 2024
1 parent d82e41d commit 7a9e69d
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 46 deletions.
10 changes: 6 additions & 4 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,8 @@ func (s *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun
batch := s.conf.getBatch()
balanceLeaderScheduleCounter.Inc()

leaderSchedulePolicy := cluster.GetSchedulerConfig().GetLeaderSchedulePolicy()
opInfluence := s.OpController.GetOpInfluence(cluster.GetBasicCluster())
kind := constant.NewScheduleKind(constant.LeaderKind, leaderSchedulePolicy)
solver := newSolver(basePlan, kind, cluster, opInfluence)
solver := newSolver(basePlan, s.tp, cluster, opInfluence)

stores := cluster.GetStores()
scoreFunc := func(store *core.StoreInfo) float64 {
Expand Down Expand Up @@ -516,9 +514,13 @@ func (s *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla
func (s *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.Collector) *operator.Operator {
solver.Step++
defer func() { solver.Step-- }()
solver.sourceScore, solver.targetScore = solver.sourceStoreScore(s.GetName()), solver.targetStoreScore(s.GetName())
solver.calcSourceStoreScore(s.GetName())
solver.calcTargetStoreScore(s.GetName())
if !solver.shouldBalance(s.GetName()) {
balanceLeaderSkipCounter.Inc()
if solver.isPotentialReverse() {
balanceLeaderPotentialReverse.Inc()
}
if collector != nil {
collector.Collect(plan.SetStatus(plan.NewStatus(plan.StatusStoreScoreDisallowed)))
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/schedule/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ func (s *balanceRegionScheduler) Schedule(cluster sche.SchedulerCluster, dryRun
sourceStores := filter.SelectSourceStores(stores, s.filters, conf, collector, s.filterCounter)
opInfluence := s.OpController.GetOpInfluence(cluster.GetBasicCluster())
s.OpController.GetFastOpInfluence(cluster.GetBasicCluster(), opInfluence)
kind := constant.NewScheduleKind(constant.RegionKind, constant.BySize)
solver := newSolver(basePlan, kind, cluster, opInfluence)
solver := newSolver(basePlan, s.tp, cluster, opInfluence)

sort.Slice(sourceStores, func(i, j int) bool {
iOp := solver.getOpInfluence(sourceStores[i].GetID())
Expand Down Expand Up @@ -137,7 +136,7 @@ func (s *balanceRegionScheduler) Schedule(cluster sche.SchedulerCluster, dryRun
// sourcesStore is sorted by region score desc, so we pick the first store as source store.
for sourceIndex, solver.Source = range sourceStores {
retryLimit := s.retryQuota.getLimit(solver.Source)
solver.sourceScore = solver.sourceStoreScore(s.GetName())
solver.calcSourceStoreScore(s.GetName())
if sourceIndex == len(sourceStores)-1 {
break
}
Expand Down Expand Up @@ -223,14 +222,17 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co
// candidates are sorted by region score desc, so we pick the last store as target store.
for i := range candidates.Stores {
solver.Target = candidates.Stores[len(candidates.Stores)-i-1]
solver.targetScore = solver.targetStoreScore(s.GetName())
solver.calcTargetStoreScore(s.GetName())
regionID := solver.Region.GetID()
sourceID := solver.Source.GetID()
targetID := solver.Target.GetID()
log.Debug("candidate store", zap.Uint64("region-id", regionID), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID))

if !solver.shouldBalance(s.GetName()) {
balanceRegionSkipCounter.Inc()
if solver.isPotentialReverse() {
balanceRegionPotentialReverse.Inc()
}
if collector != nil {
collector.Collect(plan.SetStatus(plan.NewStatus(plan.StatusStoreScoreDisallowed)))
}
Expand Down
26 changes: 15 additions & 11 deletions pkg/schedule/schedulers/balance_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func TestInfluenceAmp(t *testing.T) {
re := require.New(t)

R := int64(96)
kind := constant.NewScheduleKind(constant.RegionKind, constant.BySize)

influence := oc.GetOpInfluence(tc.GetBasicCluster())
influence.GetStoreInfluence(1).RegionSize = R
Expand All @@ -60,16 +59,18 @@ func TestInfluenceAmp(t *testing.T) {
region := tc.GetRegion(1).Clone(core.SetApproximateSize(R))
tc.PutRegion(region)
basePlan := plan.NewBalanceSchedulerPlan()
solver := newSolver(basePlan, kind, tc, influence)
solver := newSolver(basePlan, types.BalanceRegionScheduler, tc, influence)
solver.Source, solver.Target, solver.Region = tc.GetStore(1), tc.GetStore(2), tc.GetRegion(1)
solver.sourceScore, solver.targetScore = solver.sourceStoreScore(""), solver.targetStoreScore("")
solver.calcSourceStoreScore("")
solver.calcTargetStoreScore("")
re.True(solver.shouldBalance(""))

// It will not schedule if the diff region count is greater than the sum
// of TolerantSizeRatio and influenceAmp*2.
tc.AddRegionStore(1, int(100+influenceAmp+2))
solver.Source = tc.GetStore(1)
solver.sourceScore, solver.targetScore = solver.sourceStoreScore(""), solver.targetStoreScore("")
solver.calcSourceStoreScore("")
solver.calcTargetStoreScore("")
re.False(solver.shouldBalance(""))
re.Less(solver.sourceScore-solver.targetScore, float64(1))
}
Expand Down Expand Up @@ -143,11 +144,12 @@ func TestShouldBalance(t *testing.T) {
region := tc.GetRegion(1).Clone(core.SetApproximateSize(testCase.regionSize))
tc.PutRegion(region)
tc.SetLeaderSchedulePolicy(testCase.kind.String())
kind := constant.NewScheduleKind(constant.LeaderKind, testCase.kind)
basePlan := plan.NewBalanceSchedulerPlan()
solver := newSolver(basePlan, kind, tc, oc.GetOpInfluence(tc.GetBasicCluster()))
solver := newSolver(basePlan, types.BalanceLeaderScheduler, tc, oc.GetOpInfluence(tc.GetBasicCluster()))
solver.kind = constant.NewScheduleKind(constant.LeaderKind, testCase.kind)
solver.Source, solver.Target, solver.Region = tc.GetStore(1), tc.GetStore(2), tc.GetRegion(1)
solver.sourceScore, solver.targetScore = solver.sourceStoreScore(""), solver.targetStoreScore("")
solver.calcSourceStoreScore("")
solver.calcTargetStoreScore("")
re.Equal(testCase.expectedResult, solver.shouldBalance(""))
}

Expand All @@ -157,11 +159,12 @@ func TestShouldBalance(t *testing.T) {
tc.AddRegionStore(2, int(testCase.targetCount))
region := tc.GetRegion(1).Clone(core.SetApproximateSize(testCase.regionSize))
tc.PutRegion(region)
kind := constant.NewScheduleKind(constant.RegionKind, testCase.kind)
basePlan := plan.NewBalanceSchedulerPlan()
solver := newSolver(basePlan, kind, tc, oc.GetOpInfluence(tc.GetBasicCluster()))
solver := newSolver(basePlan, types.BalanceRegionScheduler, tc, oc.GetOpInfluence(tc.GetBasicCluster()))
solver.kind = constant.NewScheduleKind(constant.RegionKind, testCase.kind)
solver.Source, solver.Target, solver.Region = tc.GetStore(1), tc.GetStore(2), tc.GetRegion(1)
solver.sourceScore, solver.targetScore = solver.sourceStoreScore(""), solver.targetStoreScore("")
solver.calcSourceStoreScore("")
solver.calcTargetStoreScore("")
re.Equal(testCase.expectedResult, solver.shouldBalance(""))
}
}
Expand Down Expand Up @@ -209,7 +212,8 @@ func TestTolerantRatio(t *testing.T) {
for _, t := range tbl {
tc.SetTolerantSizeRatio(t.ratio)
basePlan := plan.NewBalanceSchedulerPlan()
solver := newSolver(basePlan, t.kind, tc, operator.OpInfluence{})
solver := newSolver(basePlan, types.BalanceLeaderScheduler, tc, operator.OpInfluence{})
solver.kind = t.kind
solver.Region = region

sourceScore := t.expectTolerantResource(t.kind)
Expand Down
9 changes: 6 additions & 3 deletions pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,7 @@ func (s *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun
schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc()

opInfluence := s.OpController.GetOpInfluence(cluster.GetBasicCluster())
kind := constant.NewScheduleKind(constant.WitnessKind, constant.ByCount)
solver := newSolver(basePlan, kind, cluster, opInfluence)
solver := newSolver(basePlan, s.tp, cluster, opInfluence)

stores := cluster.GetStores()
scoreFunc := func(store *core.StoreInfo) float64 {
Expand Down Expand Up @@ -329,9 +328,13 @@ func (s *balanceWitnessScheduler) transferWitnessOut(solver *solver, collector *
func (s *balanceWitnessScheduler) createOperator(solver *solver, collector *plan.Collector) *operator.Operator {
solver.Step++
defer func() { solver.Step-- }()
solver.sourceScore, solver.targetScore = solver.sourceStoreScore(s.GetName()), solver.targetStoreScore(s.GetName())
solver.calcSourceStoreScore(s.GetName())
solver.calcTargetStoreScore(s.GetName())
if !solver.shouldBalance(s.GetName()) {
schedulerCounter.WithLabelValues(s.GetName(), "skip").Inc()
if solver.isPotentialReverse() {
schedulerCounter.WithLabelValues(s.GetName(), "potential-reverse").Inc()
}
if collector != nil {
collector.Collect(plan.SetStatus(plan.NewStatus(plan.StatusStoreScoreDisallowed)))
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/schedule/schedulers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ var (
balanceLeaderNoTargetStoreCounter = balanceLeaderCounterWithEvent("no-target-store")
balanceLeaderNoFollowerRegionCounter = balanceLeaderCounterWithEvent("no-follower-region")
balanceLeaderSkipCounter = balanceLeaderCounterWithEvent("skip")
balanceLeaderPotentialReverse = balanceLeaderCounterWithEvent("potential-reverse")
balanceLeaderNewOpCounter = balanceLeaderCounterWithEvent("new-operator")

balanceRegionScheduleCounter = balanceRegionCounterWithEvent("schedule")
Expand All @@ -237,6 +238,7 @@ var (
balanceRegionNoLeaderCounter = balanceRegionCounterWithEvent("no-leader")
balanceRegionNewOpCounter = balanceRegionCounterWithEvent("new-operator")
balanceRegionSkipCounter = balanceRegionCounterWithEvent("skip")
balanceRegionPotentialReverse = balanceRegionCounterWithEvent("potential-reverse")
balanceRegionCreateOpFailCounter = balanceRegionCounterWithEvent("create-operator-fail")
balanceRegionNoReplacementCounter = balanceRegionCounterWithEvent("no-replacement")

Expand Down
62 changes: 38 additions & 24 deletions pkg/schedule/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/statistics"
"go.uber.org/zap"
)
Expand All @@ -50,12 +51,25 @@ type solver struct {
tolerantSizeRatio float64
tolerantSource int64
fit *placement.RegionFit

sourceScore float64
targetScore float64
}

func newSolver(basePlan *plan.BalanceSchedulerPlan, kind constant.ScheduleKind, cluster sche.SchedulerCluster, opInfluence operator.OpInfluence) *solver {
sourceScore float64
targetScore float64
sourceDelta int64
targetDelta int64
}

func newSolver(basePlan *plan.BalanceSchedulerPlan, tp types.CheckerSchedulerType, cluster sche.SchedulerCluster, opInfluence operator.OpInfluence) *solver {
var kind constant.ScheduleKind
switch tp {
case types.BalanceLeaderScheduler:
leaderSchedulePolicy := cluster.GetSchedulerConfig().GetLeaderSchedulePolicy()
kind = constant.NewScheduleKind(constant.LeaderKind, leaderSchedulePolicy)
case types.BalanceRegionScheduler:
kind = constant.NewScheduleKind(constant.RegionKind, constant.BySize)
case types.BalanceWitnessScheduler:
kind = constant.NewScheduleKind(constant.WitnessKind, constant.ByCount)
default:
log.Fatal("invalid scheduler type")
}
return &solver{
BalanceSchedulerPlan: basePlan,
SchedulerCluster: cluster,
Expand Down Expand Up @@ -85,7 +99,7 @@ func (p *solver) targetMetricLabel() string {
return strconv.FormatUint(p.targetStoreID(), 10)
}

func (p *solver) sourceStoreScore(scheduleName string) float64 {
func (p *solver) calcSourceStoreScore(scheduleName string) {
sourceID := p.Source.GetID()
tolerantResource := p.getTolerantResource()
// to avoid schedule too much, if A's core greater than B and C a little
Expand All @@ -99,22 +113,20 @@ func (p *solver) sourceStoreScore(scheduleName string) float64 {
opInfluenceStatus.WithLabelValues(scheduleName, strconv.FormatUint(sourceID, 10), "source").Set(float64(influence))
tolerantResourceStatus.WithLabelValues(scheduleName).Set(float64(tolerantResource))
}
var score float64
switch p.kind.Resource {
case constant.LeaderKind:
sourceDelta := influence - tolerantResource
score = p.Source.LeaderScore(p.kind.Policy, sourceDelta)
p.sourceDelta = influence - tolerantResource
p.sourceScore = p.Source.LeaderScore(p.kind.Policy, p.sourceDelta)
case constant.RegionKind:
sourceDelta := influence*influenceAmp - tolerantResource
score = p.Source.RegionScore(p.GetSchedulerConfig().GetRegionScoreFormulaVersion(), p.GetSchedulerConfig().GetHighSpaceRatio(), p.GetSchedulerConfig().GetLowSpaceRatio(), sourceDelta)
p.sourceDelta = influence*influenceAmp - tolerantResource
p.sourceScore = p.Source.RegionScore(p.GetSchedulerConfig().GetRegionScoreFormulaVersion(), p.GetSchedulerConfig().GetHighSpaceRatio(), p.GetSchedulerConfig().GetLowSpaceRatio(), p.sourceDelta)
case constant.WitnessKind:
sourceDelta := influence - tolerantResource
score = p.Source.WitnessScore(sourceDelta)
p.sourceDelta = influence - tolerantResource
p.sourceScore = p.Source.WitnessScore(p.sourceDelta)
}
return score
}

func (p *solver) targetStoreScore(scheduleName string) float64 {
func (p *solver) calcTargetStoreScore(scheduleName string) {
targetID := p.Target.GetID()
// to avoid schedule too much, if A's score less than B and C in small range,
// we want that A can be moved in one region not two
Expand All @@ -129,19 +141,17 @@ func (p *solver) targetStoreScore(scheduleName string) float64 {
if p.GetSchedulerConfig().IsDebugMetricsEnabled() {
opInfluenceStatus.WithLabelValues(scheduleName, strconv.FormatUint(targetID, 10), "target").Set(float64(influence))
}
var score float64
switch p.kind.Resource {
case constant.LeaderKind:
targetDelta := influence + tolerantResource
score = p.Target.LeaderScore(p.kind.Policy, targetDelta)
p.targetDelta = influence + tolerantResource
p.targetScore = p.Target.LeaderScore(p.kind.Policy, p.targetDelta)
case constant.RegionKind:
targetDelta := influence*influenceAmp + tolerantResource
score = p.Target.RegionScore(p.GetSchedulerConfig().GetRegionScoreFormulaVersion(), p.GetSchedulerConfig().GetHighSpaceRatio(), p.GetSchedulerConfig().GetLowSpaceRatio(), targetDelta)
p.targetDelta = influence*influenceAmp + tolerantResource
p.targetScore = p.Target.RegionScore(p.GetSchedulerConfig().GetRegionScoreFormulaVersion(), p.GetSchedulerConfig().GetHighSpaceRatio(), p.GetSchedulerConfig().GetLowSpaceRatio(), p.targetDelta)
case constant.WitnessKind:
targetDelta := influence + tolerantResource
score = p.Target.WitnessScore(targetDelta)
p.targetDelta = influence + tolerantResource
p.targetScore = p.Target.WitnessScore(p.targetDelta)
}
return score
}

// Both of the source store's score and target store's score should be calculated before calling this function.
Expand All @@ -166,6 +176,10 @@ func (p *solver) shouldBalance(scheduleName string) bool {
return shouldBalance
}

func (p *solver) isPotentialReverse() bool {
return p.sourceScore+float64(p.sourceDelta) < p.targetScore+float64(p.targetDelta)
}

func (p *solver) getTolerantResource() int64 {
if p.tolerantSource > 0 {
return p.tolerantSource
Expand Down

0 comments on commit 7a9e69d

Please sign in to comment.