From 35fb84fb4f0c4d86f151aad953c0c14cb6781267 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 24 Sep 2024 13:52:27 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #8632 close tikv/pd#8619 Signed-off-by: ti-chi-bot --- pkg/schedule/schedulers/evict_leader.go | 487 ++++++++++ plugin/scheduler_example/evict_leader.go | 38 + server/schedulers/grant_leader.go | 12 +- .../pd-ctl/tests/scheduler/scheduler_test.go | 909 ++++++++++++++++++ 4 files changed, 1445 insertions(+), 1 deletion(-) create mode 100644 pkg/schedule/schedulers/evict_leader.go create mode 100644 tools/pd-ctl/tests/scheduler/scheduler_test.go diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go new file mode 100644 index 00000000000..ec068b9550b --- /dev/null +++ b/pkg/schedule/schedulers/evict_leader.go @@ -0,0 +1,487 @@ +// Copyright 2017 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "net/http" + "strconv" + + "github.com/gorilla/mux" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" + "github.com/tikv/pd/pkg/errs" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/filter" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/plan" + "github.com/tikv/pd/pkg/schedule/types" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/syncutil" + "github.com/unrolled/render" + "go.uber.org/zap" +) + +const ( + // EvictLeaderBatchSize is the number of operators to transfer + // leaders by one scheduling + EvictLeaderBatchSize = 3 + lastStoreDeleteInfo = "The last store has been deleted" +) + +type evictLeaderSchedulerConfig struct { + syncutil.RWMutex + schedulerConfig + + StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` + // Batch is used to generate multiple operators by one scheduling + Batch int `json:"batch"` + cluster *core.BasicCluster + removeSchedulerCb func(string) error +} + +func (conf *evictLeaderSchedulerConfig) getStores() []uint64 { + conf.RLock() + defer conf.RUnlock() + stores := make([]uint64, 0, len(conf.StoreIDWithRanges)) + for storeID := range conf.StoreIDWithRanges { + stores = append(stores, storeID) + } + return stores +} + +func (conf *evictLeaderSchedulerConfig) getBatch() int { + conf.RLock() + defer conf.RUnlock() + return conf.Batch +} + +func (conf *evictLeaderSchedulerConfig) clone() *evictLeaderSchedulerConfig { + conf.RLock() + defer conf.RUnlock() + storeIDWithRanges := make(map[uint64][]core.KeyRange) + for id, ranges := range conf.StoreIDWithRanges { + storeIDWithRanges[id] = append(storeIDWithRanges[id], ranges...) + } + return &evictLeaderSchedulerConfig{ + StoreIDWithRanges: storeIDWithRanges, + Batch: conf.Batch, + } +} + +func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string { + conf.RLock() + defer conf.RUnlock() + ranges := conf.StoreIDWithRanges[id] + res := make([]string, 0, len(ranges)*2) + for index := range ranges { + res = append(res, (string)(ranges[index].StartKey), (string)(ranges[index].EndKey)) + } + return res +} + +func (conf *evictLeaderSchedulerConfig) removeStoreLocked(id uint64) (bool, error) { + _, exists := conf.StoreIDWithRanges[id] + if exists { + delete(conf.StoreIDWithRanges, id) + conf.cluster.ResumeLeaderTransfer(id) + return len(conf.StoreIDWithRanges) == 0, nil + } + return false, errs.ErrScheduleConfigNotExist.FastGenByArgs() +} + +func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) { + conf.Lock() + defer conf.Unlock() + // if the store is not existed, no need to resume leader transfer + _, _ = conf.removeStoreLocked(id) +} + +func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) { + if err := conf.cluster.PauseLeaderTransfer(id); err != nil { + log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err)) + } + conf.StoreIDWithRanges[id] = keyRange +} + +func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) { + conf.Lock() + defer conf.Unlock() + conf.resetStoreLocked(id, keyRange) +} + +func (conf *evictLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange { + conf.RLock() + defer conf.RUnlock() + if ranges, exist := conf.StoreIDWithRanges[id]; exist { + return ranges + } + return nil +} + +func (conf *evictLeaderSchedulerConfig) encodeConfig() ([]byte, error) { + conf.RLock() + defer conf.RUnlock() + return EncodeConfig(conf) +} + +func (conf *evictLeaderSchedulerConfig) reloadConfig() error { + conf.Lock() + defer conf.Unlock() + newCfg := &evictLeaderSchedulerConfig{} + if err := conf.load(newCfg); err != nil { + return err + } + pauseAndResumeLeaderTransfer(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) + conf.StoreIDWithRanges = newCfg.StoreIDWithRanges + conf.Batch = newCfg.Batch + return nil +} + +func (conf *evictLeaderSchedulerConfig) pauseLeaderTransfer(cluster sche.SchedulerCluster) error { + conf.RLock() + defer conf.RUnlock() + var res error + for id := range conf.StoreIDWithRanges { + if err := cluster.PauseLeaderTransfer(id); err != nil { + res = err + } + } + return res +} + +func (conf *evictLeaderSchedulerConfig) resumeLeaderTransfer(cluster sche.SchedulerCluster) { + conf.RLock() + defer conf.RUnlock() + for id := range conf.StoreIDWithRanges { + cluster.ResumeLeaderTransfer(id) + } +} + +func (conf *evictLeaderSchedulerConfig) pauseLeaderTransferIfStoreNotExist(id uint64) (bool, error) { + conf.RLock() + defer conf.RUnlock() + if _, exist := conf.StoreIDWithRanges[id]; !exist { + if err := conf.cluster.PauseLeaderTransfer(id); err != nil { + return exist, err + } + } + return true, nil +} + +func (conf *evictLeaderSchedulerConfig) update(id uint64, newRanges []core.KeyRange, batch int) error { + conf.Lock() + defer conf.Unlock() + if id != 0 { + conf.StoreIDWithRanges[id] = newRanges + } + conf.Batch = batch + err := conf.save() + if err != nil && id != 0 { + _, _ = conf.removeStoreLocked(id) + } + return err +} + +func (conf *evictLeaderSchedulerConfig) delete(id uint64) (any, error) { + conf.Lock() + var resp any + last, err := conf.removeStoreLocked(id) + if err != nil { + conf.Unlock() + return resp, err + } + + keyRanges := conf.StoreIDWithRanges[id] + err = conf.save() + if err != nil { + conf.resetStoreLocked(id, keyRanges) + conf.Unlock() + return resp, err + } + if !last { + conf.Unlock() + return resp, nil + } + conf.Unlock() + if err := conf.removeSchedulerCb(types.EvictLeaderScheduler.String()); err != nil { + if !errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) { + conf.resetStore(id, keyRanges) + } + return resp, err + } + resp = lastStoreDeleteInfo + return resp, nil +} + +type evictLeaderScheduler struct { + *BaseScheduler + conf *evictLeaderSchedulerConfig + handler http.Handler +} + +// newEvictLeaderScheduler creates an admin scheduler that transfers all leaders +// out of a store. +func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeaderSchedulerConfig) Scheduler { + handler := newEvictLeaderHandler(conf) + return &evictLeaderScheduler{ + BaseScheduler: NewBaseScheduler(opController, types.EvictLeaderScheduler, conf), + conf: conf, + handler: handler, + } +} + +// EvictStoreIDs returns the IDs of the evict-stores. +func (s *evictLeaderScheduler) EvictStoreIDs() []uint64 { + return s.conf.getStores() +} + +// ServeHTTP implements the http.Handler interface. +func (s *evictLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) +} + +// GetName implements the Scheduler interface. +func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { + return s.conf.encodeConfig() +} + +// ReloadConfig reloads the config from the storage. +func (s *evictLeaderScheduler) ReloadConfig() error { + return s.conf.reloadConfig() +} + +// PrepareConfig implements the Scheduler interface. +func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { + return s.conf.pauseLeaderTransfer(cluster) +} + +// CleanConfig implements the Scheduler interface. +func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { + s.conf.resumeLeaderTransfer(cluster) +} + +// IsScheduleAllowed implements the Scheduler interface. +func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() + if !allowed { + operator.IncOperatorLimitCounter(s.GetType(), operator.OpLeader) + } + return allowed +} + +// Schedule implements the Scheduler interface. +func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { + evictLeaderCounter.Inc() + return scheduleEvictLeaderBatch(s.GetName(), cluster, s.conf), nil +} + +func uniqueAppendOperator(dst []*operator.Operator, src ...*operator.Operator) []*operator.Operator { + regionIDs := make(map[uint64]struct{}) + for i := range dst { + regionIDs[dst[i].RegionID()] = struct{}{} + } + for i := range src { + if _, ok := regionIDs[src[i].RegionID()]; ok { + continue + } + regionIDs[src[i].RegionID()] = struct{}{} + dst = append(dst, src[i]) + } + return dst +} + +type evictLeaderStoresConf interface { + getStores() []uint64 + getKeyRangesByID(id uint64) []core.KeyRange + getBatch() int +} + +func scheduleEvictLeaderBatch(name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { + var ops []*operator.Operator + batchSize := conf.getBatch() + for i := 0; i < batchSize; i++ { + once := scheduleEvictLeaderOnce(name, cluster, conf) + // no more regions + if len(once) == 0 { + break + } + ops = uniqueAppendOperator(ops, once...) + // the batch has been fulfilled + if len(ops) > batchSize { + break + } + } + return ops +} + +func scheduleEvictLeaderOnce(name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { + stores := conf.getStores() + ops := make([]*operator.Operator, 0, len(stores)) + for _, storeID := range stores { + ranges := conf.getKeyRangesByID(storeID) + if len(ranges) == 0 { + continue + } + var filters []filter.Filter + pendingFilter := filter.NewRegionPendingFilter() + downFilter := filter.NewRegionDownFilter() + region := filter.SelectOneRegion(cluster.RandLeaderRegions(storeID, ranges), nil, pendingFilter, downFilter) + if region == nil { + // try to pick unhealthy region + region = filter.SelectOneRegion(cluster.RandLeaderRegions(storeID, ranges), nil) + if region == nil { + evictLeaderNoLeaderCounter.Inc() + continue + } + evictLeaderPickUnhealthyCounter.Inc() + unhealthyPeerStores := make(map[uint64]struct{}) + for _, peer := range region.GetDownPeers() { + unhealthyPeerStores[peer.GetPeer().GetStoreId()] = struct{}{} + } + for _, peer := range region.GetPendingPeers() { + unhealthyPeerStores[peer.GetStoreId()] = struct{}{} + } + filters = append(filters, filter.NewExcludedFilter(name, nil, unhealthyPeerStores)) + } + + filters = append(filters, &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent}) + candidates := filter.NewCandidates(cluster.GetFollowerStores(region)). + FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...) + // Compatible with old TiKV transfer leader logic. + target := candidates.RandomPick() + targets := candidates.PickAll() + // `targets` MUST contains `target`, so only needs to check if `target` is nil here. + if target == nil { + evictLeaderNoTargetStoreCounter.Inc() + continue + } + targetIDs := make([]uint64, 0, len(targets)) + for _, t := range targets { + targetIDs = append(targetIDs, t.GetID()) + } + op, err := operator.CreateTransferLeaderOperator(name, cluster, region, target.GetID(), targetIDs, operator.OpLeader) + if err != nil { + log.Debug("fail to create evict leader operator", errs.ZapError(err)) + continue + } + op.SetPriorityLevel(constant.Urgent) + op.Counters = append(op.Counters, evictLeaderNewOperatorCounter) + ops = append(ops, op) + } + return ops +} + +type evictLeaderHandler struct { + rd *render.Render + config *evictLeaderSchedulerConfig +} + +func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.Request) { + var input map[string]any + if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { + return + } + var ( + exist bool + err error + id uint64 + newRanges []core.KeyRange + ) + idFloat, inputHasStoreID := input["store_id"].(float64) + if inputHasStoreID { + id = (uint64)(idFloat) + exist, err = handler.config.pauseLeaderTransferIfStoreNotExist(id) + if err != nil { + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + } + + batch := handler.config.getBatch() + batchFloat, ok := input["batch"].(float64) + if ok { + if batchFloat < 1 || batchFloat > 10 { + handler.config.removeStore(id) + handler.rd.JSON(w, http.StatusBadRequest, "batch is invalid, it should be in [1, 10]") + return + } + batch = (int)(batchFloat) + } + + ranges, ok := (input["ranges"]).([]string) + if ok { + if !inputHasStoreID { + handler.config.removeStore(id) + handler.rd.JSON(w, http.StatusInternalServerError, errs.ErrSchedulerConfig.FastGenByArgs("id")) + return + } + } else if exist { + ranges = handler.config.getRanges(id) + } + + newRanges, err = getKeyRanges(ranges) + if err != nil { + handler.config.removeStore(id) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + + err = handler.config.update(id, newRanges, batch) + if err != nil { + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.") +} + +func (handler *evictLeaderHandler) listConfig(w http.ResponseWriter, _ *http.Request) { + conf := handler.config.clone() + handler.rd.JSON(w, http.StatusOK, conf) +} + +func (handler *evictLeaderHandler) deleteConfig(w http.ResponseWriter, r *http.Request) { + idStr := mux.Vars(r)["store_id"] + id, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + handler.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + + resp, err := handler.config.delete(id) + if err != nil { + if errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) || errors.ErrorEqual(err, errs.ErrScheduleConfigNotExist.FastGenByArgs()) { + handler.rd.JSON(w, http.StatusNotFound, err.Error()) + } else { + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + } + return + } + + handler.rd.JSON(w, http.StatusOK, resp) +} + +func newEvictLeaderHandler(config *evictLeaderSchedulerConfig) http.Handler { + h := &evictLeaderHandler{ + config: config, + rd: render.New(render.Options{IndentJSON: true}), + } + router := mux.NewRouter() + router.HandleFunc("/config", h.updateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.listConfig).Methods(http.MethodGet) + router.HandleFunc("/delete/{store_id}", h.deleteConfig).Methods(http.MethodDelete) + return router +} diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index cbc09617dad..f3c658f5c48 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -271,9 +271,25 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R args = append(args, handler.config.getRanges(id)...) } +<<<<<<< HEAD handler.config.BuildWithArgs(args) err := handler.config.Persist() +======= + err := handler.config.BuildWithArgs(args) if err != nil { + handler.config.mu.Lock() + handler.config.cluster.ResumeLeaderTransfer(id) + handler.config.mu.Unlock() + handler.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + err = handler.config.Persist() +>>>>>>> 6b927e117 (*: reset config if the input is invalid (#8632)) + if err != nil { + handler.config.mu.Lock() + delete(handler.config.StoreIDWitRanges, id) + handler.config.cluster.ResumeLeaderTransfer(id) + handler.config.mu.Unlock() handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) } handler.rd.JSON(w, http.StatusOK, nil) @@ -294,6 +310,7 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R handler.config.mu.Lock() defer handler.config.mu.Unlock() +<<<<<<< HEAD _, exists := handler.config.StoreIDWitRanges[id] if exists { delete(handler.config.StoreIDWitRanges, id) @@ -312,6 +329,27 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R } handler.rd.JSON(w, http.StatusInternalServerError, errors.New("the config does not exist")) +======= + ranges, exists := handler.config.StoreIDWitRanges[id] + if !exists { + handler.rd.JSON(w, http.StatusInternalServerError, errors.New("the config does not exist")) + return + } + delete(handler.config.StoreIDWitRanges, id) + handler.config.cluster.ResumeLeaderTransfer(id) + + if err := handler.config.Persist(); err != nil { + handler.config.StoreIDWitRanges[id] = ranges + _ = handler.config.cluster.PauseLeaderTransfer(id) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + var resp any + if len(handler.config.StoreIDWitRanges) == 0 { + resp = noStoreInSchedulerInfo + } + handler.rd.JSON(w, http.StatusOK, resp) +>>>>>>> 6b927e117 (*: reset config if the input is invalid (#8632)) } func newEvictLeaderHandler(config *evictLeaderSchedulerConfig) http.Handler { diff --git a/server/schedulers/grant_leader.go b/server/schedulers/grant_leader.go index 40dd5c8a073..a8f1fc452fa 100644 --- a/server/schedulers/grant_leader.go +++ b/server/schedulers/grant_leader.go @@ -286,10 +286,20 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R args = append(args, handler.config.getRanges(id)...) } +<<<<<<< HEAD:server/schedulers/grant_leader.go handler.config.BuildWithArgs(args) err := handler.config.Persist() +======= + err := handler.config.buildWithArgs(args) if err != nil { - handler.config.removeStore(id) + _, _ = handler.config.removeStore(id) + handler.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + err = handler.config.persist() +>>>>>>> 6b927e117 (*: reset config if the input is invalid (#8632)):pkg/schedule/schedulers/grant_leader.go + if err != nil { + _, _ = handler.config.removeStore(id) handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go new file mode 100644 index 00000000000..63d0f091b8c --- /dev/null +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -0,0 +1,909 @@ +// Copyright 2019 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler_test + +import ( + "encoding/json" + "fmt" + "reflect" + "strings" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/spf13/cobra" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/core" + sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/versioninfo" + pdTests "github.com/tikv/pd/tests" + ctl "github.com/tikv/pd/tools/pd-ctl/pdctl" + "github.com/tikv/pd/tools/pd-ctl/tests" +) + +type schedulerTestSuite struct { + suite.Suite + env *pdTests.SchedulingTestEnvironment + defaultSchedulers []string +} + +func TestSchedulerTestSuite(t *testing.T) { + suite.Run(t, new(schedulerTestSuite)) +} + +func (suite *schedulerTestSuite) SetupSuite() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipStoreConfigSync", `return(true)`)) + suite.defaultSchedulers = []string{ + "balance-leader-scheduler", + "balance-region-scheduler", + "balance-hot-region-scheduler", + "evict-slow-store-scheduler", + } +} + +func (suite *schedulerTestSuite) SetupTest() { + // use a new environment to avoid affecting other tests + suite.env = pdTests.NewSchedulingTestEnvironment(suite.T()) +} + +func (suite *schedulerTestSuite) TearDownSuite() { + re := suite.Require() + suite.env.Cleanup() + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/skipStoreConfigSync")) +} + +func (suite *schedulerTestSuite) TearDownTest() { + cleanFunc := func(cluster *pdTests.TestCluster) { + re := suite.Require() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := ctl.GetRootCmd() + + var currentSchedulers []string + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, ¤tSchedulers) + for _, scheduler := range suite.defaultSchedulers { + if slice.NoneOf(currentSchedulers, func(i int) bool { + return currentSchedulers[i] == scheduler + }) { + echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", scheduler}, nil) + re.Contains(echo, "Success!") + } + } + for _, scheduler := range currentSchedulers { + if slice.NoneOf(suite.defaultSchedulers, func(i int) bool { + return suite.defaultSchedulers[i] == scheduler + }) { + echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", scheduler}, nil) + re.Contains(echo, "Success!") + } + } + } + suite.env.RunFuncInTwoModes(cleanFunc) + suite.env.Cleanup() +} + +func (suite *schedulerTestSuite) TestScheduler() { + suite.env.RunTestBasedOnMode(suite.checkScheduler) +} + +func (suite *schedulerTestSuite) checkScheduler(cluster *pdTests.TestCluster) { + re := suite.Require() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := ctl.GetRootCmd() + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 4, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + } + + mustUsage := func(args []string) { + output, err := tests.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "Usage") + } + + checkSchedulerConfigCommand := func(expectedConfig map[string]any, schedulerName string) { + testutil.Eventually(re, func() bool { + configInfo := make(map[string]any) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName}, &configInfo) + return reflect.DeepEqual(expectedConfig["store-id-ranges"], configInfo["store-id-ranges"]) + }) + } + + leaderServer := cluster.GetLeaderServer() + for _, store := range stores { + pdTests.MustPutStore(re, cluster, store) + } + + // note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region. + pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) + + // scheduler show command + expected := map[string]bool{ + "balance-region-scheduler": true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(re, cmd, pdAddr, nil, expected) + + // scheduler delete command + args := []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(re, cmd, pdAddr, args, expected) + + // avoid the influence of the scheduler order + schedulers := []string{"evict-leader-scheduler", "grant-leader-scheduler", "evict-leader-scheduler", "grant-leader-scheduler"} + + checkStorePause := func(changedStores []uint64, schedulerName string) { + status := func() string { + switch schedulerName { + case "evict-leader-scheduler": + return "paused" + case "grant-leader-scheduler": + return "resumed" + default: + re.Fail(fmt.Sprintf("unknown scheduler %s", schedulerName)) + return "" + } + }() + for _, store := range stores { + isStorePaused := !cluster.GetLeaderServer().GetRaftCluster().GetStore(store.GetId()).AllowLeaderTransfer() + if slice.AnyOf(changedStores, func(i int) bool { + return store.GetId() == changedStores[i] + }) { + re.True(isStorePaused, + fmt.Sprintf("store %d should be %s with %s", store.GetId(), status, schedulerName)) + } else { + re.False(isStorePaused, + fmt.Sprintf("store %d should not be %s with %s", store.GetId(), status, schedulerName)) + } + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + re.Equal(isStorePaused, !sche.GetCluster().GetStore(store.GetId()).AllowLeaderTransfer()) + } + } + } + + for idx := range schedulers { + checkStorePause([]uint64{}, schedulers[idx]) + + // will fail because the scheduler is not existed + args = []string{"-u", pdAddr, "scheduler", "config", schedulers[idx], "add-store", "3"} + output := mustExec(re, cmd, args, nil) + re.Contains(output, fmt.Sprintf("Unable to update config: scheduler %s does not exist.", schedulers[idx])) + + // scheduler add command + args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "2"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + schedulers[idx]: true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(re, cmd, pdAddr, args, expected) + + // scheduler config show command + expectedConfig := make(map[string]any) + expectedConfig["store-id-ranges"] = map[string]any{"2": []any{map[string]any{"end-key": "", "start-key": ""}}} + checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2}, schedulers[idx]) + + // scheduler config update command + args = []string{"-u", pdAddr, "scheduler", "config", schedulers[idx], "add-store", "3"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + schedulers[idx]: true, + "evict-slow-store-scheduler": true, + } + + // check update success + checkSchedulerCommand(re, cmd, pdAddr, args, expected) + expectedConfig["store-id-ranges"] = map[string]any{"2": []any{map[string]any{"end-key": "", "start-key": ""}}, "3": []any{map[string]any{"end-key": "", "start-key": ""}}} + checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2, 3}, schedulers[idx]) + + // scheduler delete command + args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx]} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(re, cmd, pdAddr, args, expected) + checkStorePause([]uint64{}, schedulers[idx]) + + // scheduler add command + args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "2"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + schedulers[idx]: true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(re, cmd, pdAddr, args, expected) + checkStorePause([]uint64{2}, schedulers[idx]) + + // scheduler add command twice + args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "4"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + schedulers[idx]: true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(re, cmd, pdAddr, args, expected) + + // check add success + expectedConfig["store-id-ranges"] = map[string]any{"2": []any{map[string]any{"end-key": "", "start-key": ""}}, "4": []any{map[string]any{"end-key": "", "start-key": ""}}} + checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2, 4}, schedulers[idx]) + + // scheduler remove command [old] + args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx] + "-4"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + schedulers[idx]: true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(re, cmd, pdAddr, args, expected) + + // check remove success + expectedConfig["store-id-ranges"] = map[string]any{"2": []any{map[string]any{"end-key": "", "start-key": ""}}} + checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2}, schedulers[idx]) + + // scheduler remove command, when remove the last store, it should remove whole scheduler + args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx] + "-2"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(re, cmd, pdAddr, args, expected) + checkStorePause([]uint64{}, schedulers[idx]) + } + + // test remove and add scheduler + echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + re.NotContains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}, nil) + re.Equal("Success! The scheduler is created.\n", echo) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "2"}, nil) + re.Equal("Success! The scheduler has been applied to the store.\n", echo) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-2"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { // wait for removed scheduler to be synced to scheduling server. + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler"}, nil) + return strings.Contains(echo, "[404] scheduler not found") + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) + re.Contains(echo, "Unable to update config: scheduler evict-leader-scheduler does not exist.") + + // test remove and add + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil) + re.Contains(echo, "Success") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-hot-region-scheduler"}, nil) + re.Contains(echo, "Success") + + // test show scheduler with paused and disabled status. + checkSchedulerWithStatusCommand := func(status string, expected []string) { + testutil.Eventually(re, func() bool { + var schedulers []string + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show", "--status", status}, &schedulers) + return reflect.DeepEqual(expected, schedulers) + }) + } + + // test scatter range scheduler + for _, name := range []string{ + "test", "test#", "?test", + /* TODO: to handle case like "tes&t", we need to modify the server's JSON render to unescape the HTML characters */ + } { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "scatter-range", "--format=raw", "a", "b", name}, nil) + re.Contains(echo, "Success!") + schedulerName := fmt.Sprintf("scatter-range-%s", name) + // test show scheduler + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return strings.Contains(echo, schedulerName) + }) + // test remove scheduler + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", schedulerName}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return !strings.Contains(echo, schedulerName) + }) + } + + mustUsage([]string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler"}) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + re.Contains(echo, "Success!") + checkSchedulerWithStatusCommand("paused", []string{ + "balance-leader-scheduler", + }) + result := make(map[string]any) + testutil.Eventually(re, func() bool { + mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", "balance-leader-scheduler"}, &result) + return len(result) != 0 && result["status"] == "paused" && result["summary"] == "" + }, testutil.WithWaitFor(30*time.Second)) + + mustUsage([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler", "60"}) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) + re.Contains(echo, "Success!") + checkSchedulerWithStatusCommand("paused", []string{}) + + // set label scheduler to disabled manually. + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "label-scheduler"}, nil) + re.Contains(echo, "Success!") + cfg := leaderServer.GetServer().GetScheduleConfig() + origin := cfg.Schedulers + cfg.Schedulers = sc.SchedulerConfigs{{Type: "label", Disable: true}} + err := leaderServer.GetServer().SetScheduleConfig(*cfg) + re.NoError(err) + checkSchedulerWithStatusCommand("disabled", []string{"label-scheduler"}) + // reset Schedulers in ScheduleConfig + cfg.Schedulers = origin + err = leaderServer.GetServer().SetScheduleConfig(*cfg) + re.NoError(err) + checkSchedulerWithStatusCommand("disabled", []string{}) +} + +func (suite *schedulerTestSuite) TestSchedulerConfig() { + suite.env.RunTestBasedOnMode(suite.checkSchedulerConfig) +} + +func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestCluster) { + re := suite.Require() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := ctl.GetRootCmd() + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 4, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + } + for _, store := range stores { + pdTests.MustPutStore(re, cluster, store) + } + + // note: because pdqsort is an unstable sort algorithm, set ApproximateSize for this region. + pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) + + // test evict-slow-store && evict-slow-trend schedulers config + evictSlownessSchedulers := []string{"evict-slow-store-scheduler", "evict-slow-trend-scheduler"} + for _, schedulerName := range evictSlownessSchedulers { + echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", schedulerName}, nil) + if strings.Contains(echo, "Success!") { + re.Contains(echo, "Success!") + } else { + re.Contains(echo, "scheduler existed") + } + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return strings.Contains(echo, schedulerName) + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "set", "recovery-duration", "100"}, nil) + re.Contains(echo, "Success! Config updated.") + conf := make(map[string]any) + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "show"}, &conf) + return conf["recovery-duration"] == 100. + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", schedulerName}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return !strings.Contains(echo, schedulerName) + }) + } + // test shuffle region config + checkSchedulerCommand(re, cmd, pdAddr, []string{"-u", pdAddr, "scheduler", "add", "shuffle-region-scheduler"}, map[string]bool{ + "balance-region-scheduler": true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "shuffle-region-scheduler": true, + }) + var roles []string + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) + re.Equal([]string{"leader", "follower", "learner"}, roles) + echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "set-roles", "learner"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) + return reflect.DeepEqual([]string{"learner"}, roles) + }) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler"}, &roles) + re.Equal([]string{"learner"}, roles) + + checkSchedulerCommand(re, cmd, pdAddr, []string{"-u", pdAddr, "scheduler", "remove", "shuffle-region-scheduler"}, map[string]bool{ + "balance-region-scheduler": true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + }) + + // test grant hot region scheduler config + checkSchedulerCommand(re, cmd, pdAddr, []string{"-u", pdAddr, "scheduler", "add", "grant-hot-region-scheduler", "1", "1,2,3"}, map[string]bool{ + "balance-region-scheduler": true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "grant-hot-region-scheduler": true, + }) + var conf3 map[string]any + expected3 := map[string]any{ + "store-id": []any{float64(1), float64(2), float64(3)}, + "store-leader-id": float64(1), + } + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) + re.Equal(expected3, conf3) + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler", "set", "2", "1,2,3"}, nil) + re.Contains(echo, "Success!") + expected3["store-leader-id"] = float64(2) + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) + return reflect.DeepEqual(expected3, conf3) + }) + checkSchedulerCommand(re, cmd, pdAddr, []string{"-u", pdAddr, "scheduler", "remove", "grant-hot-region-scheduler"}, map[string]bool{ + "balance-region-scheduler": true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + }) + + // test shuffle hot region scheduler + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "shuffle-hot-region-scheduler"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return strings.Contains(echo, "shuffle-hot-region-scheduler") + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-hot-region-scheduler", "set", "limit", "127"}, nil) + re.Contains(echo, "Success!") + conf := make(map[string]any) + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-hot-region-scheduler", "show"}, &conf) + return conf["limit"] == 127. + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "shuffle-hot-region-scheduler"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return !strings.Contains(echo, "shuffle-hot-region-scheduler") + }) + + // test evict leader scheduler + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return strings.Contains(echo, "evict-leader-scheduler") + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler", "set", "batch", "5"}, nil) + re.Contains(echo, "Success!") + conf = make(map[string]any) + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler"}, &conf) + return conf["batch"] == 5. + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return !strings.Contains(echo, "evict-leader-scheduler") + }) + + // test balance leader config + conf = make(map[string]any) + conf1 := make(map[string]any) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "show"}, &conf) + re.Equal(4., conf["batch"]) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "set", "batch", "3"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, &conf1) + return conf1["batch"] == 3. + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) + re.NotContains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-leader-scheduler"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-leader-scheduler"}, nil) + re.Contains(echo, "404") + re.Contains(echo, "PD:scheduler:ErrSchedulerNotFound]scheduler not found") + // The scheduling service need time to sync from PD. + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, nil) + return strings.Contains(echo, "404") && strings.Contains(echo, "scheduler not found") + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) + re.Contains(echo, "Success!") +} + +func (suite *schedulerTestSuite) TestHotRegionSchedulerConfig() { + suite.env.RunTestBasedOnMode(suite.checkHotRegionSchedulerConfig) +} + +func (suite *schedulerTestSuite) checkHotRegionSchedulerConfig(cluster *pdTests.TestCluster) { + re := suite.Require() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := ctl.GetRootCmd() + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 4, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + } + for _, store := range stores { + pdTests.MustPutStore(re, cluster, store) + } + // note: because pdqsort is an unstable sort algorithm, set ApproximateSize for this region. + pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) + leaderServer := cluster.GetLeaderServer() + // test hot region config + expected1 := map[string]any{ + "min-hot-byte-rate": float64(100), + "min-hot-key-rate": float64(10), + "min-hot-query-rate": float64(10), + "src-tolerance-ratio": 1.05, + "dst-tolerance-ratio": 1.05, + "read-priorities": []any{"byte", "key"}, + "write-leader-priorities": []any{"key", "byte"}, + "write-peer-priorities": []any{"byte", "key"}, + "strict-picking-store": "true", + "rank-formula-version": "v2", + "split-thresholds": 0.2, + "history-sample-duration": "5m0s", + "history-sample-interval": "30s", + } + checkHotSchedulerConfig := func(expect map[string]any) { + testutil.Eventually(re, func() bool { + var conf1 map[string]any + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + return reflect.DeepEqual(expect, conf1) + }) + } + // scheduler show command + expected := map[string]bool{ + "balance-region-scheduler": true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(re, cmd, pdAddr, nil, expected) + var conf map[string]any + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "list"}, &conf) + re.Equal(expected1, conf) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "show"}, &conf) + re.Equal(expected1, conf) + echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "src-tolerance-ratio", "1.02"}, nil) + re.Contains(echo, "Success!") + expected1["src-tolerance-ratio"] = 1.02 + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "disabled", "true"}, nil) + re.Contains(echo, "Failed!") + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,key"}, nil) + re.Contains(echo, "Success!") + expected1["read-priorities"] = []any{"byte", "key"} + checkHotSchedulerConfig(expected1) + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,byte"}, nil) + re.Contains(echo, "Success!") + expected1["read-priorities"] = []any{"key", "byte"} + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "foo,bar"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", ""}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,byte"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key,byte"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + + // write-priorities is divided into write-leader-priorities and write-peer-priorities + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) + re.Contains(echo, "Failed!") + re.Contains(echo, "Config item is not found.") + checkHotSchedulerConfig(expected1) + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v0"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + expected1["rank-formula-version"] = "v2" + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v2"}, nil) + re.Contains(echo, "Success!") + checkHotSchedulerConfig(expected1) + expected1["rank-formula-version"] = "v1" + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v1"}, nil) + re.Contains(echo, "Success!") + checkHotSchedulerConfig(expected1) + + expected1["forbid-rw-type"] = "read" + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "forbid-rw-type", "read"}, nil) + re.Contains(echo, "Success!") + checkHotSchedulerConfig(expected1) + + expected1["history-sample-duration"] = "1m0s" + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "history-sample-duration", "1m"}, nil) + re.Contains(echo, "Success!") + checkHotSchedulerConfig(expected1) + + expected1["history-sample-interval"] = "1s" + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "history-sample-interval", "1s"}, nil) + re.Contains(echo, "Success!") + checkHotSchedulerConfig(expected1) + + expected1["history-sample-duration"] = "0s" + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "history-sample-duration", "0s"}, nil) + re.Contains(echo, "Success!") + checkHotSchedulerConfig(expected1) + + expected1["history-sample-interval"] = "0s" + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "history-sample-interval", "0s"}, nil) + re.Contains(echo, "Success!") + checkHotSchedulerConfig(expected1) + + // test compatibility + re.Equal("2.0.0", leaderServer.GetClusterVersion().String()) + for _, store := range stores { + version := versioninfo.HotScheduleWithQuery + store.Version = versioninfo.MinSupportedVersion(version).String() + store.LastHeartbeat = time.Now().UnixNano() + pdTests.MustPutStore(re, cluster, store) + } + re.Equal("5.2.0", leaderServer.GetClusterVersion().String()) + // After upgrading, we can use query. + expected1["write-leader-priorities"] = []any{"query", "byte"} + checkHotSchedulerConfig(expected1) + // cannot set qps as write-peer-priorities + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil) + re.Contains(echo, "query is not allowed to be set in priorities for write-peer-priorities") + checkHotSchedulerConfig(expected1) +} + +func (suite *schedulerTestSuite) TestSchedulerDiagnostic() { + suite.env.RunTestBasedOnMode(suite.checkSchedulerDiagnostic) +} + +func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *pdTests.TestCluster) { + re := suite.Require() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := ctl.GetRootCmd() + + checkSchedulerDescribeCommand := func(schedulerName, expectedStatus, expectedSummary string) { + result := make(map[string]any) + testutil.Eventually(re, func() bool { + mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", schedulerName}, &result) + return len(result) != 0 && expectedStatus == result["status"] && expectedSummary == result["summary"] + }, testutil.WithTickInterval(50*time.Millisecond)) + } + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 4, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + } + for _, store := range stores { + pdTests.MustPutStore(re, cluster, store) + } + + // note: because pdqsort is an unstable sort algorithm, set ApproximateSize for this region. + pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) + + echo := mustExec(re, cmd, []string{"-u", pdAddr, "config", "set", "enable-diagnostic", "true"}, nil) + re.Contains(echo, "Success!") + checkSchedulerDescribeCommand("balance-region-scheduler", "pending", "1 store(s) RegionNotMatchRule; ") + + // scheduler delete command + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + re.Contains(echo, "Success!") + checkSchedulerDescribeCommand("balance-region-scheduler", "disabled", "") + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) + re.Contains(echo, "Success!") + checkSchedulerDescribeCommand("balance-leader-scheduler", "normal", "") +} + +func (suite *schedulerTestSuite) TestEvictLeaderScheduler() { + // FIXME: API mode may have the problem + suite.env.RunTestInPDMode(suite.checkEvictLeaderScheduler) +} + +func (suite *schedulerTestSuite) checkEvictLeaderScheduler(cluster *pdTests.TestCluster) { + re := suite.Require() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := ctl.GetRootCmd() + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 4, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + } + for _, store := range stores { + pdTests.MustPutStore(re, cluster, store) + } + + pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b")) + output, err := tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "2"}...) + re.NoError(err) + re.Contains(string(output), "Success!") + output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...) + re.NoError(err) + re.Contains(string(output), "Success!") + output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler"}...) + re.NoError(err) + re.Contains(string(output), "Success!") + output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...) + re.NoError(err) + re.Contains(string(output), "Success!") + output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}...) + re.NoError(err) + re.Contains(string(output), "Success!") +} + +func mustExec(re *require.Assertions, cmd *cobra.Command, args []string, v any) string { + output, err := tests.ExecuteCommand(cmd, args...) + re.NoError(err) + if v == nil { + return string(output) + } + re.NoError(json.Unmarshal(output, v), string(output)) + return "" +} + +func mightExec(re *require.Assertions, cmd *cobra.Command, args []string, v any) { + output, err := tests.ExecuteCommand(cmd, args...) + re.NoError(err) + if v == nil { + return + } + json.Unmarshal(output, v) +} + +func checkSchedulerCommand(re *require.Assertions, cmd *cobra.Command, pdAddr string, args []string, expected map[string]bool) { + if args != nil { + echo := mustExec(re, cmd, args, nil) + re.Contains(echo, "Success!") + } + testutil.Eventually(re, func() bool { + var schedulers []string + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, &schedulers) + if len(schedulers) != len(expected) { + return false + } + for _, scheduler := range schedulers { + if _, ok := expected[scheduler]; !ok { + return false + } + } + return true + }) +} From e87291743f705caf43d2f5dcf105d5bae9789cdd Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 25 Sep 2024 13:16:16 +0800 Subject: [PATCH 2/2] resolve conflicts Signed-off-by: Ryan Leung --- pkg/schedule/schedulers/OWNERS | 7 - pkg/schedule/schedulers/evict_leader.go | 487 ---------- plugin/scheduler_example/evict_leader.go | 32 +- server/schedulers/evict_leader.go | 14 +- server/schedulers/grant_leader.go | 13 +- tests/pdctl/scheduler/scheduler_test.go | 58 ++ .../pd-ctl/tests/scheduler/scheduler_test.go | 909 ------------------ 7 files changed, 78 insertions(+), 1442 deletions(-) delete mode 100644 pkg/schedule/schedulers/OWNERS delete mode 100644 pkg/schedule/schedulers/evict_leader.go delete mode 100644 tools/pd-ctl/tests/scheduler/scheduler_test.go diff --git a/pkg/schedule/schedulers/OWNERS b/pkg/schedule/schedulers/OWNERS deleted file mode 100644 index ae96e4f1f42..00000000000 --- a/pkg/schedule/schedulers/OWNERS +++ /dev/null @@ -1,7 +0,0 @@ -# See the OWNERS docs at https://go.k8s.io/owners -options: - no_parent_owners: true -filters: - "(OWNERS|hot_region_config\\.go)$": - approvers: - - sig-critical-approvers-config diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go deleted file mode 100644 index ec068b9550b..00000000000 --- a/pkg/schedule/schedulers/evict_leader.go +++ /dev/null @@ -1,487 +0,0 @@ -// Copyright 2017 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package schedulers - -import ( - "net/http" - "strconv" - - "github.com/gorilla/mux" - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/core/constant" - "github.com/tikv/pd/pkg/errs" - sche "github.com/tikv/pd/pkg/schedule/core" - "github.com/tikv/pd/pkg/schedule/filter" - "github.com/tikv/pd/pkg/schedule/operator" - "github.com/tikv/pd/pkg/schedule/plan" - "github.com/tikv/pd/pkg/schedule/types" - "github.com/tikv/pd/pkg/utils/apiutil" - "github.com/tikv/pd/pkg/utils/syncutil" - "github.com/unrolled/render" - "go.uber.org/zap" -) - -const ( - // EvictLeaderBatchSize is the number of operators to transfer - // leaders by one scheduling - EvictLeaderBatchSize = 3 - lastStoreDeleteInfo = "The last store has been deleted" -) - -type evictLeaderSchedulerConfig struct { - syncutil.RWMutex - schedulerConfig - - StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` - // Batch is used to generate multiple operators by one scheduling - Batch int `json:"batch"` - cluster *core.BasicCluster - removeSchedulerCb func(string) error -} - -func (conf *evictLeaderSchedulerConfig) getStores() []uint64 { - conf.RLock() - defer conf.RUnlock() - stores := make([]uint64, 0, len(conf.StoreIDWithRanges)) - for storeID := range conf.StoreIDWithRanges { - stores = append(stores, storeID) - } - return stores -} - -func (conf *evictLeaderSchedulerConfig) getBatch() int { - conf.RLock() - defer conf.RUnlock() - return conf.Batch -} - -func (conf *evictLeaderSchedulerConfig) clone() *evictLeaderSchedulerConfig { - conf.RLock() - defer conf.RUnlock() - storeIDWithRanges := make(map[uint64][]core.KeyRange) - for id, ranges := range conf.StoreIDWithRanges { - storeIDWithRanges[id] = append(storeIDWithRanges[id], ranges...) - } - return &evictLeaderSchedulerConfig{ - StoreIDWithRanges: storeIDWithRanges, - Batch: conf.Batch, - } -} - -func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string { - conf.RLock() - defer conf.RUnlock() - ranges := conf.StoreIDWithRanges[id] - res := make([]string, 0, len(ranges)*2) - for index := range ranges { - res = append(res, (string)(ranges[index].StartKey), (string)(ranges[index].EndKey)) - } - return res -} - -func (conf *evictLeaderSchedulerConfig) removeStoreLocked(id uint64) (bool, error) { - _, exists := conf.StoreIDWithRanges[id] - if exists { - delete(conf.StoreIDWithRanges, id) - conf.cluster.ResumeLeaderTransfer(id) - return len(conf.StoreIDWithRanges) == 0, nil - } - return false, errs.ErrScheduleConfigNotExist.FastGenByArgs() -} - -func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) { - conf.Lock() - defer conf.Unlock() - // if the store is not existed, no need to resume leader transfer - _, _ = conf.removeStoreLocked(id) -} - -func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) { - if err := conf.cluster.PauseLeaderTransfer(id); err != nil { - log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err)) - } - conf.StoreIDWithRanges[id] = keyRange -} - -func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) { - conf.Lock() - defer conf.Unlock() - conf.resetStoreLocked(id, keyRange) -} - -func (conf *evictLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange { - conf.RLock() - defer conf.RUnlock() - if ranges, exist := conf.StoreIDWithRanges[id]; exist { - return ranges - } - return nil -} - -func (conf *evictLeaderSchedulerConfig) encodeConfig() ([]byte, error) { - conf.RLock() - defer conf.RUnlock() - return EncodeConfig(conf) -} - -func (conf *evictLeaderSchedulerConfig) reloadConfig() error { - conf.Lock() - defer conf.Unlock() - newCfg := &evictLeaderSchedulerConfig{} - if err := conf.load(newCfg); err != nil { - return err - } - pauseAndResumeLeaderTransfer(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) - conf.StoreIDWithRanges = newCfg.StoreIDWithRanges - conf.Batch = newCfg.Batch - return nil -} - -func (conf *evictLeaderSchedulerConfig) pauseLeaderTransfer(cluster sche.SchedulerCluster) error { - conf.RLock() - defer conf.RUnlock() - var res error - for id := range conf.StoreIDWithRanges { - if err := cluster.PauseLeaderTransfer(id); err != nil { - res = err - } - } - return res -} - -func (conf *evictLeaderSchedulerConfig) resumeLeaderTransfer(cluster sche.SchedulerCluster) { - conf.RLock() - defer conf.RUnlock() - for id := range conf.StoreIDWithRanges { - cluster.ResumeLeaderTransfer(id) - } -} - -func (conf *evictLeaderSchedulerConfig) pauseLeaderTransferIfStoreNotExist(id uint64) (bool, error) { - conf.RLock() - defer conf.RUnlock() - if _, exist := conf.StoreIDWithRanges[id]; !exist { - if err := conf.cluster.PauseLeaderTransfer(id); err != nil { - return exist, err - } - } - return true, nil -} - -func (conf *evictLeaderSchedulerConfig) update(id uint64, newRanges []core.KeyRange, batch int) error { - conf.Lock() - defer conf.Unlock() - if id != 0 { - conf.StoreIDWithRanges[id] = newRanges - } - conf.Batch = batch - err := conf.save() - if err != nil && id != 0 { - _, _ = conf.removeStoreLocked(id) - } - return err -} - -func (conf *evictLeaderSchedulerConfig) delete(id uint64) (any, error) { - conf.Lock() - var resp any - last, err := conf.removeStoreLocked(id) - if err != nil { - conf.Unlock() - return resp, err - } - - keyRanges := conf.StoreIDWithRanges[id] - err = conf.save() - if err != nil { - conf.resetStoreLocked(id, keyRanges) - conf.Unlock() - return resp, err - } - if !last { - conf.Unlock() - return resp, nil - } - conf.Unlock() - if err := conf.removeSchedulerCb(types.EvictLeaderScheduler.String()); err != nil { - if !errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) { - conf.resetStore(id, keyRanges) - } - return resp, err - } - resp = lastStoreDeleteInfo - return resp, nil -} - -type evictLeaderScheduler struct { - *BaseScheduler - conf *evictLeaderSchedulerConfig - handler http.Handler -} - -// newEvictLeaderScheduler creates an admin scheduler that transfers all leaders -// out of a store. -func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeaderSchedulerConfig) Scheduler { - handler := newEvictLeaderHandler(conf) - return &evictLeaderScheduler{ - BaseScheduler: NewBaseScheduler(opController, types.EvictLeaderScheduler, conf), - conf: conf, - handler: handler, - } -} - -// EvictStoreIDs returns the IDs of the evict-stores. -func (s *evictLeaderScheduler) EvictStoreIDs() []uint64 { - return s.conf.getStores() -} - -// ServeHTTP implements the http.Handler interface. -func (s *evictLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - s.handler.ServeHTTP(w, r) -} - -// GetName implements the Scheduler interface. -func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { - return s.conf.encodeConfig() -} - -// ReloadConfig reloads the config from the storage. -func (s *evictLeaderScheduler) ReloadConfig() error { - return s.conf.reloadConfig() -} - -// PrepareConfig implements the Scheduler interface. -func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { - return s.conf.pauseLeaderTransfer(cluster) -} - -// CleanConfig implements the Scheduler interface. -func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { - s.conf.resumeLeaderTransfer(cluster) -} - -// IsScheduleAllowed implements the Scheduler interface. -func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { - allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() - if !allowed { - operator.IncOperatorLimitCounter(s.GetType(), operator.OpLeader) - } - return allowed -} - -// Schedule implements the Scheduler interface. -func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { - evictLeaderCounter.Inc() - return scheduleEvictLeaderBatch(s.GetName(), cluster, s.conf), nil -} - -func uniqueAppendOperator(dst []*operator.Operator, src ...*operator.Operator) []*operator.Operator { - regionIDs := make(map[uint64]struct{}) - for i := range dst { - regionIDs[dst[i].RegionID()] = struct{}{} - } - for i := range src { - if _, ok := regionIDs[src[i].RegionID()]; ok { - continue - } - regionIDs[src[i].RegionID()] = struct{}{} - dst = append(dst, src[i]) - } - return dst -} - -type evictLeaderStoresConf interface { - getStores() []uint64 - getKeyRangesByID(id uint64) []core.KeyRange - getBatch() int -} - -func scheduleEvictLeaderBatch(name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { - var ops []*operator.Operator - batchSize := conf.getBatch() - for i := 0; i < batchSize; i++ { - once := scheduleEvictLeaderOnce(name, cluster, conf) - // no more regions - if len(once) == 0 { - break - } - ops = uniqueAppendOperator(ops, once...) - // the batch has been fulfilled - if len(ops) > batchSize { - break - } - } - return ops -} - -func scheduleEvictLeaderOnce(name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { - stores := conf.getStores() - ops := make([]*operator.Operator, 0, len(stores)) - for _, storeID := range stores { - ranges := conf.getKeyRangesByID(storeID) - if len(ranges) == 0 { - continue - } - var filters []filter.Filter - pendingFilter := filter.NewRegionPendingFilter() - downFilter := filter.NewRegionDownFilter() - region := filter.SelectOneRegion(cluster.RandLeaderRegions(storeID, ranges), nil, pendingFilter, downFilter) - if region == nil { - // try to pick unhealthy region - region = filter.SelectOneRegion(cluster.RandLeaderRegions(storeID, ranges), nil) - if region == nil { - evictLeaderNoLeaderCounter.Inc() - continue - } - evictLeaderPickUnhealthyCounter.Inc() - unhealthyPeerStores := make(map[uint64]struct{}) - for _, peer := range region.GetDownPeers() { - unhealthyPeerStores[peer.GetPeer().GetStoreId()] = struct{}{} - } - for _, peer := range region.GetPendingPeers() { - unhealthyPeerStores[peer.GetStoreId()] = struct{}{} - } - filters = append(filters, filter.NewExcludedFilter(name, nil, unhealthyPeerStores)) - } - - filters = append(filters, &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent}) - candidates := filter.NewCandidates(cluster.GetFollowerStores(region)). - FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...) - // Compatible with old TiKV transfer leader logic. - target := candidates.RandomPick() - targets := candidates.PickAll() - // `targets` MUST contains `target`, so only needs to check if `target` is nil here. - if target == nil { - evictLeaderNoTargetStoreCounter.Inc() - continue - } - targetIDs := make([]uint64, 0, len(targets)) - for _, t := range targets { - targetIDs = append(targetIDs, t.GetID()) - } - op, err := operator.CreateTransferLeaderOperator(name, cluster, region, target.GetID(), targetIDs, operator.OpLeader) - if err != nil { - log.Debug("fail to create evict leader operator", errs.ZapError(err)) - continue - } - op.SetPriorityLevel(constant.Urgent) - op.Counters = append(op.Counters, evictLeaderNewOperatorCounter) - ops = append(ops, op) - } - return ops -} - -type evictLeaderHandler struct { - rd *render.Render - config *evictLeaderSchedulerConfig -} - -func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.Request) { - var input map[string]any - if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { - return - } - var ( - exist bool - err error - id uint64 - newRanges []core.KeyRange - ) - idFloat, inputHasStoreID := input["store_id"].(float64) - if inputHasStoreID { - id = (uint64)(idFloat) - exist, err = handler.config.pauseLeaderTransferIfStoreNotExist(id) - if err != nil { - handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - } - - batch := handler.config.getBatch() - batchFloat, ok := input["batch"].(float64) - if ok { - if batchFloat < 1 || batchFloat > 10 { - handler.config.removeStore(id) - handler.rd.JSON(w, http.StatusBadRequest, "batch is invalid, it should be in [1, 10]") - return - } - batch = (int)(batchFloat) - } - - ranges, ok := (input["ranges"]).([]string) - if ok { - if !inputHasStoreID { - handler.config.removeStore(id) - handler.rd.JSON(w, http.StatusInternalServerError, errs.ErrSchedulerConfig.FastGenByArgs("id")) - return - } - } else if exist { - ranges = handler.config.getRanges(id) - } - - newRanges, err = getKeyRanges(ranges) - if err != nil { - handler.config.removeStore(id) - handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - - err = handler.config.update(id, newRanges, batch) - if err != nil { - handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.") -} - -func (handler *evictLeaderHandler) listConfig(w http.ResponseWriter, _ *http.Request) { - conf := handler.config.clone() - handler.rd.JSON(w, http.StatusOK, conf) -} - -func (handler *evictLeaderHandler) deleteConfig(w http.ResponseWriter, r *http.Request) { - idStr := mux.Vars(r)["store_id"] - id, err := strconv.ParseUint(idStr, 10, 64) - if err != nil { - handler.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - - resp, err := handler.config.delete(id) - if err != nil { - if errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) || errors.ErrorEqual(err, errs.ErrScheduleConfigNotExist.FastGenByArgs()) { - handler.rd.JSON(w, http.StatusNotFound, err.Error()) - } else { - handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) - } - return - } - - handler.rd.JSON(w, http.StatusOK, resp) -} - -func newEvictLeaderHandler(config *evictLeaderSchedulerConfig) http.Handler { - h := &evictLeaderHandler{ - config: config, - rd: render.New(render.Options{IndentJSON: true}), - } - router := mux.NewRouter() - router.HandleFunc("/config", h.updateConfig).Methods(http.MethodPost) - router.HandleFunc("/list", h.listConfig).Methods(http.MethodGet) - router.HandleFunc("/delete/{store_id}", h.deleteConfig).Methods(http.MethodDelete) - return router -} diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index f3c658f5c48..98333981973 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -77,13 +77,13 @@ func init() { } // SchedulerType returns the type of the scheduler -//nolint +// nolint func SchedulerType() string { return EvictLeaderType } // SchedulerArgs returns the args for the scheduler -//nolint +// nolint func SchedulerArgs() []string { args := []string{"1"} return args @@ -271,10 +271,6 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R args = append(args, handler.config.getRanges(id)...) } -<<<<<<< HEAD - handler.config.BuildWithArgs(args) - err := handler.config.Persist() -======= err := handler.config.BuildWithArgs(args) if err != nil { handler.config.mu.Lock() @@ -283,8 +279,8 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R handler.rd.JSON(w, http.StatusBadRequest, err.Error()) return } + err = handler.config.Persist() ->>>>>>> 6b927e117 (*: reset config if the input is invalid (#8632)) if err != nil { handler.config.mu.Lock() delete(handler.config.StoreIDWitRanges, id) @@ -310,7 +306,6 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R handler.config.mu.Lock() defer handler.config.mu.Unlock() -<<<<<<< HEAD _, exists := handler.config.StoreIDWitRanges[id] if exists { delete(handler.config.StoreIDWitRanges, id) @@ -329,27 +324,6 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R } handler.rd.JSON(w, http.StatusInternalServerError, errors.New("the config does not exist")) -======= - ranges, exists := handler.config.StoreIDWitRanges[id] - if !exists { - handler.rd.JSON(w, http.StatusInternalServerError, errors.New("the config does not exist")) - return - } - delete(handler.config.StoreIDWitRanges, id) - handler.config.cluster.ResumeLeaderTransfer(id) - - if err := handler.config.Persist(); err != nil { - handler.config.StoreIDWitRanges[id] = ranges - _ = handler.config.cluster.PauseLeaderTransfer(id) - handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - var resp any - if len(handler.config.StoreIDWitRanges) == 0 { - resp = noStoreInSchedulerInfo - } - handler.rd.JSON(w, http.StatusOK, resp) ->>>>>>> 6b927e117 (*: reset config if the input is invalid (#8632)) } func newEvictLeaderHandler(config *evictLeaderSchedulerConfig) http.Handler { diff --git a/server/schedulers/evict_leader.go b/server/schedulers/evict_leader.go index d7ae5e204db..85dd3847af3 100644 --- a/server/schedulers/evict_leader.go +++ b/server/schedulers/evict_leader.go @@ -97,6 +97,9 @@ func (conf *evictLeaderSchedulerConfig) getStores() []uint64 { } func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error { + failpoint.Inject("buildWithArgsErr", func() { + failpoint.Return(errors.New("fail to build with args")) + }) if len(args) != 1 { return errs.ErrSchedulerConfig.FastGenByArgs("id") } @@ -387,8 +390,15 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R args = append(args, handler.config.getRanges(id)...) } - handler.config.BuildWithArgs(args) - err := handler.config.Persist() + err := handler.config.BuildWithArgs(args) + if err != nil { + handler.config.mu.Lock() + handler.config.cluster.ResumeLeaderTransfer(id) + handler.config.mu.Unlock() + handler.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + err = handler.config.Persist() if err != nil { handler.config.removeStore(id) handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) diff --git a/server/schedulers/grant_leader.go b/server/schedulers/grant_leader.go index a8f1fc452fa..79678f69abf 100644 --- a/server/schedulers/grant_leader.go +++ b/server/schedulers/grant_leader.go @@ -286,18 +286,15 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R args = append(args, handler.config.getRanges(id)...) } -<<<<<<< HEAD:server/schedulers/grant_leader.go - handler.config.BuildWithArgs(args) - err := handler.config.Persist() -======= - err := handler.config.buildWithArgs(args) + err := handler.config.BuildWithArgs(args) if err != nil { - _, _ = handler.config.removeStore(id) + handler.config.mu.Lock() + handler.config.cluster.ResumeLeaderTransfer(id) + handler.config.mu.Unlock() handler.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - err = handler.config.persist() ->>>>>>> 6b927e117 (*: reset config if the input is invalid (#8632)):pkg/schedule/schedulers/grant_leader.go + err = handler.config.Persist() if err != nil { _, _ = handler.config.removeStore(id) handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index de0e523dc8e..33083e8eb52 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -22,6 +22,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" @@ -442,3 +443,60 @@ func (s *schedulerTestSuite) TestScheduler(c *C) { echo = mustExec([]string{"-u", pdAddr, "scheduler", "remove", "split-bucket-scheduler"}, nil) c.Assert(strings.Contains(echo, "Success!"), IsTrue) } + +func (s *schedulerTestSuite) TestEvictLeaderScheduler(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 1) + c.Assert(err, IsNil) + defer cluster.Destroy() + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := pdctlCmd.GetRootCmd() + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 4, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + } + leaderServer := cluster.GetServer(cluster.GetLeader()) + c.Assert(leaderServer.BootstrapCluster(), IsNil) + for _, store := range stores { + pdctl.MustPutStore(c, leaderServer.GetServer(), store) + } + + pdctl.MustPutRegion(c, cluster, 1, 1, []byte("a"), []byte("b")) + output, err := pdctl.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "2"}...) + c.Assert(err, IsNil) + c.Assert(strings.Contains(string(output), "Success!"), IsTrue) + failpoint.Enable("github.com/tikv/pd/server/schedulers/buildWithArgsErr", "return(true)") + output, err = pdctl.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...) + c.Assert(err, IsNil) + c.Assert(strings.Contains(string(output), "fail to build with args"), IsTrue) + failpoint.Disable("github.com/tikv/pd/server/schedulers/buildWithArgsErr") + output, err = pdctl.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler"}...) + c.Assert(err, IsNil) + c.Assert(strings.Contains(string(output), "Success!"), IsTrue) + output, err = pdctl.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...) + c.Assert(err, IsNil) + c.Assert(strings.Contains(string(output), "Success!"), IsTrue) +} diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go deleted file mode 100644 index 63d0f091b8c..00000000000 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ /dev/null @@ -1,909 +0,0 @@ -// Copyright 2019 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package scheduler_test - -import ( - "encoding/json" - "fmt" - "reflect" - "strings" - "testing" - "time" - - "github.com/pingcap/failpoint" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/spf13/cobra" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "github.com/tikv/pd/pkg/core" - sc "github.com/tikv/pd/pkg/schedule/config" - "github.com/tikv/pd/pkg/slice" - "github.com/tikv/pd/pkg/utils/testutil" - "github.com/tikv/pd/pkg/versioninfo" - pdTests "github.com/tikv/pd/tests" - ctl "github.com/tikv/pd/tools/pd-ctl/pdctl" - "github.com/tikv/pd/tools/pd-ctl/tests" -) - -type schedulerTestSuite struct { - suite.Suite - env *pdTests.SchedulingTestEnvironment - defaultSchedulers []string -} - -func TestSchedulerTestSuite(t *testing.T) { - suite.Run(t, new(schedulerTestSuite)) -} - -func (suite *schedulerTestSuite) SetupSuite() { - re := suite.Require() - re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipStoreConfigSync", `return(true)`)) - suite.defaultSchedulers = []string{ - "balance-leader-scheduler", - "balance-region-scheduler", - "balance-hot-region-scheduler", - "evict-slow-store-scheduler", - } -} - -func (suite *schedulerTestSuite) SetupTest() { - // use a new environment to avoid affecting other tests - suite.env = pdTests.NewSchedulingTestEnvironment(suite.T()) -} - -func (suite *schedulerTestSuite) TearDownSuite() { - re := suite.Require() - suite.env.Cleanup() - re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/skipStoreConfigSync")) -} - -func (suite *schedulerTestSuite) TearDownTest() { - cleanFunc := func(cluster *pdTests.TestCluster) { - re := suite.Require() - pdAddr := cluster.GetConfig().GetClientURL() - cmd := ctl.GetRootCmd() - - var currentSchedulers []string - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, ¤tSchedulers) - for _, scheduler := range suite.defaultSchedulers { - if slice.NoneOf(currentSchedulers, func(i int) bool { - return currentSchedulers[i] == scheduler - }) { - echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", scheduler}, nil) - re.Contains(echo, "Success!") - } - } - for _, scheduler := range currentSchedulers { - if slice.NoneOf(suite.defaultSchedulers, func(i int) bool { - return suite.defaultSchedulers[i] == scheduler - }) { - echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", scheduler}, nil) - re.Contains(echo, "Success!") - } - } - } - suite.env.RunFuncInTwoModes(cleanFunc) - suite.env.Cleanup() -} - -func (suite *schedulerTestSuite) TestScheduler() { - suite.env.RunTestBasedOnMode(suite.checkScheduler) -} - -func (suite *schedulerTestSuite) checkScheduler(cluster *pdTests.TestCluster) { - re := suite.Require() - pdAddr := cluster.GetConfig().GetClientURL() - cmd := ctl.GetRootCmd() - - stores := []*metapb.Store{ - { - Id: 1, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 2, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 3, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 4, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - } - - mustUsage := func(args []string) { - output, err := tests.ExecuteCommand(cmd, args...) - re.NoError(err) - re.Contains(string(output), "Usage") - } - - checkSchedulerConfigCommand := func(expectedConfig map[string]any, schedulerName string) { - testutil.Eventually(re, func() bool { - configInfo := make(map[string]any) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName}, &configInfo) - return reflect.DeepEqual(expectedConfig["store-id-ranges"], configInfo["store-id-ranges"]) - }) - } - - leaderServer := cluster.GetLeaderServer() - for _, store := range stores { - pdTests.MustPutStore(re, cluster, store) - } - - // note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region. - pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) - - // scheduler show command - expected := map[string]bool{ - "balance-region-scheduler": true, - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "evict-slow-store-scheduler": true, - } - checkSchedulerCommand(re, cmd, pdAddr, nil, expected) - - // scheduler delete command - args := []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"} - expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "evict-slow-store-scheduler": true, - } - checkSchedulerCommand(re, cmd, pdAddr, args, expected) - - // avoid the influence of the scheduler order - schedulers := []string{"evict-leader-scheduler", "grant-leader-scheduler", "evict-leader-scheduler", "grant-leader-scheduler"} - - checkStorePause := func(changedStores []uint64, schedulerName string) { - status := func() string { - switch schedulerName { - case "evict-leader-scheduler": - return "paused" - case "grant-leader-scheduler": - return "resumed" - default: - re.Fail(fmt.Sprintf("unknown scheduler %s", schedulerName)) - return "" - } - }() - for _, store := range stores { - isStorePaused := !cluster.GetLeaderServer().GetRaftCluster().GetStore(store.GetId()).AllowLeaderTransfer() - if slice.AnyOf(changedStores, func(i int) bool { - return store.GetId() == changedStores[i] - }) { - re.True(isStorePaused, - fmt.Sprintf("store %d should be %s with %s", store.GetId(), status, schedulerName)) - } else { - re.False(isStorePaused, - fmt.Sprintf("store %d should not be %s with %s", store.GetId(), status, schedulerName)) - } - if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { - re.Equal(isStorePaused, !sche.GetCluster().GetStore(store.GetId()).AllowLeaderTransfer()) - } - } - } - - for idx := range schedulers { - checkStorePause([]uint64{}, schedulers[idx]) - - // will fail because the scheduler is not existed - args = []string{"-u", pdAddr, "scheduler", "config", schedulers[idx], "add-store", "3"} - output := mustExec(re, cmd, args, nil) - re.Contains(output, fmt.Sprintf("Unable to update config: scheduler %s does not exist.", schedulers[idx])) - - // scheduler add command - args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "2"} - expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - schedulers[idx]: true, - "evict-slow-store-scheduler": true, - } - checkSchedulerCommand(re, cmd, pdAddr, args, expected) - - // scheduler config show command - expectedConfig := make(map[string]any) - expectedConfig["store-id-ranges"] = map[string]any{"2": []any{map[string]any{"end-key": "", "start-key": ""}}} - checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) - checkStorePause([]uint64{2}, schedulers[idx]) - - // scheduler config update command - args = []string{"-u", pdAddr, "scheduler", "config", schedulers[idx], "add-store", "3"} - expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - schedulers[idx]: true, - "evict-slow-store-scheduler": true, - } - - // check update success - checkSchedulerCommand(re, cmd, pdAddr, args, expected) - expectedConfig["store-id-ranges"] = map[string]any{"2": []any{map[string]any{"end-key": "", "start-key": ""}}, "3": []any{map[string]any{"end-key": "", "start-key": ""}}} - checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) - checkStorePause([]uint64{2, 3}, schedulers[idx]) - - // scheduler delete command - args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx]} - expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "evict-slow-store-scheduler": true, - } - checkSchedulerCommand(re, cmd, pdAddr, args, expected) - checkStorePause([]uint64{}, schedulers[idx]) - - // scheduler add command - args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "2"} - expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - schedulers[idx]: true, - "evict-slow-store-scheduler": true, - } - checkSchedulerCommand(re, cmd, pdAddr, args, expected) - checkStorePause([]uint64{2}, schedulers[idx]) - - // scheduler add command twice - args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "4"} - expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - schedulers[idx]: true, - "evict-slow-store-scheduler": true, - } - checkSchedulerCommand(re, cmd, pdAddr, args, expected) - - // check add success - expectedConfig["store-id-ranges"] = map[string]any{"2": []any{map[string]any{"end-key": "", "start-key": ""}}, "4": []any{map[string]any{"end-key": "", "start-key": ""}}} - checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) - checkStorePause([]uint64{2, 4}, schedulers[idx]) - - // scheduler remove command [old] - args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx] + "-4"} - expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - schedulers[idx]: true, - "evict-slow-store-scheduler": true, - } - checkSchedulerCommand(re, cmd, pdAddr, args, expected) - - // check remove success - expectedConfig["store-id-ranges"] = map[string]any{"2": []any{map[string]any{"end-key": "", "start-key": ""}}} - checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) - checkStorePause([]uint64{2}, schedulers[idx]) - - // scheduler remove command, when remove the last store, it should remove whole scheduler - args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx] + "-2"} - expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "evict-slow-store-scheduler": true, - } - checkSchedulerCommand(re, cmd, pdAddr, args, expected) - checkStorePause([]uint64{}, schedulers[idx]) - } - - // test remove and add scheduler - echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) - re.NotContains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}, nil) - re.Equal("Success! The scheduler is created.\n", echo) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "2"}, nil) - re.Equal("Success! The scheduler has been applied to the store.\n", echo) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-2"}, nil) - re.Contains(echo, "Success!") - testutil.Eventually(re, func() bool { // wait for removed scheduler to be synced to scheduling server. - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler"}, nil) - return strings.Contains(echo, "[404] scheduler not found") - }) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) - re.Contains(echo, "Unable to update config: scheduler evict-leader-scheduler does not exist.") - - // test remove and add - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil) - re.Contains(echo, "Success") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-hot-region-scheduler"}, nil) - re.Contains(echo, "Success") - - // test show scheduler with paused and disabled status. - checkSchedulerWithStatusCommand := func(status string, expected []string) { - testutil.Eventually(re, func() bool { - var schedulers []string - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show", "--status", status}, &schedulers) - return reflect.DeepEqual(expected, schedulers) - }) - } - - // test scatter range scheduler - for _, name := range []string{ - "test", "test#", "?test", - /* TODO: to handle case like "tes&t", we need to modify the server's JSON render to unescape the HTML characters */ - } { - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "scatter-range", "--format=raw", "a", "b", name}, nil) - re.Contains(echo, "Success!") - schedulerName := fmt.Sprintf("scatter-range-%s", name) - // test show scheduler - testutil.Eventually(re, func() bool { - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - return strings.Contains(echo, schedulerName) - }) - // test remove scheduler - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", schedulerName}, nil) - re.Contains(echo, "Success!") - testutil.Eventually(re, func() bool { - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - return !strings.Contains(echo, schedulerName) - }) - } - - mustUsage([]string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler"}) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) - re.Contains(echo, "Success!") - checkSchedulerWithStatusCommand("paused", []string{ - "balance-leader-scheduler", - }) - result := make(map[string]any) - testutil.Eventually(re, func() bool { - mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", "balance-leader-scheduler"}, &result) - return len(result) != 0 && result["status"] == "paused" && result["summary"] == "" - }, testutil.WithWaitFor(30*time.Second)) - - mustUsage([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler", "60"}) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) - re.Contains(echo, "Success!") - checkSchedulerWithStatusCommand("paused", []string{}) - - // set label scheduler to disabled manually. - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "label-scheduler"}, nil) - re.Contains(echo, "Success!") - cfg := leaderServer.GetServer().GetScheduleConfig() - origin := cfg.Schedulers - cfg.Schedulers = sc.SchedulerConfigs{{Type: "label", Disable: true}} - err := leaderServer.GetServer().SetScheduleConfig(*cfg) - re.NoError(err) - checkSchedulerWithStatusCommand("disabled", []string{"label-scheduler"}) - // reset Schedulers in ScheduleConfig - cfg.Schedulers = origin - err = leaderServer.GetServer().SetScheduleConfig(*cfg) - re.NoError(err) - checkSchedulerWithStatusCommand("disabled", []string{}) -} - -func (suite *schedulerTestSuite) TestSchedulerConfig() { - suite.env.RunTestBasedOnMode(suite.checkSchedulerConfig) -} - -func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestCluster) { - re := suite.Require() - pdAddr := cluster.GetConfig().GetClientURL() - cmd := ctl.GetRootCmd() - - stores := []*metapb.Store{ - { - Id: 1, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 2, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 3, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 4, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - } - for _, store := range stores { - pdTests.MustPutStore(re, cluster, store) - } - - // note: because pdqsort is an unstable sort algorithm, set ApproximateSize for this region. - pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) - - // test evict-slow-store && evict-slow-trend schedulers config - evictSlownessSchedulers := []string{"evict-slow-store-scheduler", "evict-slow-trend-scheduler"} - for _, schedulerName := range evictSlownessSchedulers { - echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", schedulerName}, nil) - if strings.Contains(echo, "Success!") { - re.Contains(echo, "Success!") - } else { - re.Contains(echo, "scheduler existed") - } - testutil.Eventually(re, func() bool { - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - return strings.Contains(echo, schedulerName) - }) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "set", "recovery-duration", "100"}, nil) - re.Contains(echo, "Success! Config updated.") - conf := make(map[string]any) - testutil.Eventually(re, func() bool { - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "show"}, &conf) - return conf["recovery-duration"] == 100. - }) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", schedulerName}, nil) - re.Contains(echo, "Success!") - testutil.Eventually(re, func() bool { - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - return !strings.Contains(echo, schedulerName) - }) - } - // test shuffle region config - checkSchedulerCommand(re, cmd, pdAddr, []string{"-u", pdAddr, "scheduler", "add", "shuffle-region-scheduler"}, map[string]bool{ - "balance-region-scheduler": true, - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "shuffle-region-scheduler": true, - }) - var roles []string - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) - re.Equal([]string{"leader", "follower", "learner"}, roles) - echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "set-roles", "learner"}, nil) - re.Contains(echo, "Success!") - testutil.Eventually(re, func() bool { - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) - return reflect.DeepEqual([]string{"learner"}, roles) - }) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler"}, &roles) - re.Equal([]string{"learner"}, roles) - - checkSchedulerCommand(re, cmd, pdAddr, []string{"-u", pdAddr, "scheduler", "remove", "shuffle-region-scheduler"}, map[string]bool{ - "balance-region-scheduler": true, - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - }) - - // test grant hot region scheduler config - checkSchedulerCommand(re, cmd, pdAddr, []string{"-u", pdAddr, "scheduler", "add", "grant-hot-region-scheduler", "1", "1,2,3"}, map[string]bool{ - "balance-region-scheduler": true, - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "grant-hot-region-scheduler": true, - }) - var conf3 map[string]any - expected3 := map[string]any{ - "store-id": []any{float64(1), float64(2), float64(3)}, - "store-leader-id": float64(1), - } - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) - re.Equal(expected3, conf3) - - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler", "set", "2", "1,2,3"}, nil) - re.Contains(echo, "Success!") - expected3["store-leader-id"] = float64(2) - testutil.Eventually(re, func() bool { - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) - return reflect.DeepEqual(expected3, conf3) - }) - checkSchedulerCommand(re, cmd, pdAddr, []string{"-u", pdAddr, "scheduler", "remove", "grant-hot-region-scheduler"}, map[string]bool{ - "balance-region-scheduler": true, - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - }) - - // test shuffle hot region scheduler - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "shuffle-hot-region-scheduler"}, nil) - re.Contains(echo, "Success!") - testutil.Eventually(re, func() bool { - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - return strings.Contains(echo, "shuffle-hot-region-scheduler") - }) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-hot-region-scheduler", "set", "limit", "127"}, nil) - re.Contains(echo, "Success!") - conf := make(map[string]any) - testutil.Eventually(re, func() bool { - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-hot-region-scheduler", "show"}, &conf) - return conf["limit"] == 127. - }) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "shuffle-hot-region-scheduler"}, nil) - re.Contains(echo, "Success!") - testutil.Eventually(re, func() bool { - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - return !strings.Contains(echo, "shuffle-hot-region-scheduler") - }) - - // test evict leader scheduler - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}, nil) - re.Contains(echo, "Success!") - testutil.Eventually(re, func() bool { - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - return strings.Contains(echo, "evict-leader-scheduler") - }) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler", "set", "batch", "5"}, nil) - re.Contains(echo, "Success!") - conf = make(map[string]any) - testutil.Eventually(re, func() bool { - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler"}, &conf) - return conf["batch"] == 5. - }) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) - re.Contains(echo, "Success!") - testutil.Eventually(re, func() bool { - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - return !strings.Contains(echo, "evict-leader-scheduler") - }) - - // test balance leader config - conf = make(map[string]any) - conf1 := make(map[string]any) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "show"}, &conf) - re.Equal(4., conf["batch"]) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "set", "batch", "3"}, nil) - re.Contains(echo, "Success!") - testutil.Eventually(re, func() bool { - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, &conf1) - return conf1["batch"] == 3. - }) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) - re.NotContains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-leader-scheduler"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-leader-scheduler"}, nil) - re.Contains(echo, "404") - re.Contains(echo, "PD:scheduler:ErrSchedulerNotFound]scheduler not found") - // The scheduling service need time to sync from PD. - testutil.Eventually(re, func() bool { - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, nil) - return strings.Contains(echo, "404") && strings.Contains(echo, "scheduler not found") - }) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) - re.Contains(echo, "Success!") -} - -func (suite *schedulerTestSuite) TestHotRegionSchedulerConfig() { - suite.env.RunTestBasedOnMode(suite.checkHotRegionSchedulerConfig) -} - -func (suite *schedulerTestSuite) checkHotRegionSchedulerConfig(cluster *pdTests.TestCluster) { - re := suite.Require() - pdAddr := cluster.GetConfig().GetClientURL() - cmd := ctl.GetRootCmd() - - stores := []*metapb.Store{ - { - Id: 1, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 2, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 3, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 4, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - } - for _, store := range stores { - pdTests.MustPutStore(re, cluster, store) - } - // note: because pdqsort is an unstable sort algorithm, set ApproximateSize for this region. - pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) - leaderServer := cluster.GetLeaderServer() - // test hot region config - expected1 := map[string]any{ - "min-hot-byte-rate": float64(100), - "min-hot-key-rate": float64(10), - "min-hot-query-rate": float64(10), - "src-tolerance-ratio": 1.05, - "dst-tolerance-ratio": 1.05, - "read-priorities": []any{"byte", "key"}, - "write-leader-priorities": []any{"key", "byte"}, - "write-peer-priorities": []any{"byte", "key"}, - "strict-picking-store": "true", - "rank-formula-version": "v2", - "split-thresholds": 0.2, - "history-sample-duration": "5m0s", - "history-sample-interval": "30s", - } - checkHotSchedulerConfig := func(expect map[string]any) { - testutil.Eventually(re, func() bool { - var conf1 map[string]any - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - return reflect.DeepEqual(expect, conf1) - }) - } - // scheduler show command - expected := map[string]bool{ - "balance-region-scheduler": true, - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "evict-slow-store-scheduler": true, - } - checkSchedulerCommand(re, cmd, pdAddr, nil, expected) - var conf map[string]any - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "list"}, &conf) - re.Equal(expected1, conf) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "show"}, &conf) - re.Equal(expected1, conf) - echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "src-tolerance-ratio", "1.02"}, nil) - re.Contains(echo, "Success!") - expected1["src-tolerance-ratio"] = 1.02 - checkHotSchedulerConfig(expected1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "disabled", "true"}, nil) - re.Contains(echo, "Failed!") - - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,key"}, nil) - re.Contains(echo, "Success!") - expected1["read-priorities"] = []any{"byte", "key"} - checkHotSchedulerConfig(expected1) - - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key"}, nil) - re.Contains(echo, "Failed!") - checkHotSchedulerConfig(expected1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,byte"}, nil) - re.Contains(echo, "Success!") - expected1["read-priorities"] = []any{"key", "byte"} - checkHotSchedulerConfig(expected1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "foo,bar"}, nil) - re.Contains(echo, "Failed!") - checkHotSchedulerConfig(expected1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", ""}, nil) - re.Contains(echo, "Failed!") - checkHotSchedulerConfig(expected1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key"}, nil) - re.Contains(echo, "Failed!") - checkHotSchedulerConfig(expected1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,byte"}, nil) - re.Contains(echo, "Failed!") - checkHotSchedulerConfig(expected1) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key,byte"}, nil) - re.Contains(echo, "Failed!") - checkHotSchedulerConfig(expected1) - - // write-priorities is divided into write-leader-priorities and write-peer-priorities - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) - re.Contains(echo, "Failed!") - re.Contains(echo, "Config item is not found.") - checkHotSchedulerConfig(expected1) - - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v0"}, nil) - re.Contains(echo, "Failed!") - checkHotSchedulerConfig(expected1) - expected1["rank-formula-version"] = "v2" - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v2"}, nil) - re.Contains(echo, "Success!") - checkHotSchedulerConfig(expected1) - expected1["rank-formula-version"] = "v1" - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v1"}, nil) - re.Contains(echo, "Success!") - checkHotSchedulerConfig(expected1) - - expected1["forbid-rw-type"] = "read" - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "forbid-rw-type", "read"}, nil) - re.Contains(echo, "Success!") - checkHotSchedulerConfig(expected1) - - expected1["history-sample-duration"] = "1m0s" - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "history-sample-duration", "1m"}, nil) - re.Contains(echo, "Success!") - checkHotSchedulerConfig(expected1) - - expected1["history-sample-interval"] = "1s" - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "history-sample-interval", "1s"}, nil) - re.Contains(echo, "Success!") - checkHotSchedulerConfig(expected1) - - expected1["history-sample-duration"] = "0s" - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "history-sample-duration", "0s"}, nil) - re.Contains(echo, "Success!") - checkHotSchedulerConfig(expected1) - - expected1["history-sample-interval"] = "0s" - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "history-sample-interval", "0s"}, nil) - re.Contains(echo, "Success!") - checkHotSchedulerConfig(expected1) - - // test compatibility - re.Equal("2.0.0", leaderServer.GetClusterVersion().String()) - for _, store := range stores { - version := versioninfo.HotScheduleWithQuery - store.Version = versioninfo.MinSupportedVersion(version).String() - store.LastHeartbeat = time.Now().UnixNano() - pdTests.MustPutStore(re, cluster, store) - } - re.Equal("5.2.0", leaderServer.GetClusterVersion().String()) - // After upgrading, we can use query. - expected1["write-leader-priorities"] = []any{"query", "byte"} - checkHotSchedulerConfig(expected1) - // cannot set qps as write-peer-priorities - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil) - re.Contains(echo, "query is not allowed to be set in priorities for write-peer-priorities") - checkHotSchedulerConfig(expected1) -} - -func (suite *schedulerTestSuite) TestSchedulerDiagnostic() { - suite.env.RunTestBasedOnMode(suite.checkSchedulerDiagnostic) -} - -func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *pdTests.TestCluster) { - re := suite.Require() - pdAddr := cluster.GetConfig().GetClientURL() - cmd := ctl.GetRootCmd() - - checkSchedulerDescribeCommand := func(schedulerName, expectedStatus, expectedSummary string) { - result := make(map[string]any) - testutil.Eventually(re, func() bool { - mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", schedulerName}, &result) - return len(result) != 0 && expectedStatus == result["status"] && expectedSummary == result["summary"] - }, testutil.WithTickInterval(50*time.Millisecond)) - } - - stores := []*metapb.Store{ - { - Id: 1, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 2, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 3, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 4, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - } - for _, store := range stores { - pdTests.MustPutStore(re, cluster, store) - } - - // note: because pdqsort is an unstable sort algorithm, set ApproximateSize for this region. - pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) - - echo := mustExec(re, cmd, []string{"-u", pdAddr, "config", "set", "enable-diagnostic", "true"}, nil) - re.Contains(echo, "Success!") - checkSchedulerDescribeCommand("balance-region-scheduler", "pending", "1 store(s) RegionNotMatchRule; ") - - // scheduler delete command - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) - re.Contains(echo, "Success!") - checkSchedulerDescribeCommand("balance-region-scheduler", "disabled", "") - - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) - re.Contains(echo, "Success!") - checkSchedulerDescribeCommand("balance-leader-scheduler", "normal", "") -} - -func (suite *schedulerTestSuite) TestEvictLeaderScheduler() { - // FIXME: API mode may have the problem - suite.env.RunTestInPDMode(suite.checkEvictLeaderScheduler) -} - -func (suite *schedulerTestSuite) checkEvictLeaderScheduler(cluster *pdTests.TestCluster) { - re := suite.Require() - pdAddr := cluster.GetConfig().GetClientURL() - cmd := ctl.GetRootCmd() - - stores := []*metapb.Store{ - { - Id: 1, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 2, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 3, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 4, - State: metapb.StoreState_Up, - LastHeartbeat: time.Now().UnixNano(), - }, - } - for _, store := range stores { - pdTests.MustPutStore(re, cluster, store) - } - - pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b")) - output, err := tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "2"}...) - re.NoError(err) - re.Contains(string(output), "Success!") - output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...) - re.NoError(err) - re.Contains(string(output), "Success!") - output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler"}...) - re.NoError(err) - re.Contains(string(output), "Success!") - output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...) - re.NoError(err) - re.Contains(string(output), "Success!") - output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}...) - re.NoError(err) - re.Contains(string(output), "Success!") -} - -func mustExec(re *require.Assertions, cmd *cobra.Command, args []string, v any) string { - output, err := tests.ExecuteCommand(cmd, args...) - re.NoError(err) - if v == nil { - return string(output) - } - re.NoError(json.Unmarshal(output, v), string(output)) - return "" -} - -func mightExec(re *require.Assertions, cmd *cobra.Command, args []string, v any) { - output, err := tests.ExecuteCommand(cmd, args...) - re.NoError(err) - if v == nil { - return - } - json.Unmarshal(output, v) -} - -func checkSchedulerCommand(re *require.Assertions, cmd *cobra.Command, pdAddr string, args []string, expected map[string]bool) { - if args != nil { - echo := mustExec(re, cmd, args, nil) - re.Contains(echo, "Success!") - } - testutil.Eventually(re, func() bool { - var schedulers []string - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, &schedulers) - if len(schedulers) != len(expected) { - return false - } - for _, scheduler := range schedulers { - if _, ok := expected[scheduler]; !ok { - return false - } - } - return true - }) -}