Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make slow store filtering the highest priority in replica selector v2 #1267

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/pd/client v0.0.0-20240319071242-d3b94c97c12b h1:LUeYme5++BRU4DSEi2BmdIki0dRki4dFt2/8IhmIXy4=
github.com/tikv/pd/client v0.0.0-20240319071242-d3b94c97c12b/go.mod h1:Z/QAgOt29zvwBTd0H6pdx45VO6KRNc/O/DzGkVmSyZg=
github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 h1:qiIt9AyEUW5yabTbCIgwxSMKi3p8ZE/YAk1Z6+fJq8M=
github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31/go.mod h1:Z/QAgOt29zvwBTd0H6pdx45VO6KRNc/O/DzGkVmSyZg=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
Expand Down
21 changes: 19 additions & 2 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,8 +655,23 @@ type RegionCache struct {
clusterID uint64
}

type regionCacheOptions struct {
noHealthTick bool
}

type RegionCacheOpt func(*regionCacheOptions)

func RegionCacheNoHealthTick(o *regionCacheOptions) {
o.noHealthTick = true
}

// NewRegionCache creates a RegionCache.
func NewRegionCache(pdClient pd.Client) *RegionCache {
func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
var options regionCacheOptions
for _, o := range opt {
o(&options)
}

c := &RegionCache{
pdClient: pdClient,
}
Expand Down Expand Up @@ -705,7 +720,9 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
needCheckStores = c.checkAndResolve(needCheckStores[:0], func(s *Store) bool { return filter(s.getResolveState()) })
return false
}, time.Duration(refreshStoreInterval/4)*time.Second, c.getCheckStoreEvents())
c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second)
if !options.noHealthTick {
c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second)
}
c.bg.schedule(repeat(c.reportStoreReplicaFlows), time.Duration(refreshStoreInterval/2)*time.Second)
if refreshCacheInterval := config.GetGlobalConfig().RegionsRefreshInterval; refreshCacheInterval > 0 {
c.bg.schedule(func(ctx context.Context, _ time.Time) bool {
Expand Down
4 changes: 2 additions & 2 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2088,7 +2088,7 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() {
}

func (s *testRegionCacheSuite) TestTiKVSideSlowScore() {
stats := newStoreHealthStatus()
stats := newStoreHealthStatus(1)
s.LessOrEqual(stats.GetHealthStatusDetail().TiKVSideSlowScore, int64(1))
now := time.Now()
stats.tick(now)
Expand Down Expand Up @@ -2124,7 +2124,7 @@ func (s *testRegionCacheSuite) TestTiKVSideSlowScore() {
}

func (s *testRegionCacheSuite) TestStoreHealthStatus() {
stats := newStoreHealthStatus()
stats := newStoreHealthStatus(1)
now := time.Now()
s.False(stats.IsSlow())

Expand Down
58 changes: 45 additions & 13 deletions internal/locate/replica_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ type ReplicaSelectMixedStrategy struct {
func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelectorV2, region *Region) *replica {
replicas := selector.replicas
maxScoreIdxes := make([]int, 0, len(replicas))
maxScore := -1
var maxScore storeSelectionScore = -1
reloadRegion := false
for i, r := range replicas {
epochStale := r.isEpochStale()
Expand Down Expand Up @@ -289,7 +289,7 @@ func (s *ReplicaSelectMixedStrategy) isCandidate(r *replica, isLeader bool, epoc
if r.dataIsNotReady && !isLeader {
// If the replica is failed by data not ready with stale read, we can retry it with replica-read.
// after https://github.com/tikv/tikv/pull/15726, the leader will not return DataIsNotReady error,
// then no need to retry leader again, if you try it again, you may got a NotLeader error.
// then no need to retry leader again. If you try it again, you may get a NotLeader error.
maxAttempt = 2
}
if r.isExhausted(maxAttempt, 0) {
Expand All @@ -310,20 +310,51 @@ func (s *ReplicaSelectMixedStrategy) isCandidate(r *replica, isLeader bool, epoc
return true
}

type storeSelectionScore int64

const (
// The definition of the score is:
// MSB LSB
// [unused bits][1 bit: LabelMatches][1 bit: PreferLeader][2 bits: NormalPeer + NotSlow]
flagLabelMatches = 1 << 4
flagPreferLeader = 1 << 3
flagNormalPeer = 1 << 2
flagNotSlow = 1 << 1
flagNotAttempt = 1
// MSB LSB
// [unused bits][1 bit: NotSlow][1 bit: LabelMatches][1 bit: PreferLeader][1 bit: NormalPeer][1 bit: NotAttempted]
flagNotAttempted storeSelectionScore = 1 << iota
flagNormalPeer
flagPreferLeader
flagLabelMatches
flagNotSlow
)

func (s storeSelectionScore) String() string {
if s == 0 {
return "0"
}
res := ""
appendFactor := func(name string) {
if len(res) != 0 {
res += "|"
}
res += name
}
if (s & flagNotSlow) != 0 {
appendFactor("NotSlow")
}
if (s & flagLabelMatches) != 0 {
appendFactor("LableMatches")
}
if (s & flagPreferLeader) != 0 {
appendFactor("PreferLeader")
}
if (s & flagNormalPeer) != 0 {
appendFactor("NormalPeer")
}
if (s & flagNotAttempted) != 0 {
appendFactor("NotAttempted")
}
return res
}

// calculateScore calculates the score of the replica.
func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) int {
score := 0
func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) storeSelectionScore {
var score storeSelectionScore = 0
if r.store.IsStoreMatch(s.stores) && r.store.IsLabelsMatch(s.labels) {
score |= flagLabelMatches
}
Expand All @@ -338,7 +369,8 @@ func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) i
}
} else if s.tryLeader {
if len(s.labels) > 0 {
// When the leader has matching labels, prefer leader than other mismatching peers.
// When label matching is enabled, prefer selecting the leader for replicas that has same label-matching
// results.
score |= flagPreferLeader
} else {
score |= flagNormalPeer
Expand All @@ -357,7 +389,7 @@ func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) i
score |= flagNotSlow
}
if r.attempts == 0 {
score |= flagNotAttempt
score |= flagNotAttempted
}
return score
}
Expand Down
Loading
Loading