Skip to content

Commit

Permalink
scheduler: fix scheduler save config (#7108) (#7165)
Browse files Browse the repository at this point in the history
close #6897

Signed-off-by: husharp <[email protected]>

Co-authored-by: husharp <[email protected]>
Co-authored-by: Hu# <[email protected]>
  • Loading branch information
ti-chi-bot and HuSharp authored Sep 14, 2024
1 parent 9365b94 commit b7396e6
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 8 deletions.
4 changes: 4 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,10 @@ func (c *coordinator) addScheduler(scheduler schedule.Scheduler, args ...string)
c.wg.Add(1)
go c.runScheduler(s)
c.schedulers[s.GetName()] = s
if err := schedule.SaveSchedulerConfig(c.cluster.storage, scheduler); err != nil {
log.Error("can not save scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err))
return err
}
c.cluster.opt.AddSchedulerCfg(s.GetType(), args)
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,9 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) {
// whether the schedulers added or removed in dynamic way are recorded in opt
_, newOpt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
_, err = schedule.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedule.ConfigJSONDecoder([]byte("null")))
shuffle, err := schedule.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedule.ConfigJSONDecoder([]byte("null")))
c.Assert(err, IsNil)
c.Assert(co.addScheduler(shuffle), IsNil)
// suppose we add a new default enable scheduler
config.DefaultSchedulers = append(config.DefaultSchedulers, config.SchedulerConfig{Type: "shuffle-region"})
defer func() {
Expand Down
13 changes: 6 additions & 7 deletions server/schedule/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,16 @@ func CreateScheduler(typ string, opController *OperatorController, storage *core
if !ok {
return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ)
}
return fn(opController, storage, dec)
}

s, err := fn(opController, storage, dec)
if err != nil {
return nil, err
}
// SaveSchedulerConfig saves the config of the specified scheduler.
func SaveSchedulerConfig(storage *core.Storage, s Scheduler) error {
data, err := s.EncodeConfig()
if err != nil {
return nil, err
return err
}
err = storage.SaveScheduleConfig(s.GetName(), data)
return s, err
return storage.SaveScheduleConfig(s.GetName(), data)
}

// FindSchedulerTypeByName finds the type of the specified name.
Expand Down
15 changes: 15 additions & 0 deletions server/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ type evictLeaderSchedulerConfig struct {
cluster opt.Cluster
}

func (conf *evictLeaderSchedulerConfig) getStores() []uint64 {
conf.mu.RLock()
defer conf.mu.RUnlock()
stores := make([]uint64, 0, len(conf.StoreIDWithRanges))
for storeID := range conf.StoreIDWithRanges {
stores = append(stores, storeID)
}
return stores
}

func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error {
if len(args) != 1 {
return errs.ErrSchedulerConfig.FastGenByArgs("id")
Expand Down Expand Up @@ -189,6 +199,11 @@ func newEvictLeaderScheduler(opController *schedule.OperatorController, conf *ev
}
}

// EvictStoreIDs returns the IDs of the evict-stores.
func (s *evictLeaderScheduler) EvictStoreIDs() []uint64 {
return s.conf.getStores()
}

func (s *evictLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handler.ServeHTTP(w, r)
}
Expand Down
59 changes: 59 additions & 0 deletions tests/server/api/testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"

"github.com/pingcap/check"
)

const schedulersPrefix = "/pd/api/v1/schedulers"

// dialClient used to dial http request.
var dialClient = &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
},
}

// MustAddScheduler adds a scheduler with HTTP API.
func MustAddScheduler(
c *check.C, serverAddr string,
schedulerName string, args map[string]interface{},
) {
request := map[string]interface{}{
"name": schedulerName,
}
for arg, val := range args {
request[arg] = val
}
data, err := json.Marshal(request)
c.Assert(err, check.IsNil)

httpReq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s%s", serverAddr, schedulersPrefix), bytes.NewBuffer(data))
c.Assert(err, check.IsNil)
// Send request.
resp, err := dialClient.Do(httpReq)
c.Assert(err, check.IsNil)
defer resp.Body.Close()
_, err = io.ReadAll(resp.Body)
c.Assert(err, check.IsNil)
c.Assert(resp.StatusCode, check.Equals, http.StatusOK)
}
139 changes: 139 additions & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ package cluster_test
import (
"context"
"fmt"
"github.com/tikv/pd/server/id"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/tests/server/api"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1182,3 +1186,138 @@ func (s *clusterTestSuite) TestTransferLeaderBack(c *C) {
c.Assert(rc.GetMetaCluster(), DeepEquals, meta)
c.Assert(rc.GetStoreCount(), Equals, 3)
}

func (s *clusterTestSuite) TestTransferLeaderForScheduler(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker", `return(true)`), IsNil)
tc, err := tests.NewTestCluster(ctx, 2)
defer tc.Destroy()
c.Assert(err, IsNil)
err = tc.RunInitialServers()
c.Assert(err, IsNil)
tc.WaitLeader()
// start
leaderServer := tc.GetServer(tc.GetLeader())
c.Assert(leaderServer.BootstrapCluster(), IsNil)
rc := leaderServer.GetServer().GetRaftCluster()
c.Assert(rc, NotNil)

storesNum := 2
grpcPDClient := testutil.MustNewGrpcClient(c, leaderServer.GetAddr())
for i := 1; i <= storesNum; i++ {
store := &metapb.Store{
Id: uint64(i),
Address: "127.0.0.1:" + strconv.Itoa(i),
}
resp, err := putStore(grpcPDClient, leaderServer.GetClusterID(), store)
c.Assert(err, IsNil)
c.Assert(resp.GetHeader().GetError().GetType(), Equals, pdpb.ErrorType_OK)
}
// region heartbeat
id := leaderServer.GetAllocator()
putRegionWithLeader(c, rc, id, 1)

time.Sleep(time.Second)
c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue)

// Add evict leader scheduler
api.MustAddScheduler(c, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{
"store_id": 1,
})
api.MustAddScheduler(c, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{
"store_id": 2,
})
// Check scheduler updated.
c.Assert(len(rc.GetSchedulers()), Equals, 4)
checkEvictLeaderSchedulerExist(c, rc, true)
checkEvictLeaderStoreIDs(c, rc, []uint64{1, 2})

// transfer PD leader to another PD
tc.ResignLeader()
rc.Stop()
tc.WaitLeader()
leaderServer = tc.GetServer(tc.GetLeader())
rc1 := leaderServer.GetServer().GetRaftCluster()
rc1.Start(leaderServer.GetServer())
c.Assert(err, IsNil)
c.Assert(rc1, NotNil)
// region heartbeat
id = leaderServer.GetAllocator()
putRegionWithLeader(c, rc1, id, 1)
time.Sleep(time.Second)
c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue)
// Check scheduler updated.
c.Assert(len(rc.GetSchedulers()), Equals, 4)
checkEvictLeaderSchedulerExist(c, rc, true)
checkEvictLeaderStoreIDs(c, rc, []uint64{1, 2})

// transfer PD leader back to the previous PD
tc.ResignLeader()
rc1.Stop()
tc.WaitLeader()
leaderServer = tc.GetServer(tc.GetLeader())
rc = leaderServer.GetServer().GetRaftCluster()
rc.Start(leaderServer.GetServer())
c.Assert(rc, NotNil)
// region heartbeat
id = leaderServer.GetAllocator()
putRegionWithLeader(c, rc, id, 1)
time.Sleep(time.Second)
c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue)
// Check scheduler updated
c.Assert(len(rc.GetSchedulers()), Equals, 4)
checkEvictLeaderSchedulerExist(c, rc, true)
checkEvictLeaderStoreIDs(c, rc, []uint64{1, 2})

c.Assert(failpoint.Disable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker"), IsNil)
}

func checkEvictLeaderSchedulerExist(c *C, rc *cluster.RaftCluster, exist bool) {
isExistScheduler := func(rc *cluster.RaftCluster, name string) bool {
s := rc.GetSchedulers()
for _, scheduler := range s {
if scheduler == name {
return true
}
}
return false
}

testutil.WaitUntil(c, func(c *C) bool {
return isExistScheduler(rc, schedulers.EvictLeaderName) == exist
})
}

func checkEvictLeaderStoreIDs(c *C, rc *cluster.RaftCluster, expected []uint64) {
handler, ok := rc.GetSchedulerHandlers()[schedulers.EvictLeaderName]
c.Assert(ok, IsTrue)
h, ok := handler.(interface {
EvictStoreIDs() []uint64
})
c.Assert(ok, IsTrue)
var evictStoreIDs []uint64
testutil.WaitUntil(c, func(c *C) bool {
evictStoreIDs = h.EvictStoreIDs()
return len(evictStoreIDs) == len(expected)
})
}

func putRegionWithLeader(c *C, rc *cluster.RaftCluster, id id.Allocator, storeID uint64) {
for i := 0; i < 3; i++ {
regionID, err := id.Alloc()
c.Assert(err, IsNil)
peerID, err := id.Alloc()
c.Assert(err, IsNil)
region := &metapb.Region{
Id: regionID,
Peers: []*metapb.Peer{{Id: peerID, StoreId: storeID}},
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
}
rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0]))
}

time.Sleep(50 * time.Millisecond)
c.Assert(rc.GetStore(storeID).GetLeaderCount(), Equals, 3)
}

0 comments on commit b7396e6

Please sign in to comment.