Skip to content

Commit

Permalink
Make slow store filtering the highest priority in replica selector v2 (
Browse files Browse the repository at this point in the history
…#1267)

* Add some logs

Signed-off-by: MyonKeminta <[email protected]>

* Make slow store filtering the highest priority in replica selector v2

Signed-off-by: MyonKeminta <[email protected]>

* Add non stale read case to TestMultiReplicaInOneAZ

Signed-off-by: MyonKeminta <[email protected]>

* Enrich the multi replcia in one AZ case but it failed...

Signed-off-by: MyonKeminta <[email protected]>

* update test to adapt the fix on master branch

Signed-off-by: MyonKeminta <[email protected]>

* Remove TestMultiReplicaInOneAZ

Signed-off-by: MyonKeminta <[email protected]>

---------

Signed-off-by: MyonKeminta <[email protected]>
Co-authored-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta and MyonKeminta authored Apr 8, 2024
1 parent 8fc819c commit 642a09b
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 43 deletions.
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/pd/client v0.0.0-20240319071242-d3b94c97c12b h1:LUeYme5++BRU4DSEi2BmdIki0dRki4dFt2/8IhmIXy4=
github.com/tikv/pd/client v0.0.0-20240319071242-d3b94c97c12b/go.mod h1:Z/QAgOt29zvwBTd0H6pdx45VO6KRNc/O/DzGkVmSyZg=
github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 h1:qiIt9AyEUW5yabTbCIgwxSMKi3p8ZE/YAk1Z6+fJq8M=
github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31/go.mod h1:Z/QAgOt29zvwBTd0H6pdx45VO6KRNc/O/DzGkVmSyZg=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
Expand Down
21 changes: 19 additions & 2 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,8 +655,23 @@ type RegionCache struct {
clusterID uint64
}

type regionCacheOptions struct {
noHealthTick bool
}

type RegionCacheOpt func(*regionCacheOptions)

func RegionCacheNoHealthTick(o *regionCacheOptions) {
o.noHealthTick = true
}

// NewRegionCache creates a RegionCache.
func NewRegionCache(pdClient pd.Client) *RegionCache {
func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
var options regionCacheOptions
for _, o := range opt {
o(&options)
}

c := &RegionCache{
pdClient: pdClient,
}
Expand Down Expand Up @@ -705,7 +720,9 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
needCheckStores = c.checkAndResolve(needCheckStores[:0], func(s *Store) bool { return filter(s.getResolveState()) })
return false
}, time.Duration(refreshStoreInterval/4)*time.Second, c.getCheckStoreEvents())
c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second)
if !options.noHealthTick {
c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second)
}
c.bg.schedule(repeat(c.reportStoreReplicaFlows), time.Duration(refreshStoreInterval/2)*time.Second)
if refreshCacheInterval := config.GetGlobalConfig().RegionsRefreshInterval; refreshCacheInterval > 0 {
c.bg.schedule(func(ctx context.Context, _ time.Time) bool {
Expand Down
4 changes: 2 additions & 2 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2088,7 +2088,7 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() {
}

func (s *testRegionCacheSuite) TestTiKVSideSlowScore() {
stats := newStoreHealthStatus()
stats := newStoreHealthStatus(1)
s.LessOrEqual(stats.GetHealthStatusDetail().TiKVSideSlowScore, int64(1))
now := time.Now()
stats.tick(now)
Expand Down Expand Up @@ -2124,7 +2124,7 @@ func (s *testRegionCacheSuite) TestTiKVSideSlowScore() {
}

func (s *testRegionCacheSuite) TestStoreHealthStatus() {
stats := newStoreHealthStatus()
stats := newStoreHealthStatus(1)
now := time.Now()
s.False(stats.IsSlow())

Expand Down
58 changes: 45 additions & 13 deletions internal/locate/replica_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ type ReplicaSelectMixedStrategy struct {
func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelectorV2, region *Region) *replica {
replicas := selector.replicas
maxScoreIdxes := make([]int, 0, len(replicas))
maxScore := -1
var maxScore storeSelectionScore = -1
reloadRegion := false
for i, r := range replicas {
epochStale := r.isEpochStale()
Expand Down Expand Up @@ -289,7 +289,7 @@ func (s *ReplicaSelectMixedStrategy) isCandidate(r *replica, isLeader bool, epoc
if r.dataIsNotReady && !isLeader {
// If the replica is failed by data not ready with stale read, we can retry it with replica-read.
// after https://github.com/tikv/tikv/pull/15726, the leader will not return DataIsNotReady error,
// then no need to retry leader again, if you try it again, you may got a NotLeader error.
// then no need to retry leader again. If you try it again, you may get a NotLeader error.
maxAttempt = 2
}
if r.isExhausted(maxAttempt, 0) {
Expand All @@ -310,20 +310,51 @@ func (s *ReplicaSelectMixedStrategy) isCandidate(r *replica, isLeader bool, epoc
return true
}

type storeSelectionScore int64

const (
// The definition of the score is:
// MSB LSB
// [unused bits][1 bit: LabelMatches][1 bit: PreferLeader][2 bits: NormalPeer + NotSlow]
flagLabelMatches = 1 << 4
flagPreferLeader = 1 << 3
flagNormalPeer = 1 << 2
flagNotSlow = 1 << 1
flagNotAttempt = 1
// MSB LSB
// [unused bits][1 bit: NotSlow][1 bit: LabelMatches][1 bit: PreferLeader][1 bit: NormalPeer][1 bit: NotAttempted]
flagNotAttempted storeSelectionScore = 1 << iota
flagNormalPeer
flagPreferLeader
flagLabelMatches
flagNotSlow
)

func (s storeSelectionScore) String() string {
if s == 0 {
return "0"
}
res := ""
appendFactor := func(name string) {
if len(res) != 0 {
res += "|"
}
res += name
}
if (s & flagNotSlow) != 0 {
appendFactor("NotSlow")
}
if (s & flagLabelMatches) != 0 {
appendFactor("LableMatches")
}
if (s & flagPreferLeader) != 0 {
appendFactor("PreferLeader")
}
if (s & flagNormalPeer) != 0 {
appendFactor("NormalPeer")
}
if (s & flagNotAttempted) != 0 {
appendFactor("NotAttempted")
}
return res
}

// calculateScore calculates the score of the replica.
func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) int {
score := 0
func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) storeSelectionScore {
var score storeSelectionScore = 0
if r.store.IsStoreMatch(s.stores) && r.store.IsLabelsMatch(s.labels) {
score |= flagLabelMatches
}
Expand All @@ -338,7 +369,8 @@ func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) i
}
} else if s.tryLeader {
if len(s.labels) > 0 {
// When the leader has matching labels, prefer leader than other mismatching peers.
// When label matching is enabled, prefer selecting the leader for replicas that has same label-matching
// results.
score |= flagPreferLeader
} else {
score |= flagNormalPeer
Expand All @@ -357,7 +389,7 @@ func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) i
score |= flagNotSlow
}
if r.attempts == 0 {
score |= flagNotAttempt
score |= flagNotAttempted
}
return score
}
Expand Down
Loading

0 comments on commit 642a09b

Please sign in to comment.