From 642a09bef1d992a568db4c46a12492c30e089d35 Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Mon, 8 Apr 2024 14:23:38 +0800 Subject: [PATCH] Make slow store filtering the highest priority in replica selector v2 (#1267) * Add some logs Signed-off-by: MyonKeminta * Make slow store filtering the highest priority in replica selector v2 Signed-off-by: MyonKeminta * Add non stale read case to TestMultiReplicaInOneAZ Signed-off-by: MyonKeminta * Enrich the multi replcia in one AZ case but it failed... Signed-off-by: MyonKeminta * update test to adapt the fix on master branch Signed-off-by: MyonKeminta * Remove TestMultiReplicaInOneAZ Signed-off-by: MyonKeminta --------- Signed-off-by: MyonKeminta Co-authored-by: MyonKeminta --- go.sum | 2 - internal/locate/region_cache.go | 21 ++- internal/locate/region_cache_test.go | 4 +- internal/locate/replica_selector.go | 58 ++++++-- internal/locate/replica_selector_test.go | 182 +++++++++++++++++++++-- internal/locate/slow_score.go | 14 +- internal/locate/store_cache.go | 34 ++++- 7 files changed, 272 insertions(+), 43 deletions(-) diff --git a/go.sum b/go.sum index 05d73b3c24..cf2c4a7d90 100644 --- a/go.sum +++ b/go.sum @@ -111,8 +111,6 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/pd/client v0.0.0-20240319071242-d3b94c97c12b h1:LUeYme5++BRU4DSEi2BmdIki0dRki4dFt2/8IhmIXy4= -github.com/tikv/pd/client v0.0.0-20240319071242-d3b94c97c12b/go.mod h1:Z/QAgOt29zvwBTd0H6pdx45VO6KRNc/O/DzGkVmSyZg= github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 h1:qiIt9AyEUW5yabTbCIgwxSMKi3p8ZE/YAk1Z6+fJq8M= github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31/go.mod h1:Z/QAgOt29zvwBTd0H6pdx45VO6KRNc/O/DzGkVmSyZg= github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA= diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index e8000f78c9..e109db5a7b 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -655,8 +655,23 @@ type RegionCache struct { clusterID uint64 } +type regionCacheOptions struct { + noHealthTick bool +} + +type RegionCacheOpt func(*regionCacheOptions) + +func RegionCacheNoHealthTick(o *regionCacheOptions) { + o.noHealthTick = true +} + // NewRegionCache creates a RegionCache. -func NewRegionCache(pdClient pd.Client) *RegionCache { +func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { + var options regionCacheOptions + for _, o := range opt { + o(&options) + } + c := &RegionCache{ pdClient: pdClient, } @@ -705,7 +720,9 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { needCheckStores = c.checkAndResolve(needCheckStores[:0], func(s *Store) bool { return filter(s.getResolveState()) }) return false }, time.Duration(refreshStoreInterval/4)*time.Second, c.getCheckStoreEvents()) - c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second) + if !options.noHealthTick { + c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second) + } c.bg.schedule(repeat(c.reportStoreReplicaFlows), time.Duration(refreshStoreInterval/2)*time.Second) if refreshCacheInterval := config.GetGlobalConfig().RegionsRefreshInterval; refreshCacheInterval > 0 { c.bg.schedule(func(ctx context.Context, _ time.Time) bool { diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 441df39c7f..3369fc4fd1 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -2088,7 +2088,7 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() { } func (s *testRegionCacheSuite) TestTiKVSideSlowScore() { - stats := newStoreHealthStatus() + stats := newStoreHealthStatus(1) s.LessOrEqual(stats.GetHealthStatusDetail().TiKVSideSlowScore, int64(1)) now := time.Now() stats.tick(now) @@ -2124,7 +2124,7 @@ func (s *testRegionCacheSuite) TestTiKVSideSlowScore() { } func (s *testRegionCacheSuite) TestStoreHealthStatus() { - stats := newStoreHealthStatus() + stats := newStoreHealthStatus(1) now := time.Now() s.False(stats.IsSlow()) diff --git a/internal/locate/replica_selector.go b/internal/locate/replica_selector.go index 3a20eac7f7..895add05ee 100644 --- a/internal/locate/replica_selector.go +++ b/internal/locate/replica_selector.go @@ -236,7 +236,7 @@ type ReplicaSelectMixedStrategy struct { func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelectorV2, region *Region) *replica { replicas := selector.replicas maxScoreIdxes := make([]int, 0, len(replicas)) - maxScore := -1 + var maxScore storeSelectionScore = -1 reloadRegion := false for i, r := range replicas { epochStale := r.isEpochStale() @@ -289,7 +289,7 @@ func (s *ReplicaSelectMixedStrategy) isCandidate(r *replica, isLeader bool, epoc if r.dataIsNotReady && !isLeader { // If the replica is failed by data not ready with stale read, we can retry it with replica-read. // after https://github.com/tikv/tikv/pull/15726, the leader will not return DataIsNotReady error, - // then no need to retry leader again, if you try it again, you may got a NotLeader error. + // then no need to retry leader again. If you try it again, you may get a NotLeader error. maxAttempt = 2 } if r.isExhausted(maxAttempt, 0) { @@ -310,20 +310,51 @@ func (s *ReplicaSelectMixedStrategy) isCandidate(r *replica, isLeader bool, epoc return true } +type storeSelectionScore int64 + const ( // The definition of the score is: - // MSB LSB - // [unused bits][1 bit: LabelMatches][1 bit: PreferLeader][2 bits: NormalPeer + NotSlow] - flagLabelMatches = 1 << 4 - flagPreferLeader = 1 << 3 - flagNormalPeer = 1 << 2 - flagNotSlow = 1 << 1 - flagNotAttempt = 1 + // MSB LSB + // [unused bits][1 bit: NotSlow][1 bit: LabelMatches][1 bit: PreferLeader][1 bit: NormalPeer][1 bit: NotAttempted] + flagNotAttempted storeSelectionScore = 1 << iota + flagNormalPeer + flagPreferLeader + flagLabelMatches + flagNotSlow ) +func (s storeSelectionScore) String() string { + if s == 0 { + return "0" + } + res := "" + appendFactor := func(name string) { + if len(res) != 0 { + res += "|" + } + res += name + } + if (s & flagNotSlow) != 0 { + appendFactor("NotSlow") + } + if (s & flagLabelMatches) != 0 { + appendFactor("LableMatches") + } + if (s & flagPreferLeader) != 0 { + appendFactor("PreferLeader") + } + if (s & flagNormalPeer) != 0 { + appendFactor("NormalPeer") + } + if (s & flagNotAttempted) != 0 { + appendFactor("NotAttempted") + } + return res +} + // calculateScore calculates the score of the replica. -func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) int { - score := 0 +func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) storeSelectionScore { + var score storeSelectionScore = 0 if r.store.IsStoreMatch(s.stores) && r.store.IsLabelsMatch(s.labels) { score |= flagLabelMatches } @@ -338,7 +369,8 @@ func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) i } } else if s.tryLeader { if len(s.labels) > 0 { - // When the leader has matching labels, prefer leader than other mismatching peers. + // When label matching is enabled, prefer selecting the leader for replicas that has same label-matching + // results. score |= flagPreferLeader } else { score |= flagNormalPeer @@ -357,7 +389,7 @@ func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) i score |= flagNotSlow } if r.attempts == 0 { - score |= flagNotAttempt + score |= flagNotAttempted } return score } diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index bdb8325b64..521700d8ae 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -52,7 +52,8 @@ func (s *testReplicaSelectorSuite) SetupTest(t *testing.T) { s.cluster = mocktikv.NewCluster(s.mvccStore) s.storeIDs, s.peerIDs, s.regionID, s.leaderPeer = mocktikv.BootstrapWithMultiStores(s.cluster, 3) pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)} - s.cache = NewRegionCache(pdCli) + // Disable the tick on health status. + s.cache = NewRegionCache(pdCli, RegionCacheNoHealthTick) s.bo = retry.NewNoopBackoff(context.Background()) s.SetT(t) s.SetS(s) @@ -166,29 +167,29 @@ func TestReplicaSelectorCalculateScore(t *testing.T) { score := strategy.calculateScore(r, isLeader) s.Equal(r.store.healthStatus.IsSlow(), false) if isLeader { - s.Equal(score, flagLabelMatches+flagNotSlow+flagNotAttempt) + s.Equal(score, flagLabelMatches+flagNotSlow+flagNotAttempted) } else { - s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotSlow+flagNotAttempt) + s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotSlow+flagNotAttempted) } r.store.healthStatus.markAlreadySlow() s.Equal(r.store.healthStatus.IsSlow(), true) score = strategy.calculateScore(r, isLeader) if isLeader { - s.Equal(score, flagLabelMatches+flagNotAttempt) + s.Equal(score, flagLabelMatches+flagNotAttempted) } else { - s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempt) + s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempted) } strategy.tryLeader = true score = strategy.calculateScore(r, isLeader) - s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempt) + s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempted) strategy.preferLeader = true score = strategy.calculateScore(r, isLeader) - s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempt) + s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempted) strategy.learnerOnly = true strategy.tryLeader = false strategy.preferLeader = false score = strategy.calculateScore(r, isLeader) - s.Equal(score, flagLabelMatches+flagNotAttempt) + s.Equal(score, flagLabelMatches+flagNotAttempted) labels := []*metapb.StoreLabel{ { Key: "zone", @@ -197,7 +198,7 @@ func TestReplicaSelectorCalculateScore(t *testing.T) { } strategy.labels = labels score = strategy.calculateScore(r, isLeader) - s.Equal(score, flagNotAttempt) + s.Equal(score, flagNotAttempted) strategy = ReplicaSelectMixedStrategy{ leaderIdx: rc.getStore().workTiKVIdx, @@ -206,9 +207,9 @@ func TestReplicaSelectorCalculateScore(t *testing.T) { } score = strategy.calculateScore(r, isLeader) if isLeader { - s.Equal(score, flagPreferLeader+flagNotAttempt) + s.Equal(score, flagPreferLeader+flagNotAttempted) } else { - s.Equal(score, flagNormalPeer+flagNotAttempt) + s.Equal(score, flagNormalPeer+flagNotAttempted) } strategy = ReplicaSelectMixedStrategy{ @@ -217,10 +218,10 @@ func TestReplicaSelectorCalculateScore(t *testing.T) { labels: labels, } score = strategy.calculateScore(r, isLeader) - s.Equal(score, flagNormalPeer+flagNotAttempt) + s.Equal(score, flagNormalPeer+flagNotAttempted) r.store.labels = labels score = strategy.calculateScore(r, isLeader) - s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempt) + s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempted) r.store.labels = nil } } @@ -2570,6 +2571,159 @@ func TestReplicaReadAccessPathByLearnerCase(t *testing.T) { s.True(s.runCaseAndCompare(ca)) } +func TestReplicaReadAvoidSlowStore(t *testing.T) { + s := new(testReplicaSelectorSuite) + s.SetupTest(t) + defer s.TearDownTest() + + s.changeRegionLeader(3) + store, exists := s.cache.getStore(1) + s.True(exists) + + for _, staleRead := range []bool{false, true} { + for _, withLabel := range []bool{false, true} { + var label *metapb.StoreLabel + if withLabel { + label = &metapb.StoreLabel{Key: "id", Value: "1"} + } + + s.T().Logf("test case: stale read: %v, with label: %v, slow: false", staleRead, withLabel) + + ca := replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: staleRead, + timeout: 0, + busyThresholdMs: 0, + label: label, + accessErr: []RegionErrorType{}, + expect: &accessPathResult{ + accessPath: []string{ + fmt.Sprintf("{addr: store1, replica-read: %v, stale-read: %v}", !staleRead, staleRead), + }, + respErr: "", + respRegionError: nil, + backoffCnt: 0, + backoffDetail: []string{}, + regionIsValid: true, + }, + } + s.True(s.runCaseAndCompare(ca)) + + s.T().Logf("test case: stale read: %v, with label: %v, slow: true", staleRead, withLabel) + expectedFirstStore := 2 + if withLabel { + // Leader is preferred in this case + expectedFirstStore = 3 + } + ca = replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: staleRead, + timeout: 0, + busyThresholdMs: 0, + label: label, + accessErr: []RegionErrorType{}, + expect: &accessPathResult{ + accessPath: []string{ + fmt.Sprintf("{addr: store%v, replica-read: %v, stale-read: %v}", expectedFirstStore, !staleRead, staleRead), + }, + respErr: "", + respRegionError: nil, + backoffCnt: 0, + backoffDetail: []string{}, + regionIsValid: true, + }, + beforeRun: func() { + s.resetStoreState() + store.healthStatus.updateTiKVServerSideSlowScore(100, time.Now()) + }, + } + // v1 doesn't support avoiding slow stores. We only test this on v2. + s.True(s.runCase(ca, true)) + + s.T().Logf("test case: stale read: %v, with label: %v, slow: false, encoutner err: true", staleRead, withLabel) + var expectedSecondPath string + if staleRead { + // Retry leader, and fallback to leader-read mode. + expectedSecondPath = "{addr: store3, replica-read: false, stale-read: false}" + } else { + if withLabel { + // Prefer retrying leader. + expectedSecondPath = "{addr: store3, replica-read: true, stale-read: false}" + } else { + // Retry any another replica. + expectedSecondPath = "{addr: store2, replica-read: true, stale-read: false}" + } + } + ca = replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: staleRead, + timeout: 0, + busyThresholdMs: 0, + label: label, + accessErr: []RegionErrorType{ServerIsBusyErr}, + expect: &accessPathResult{ + accessPath: []string{ + fmt.Sprintf("{addr: store1, replica-read: %v, stale-read: %v}", !staleRead, staleRead), + // Retry leader. + // For stale read, it fallbacks to leader read. However, replica-read doesn't do so. + expectedSecondPath, + }, + respErr: "", + respRegionError: nil, + backoffCnt: 0, + backoffDetail: []string{}, + regionIsValid: true, + }, + } + s.True(s.runCaseAndCompare(ca)) + + s.T().Logf("test case: stale read: %v, with label: %v, slow: true, encoutner err: true", staleRead, withLabel) + if expectedFirstStore == 3 { + // Retry on store 2 which is a follower. + // Stale-read mode falls back to replica-read mode. + expectedSecondPath = "{addr: store2, replica-read: true, stale-read: false}" + } else { + if staleRead { + // Retry in leader read mode + expectedSecondPath = "{addr: store3, replica-read: false, stale-read: false}" + } else { + // Retry with the same mode, which is replica-read mode. + expectedSecondPath = "{addr: store3, replica-read: true, stale-read: false}" + } + } + + ca = replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: staleRead, + timeout: 0, + busyThresholdMs: 0, + label: label, + accessErr: []RegionErrorType{ServerIsBusyErr}, + expect: &accessPathResult{ + accessPath: []string{ + fmt.Sprintf("{addr: store%v, replica-read: %v, stale-read: %v}", expectedFirstStore, !staleRead, staleRead), + expectedSecondPath, + }, + respErr: "", + respRegionError: nil, + backoffCnt: 0, + backoffDetail: []string{}, + regionIsValid: true, + }, + beforeRun: func() { + s.resetStoreState() + store.healthStatus.updateTiKVServerSideSlowScore(100, time.Now()) + }, + } + s.True(s.runCase(ca, true)) + } + } +} + func TestReplicaReadAccessPathByGenError(t *testing.T) { s := new(testReplicaSelectorSuite) s.SetupTest(t) @@ -2934,7 +3088,7 @@ func (s *testReplicaSelectorSuite) resetStoreState() { for _, store := range rc.getStore().stores { store.loadStats.Store(nil) store.healthStatus.clientSideSlowScore.resetSlowScore() - store.healthStatus.updateTiKVServerSideSlowScore(0, time.Now()) + store.healthStatus.resetTiKVServerSideSlowScoreForTest() store.healthStatus.updateSlowFlag() atomic.StoreUint32(&store.livenessState, uint32(reachable)) store.setResolveState(resolved) diff --git a/internal/locate/slow_score.go b/internal/locate/slow_score.go index d67c846b3c..28ed88fe16 100644 --- a/internal/locate/slow_score.go +++ b/internal/locate/slow_score.go @@ -124,7 +124,7 @@ func (ss *SlowScoreStat) updateSlowScore() { } atomic.CompareAndSwapUint64(&ss.avgTimecost, avgTimecost, ss.tsCntSlidingWindow.Avg()) - // Resets the counter of inteval timecost + // Resets the counter of interval timecost atomic.StoreUint64(&ss.intervalTimecost, 0) atomic.StoreUint64(&ss.intervalUpdCount, 0) } @@ -155,11 +155,17 @@ func (ss *SlowScoreStat) markAlreadySlow() { // resetSlowScore resets the slow score to 0. It's used for test. func (ss *SlowScoreStat) resetSlowScore() { - atomic.StoreUint64(&ss.avgScore, 0) + *ss = SlowScoreStat{ + avgScore: 1, + } } func (ss *SlowScoreStat) isSlow() bool { - return ss.getSlowScore() >= slowScoreThreshold + return clientSideSlowScoreIsSlow(ss.getSlowScore()) +} + +func clientSideSlowScoreIsSlow(value uint64) bool { + return value >= slowScoreThreshold } // replicaFlowsType indicates the type of the destination replica of flows. @@ -170,7 +176,7 @@ const ( toLeader replicaFlowsType = iota // toFollower indicates that flows are sent to followers' replica toFollower - // numflowsDestType reserved to keep max replicaFlowsType value. + // numReplicaFlowsType is reserved to keep max replicaFlowsType value. numReplicaFlowsType ) diff --git a/internal/locate/store_cache.go b/internal/locate/store_cache.go index 5defacbe40..94ea5bfedb 100644 --- a/internal/locate/store_cache.go +++ b/internal/locate/store_cache.go @@ -214,7 +214,7 @@ func newStore( peerAddr: peerAddr, saddr: statusAddr, // Make sure healthStatus field is never null. - healthStatus: newStoreHealthStatus(), + healthStatus: newStoreHealthStatus(id), } } @@ -223,7 +223,7 @@ func newUninitializedStore(id uint64) *Store { return &Store{ storeID: id, // Make sure healthStatus field is never null. - healthStatus: newStoreHealthStatus(), + healthStatus: newStoreHealthStatus(id), } } @@ -794,6 +794,9 @@ const ( ) type StoreHealthStatus struct { + // Used for logging. + storeID uint64 + isSlow atomic.Bool // A statistic for counting the request latency to this store @@ -816,8 +819,18 @@ type HealthStatusDetail struct { TiKVSideSlowScore int64 } -func newStoreHealthStatus() *StoreHealthStatus { - return &StoreHealthStatus{} +func (d HealthStatusDetail) IsSlow() bool { + return clientSideSlowScoreIsSlow(uint64(d.ClientSideSlowScore)) || d.TiKVSideSlowScore >= tikvSlowScoreSlowThreshold +} + +func (d HealthStatusDetail) String() string { + return fmt.Sprintf("{ ClientSideSlowScore: %d, TiKVSideSlowScore: %d }", d.ClientSideSlowScore, d.TiKVSideSlowScore) +} + +func newStoreHealthStatus(storeID uint64) *StoreHealthStatus { + return &StoreHealthStatus{ + storeID: storeID, + } } // IsSlow returns whether current Store is slow. @@ -935,9 +948,18 @@ func (s *StoreHealthStatus) updateTiKVServerSideSlowScore(score int64, currTime s.tikvSideSlowScore.lastUpdateTime.Store(newUpdateTime) } +func (s *StoreHealthStatus) resetTiKVServerSideSlowScoreForTest() { + s.setTiKVSlowScoreLastUpdateTimeForTest(time.Now().Add(-time.Hour * 2)) + s.updateTiKVServerSideSlowScore(1, time.Now().Add(-time.Hour)) +} + func (s *StoreHealthStatus) updateSlowFlag() { - isSlow := s.clientSideSlowScore.isSlow() || s.tikvSideSlowScore.score.Load() >= tikvSlowScoreSlowThreshold - s.isSlow.Store(isSlow) + healthDetail := s.GetHealthStatusDetail() + isSlow := healthDetail.IsSlow() + old := s.isSlow.Swap(isSlow) + if old != isSlow { + logutil.BgLogger().Info("store health status changed", zap.Uint64("storeID", s.storeID), zap.Bool("isSlow", isSlow), zap.Stringer("healthDetail", healthDetail)) + } } // setTiKVSlowScoreLastUpdateTimeForTest force sets last update time of TiKV server side slow score to specified value.