From c1efb484c8663fbfea6e14dfe364af70469aafad Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Sun, 29 Sep 2024 17:41:39 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #8675 close tikv/pd#8674 Signed-off-by: ti-chi-bot --- pkg/schedule/checker/replica_checker.go | 116 ++++++++++++------ pkg/schedule/checker/replica_strategy.go | 7 +- pkg/schedule/checker/rule_checker.go | 30 ++++- pkg/schedule/filter/candidates.go | 5 +- pkg/schedule/filter/candidates_test.go | 4 +- pkg/schedule/schedulers/balance_leader.go | 2 +- pkg/schedule/schedulers/balance_region.go | 2 +- pkg/schedule/schedulers/base_scheduler.go | 18 +++ pkg/schedule/schedulers/evict_leader.go | 19 ++- pkg/schedule/schedulers/evict_slow_store.go | 4 + pkg/schedule/schedulers/evict_slow_trend.go | 4 + pkg/schedule/schedulers/label.go | 5 + pkg/schedule/schedulers/random_merge.go | 5 + pkg/schedule/schedulers/shuffle_leader.go | 2 +- pkg/schedule/schedulers/shuffle_region.go | 4 +- .../schedulers/transfer_witness_leader.go | 16 +++ plugin/scheduler_example/evict_leader.go | 2 +- 17 files changed, 188 insertions(+), 57 deletions(-) diff --git a/pkg/schedule/checker/replica_checker.go b/pkg/schedule/checker/replica_checker.go index 3e23f3bdcac..1b537cb6f4f 100644 --- a/pkg/schedule/checker/replica_checker.go +++ b/pkg/schedule/checker/replica_checker.go @@ -16,6 +16,8 @@ package checker import ( "fmt" + "math/rand" + "time" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -61,17 +63,31 @@ var ( // Location management, mainly used for cross data center deployment. type ReplicaChecker struct { PauseController +<<<<<<< HEAD cluster sche.CheckerCluster conf config.CheckerConfigProvider regionWaitingList cache.Cache +======= + cluster sche.CheckerCluster + conf config.CheckerConfigProvider + pendingProcessedRegions *cache.TTLUint64 + r *rand.Rand +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } // NewReplicaChecker creates a replica checker. func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, regionWaitingList cache.Cache) *ReplicaChecker { return &ReplicaChecker{ +<<<<<<< HEAD cluster: cluster, conf: conf, regionWaitingList: regionWaitingList, +======= + cluster: cluster, + conf: conf, + pendingProcessedRegions: pendingProcessedRegions, + r: rand.New(rand.NewSource(time.Now().UnixNano())), +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } } @@ -81,40 +97,40 @@ func (r *ReplicaChecker) GetType() string { } // Check verifies a region's replicas, creating an operator.Operator if need. -func (r *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator { +func (c *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator { replicaCheckerCounter.Inc() - if r.IsPaused() { + if c.IsPaused() { replicaCheckerPausedCounter.Inc() return nil } - if op := r.checkDownPeer(region); op != nil { + if op := c.checkDownPeer(region); op != nil { replicaCheckerNewOpCounter.Inc() op.SetPriorityLevel(constant.High) return op } - if op := r.checkOfflinePeer(region); op != nil { + if op := c.checkOfflinePeer(region); op != nil { replicaCheckerNewOpCounter.Inc() op.SetPriorityLevel(constant.High) return op } - if op := r.checkMakeUpReplica(region); op != nil { + if op := c.checkMakeUpReplica(region); op != nil { replicaCheckerNewOpCounter.Inc() op.SetPriorityLevel(constant.High) return op } - if op := r.checkRemoveExtraReplica(region); op != nil { + if op := c.checkRemoveExtraReplica(region); op != nil { replicaCheckerNewOpCounter.Inc() return op } - if op := r.checkLocationReplacement(region); op != nil { + if op := c.checkLocationReplacement(region); op != nil { replicaCheckerNewOpCounter.Inc() return op } return nil } -func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsRemoveDownReplicaEnabled() { +func (c *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsRemoveDownReplicaEnabled() { return nil } @@ -124,22 +140,22 @@ func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operat continue } storeID := peer.GetStoreId() - store := r.cluster.GetStore(storeID) + store := c.cluster.GetStore(storeID) if store == nil { log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID)) return nil } // Only consider the state of the Store, not `stats.DownSeconds`. - if store.DownTime() < r.conf.GetMaxStoreDownTime() { + if store.DownTime() < c.conf.GetMaxStoreDownTime() { continue } - return r.fixPeer(region, storeID, downStatus) + return c.fixPeer(region, storeID, downStatus) } return nil } -func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsReplaceOfflineReplicaEnabled() { +func (c *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsReplaceOfflineReplicaEnabled() { return nil } @@ -150,7 +166,7 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope for _, peer := range region.GetPeers() { storeID := peer.GetStoreId() - store := r.cluster.GetStore(storeID) + store := c.cluster.GetStore(storeID) if store == nil { log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID)) return nil @@ -159,32 +175,36 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope continue } - return r.fixPeer(region, storeID, offlineStatus) + return c.fixPeer(region, storeID, offlineStatus) } return nil } -func (r *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsMakeUpReplicaEnabled() { +func (c *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsMakeUpReplicaEnabled() { return nil } - if len(region.GetPeers()) >= r.conf.GetMaxReplicas() { + if len(region.GetPeers()) >= c.conf.GetMaxReplicas() { return nil } log.Debug("region has fewer than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers()))) - regionStores := r.cluster.GetRegionStores(region) - target, filterByTempState := r.strategy(region).SelectStoreToAdd(regionStores) + regionStores := c.cluster.GetRegionStores(region) + target, filterByTempState := c.strategy(c.r, region).SelectStoreToAdd(regionStores) if target == 0 { log.Debug("no store to add replica", zap.Uint64("region-id", region.GetID())) replicaCheckerNoTargetStoreCounter.Inc() if filterByTempState { +<<<<<<< HEAD r.regionWaitingList.Put(region.GetID(), nil) +======= + c.pendingProcessedRegions.Put(region.GetID(), nil) +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } return nil } newPeer := &metapb.Peer{StoreId: target} - op, err := operator.CreateAddPeerOperator("make-up-replica", r.cluster, region, newPeer, operator.OpReplica) + op, err := operator.CreateAddPeerOperator("make-up-replica", c.cluster, region, newPeer, operator.OpReplica) if err != nil { log.Debug("create make-up-replica operator fail", errs.ZapError(err)) return nil @@ -192,24 +212,28 @@ func (r *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.O return op } -func (r *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsRemoveExtraReplicaEnabled() { +func (c *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsRemoveExtraReplicaEnabled() { return nil } // when add learner peer, the number of peer will exceed max replicas for a while, // just comparing the the number of voters to avoid too many cancel add operator log. - if len(region.GetVoters()) <= r.conf.GetMaxReplicas() { + if len(region.GetVoters()) <= c.conf.GetMaxReplicas() { return nil } log.Debug("region has more than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers()))) - regionStores := r.cluster.GetRegionStores(region) - old := r.strategy(region).SelectStoreToRemove(regionStores) + regionStores := c.cluster.GetRegionStores(region) + old := c.strategy(c.r, region).SelectStoreToRemove(regionStores) if old == 0 { replicaCheckerNoWorstPeerCounter.Inc() +<<<<<<< HEAD r.regionWaitingList.Put(region.GetID(), nil) +======= + c.pendingProcessedRegions.Put(region.GetID(), nil) +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) return nil } - op, err := operator.CreateRemovePeerOperator("remove-extra-replica", r.cluster, operator.OpReplica, region, old) + op, err := operator.CreateRemovePeerOperator("remove-extra-replica", c.cluster, operator.OpReplica, region, old) if err != nil { replicaCheckerCreateOpFailedCounter.Inc() return nil @@ -217,13 +241,13 @@ func (r *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *opera return op } -func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsLocationReplacementEnabled() { +func (c *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsLocationReplacementEnabled() { return nil } - strategy := r.strategy(region) - regionStores := r.cluster.GetRegionStores(region) + strategy := c.strategy(c.r, region) + regionStores := c.cluster.GetRegionStores(region) oldStore := strategy.SelectStoreToRemove(regionStores) if oldStore == 0 { replicaCheckerAllRightCounter.Inc() @@ -237,7 +261,7 @@ func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *oper } newPeer := &metapb.Peer{StoreId: newStore} - op, err := operator.CreateMovePeerOperator("move-to-better-location", r.cluster, region, operator.OpReplica, oldStore, newPeer) + op, err := operator.CreateMovePeerOperator("move-to-better-location", c.cluster, region, operator.OpReplica, oldStore, newPeer) if err != nil { replicaCheckerCreateOpFailedCounter.Inc() return nil @@ -245,11 +269,11 @@ func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *oper return op } -func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator { +func (c *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator { // Check the number of replicas first. - if len(region.GetVoters()) > r.conf.GetMaxReplicas() { + if len(region.GetVoters()) > c.conf.GetMaxReplicas() { removeExtra := fmt.Sprintf("remove-extra-%s-replica", status) - op, err := operator.CreateRemovePeerOperator(removeExtra, r.cluster, operator.OpReplica, region, storeID) + op, err := operator.CreateRemovePeerOperator(removeExtra, c.cluster, operator.OpReplica, region, storeID) if err != nil { if status == offlineStatus { replicaCheckerRemoveExtraOfflineFailedCounter.Inc() @@ -261,8 +285,8 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status return op } - regionStores := r.cluster.GetRegionStores(region) - target, filterByTempState := r.strategy(region).SelectStoreToFix(regionStores, storeID) + regionStores := c.cluster.GetRegionStores(region) + target, filterByTempState := c.strategy(c.r, region).SelectStoreToFix(regionStores, storeID) if target == 0 { if status == offlineStatus { replicaCheckerNoStoreOfflineCounter.Inc() @@ -271,13 +295,17 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status } log.Debug("no best store to add replica", zap.Uint64("region-id", region.GetID())) if filterByTempState { +<<<<<<< HEAD r.regionWaitingList.Put(region.GetID(), nil) +======= + c.pendingProcessedRegions.Put(region.GetID(), nil) +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } return nil } newPeer := &metapb.Peer{StoreId: target} replace := fmt.Sprintf("replace-%s-replica", status) - op, err := operator.CreateMovePeerOperator(replace, r.cluster, region, operator.OpReplica, storeID, newPeer) + op, err := operator.CreateMovePeerOperator(replace, c.cluster, region, operator.OpReplica, storeID, newPeer) if err != nil { if status == offlineStatus { replicaCheckerReplaceOfflineFailedCounter.Inc() @@ -289,12 +317,20 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status return op } -func (r *ReplicaChecker) strategy(region *core.RegionInfo) *ReplicaStrategy { +func (c *ReplicaChecker) strategy(r *rand.Rand, region *core.RegionInfo) *ReplicaStrategy { return &ReplicaStrategy{ +<<<<<<< HEAD checkerName: replicaCheckerName, cluster: r.cluster, locationLabels: r.conf.GetLocationLabels(), isolationLevel: r.conf.GetIsolationLevel(), +======= + checkerName: c.Name(), + cluster: c.cluster, + locationLabels: c.conf.GetLocationLabels(), + isolationLevel: c.conf.GetIsolationLevel(), +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) region: region, + r: r, } } diff --git a/pkg/schedule/checker/replica_strategy.go b/pkg/schedule/checker/replica_strategy.go index ad85e307bbe..02371a4a08e 100644 --- a/pkg/schedule/checker/replica_strategy.go +++ b/pkg/schedule/checker/replica_strategy.go @@ -15,6 +15,8 @@ package checker import ( + "math/rand" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" @@ -26,6 +28,7 @@ import ( // ReplicaStrategy collects some utilities to manipulate region peers. It // exists to allow replica_checker and rule_checker to reuse common logics. type ReplicaStrategy struct { + r *rand.Rand checkerName string // replica-checker / rule-checker cluster sche.CheckerCluster locationLabels []string @@ -76,7 +79,7 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores) strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowFastFailover: s.fastFailover, OperatorLevel: level} - targetCandidate := filter.NewCandidates(s.cluster.GetStores()). + targetCandidate := filter.NewCandidates(s.r, s.cluster.GetStores()). FilterTarget(s.cluster.GetCheckerConfig(), nil, nil, filters...). KeepTheTopStores(isolationComparer, false) // greater isolation score is better if targetCandidate.Len() == 0 { @@ -143,7 +146,7 @@ func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo if s.fastFailover { level = constant.Urgent } - source := filter.NewCandidates(coLocationStores). + source := filter.NewCandidates(s.r, coLocationStores). FilterSource(s.cluster.GetCheckerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, OperatorLevel: level}). KeepTheTopStores(isolationComparer, true). PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetCheckerConfig()), false) diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index 464f5e97be8..9d34ec4c7e6 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -18,6 +18,7 @@ import ( "context" "errors" "math" + "math/rand" "time" "github.com/pingcap/kvproto/pkg/metapb" @@ -84,6 +85,7 @@ var ( // RuleChecker fix/improve region by placement rules. type RuleChecker struct { PauseController +<<<<<<< HEAD cluster sche.CheckerCluster ruleManager *placement.RuleManager name string @@ -91,11 +93,21 @@ type RuleChecker struct { pendingList cache.Cache switchWitnessCache *cache.TTLUint64 record *recorder +======= + cluster sche.CheckerCluster + ruleManager *placement.RuleManager + pendingProcessedRegions *cache.TTLUint64 + pendingList cache.Cache + switchWitnessCache *cache.TTLUint64 + record *recorder + r *rand.Rand +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } // NewRuleChecker creates a checker instance. func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManager *placement.RuleManager, regionWaitingList cache.Cache) *RuleChecker { return &RuleChecker{ +<<<<<<< HEAD cluster: cluster, ruleManager: ruleManager, name: ruleCheckerName, @@ -103,6 +115,15 @@ func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManage pendingList: cache.NewDefaultCache(maxPendingListLen), switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetCheckerConfig().GetSwitchWitnessInterval()), record: newRecord(), +======= + cluster: cluster, + ruleManager: ruleManager, + pendingProcessedRegions: pendingProcessedRegions, + pendingList: cache.NewDefaultCache(maxPendingListLen), + switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetCheckerConfig().GetSwitchWitnessInterval()), + record: newRecord(), + r: rand.New(rand.NewSource(time.Now().UnixNano())), +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } } @@ -232,7 +253,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region ruleStores := c.getRuleFitStores(rf) isWitness := rf.Rule.IsWitness && c.isWitnessEnabled() // If the peer to be added is a witness, since no snapshot is needed, we also reuse the fast failover logic. - store, filterByTempState := c.strategy(region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores) + store, filterByTempState := c.strategy(c.r, region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores) if store == 0 { ruleCheckerNoStoreAddCounter.Inc() c.handleFilterState(region, filterByTempState) @@ -283,7 +304,7 @@ func (c *RuleChecker) replaceUnexpectedRulePeer(region *core.RegionInfo, rf *pla fastFailover = false } ruleStores := c.getRuleFitStores(rf) - store, filterByTempState := c.strategy(region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId()) + store, filterByTempState := c.strategy(c.r, region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId()) if store == 0 { ruleCheckerNoStoreReplaceCounter.Inc() c.handleFilterState(region, filterByTempState) @@ -424,7 +445,7 @@ func (c *RuleChecker) fixBetterLocation(region *core.RegionInfo, rf *placement.R isWitness := rf.Rule.IsWitness && c.isWitnessEnabled() // If the peer to be moved is a witness, since no snapshot is needed, we also reuse the fast failover logic. - strategy := c.strategy(region, rf.Rule, isWitness) + strategy := c.strategy(c.r, region, rf.Rule, isWitness) ruleStores := c.getRuleFitStores(rf) oldStore := strategy.SelectStoreToRemove(ruleStores) if oldStore == 0 { @@ -649,7 +670,7 @@ func (c *RuleChecker) hasAvailableWitness(region *core.RegionInfo, peer *metapb. return nil, false } -func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy { +func (c *RuleChecker) strategy(r *rand.Rand, region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy { return &ReplicaStrategy{ checkerName: c.name, cluster: c.cluster, @@ -658,6 +679,7 @@ func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fa region: region, extraFilters: []filter.Filter{filter.NewLabelConstraintFilter(c.name, rule.LabelConstraints)}, fastFailover: fastFailover, + r: r, } } diff --git a/pkg/schedule/filter/candidates.go b/pkg/schedule/filter/candidates.go index 35ca4a2e284..9877bbaa0ac 100644 --- a/pkg/schedule/filter/candidates.go +++ b/pkg/schedule/filter/candidates.go @@ -17,7 +17,6 @@ package filter import ( "math/rand" "sort" - "time" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/config" @@ -32,8 +31,8 @@ type StoreCandidates struct { } // NewCandidates creates StoreCandidates with store list. -func NewCandidates(stores []*core.StoreInfo) *StoreCandidates { - return &StoreCandidates{r: rand.New(rand.NewSource(time.Now().UnixNano())), Stores: stores} +func NewCandidates(r *rand.Rand, stores []*core.StoreInfo) *StoreCandidates { + return &StoreCandidates{r: r, Stores: stores} } // FilterSource keeps stores that can pass all source filters. diff --git a/pkg/schedule/filter/candidates_test.go b/pkg/schedule/filter/candidates_test.go index 13e8ed661cc..d421048aee3 100644 --- a/pkg/schedule/filter/candidates_test.go +++ b/pkg/schedule/filter/candidates_test.go @@ -15,7 +15,9 @@ package filter import ( + "math/rand" "testing" + "time" "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/require" @@ -112,7 +114,7 @@ func newTestCandidates(ids ...uint64) *StoreCandidates { for _, id := range ids { stores = append(stores, core.NewStoreInfo(&metapb.Store{Id: id})) } - return NewCandidates(stores) + return NewCandidates(rand.New(rand.NewSource(time.Now().UnixNano())), stores) } func check(re *require.Assertions, candidates *StoreCandidates, ids ...uint64) { diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 692256dc7d4..de951ffbeaf 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -532,7 +532,7 @@ func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil { finalFilters = append(l.filters, leaderFilter) } - target := filter.NewCandidates([]*core.StoreInfo{solver.Target}). + target := filter.NewCandidates(l.R, []*core.StoreInfo{solver.Target}). FilterTarget(conf, nil, l.filterCounter, finalFilters...). PickFirst() if target == nil { diff --git a/pkg/schedule/schedulers/balance_region.go b/pkg/schedule/schedulers/balance_region.go index dca22475808..35e26bdab7d 100644 --- a/pkg/schedule/schedulers/balance_region.go +++ b/pkg/schedule/schedulers/balance_region.go @@ -243,7 +243,7 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co filter.NewPlacementSafeguard(s.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, solver.fit), } - candidates := filter.NewCandidates(dstStores).FilterTarget(conf, collector, s.filterCounter, filters...) + candidates := filter.NewCandidates(s.R, dstStores).FilterTarget(conf, collector, s.filterCounter, filters...) if len(candidates.Stores) != 0 { solver.Step++ } diff --git a/pkg/schedule/schedulers/base_scheduler.go b/pkg/schedule/schedulers/base_scheduler.go index f4c8c577767..096207473fc 100644 --- a/pkg/schedule/schedulers/base_scheduler.go +++ b/pkg/schedule/schedulers/base_scheduler.go @@ -16,6 +16,7 @@ package schedulers import ( "fmt" + "math/rand" "net/http" "time" @@ -61,11 +62,28 @@ func intervalGrow(x time.Duration, maxInterval time.Duration, typ intervalGrowth // BaseScheduler is a basic scheduler for all other complex scheduler type BaseScheduler struct { OpController *operator.Controller +<<<<<<< HEAD } // NewBaseScheduler returns a basic scheduler func NewBaseScheduler(opController *operator.Controller) *BaseScheduler { return &BaseScheduler{OpController: opController} +======= + R *rand.Rand + + name string + tp types.CheckerSchedulerType + conf schedulerConfig +} + +// NewBaseScheduler returns a basic scheduler +func NewBaseScheduler( + opController *operator.Controller, + tp types.CheckerSchedulerType, + conf schedulerConfig, +) *BaseScheduler { + return &BaseScheduler{OpController: opController, tp: tp, conf: conf, R: rand.New(rand.NewSource(time.Now().UnixNano()))} +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } func (s *BaseScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 5cd59583767..c587abfe69f 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -15,6 +15,7 @@ package schedulers import ( + "math/rand" "net/http" "strconv" @@ -253,7 +254,11 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { evictLeaderCounter.Inc() +<<<<<<< HEAD return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize), nil +======= + return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf), nil +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } func uniqueAppendOperator(dst []*operator.Operator, src ...*operator.Operator) []*operator.Operator { @@ -276,10 +281,18 @@ type evictLeaderStoresConf interface { getKeyRangesByID(id uint64) []core.KeyRange } +<<<<<<< HEAD func scheduleEvictLeaderBatch(name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf, batchSize int) []*operator.Operator { +======= +func scheduleEvictLeaderBatch(r *rand.Rand, name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) var ops []*operator.Operator for i := 0; i < batchSize; i++ { +<<<<<<< HEAD once := scheduleEvictLeaderOnce(name, typ, cluster, conf) +======= + once := scheduleEvictLeaderOnce(r, name, cluster, conf) +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) // no more regions if len(once) == 0 { break @@ -293,7 +306,11 @@ func scheduleEvictLeaderBatch(name, typ string, cluster sche.SchedulerCluster, c return ops } +<<<<<<< HEAD func scheduleEvictLeaderOnce(name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { +======= +func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) stores := conf.getStores() ops := make([]*operator.Operator, 0, len(stores)) for _, storeID := range stores { @@ -324,7 +341,7 @@ func scheduleEvictLeaderOnce(name, typ string, cluster sche.SchedulerCluster, co } filters = append(filters, &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent}) - candidates := filter.NewCandidates(cluster.GetFollowerStores(region)). + candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)). FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...) // Compatible with old TiKV transfer leader logic. target := candidates.RandomPick() diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index aa48d0bc9e9..c1f25cd4dc8 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -266,7 +266,11 @@ func (s *evictSlowStoreScheduler) cleanupEvictLeader(cluster sche.SchedulerClust } func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster sche.SchedulerCluster) []*operator.Operator { +<<<<<<< HEAD return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) +======= + return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf) +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index d919c1c0f0a..a8d8c81876f 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -370,7 +370,11 @@ func (s *evictSlowTrendScheduler) scheduleEvictLeader(cluster sche.SchedulerClus return nil } storeSlowTrendEvictedStatusGauge.WithLabelValues(store.GetAddress(), strconv.FormatUint(store.GetID(), 10)).Set(1) +<<<<<<< HEAD return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) +======= + return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf) +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } func (s *evictSlowTrendScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { diff --git a/pkg/schedule/schedulers/label.go b/pkg/schedule/schedulers/label.go index 90310bcf10e..0f728abd8af 100644 --- a/pkg/schedule/schedulers/label.go +++ b/pkg/schedule/schedulers/label.go @@ -110,8 +110,13 @@ func (s *labelScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([ } f := filter.NewExcludedFilter(s.GetName(), nil, excludeStores) +<<<<<<< HEAD target := filter.NewCandidates(cluster.GetFollowerStores(region)). FilterTarget(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: LabelName, TransferLeader: true, OperatorLevel: constant.Medium}, f). +======= + target := filter.NewCandidates(s.R, cluster.GetFollowerStores(region)). + FilterTarget(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.Medium}, f). +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) RandomPick() if target == nil { log.Debug("label scheduler no target found for region", zap.Uint64("region-id", region.GetID())) diff --git a/pkg/schedule/schedulers/random_merge.go b/pkg/schedule/schedulers/random_merge.go index 44bb5081ef9..d387e1d084c 100644 --- a/pkg/schedule/schedulers/random_merge.go +++ b/pkg/schedule/schedulers/random_merge.go @@ -89,8 +89,13 @@ func (s *randomMergeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) func (s *randomMergeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { randomMergeCounter.Inc() +<<<<<<< HEAD store := filter.NewCandidates(cluster.GetStores()). FilterSource(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.conf.Name, MoveRegion: true, OperatorLevel: constant.Low}). +======= + store := filter.NewCandidates(s.R, cluster.GetStores()). + FilterSource(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.GetName(), MoveRegion: true, OperatorLevel: constant.Low}). +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) RandomPick() if store == nil { randomMergeNoSourceStoreCounter.Inc() diff --git a/pkg/schedule/schedulers/shuffle_leader.go b/pkg/schedule/schedulers/shuffle_leader.go index a6ff4baf65b..f28a1f3d865 100644 --- a/pkg/schedule/schedulers/shuffle_leader.go +++ b/pkg/schedule/schedulers/shuffle_leader.go @@ -92,7 +92,7 @@ func (s *shuffleLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun // 1. random select a valid store. // 2. transfer a leader to the store. shuffleLeaderCounter.Inc() - targetStore := filter.NewCandidates(cluster.GetStores()). + targetStore := filter.NewCandidates(s.R, cluster.GetStores()). FilterTarget(cluster.GetSchedulerConfig(), nil, nil, s.filters...). RandomPick() if targetStore == nil { diff --git a/pkg/schedule/schedulers/shuffle_region.go b/pkg/schedule/schedulers/shuffle_region.go index f9bed18d3fa..bfc7e56106b 100644 --- a/pkg/schedule/schedulers/shuffle_region.go +++ b/pkg/schedule/schedulers/shuffle_region.go @@ -132,7 +132,7 @@ func (s *shuffleRegionScheduler) Schedule(cluster sche.SchedulerCluster, dryRun } func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster sche.SchedulerCluster) (*core.RegionInfo, *metapb.Peer) { - candidates := filter.NewCandidates(cluster.GetStores()). + candidates := filter.NewCandidates(s.R, cluster.GetStores()). FilterSource(cluster.GetSchedulerConfig(), nil, nil, s.filters...). Shuffle() @@ -172,7 +172,7 @@ func (s *shuffleRegionScheduler) scheduleAddPeer(cluster sche.SchedulerCluster, scoreGuard := filter.NewPlacementSafeguard(s.GetName(), cluster.GetSchedulerConfig(), cluster.GetBasicCluster(), cluster.GetRuleManager(), region, store, nil) excludedFilter := filter.NewExcludedFilter(s.GetName(), nil, region.GetStoreIDs()) - target := filter.NewCandidates(cluster.GetStores()). + target := filter.NewCandidates(s.R, cluster.GetStores()). FilterTarget(cluster.GetSchedulerConfig(), nil, nil, append(s.filters, scoreGuard, excludedFilter)...). RandomPick() if target == nil { diff --git a/pkg/schedule/schedulers/transfer_witness_leader.go b/pkg/schedule/schedulers/transfer_witness_leader.go index c651a8ef872..4fb85072aaa 100644 --- a/pkg/schedule/schedulers/transfer_witness_leader.go +++ b/pkg/schedule/schedulers/transfer_witness_leader.go @@ -15,6 +15,8 @@ package schedulers import ( + "math/rand" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" @@ -83,7 +85,11 @@ batchLoop: for i := 0; i < batchSize; i++ { select { case region := <-s.regions: +<<<<<<< HEAD op, err := s.scheduleTransferWitnessLeader(name, typ, cluster, region) +======= + op, err := scheduleTransferWitnessLeader(s.R, name, cluster, region) +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) if err != nil { log.Debug("fail to create transfer leader operator", errs.ZapError(err)) continue @@ -100,7 +106,11 @@ batchLoop: return ops } +<<<<<<< HEAD func (s *transferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ string, cluster sche.SchedulerCluster, region *core.RegionInfo) (*operator.Operator, error) { +======= +func scheduleTransferWitnessLeader(r *rand.Rand, name string, cluster sche.SchedulerCluster, region *core.RegionInfo) (*operator.Operator, error) { +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) var filters []filter.Filter unhealthyPeerStores := make(map[uint64]struct{}) for _, peer := range region.GetDownPeers() { @@ -109,8 +119,14 @@ func (s *transferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ for _, peer := range region.GetPendingPeers() { unhealthyPeerStores[peer.GetStoreId()] = struct{}{} } +<<<<<<< HEAD filters = append(filters, filter.NewExcludedFilter(name, nil, unhealthyPeerStores), &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent}) candidates := filter.NewCandidates(cluster.GetFollowerStores(region)).FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...) +======= + filters = append(filters, filter.NewExcludedFilter(name, nil, unhealthyPeerStores), + &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent}) + candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)).FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...) +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) // Compatible with old TiKV transfer leader logic. target := candidates.RandomPick() targets := candidates.PickAll() diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index eaeb449f4a6..7bf0887bb27 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -225,7 +225,7 @@ func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bo if region == nil { continue } - target := filter.NewCandidates(cluster.GetFollowerStores(region)). + target := filter.NewCandidates(s.R, cluster.GetFollowerStores(region)). FilterTarget(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true, OperatorLevel: constant.Urgent}). RandomPick() if target == nil { From c2f65694a858bd4c0c14fe899c25f0364a8bbb3c Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 18 Oct 2024 16:52:47 +0800 Subject: [PATCH 2/2] resolve conflicts Signed-off-by: Ryan Leung --- pkg/schedule/checker/replica_checker.go | 43 +++---------------- pkg/schedule/checker/rule_checker.go | 22 +--------- pkg/schedule/schedulers/base_scheduler.go | 20 +-------- pkg/schedule/schedulers/evict_leader.go | 24 ++--------- pkg/schedule/schedulers/evict_slow_store.go | 6 +-- pkg/schedule/schedulers/evict_slow_trend.go | 6 +-- pkg/schedule/schedulers/label.go | 7 +-- pkg/schedule/schedulers/random_merge.go | 7 +-- .../schedulers/transfer_witness_leader.go | 18 +------- 9 files changed, 20 insertions(+), 133 deletions(-) diff --git a/pkg/schedule/checker/replica_checker.go b/pkg/schedule/checker/replica_checker.go index 1b537cb6f4f..8f1d0115ba4 100644 --- a/pkg/schedule/checker/replica_checker.go +++ b/pkg/schedule/checker/replica_checker.go @@ -63,36 +63,24 @@ var ( // Location management, mainly used for cross data center deployment. type ReplicaChecker struct { PauseController -<<<<<<< HEAD cluster sche.CheckerCluster conf config.CheckerConfigProvider regionWaitingList cache.Cache -======= - cluster sche.CheckerCluster - conf config.CheckerConfigProvider - pendingProcessedRegions *cache.TTLUint64 - r *rand.Rand ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) + r *rand.Rand } // NewReplicaChecker creates a replica checker. func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, regionWaitingList cache.Cache) *ReplicaChecker { return &ReplicaChecker{ -<<<<<<< HEAD cluster: cluster, conf: conf, regionWaitingList: regionWaitingList, -======= - cluster: cluster, - conf: conf, - pendingProcessedRegions: pendingProcessedRegions, - r: rand.New(rand.NewSource(time.Now().UnixNano())), ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) + r: rand.New(rand.NewSource(time.Now().UnixNano())), } } // GetType return ReplicaChecker's type -func (r *ReplicaChecker) GetType() string { +func (c *ReplicaChecker) GetType() string { return replicaCheckerName } @@ -195,11 +183,7 @@ func (c *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.O log.Debug("no store to add replica", zap.Uint64("region-id", region.GetID())) replicaCheckerNoTargetStoreCounter.Inc() if filterByTempState { -<<<<<<< HEAD - r.regionWaitingList.Put(region.GetID(), nil) -======= - c.pendingProcessedRegions.Put(region.GetID(), nil) ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) + c.regionWaitingList.Put(region.GetID(), nil) } return nil } @@ -226,11 +210,7 @@ func (c *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *opera old := c.strategy(c.r, region).SelectStoreToRemove(regionStores) if old == 0 { replicaCheckerNoWorstPeerCounter.Inc() -<<<<<<< HEAD - r.regionWaitingList.Put(region.GetID(), nil) -======= - c.pendingProcessedRegions.Put(region.GetID(), nil) ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) + c.regionWaitingList.Put(region.GetID(), nil) return nil } op, err := operator.CreateRemovePeerOperator("remove-extra-replica", c.cluster, operator.OpReplica, region, old) @@ -295,11 +275,7 @@ func (c *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status } log.Debug("no best store to add replica", zap.Uint64("region-id", region.GetID())) if filterByTempState { -<<<<<<< HEAD - r.regionWaitingList.Put(region.GetID(), nil) -======= - c.pendingProcessedRegions.Put(region.GetID(), nil) ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) + c.regionWaitingList.Put(region.GetID(), nil) } return nil } @@ -319,17 +295,10 @@ func (c *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status func (c *ReplicaChecker) strategy(r *rand.Rand, region *core.RegionInfo) *ReplicaStrategy { return &ReplicaStrategy{ -<<<<<<< HEAD checkerName: replicaCheckerName, - cluster: r.cluster, - locationLabels: r.conf.GetLocationLabels(), - isolationLevel: r.conf.GetIsolationLevel(), -======= - checkerName: c.Name(), cluster: c.cluster, locationLabels: c.conf.GetLocationLabels(), isolationLevel: c.conf.GetIsolationLevel(), ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) region: region, r: r, } diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index 9d34ec4c7e6..2203c68accd 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -85,7 +85,6 @@ var ( // RuleChecker fix/improve region by placement rules. type RuleChecker struct { PauseController -<<<<<<< HEAD cluster sche.CheckerCluster ruleManager *placement.RuleManager name string @@ -93,21 +92,12 @@ type RuleChecker struct { pendingList cache.Cache switchWitnessCache *cache.TTLUint64 record *recorder -======= - cluster sche.CheckerCluster - ruleManager *placement.RuleManager - pendingProcessedRegions *cache.TTLUint64 - pendingList cache.Cache - switchWitnessCache *cache.TTLUint64 - record *recorder - r *rand.Rand ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) + r *rand.Rand } // NewRuleChecker creates a checker instance. func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManager *placement.RuleManager, regionWaitingList cache.Cache) *RuleChecker { return &RuleChecker{ -<<<<<<< HEAD cluster: cluster, ruleManager: ruleManager, name: ruleCheckerName, @@ -115,15 +105,7 @@ func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManage pendingList: cache.NewDefaultCache(maxPendingListLen), switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetCheckerConfig().GetSwitchWitnessInterval()), record: newRecord(), -======= - cluster: cluster, - ruleManager: ruleManager, - pendingProcessedRegions: pendingProcessedRegions, - pendingList: cache.NewDefaultCache(maxPendingListLen), - switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetCheckerConfig().GetSwitchWitnessInterval()), - record: newRecord(), - r: rand.New(rand.NewSource(time.Now().UnixNano())), ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) + r: rand.New(rand.NewSource(time.Now().UnixNano())), } } diff --git a/pkg/schedule/schedulers/base_scheduler.go b/pkg/schedule/schedulers/base_scheduler.go index 096207473fc..958c5978a45 100644 --- a/pkg/schedule/schedulers/base_scheduler.go +++ b/pkg/schedule/schedulers/base_scheduler.go @@ -62,28 +62,12 @@ func intervalGrow(x time.Duration, maxInterval time.Duration, typ intervalGrowth // BaseScheduler is a basic scheduler for all other complex scheduler type BaseScheduler struct { OpController *operator.Controller -<<<<<<< HEAD -} - -// NewBaseScheduler returns a basic scheduler -func NewBaseScheduler(opController *operator.Controller) *BaseScheduler { - return &BaseScheduler{OpController: opController} -======= R *rand.Rand - - name string - tp types.CheckerSchedulerType - conf schedulerConfig } // NewBaseScheduler returns a basic scheduler -func NewBaseScheduler( - opController *operator.Controller, - tp types.CheckerSchedulerType, - conf schedulerConfig, -) *BaseScheduler { - return &BaseScheduler{OpController: opController, tp: tp, conf: conf, R: rand.New(rand.NewSource(time.Now().UnixNano()))} ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) +func NewBaseScheduler(opController *operator.Controller) *BaseScheduler { + return &BaseScheduler{OpController: opController, R: rand.New(rand.NewSource(time.Now().UnixNano()))} } func (s *BaseScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index c587abfe69f..88b552e5f80 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -254,11 +254,7 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { evictLeaderCounter.Inc() -<<<<<<< HEAD - return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize), nil -======= - return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf), nil ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) + return scheduleEvictLeaderBatch(s.R, s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize), nil } func uniqueAppendOperator(dst []*operator.Operator, src ...*operator.Operator) []*operator.Operator { @@ -281,18 +277,10 @@ type evictLeaderStoresConf interface { getKeyRangesByID(id uint64) []core.KeyRange } -<<<<<<< HEAD -func scheduleEvictLeaderBatch(name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf, batchSize int) []*operator.Operator { -======= -func scheduleEvictLeaderBatch(r *rand.Rand, name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) +func scheduleEvictLeaderBatch(r *rand.Rand, name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf, batchSize int) []*operator.Operator { var ops []*operator.Operator for i := 0; i < batchSize; i++ { -<<<<<<< HEAD - once := scheduleEvictLeaderOnce(name, typ, cluster, conf) -======= - once := scheduleEvictLeaderOnce(r, name, cluster, conf) ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) + once := scheduleEvictLeaderOnce(r, name, typ, cluster, conf) // no more regions if len(once) == 0 { break @@ -306,11 +294,7 @@ func scheduleEvictLeaderBatch(r *rand.Rand, name string, cluster sche.SchedulerC return ops } -<<<<<<< HEAD -func scheduleEvictLeaderOnce(name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { -======= -func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) +func scheduleEvictLeaderOnce(r *rand.Rand, name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { stores := conf.getStores() ops := make([]*operator.Operator, 0, len(stores)) for _, storeID := range stores { diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index c1f25cd4dc8..3ffc29d6022 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -266,11 +266,7 @@ func (s *evictSlowStoreScheduler) cleanupEvictLeader(cluster sche.SchedulerClust } func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster sche.SchedulerCluster) []*operator.Operator { -<<<<<<< HEAD - return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) -======= - return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf) ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) + return scheduleEvictLeaderBatch(s.R, s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) } func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index a8d8c81876f..829c948a3c9 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -370,11 +370,7 @@ func (s *evictSlowTrendScheduler) scheduleEvictLeader(cluster sche.SchedulerClus return nil } storeSlowTrendEvictedStatusGauge.WithLabelValues(store.GetAddress(), strconv.FormatUint(store.GetID(), 10)).Set(1) -<<<<<<< HEAD - return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) -======= - return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf) ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) + return scheduleEvictLeaderBatch(s.R, s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) } func (s *evictSlowTrendScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { diff --git a/pkg/schedule/schedulers/label.go b/pkg/schedule/schedulers/label.go index 0f728abd8af..345f7df4007 100644 --- a/pkg/schedule/schedulers/label.go +++ b/pkg/schedule/schedulers/label.go @@ -110,13 +110,8 @@ func (s *labelScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([ } f := filter.NewExcludedFilter(s.GetName(), nil, excludeStores) -<<<<<<< HEAD - target := filter.NewCandidates(cluster.GetFollowerStores(region)). - FilterTarget(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: LabelName, TransferLeader: true, OperatorLevel: constant.Medium}, f). -======= target := filter.NewCandidates(s.R, cluster.GetFollowerStores(region)). - FilterTarget(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.Medium}, f). ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) + FilterTarget(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: LabelName, TransferLeader: true, OperatorLevel: constant.Medium}, f). RandomPick() if target == nil { log.Debug("label scheduler no target found for region", zap.Uint64("region-id", region.GetID())) diff --git a/pkg/schedule/schedulers/random_merge.go b/pkg/schedule/schedulers/random_merge.go index d387e1d084c..0a23907560f 100644 --- a/pkg/schedule/schedulers/random_merge.go +++ b/pkg/schedule/schedulers/random_merge.go @@ -89,13 +89,8 @@ func (s *randomMergeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) func (s *randomMergeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { randomMergeCounter.Inc() -<<<<<<< HEAD - store := filter.NewCandidates(cluster.GetStores()). - FilterSource(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.conf.Name, MoveRegion: true, OperatorLevel: constant.Low}). -======= store := filter.NewCandidates(s.R, cluster.GetStores()). - FilterSource(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.GetName(), MoveRegion: true, OperatorLevel: constant.Low}). ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) + FilterSource(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.conf.Name, MoveRegion: true, OperatorLevel: constant.Low}). RandomPick() if store == nil { randomMergeNoSourceStoreCounter.Inc() diff --git a/pkg/schedule/schedulers/transfer_witness_leader.go b/pkg/schedule/schedulers/transfer_witness_leader.go index 4fb85072aaa..e0c6dcbdca2 100644 --- a/pkg/schedule/schedulers/transfer_witness_leader.go +++ b/pkg/schedule/schedulers/transfer_witness_leader.go @@ -85,11 +85,7 @@ batchLoop: for i := 0; i < batchSize; i++ { select { case region := <-s.regions: -<<<<<<< HEAD - op, err := s.scheduleTransferWitnessLeader(name, typ, cluster, region) -======= - op, err := scheduleTransferWitnessLeader(s.R, name, cluster, region) ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) + op, err := s.scheduleTransferWitnessLeader(s.R, name, typ, cluster, region) if err != nil { log.Debug("fail to create transfer leader operator", errs.ZapError(err)) continue @@ -106,11 +102,7 @@ batchLoop: return ops } -<<<<<<< HEAD -func (s *transferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ string, cluster sche.SchedulerCluster, region *core.RegionInfo) (*operator.Operator, error) { -======= -func scheduleTransferWitnessLeader(r *rand.Rand, name string, cluster sche.SchedulerCluster, region *core.RegionInfo) (*operator.Operator, error) { ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) +func (s *transferWitnessLeaderScheduler) scheduleTransferWitnessLeader(r *rand.Rand, name, typ string, cluster sche.SchedulerCluster, region *core.RegionInfo) (*operator.Operator, error) { var filters []filter.Filter unhealthyPeerStores := make(map[uint64]struct{}) for _, peer := range region.GetDownPeers() { @@ -119,14 +111,8 @@ func scheduleTransferWitnessLeader(r *rand.Rand, name string, cluster sche.Sched for _, peer := range region.GetPendingPeers() { unhealthyPeerStores[peer.GetStoreId()] = struct{}{} } -<<<<<<< HEAD filters = append(filters, filter.NewExcludedFilter(name, nil, unhealthyPeerStores), &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent}) - candidates := filter.NewCandidates(cluster.GetFollowerStores(region)).FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...) -======= - filters = append(filters, filter.NewExcludedFilter(name, nil, unhealthyPeerStores), - &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent}) candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)).FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...) ->>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) // Compatible with old TiKV transfer leader logic. target := candidates.RandomPick() targets := candidates.PickAll()