From 909bb30ffd7c38c6211a5f24196630689037febe Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Tue, 26 Mar 2024 17:59:15 +0800 Subject: [PATCH 1/6] Add some logs Signed-off-by: MyonKeminta --- internal/locate/region_cache_test.go | 4 ++-- internal/locate/slow_score.go | 6 +++++- internal/locate/store_cache.go | 29 ++++++++++++++++++++++------ 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 441df39c7..3369fc4fd 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -2088,7 +2088,7 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() { } func (s *testRegionCacheSuite) TestTiKVSideSlowScore() { - stats := newStoreHealthStatus() + stats := newStoreHealthStatus(1) s.LessOrEqual(stats.GetHealthStatusDetail().TiKVSideSlowScore, int64(1)) now := time.Now() stats.tick(now) @@ -2124,7 +2124,7 @@ func (s *testRegionCacheSuite) TestTiKVSideSlowScore() { } func (s *testRegionCacheSuite) TestStoreHealthStatus() { - stats := newStoreHealthStatus() + stats := newStoreHealthStatus(1) now := time.Now() s.False(stats.IsSlow()) diff --git a/internal/locate/slow_score.go b/internal/locate/slow_score.go index d67c846b3..dc595a95c 100644 --- a/internal/locate/slow_score.go +++ b/internal/locate/slow_score.go @@ -159,7 +159,11 @@ func (ss *SlowScoreStat) resetSlowScore() { } func (ss *SlowScoreStat) isSlow() bool { - return ss.getSlowScore() >= slowScoreThreshold + return clientSideSlowScoreIsSlow(ss.getSlowScore()) +} + +func clientSideSlowScoreIsSlow(value uint64) bool { + return value >= slowScoreThreshold } // replicaFlowsType indicates the type of the destination replica of flows. diff --git a/internal/locate/store_cache.go b/internal/locate/store_cache.go index 235cbd0a1..befa87c9d 100644 --- a/internal/locate/store_cache.go +++ b/internal/locate/store_cache.go @@ -214,7 +214,7 @@ func newStore( peerAddr: peerAddr, saddr: statusAddr, // Make sure healthStatus field is never null. - healthStatus: newStoreHealthStatus(), + healthStatus: newStoreHealthStatus(id), } } @@ -223,7 +223,7 @@ func newUninitializedStore(id uint64) *Store { return &Store{ storeID: id, // Make sure healthStatus field is never null. - healthStatus: newStoreHealthStatus(), + healthStatus: newStoreHealthStatus(id), } } @@ -794,6 +794,9 @@ const ( ) type StoreHealthStatus struct { + // Used for logging. + storeID uint64 + isSlow atomic.Bool // A statistic for counting the request latency to this store @@ -816,8 +819,18 @@ type HealthStatusDetail struct { TiKVSideSlowScore int64 } -func newStoreHealthStatus() *StoreHealthStatus { - return &StoreHealthStatus{} +func (d HealthStatusDetail) IsSlow() bool { + return clientSideSlowScoreIsSlow(uint64(d.ClientSideSlowScore)) || d.TiKVSideSlowScore >= tikvSlowScoreSlowThreshold +} + +func (d HealthStatusDetail) String() string { + return fmt.Sprintf("{ ClientSideSlowScore: %d, TiKVSideSlowScore: %d }", d.ClientSideSlowScore, d.TiKVSideSlowScore) +} + +func newStoreHealthStatus(storeID uint64) *StoreHealthStatus { + return &StoreHealthStatus{ + storeID: storeID, + } } // IsSlow returns whether current Store is slow. @@ -936,8 +949,12 @@ func (s *StoreHealthStatus) updateTiKVServerSideSlowScore(score int64, currTime } func (s *StoreHealthStatus) updateSlowFlag() { - isSlow := s.clientSideSlowScore.isSlow() || s.tikvSideSlowScore.score.Load() >= tikvSlowScoreSlowThreshold - s.isSlow.Store(isSlow) + healthDetail := s.GetHealthStatusDetail() + isSlow := healthDetail.IsSlow() + old := s.isSlow.Swap(isSlow) + if old != isSlow { + logutil.BgLogger().Info("store health status changed", zap.Bool("isSlow", isSlow), zap.Stringer("healthDetail", healthDetail)) + } } // setTiKVSlowScoreLastUpdateTimeForTest force sets last update time of TiKV server side slow score to specified value. From 60a8844529a75331ff08c07396a5bf22d1b0f252 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Sun, 7 Apr 2024 18:00:14 +0800 Subject: [PATCH 2/6] Make slow store filtering the highest priority in replica selector v2 Signed-off-by: MyonKeminta --- go.sum | 2 - internal/locate/region_cache.go | 21 ++- internal/locate/replica_selector.go | 55 ++++-- internal/locate/replica_selector_test.go | 213 +++++++++++++++++++++-- internal/locate/slow_score.go | 12 +- internal/locate/store_cache.go | 35 ++-- 6 files changed, 288 insertions(+), 50 deletions(-) diff --git a/go.sum b/go.sum index 05d73b3c2..cf2c4a7d9 100644 --- a/go.sum +++ b/go.sum @@ -111,8 +111,6 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/pd/client v0.0.0-20240319071242-d3b94c97c12b h1:LUeYme5++BRU4DSEi2BmdIki0dRki4dFt2/8IhmIXy4= -github.com/tikv/pd/client v0.0.0-20240319071242-d3b94c97c12b/go.mod h1:Z/QAgOt29zvwBTd0H6pdx45VO6KRNc/O/DzGkVmSyZg= github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 h1:qiIt9AyEUW5yabTbCIgwxSMKi3p8ZE/YAk1Z6+fJq8M= github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31/go.mod h1:Z/QAgOt29zvwBTd0H6pdx45VO6KRNc/O/DzGkVmSyZg= github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA= diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index ce6582bea..8e8e8de2b 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -655,8 +655,23 @@ type RegionCache struct { clusterID uint64 } +type regionCacheOptions struct { + noHealthTick bool +} + +type RegionCacheOpt func(*regionCacheOptions) + +func RegionCacheNoHealthTick(o *regionCacheOptions) { + o.noHealthTick = true +} + // NewRegionCache creates a RegionCache. -func NewRegionCache(pdClient pd.Client) *RegionCache { +func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { + var options regionCacheOptions + for _, o := range opt { + o(&options) + } + c := &RegionCache{ pdClient: pdClient, } @@ -705,7 +720,9 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { needCheckStores = c.checkAndResolve(needCheckStores[:0], func(s *Store) bool { return filter(s.getResolveState()) }) return false }, time.Duration(refreshStoreInterval/4)*time.Second, c.getCheckStoreEvents()) - c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second) + if !options.noHealthTick { + c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second) + } c.bg.schedule(repeat(c.reportStoreReplicaFlows), time.Duration(refreshStoreInterval/2)*time.Second) if refreshCacheInterval := config.GetGlobalConfig().RegionsRefreshInterval; refreshCacheInterval > 0 { c.bg.schedule(func(ctx context.Context, _ time.Time) bool { diff --git a/internal/locate/replica_selector.go b/internal/locate/replica_selector.go index 5d5a927bb..df8fabb9c 100644 --- a/internal/locate/replica_selector.go +++ b/internal/locate/replica_selector.go @@ -232,7 +232,7 @@ type ReplicaSelectMixedStrategy struct { func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelectorV2, region *Region) *replica { replicas := selector.replicas maxScoreIdxes := make([]int, 0, len(replicas)) - maxScore := -1 + var maxScore storeSelectionScore = -1 reloadRegion := false for i, r := range replicas { epochStale := r.isEpochStale() @@ -285,7 +285,7 @@ func (s *ReplicaSelectMixedStrategy) isCandidate(r *replica, isLeader bool, epoc if r.dataIsNotReady && !isLeader { // If the replica is failed by data not ready with stale read, we can retry it with replica-read. // after https://github.com/tikv/tikv/pull/15726, the leader will not return DataIsNotReady error, - // then no need to retry leader again, if you try it again, you may got a NotLeader error. + // then no need to retry leader again. If you try it again, you may get a NotLeader error. maxAttempt = 2 } if r.isExhausted(maxAttempt, 0) { @@ -306,20 +306,51 @@ func (s *ReplicaSelectMixedStrategy) isCandidate(r *replica, isLeader bool, epoc return true } +type storeSelectionScore int64 + const ( // The definition of the score is: - // MSB LSB - // [unused bits][1 bit: LabelMatches][1 bit: PreferLeader][2 bits: NormalPeer + NotSlow] - flagLabelMatches = 1 << 4 - flagPreferLeader = 1 << 3 - flagNormalPeer = 1 << 2 - flagNotSlow = 1 << 1 - flagNotAttempt = 1 + // MSB LSB + // [unused bits][1 bit: NotSlow][1 bit: LabelMatches][1 bit: PreferLeader][1 bit: NormalPeer][1 bit: NotAttempted] + flagNotAttempted storeSelectionScore = 1 << iota + flagNormalPeer + flagPreferLeader + flagLabelMatches + flagNotSlow ) +func (s storeSelectionScore) String() string { + if s == 0 { + return "0" + } + res := "" + appendFactor := func(name string) { + if len(res) != 0 { + res += "|" + } + res += name + } + if (s & flagNotSlow) != 0 { + appendFactor("NotSlow") + } + if (s & flagLabelMatches) != 0 { + appendFactor("LableMatches") + } + if (s & flagPreferLeader) != 0 { + appendFactor("PreferLeader") + } + if (s & flagNormalPeer) != 0 { + appendFactor("NormalPeer") + } + if (s & flagNotAttempted) != 0 { + appendFactor("NotAttempted") + } + return res +} + // calculateScore calculates the score of the replica. -func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) int { - score := 0 +func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) storeSelectionScore { + var score storeSelectionScore = 0 if r.store.IsStoreMatch(s.stores) && r.store.IsLabelsMatch(s.labels) { score |= flagLabelMatches } @@ -353,7 +384,7 @@ func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) i score |= flagNotSlow } if r.attempts == 0 { - score |= flagNotAttempt + score |= flagNotAttempted } return score } diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index bb6cd01a6..4c7746194 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -52,7 +52,8 @@ func (s *testReplicaSelectorSuite) SetupTest(t *testing.T) { s.cluster = mocktikv.NewCluster(s.mvccStore) s.storeIDs, s.peerIDs, s.regionID, s.leaderPeer = mocktikv.BootstrapWithMultiStores(s.cluster, 3) pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)} - s.cache = NewRegionCache(pdCli) + // Disable the tick on health status. + s.cache = NewRegionCache(pdCli, RegionCacheNoHealthTick) s.bo = retry.NewNoopBackoff(context.Background()) s.SetT(t) s.SetS(s) @@ -166,29 +167,29 @@ func TestReplicaSelectorCalculateScore(t *testing.T) { score := strategy.calculateScore(r, isLeader) s.Equal(r.store.healthStatus.IsSlow(), false) if isLeader { - s.Equal(score, flagLabelMatches+flagNotSlow+flagNotAttempt) + s.Equal(score, flagLabelMatches+flagNotSlow+flagNotAttempted) } else { - s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotSlow+flagNotAttempt) + s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotSlow+flagNotAttempted) } r.store.healthStatus.markAlreadySlow() s.Equal(r.store.healthStatus.IsSlow(), true) score = strategy.calculateScore(r, isLeader) if isLeader { - s.Equal(score, flagLabelMatches+flagNotAttempt) + s.Equal(score, flagLabelMatches+flagNotAttempted) } else { - s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempt) + s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempted) } strategy.tryLeader = true score = strategy.calculateScore(r, isLeader) - s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempt) + s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempted) strategy.preferLeader = true score = strategy.calculateScore(r, isLeader) - s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempt) + s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempted) strategy.learnerOnly = true strategy.tryLeader = false strategy.preferLeader = false score = strategy.calculateScore(r, isLeader) - s.Equal(score, flagLabelMatches+flagNotAttempt) + s.Equal(score, flagLabelMatches+flagNotAttempted) labels := []*metapb.StoreLabel{ { Key: "zone", @@ -197,7 +198,7 @@ func TestReplicaSelectorCalculateScore(t *testing.T) { } strategy.labels = labels score = strategy.calculateScore(r, isLeader) - s.Equal(score, flagNotAttempt) + s.Equal(score, flagNotAttempted) strategy = ReplicaSelectMixedStrategy{ leaderIdx: rc.getStore().workTiKVIdx, @@ -206,9 +207,9 @@ func TestReplicaSelectorCalculateScore(t *testing.T) { } score = strategy.calculateScore(r, isLeader) if isLeader { - s.Equal(score, flagPreferLeader+flagNotAttempt) + s.Equal(score, flagPreferLeader+flagNotAttempted) } else { - s.Equal(score, flagNormalPeer+flagNotAttempt) + s.Equal(score, flagNormalPeer+flagNotAttempted) } strategy = ReplicaSelectMixedStrategy{ @@ -217,10 +218,10 @@ func TestReplicaSelectorCalculateScore(t *testing.T) { labels: labels, } score = strategy.calculateScore(r, isLeader) - s.Equal(score, flagNormalPeer+flagNotAttempt) + s.Equal(score, flagNormalPeer+flagNotAttempted) r.store.labels = labels score = strategy.calculateScore(r, isLeader) - s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempt) + s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempted) r.store.labels = nil } } @@ -1738,6 +1739,44 @@ func TestReplicaReadAccessPathByMixedAndPreferLeaderCase(t *testing.T) { s.True(s.runMultiCaseAndCompare(cas)) } +func TestMultiReplicaInOneAZ(t *testing.T) { + s := new(testReplicaSelectorSuite) + s.SetupTest(t) + defer s.TearDownTest() + + rc := s.getRegion() + for i := uint64(4); i <= 6; i++ { + s.cluster.AddStore(i, fmt.Sprintf("store%d", i), &metapb.StoreLabel{ + Key: "id", + Value: strconv.Itoa(int(i - 3)), + }) + s.cluster.AddPeer(rc.GetID(), i, s.cluster.AllocID()) + s.Equal(s.cluster.GetStore(i - 3).Labels[0], s.cluster.GetStore(i).Labels[0]) + } + rc.invalidate(Other) + + s.changeRegionLeader(2) + ca := replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: true, + accessErr: []RegionErrorType{ServerIsBusyErr}, + label: &metapb.StoreLabel{Key: "id", Value: "3"}, + expect: &accessPathResult{ + accessPath: []string{ + "{addr: store3, replica-read: false, stale-read: true}", + "{addr: store2, replica-read: false, stale-read: false}", + }, + respErr: "", + respRegionError: nil, + backoffCnt: 0, + backoffDetail: []string{}, + regionIsValid: true, + }, + } + s.True(s.runCaseAndCompare(ca)) +} + func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) { s := new(testReplicaSelectorSuite) s.SetupTest(t) @@ -2507,6 +2546,152 @@ func TestReplicaReadAccessPathByLearnerCase(t *testing.T) { s.True(s.runCaseAndCompare(ca)) } +func TestReplicaReadAvoidSlowStore(t *testing.T) { + s := new(testReplicaSelectorSuite) + s.SetupTest(t) + defer s.TearDownTest() + + s.changeRegionLeader(3) + store, exists := s.cache.getStore(1) + s.True(exists) + + for _, staleRead := range []bool{false, true} { + for _, withLabel := range []bool{false, true} { + var label *metapb.StoreLabel + if withLabel { + label = &metapb.StoreLabel{Key: "id", Value: "1"} + } + + s.T().Logf("test case: stale read: %v, with label: %v, slow: false", staleRead, withLabel) + + ca := replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: staleRead, + timeout: 0, + busyThresholdMs: 0, + label: label, + accessErr: []RegionErrorType{}, + expect: &accessPathResult{ + accessPath: []string{ + fmt.Sprintf("{addr: store1, replica-read: %v, stale-read: %v}", !staleRead, staleRead), + }, + respErr: "", + respRegionError: nil, + backoffCnt: 0, + backoffDetail: []string{}, + regionIsValid: true, + }, + } + s.True(s.runCaseAndCompare(ca)) + + s.T().Logf("test case: stale read: %v, with label: %v, slow: true", staleRead, withLabel) + expectedFirstStore := 2 + if withLabel { + // Leader is preferred in this case + expectedFirstStore = 3 + } + ca = replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: staleRead, + timeout: 0, + busyThresholdMs: 0, + label: label, + accessErr: []RegionErrorType{}, + expect: &accessPathResult{ + accessPath: []string{ + fmt.Sprintf("{addr: store%v, replica-read: %v, stale-read: %v}", expectedFirstStore, !staleRead, staleRead), + }, + respErr: "", + respRegionError: nil, + backoffCnt: 0, + backoffDetail: []string{}, + regionIsValid: true, + }, + beforeRun: func() { + s.resetStoreState() + store.healthStatus.updateTiKVServerSideSlowScore(100, time.Now()) + }, + } + // v1 doesn't support avoiding slow stores. We only test this on v2. + s.True(s.runCase(ca, true)) + + s.T().Logf("test case: stale read: %v, with label: %v, slow: false, encoutner err: true", staleRead, withLabel) + ca = replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: staleRead, + timeout: 0, + busyThresholdMs: 0, + label: label, + accessErr: []RegionErrorType{ServerIsBusyErr}, + expect: &accessPathResult{ + accessPath: []string{ + fmt.Sprintf("{addr: store1, replica-read: %v, stale-read: %v}", !staleRead, staleRead), + // Retry leader. + // For stale read, it fallbacks to leader read. However, replica-read doesn't do so. + fmt.Sprintf("{addr: store3, replica-read: %v, stale-read: false}", !staleRead), + }, + respErr: "", + respRegionError: nil, + backoffCnt: 0, + backoffDetail: []string{}, + regionIsValid: true, + }, + } + if !staleRead && !withLabel { + // v1 and v2 are inconsistent in this case. Skip running on v1. + s.True(s.runCase(ca, true)) + } else { + s.True(s.runCaseAndCompare(ca)) + } + + s.T().Logf("test case: stale read: %v, with label: %v, slow: true, encoutner err: true", staleRead, withLabel) + var expectedSecondPath string + if expectedFirstStore == 3 { + // Retry on store 2 which is a follower. + // Stale-read mode falls back to replica-read mode. + expectedSecondPath = "{addr: store2, replica-read: true, stale-read: false}" + } else { + if staleRead { + // Retry in leader read mode + expectedSecondPath = "{addr: store3, replica-read: false, stale-read: false}" + } else { + // Retry with the same mode, which is replica-read mode. + expectedSecondPath = "{addr: store3, replica-read: true, stale-read: false}" + } + } + + ca = replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: staleRead, + timeout: 0, + busyThresholdMs: 0, + label: label, + accessErr: []RegionErrorType{ServerIsBusyErr}, + expect: &accessPathResult{ + accessPath: []string{ + fmt.Sprintf("{addr: store%v, replica-read: %v, stale-read: %v}", expectedFirstStore, !staleRead, staleRead), + expectedSecondPath, + }, + respErr: "", + respRegionError: nil, + backoffCnt: 0, + backoffDetail: []string{}, + regionIsValid: true, + }, + beforeRun: func() { + s.resetStoreState() + store.healthStatus.updateTiKVServerSideSlowScore(100, time.Now()) + }, + } + s.True(s.runCase(ca, true)) + } + } +} + func TestReplicaReadAccessPathByGenError(t *testing.T) { s := new(testReplicaSelectorSuite) s.SetupTest(t) @@ -2871,7 +3056,7 @@ func (s *testReplicaSelectorSuite) resetStoreState() { for _, store := range rc.getStore().stores { store.loadStats.Store(nil) store.healthStatus.clientSideSlowScore.resetSlowScore() - store.healthStatus.updateTiKVServerSideSlowScore(0, time.Now()) + store.healthStatus.resetTiKVServerSideSlowScoreForTest() store.healthStatus.updateSlowFlag() atomic.StoreUint32(&store.livenessState, uint32(reachable)) store.setResolveState(resolved) diff --git a/internal/locate/slow_score.go b/internal/locate/slow_score.go index dc595a95c..28ed88fe1 100644 --- a/internal/locate/slow_score.go +++ b/internal/locate/slow_score.go @@ -124,7 +124,7 @@ func (ss *SlowScoreStat) updateSlowScore() { } atomic.CompareAndSwapUint64(&ss.avgTimecost, avgTimecost, ss.tsCntSlidingWindow.Avg()) - // Resets the counter of inteval timecost + // Resets the counter of interval timecost atomic.StoreUint64(&ss.intervalTimecost, 0) atomic.StoreUint64(&ss.intervalUpdCount, 0) } @@ -155,15 +155,17 @@ func (ss *SlowScoreStat) markAlreadySlow() { // resetSlowScore resets the slow score to 0. It's used for test. func (ss *SlowScoreStat) resetSlowScore() { - atomic.StoreUint64(&ss.avgScore, 0) + *ss = SlowScoreStat{ + avgScore: 1, + } } func (ss *SlowScoreStat) isSlow() bool { - return clientSideSlowScoreIsSlow(ss.getSlowScore()) + return clientSideSlowScoreIsSlow(ss.getSlowScore()) } func clientSideSlowScoreIsSlow(value uint64) bool { - return value >= slowScoreThreshold + return value >= slowScoreThreshold } // replicaFlowsType indicates the type of the destination replica of flows. @@ -174,7 +176,7 @@ const ( toLeader replicaFlowsType = iota // toFollower indicates that flows are sent to followers' replica toFollower - // numflowsDestType reserved to keep max replicaFlowsType value. + // numReplicaFlowsType is reserved to keep max replicaFlowsType value. numReplicaFlowsType ) diff --git a/internal/locate/store_cache.go b/internal/locate/store_cache.go index befa87c9d..e033dcc78 100644 --- a/internal/locate/store_cache.go +++ b/internal/locate/store_cache.go @@ -214,7 +214,7 @@ func newStore( peerAddr: peerAddr, saddr: statusAddr, // Make sure healthStatus field is never null. - healthStatus: newStoreHealthStatus(id), + healthStatus: newStoreHealthStatus(id), } } @@ -223,7 +223,7 @@ func newUninitializedStore(id uint64) *Store { return &Store{ storeID: id, // Make sure healthStatus field is never null. - healthStatus: newStoreHealthStatus(id), + healthStatus: newStoreHealthStatus(id), } } @@ -794,8 +794,8 @@ const ( ) type StoreHealthStatus struct { - // Used for logging. - storeID uint64 + // Used for logging. + storeID uint64 isSlow atomic.Bool @@ -820,17 +820,17 @@ type HealthStatusDetail struct { } func (d HealthStatusDetail) IsSlow() bool { - return clientSideSlowScoreIsSlow(uint64(d.ClientSideSlowScore)) || d.TiKVSideSlowScore >= tikvSlowScoreSlowThreshold + return clientSideSlowScoreIsSlow(uint64(d.ClientSideSlowScore)) || d.TiKVSideSlowScore >= tikvSlowScoreSlowThreshold } func (d HealthStatusDetail) String() string { - return fmt.Sprintf("{ ClientSideSlowScore: %d, TiKVSideSlowScore: %d }", d.ClientSideSlowScore, d.TiKVSideSlowScore) + return fmt.Sprintf("{ ClientSideSlowScore: %d, TiKVSideSlowScore: %d }", d.ClientSideSlowScore, d.TiKVSideSlowScore) } func newStoreHealthStatus(storeID uint64) *StoreHealthStatus { - return &StoreHealthStatus{ - storeID: storeID, - } + return &StoreHealthStatus{ + storeID: storeID, + } } // IsSlow returns whether current Store is slow. @@ -948,13 +948,18 @@ func (s *StoreHealthStatus) updateTiKVServerSideSlowScore(score int64, currTime s.tikvSideSlowScore.lastUpdateTime.Store(newUpdateTime) } +func (s *StoreHealthStatus) resetTiKVServerSideSlowScoreForTest() { + s.setTiKVSlowScoreLastUpdateTimeForTest(time.Now().Add(-time.Hour * 2)) + s.updateTiKVServerSideSlowScore(1, time.Now().Add(-time.Hour)) +} + func (s *StoreHealthStatus) updateSlowFlag() { - healthDetail := s.GetHealthStatusDetail() - isSlow := healthDetail.IsSlow() - old := s.isSlow.Swap(isSlow) - if old != isSlow { - logutil.BgLogger().Info("store health status changed", zap.Bool("isSlow", isSlow), zap.Stringer("healthDetail", healthDetail)) - } + healthDetail := s.GetHealthStatusDetail() + isSlow := healthDetail.IsSlow() + old := s.isSlow.Swap(isSlow) + if old != isSlow { + logutil.BgLogger().Info("store health status changed", zap.Uint64("storeID", s.storeID), zap.Bool("isSlow", isSlow), zap.Stringer("healthDetail", healthDetail)) + } } // setTiKVSlowScoreLastUpdateTimeForTest force sets last update time of TiKV server side slow score to specified value. From 8ebbe6a882762f6d01e41622cd23dea7a20aa059 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Sun, 7 Apr 2024 18:10:44 +0800 Subject: [PATCH 3/6] Add non stale read case to TestMultiReplicaInOneAZ Signed-off-by: MyonKeminta --- internal/locate/replica_selector_test.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index 4c7746194..283f6d3f3 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -1756,6 +1756,7 @@ func TestMultiReplicaInOneAZ(t *testing.T) { rc.invalidate(Other) s.changeRegionLeader(2) + // For stale read mode, always retry leader in the second attempt. ca := replicaSelectorAccessPathCase{ reqType: tikvrpc.CmdGet, readType: kv.ReplicaReadMixed, @@ -1775,6 +1776,27 @@ func TestMultiReplicaInOneAZ(t *testing.T) { }, } s.True(s.runCaseAndCompare(ca)) + + // For non-stale-read, retrying leader is not that high priority. + ca = replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: false, + accessErr: []RegionErrorType{ServerIsBusyErr}, + label: &metapb.StoreLabel{Key: "id", Value: "3"}, + expect: &accessPathResult{ + accessPath: []string{ + "{addr: store3, replica-read: true, stale-read: false}", + "{addr: store6, replica-read: true, stale-read: false}", + }, + respErr: "", + respRegionError: nil, + backoffCnt: 0, + backoffDetail: []string{}, + regionIsValid: true, + }, + } + s.True(s.runCaseAndCompare(ca)) } func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) { From 05eec3f6f7a5e49b7aff92862716253afa882da5 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Sun, 7 Apr 2024 18:21:32 +0800 Subject: [PATCH 4/6] Enrich the multi replcia in one AZ case but it failed... Signed-off-by: MyonKeminta --- internal/locate/replica_selector.go | 3 +- internal/locate/replica_selector_test.go | 44 ++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/internal/locate/replica_selector.go b/internal/locate/replica_selector.go index df8fabb9c..67917604b 100644 --- a/internal/locate/replica_selector.go +++ b/internal/locate/replica_selector.go @@ -365,7 +365,8 @@ func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) s } } else if s.tryLeader { if len(s.labels) > 0 { - // When the leader has matching labels, prefer leader than other mismatching peers. + // When label matching is enabled, prefer selecting the leader for replicas that has same label-matching + // results. score |= flagPreferLeader } else { score |= flagNormalPeer diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index 283f6d3f3..76fe71b29 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -1797,6 +1797,50 @@ func TestMultiReplicaInOneAZ(t *testing.T) { }, } s.True(s.runCaseAndCompare(ca)) + + s.changeRegionLeader(4) + // Prefer choosing leader for replicas with same label matching results. + for _, staleRead := range []bool{false, true} { + ca = replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: staleRead, + accessErr: []RegionErrorType{}, + label: &metapb.StoreLabel{Key: "id", Value: "1"}, + expect: &accessPathResult{ + accessPath: []string{ + fmt.Sprintf("{addr: store4, replica-read: %v, stale-read: %v}", !staleRead, staleRead), + }, + respErr: "", + respRegionError: nil, + backoffCnt: 0, + backoffDetail: []string{}, + regionIsValid: true, + }, + } + s.True(s.runCaseAndCompare(ca)) + } + + ca = replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: false, + accessErr: []RegionErrorType{ServerIsBusyErr, ServerIsBusyErr}, + label: &metapb.StoreLabel{Key: "id", Value: "3"}, + expect: &accessPathResult{ + accessPath: []string{ + "{addr: store3, replica-read: true, stale-read: false}", + "{addr: store6, replica-read: true, stale-read: false}", + "{addr: store4, replica-read: true, stale-read: false}", + }, + respErr: "", + respRegionError: nil, + backoffCnt: 0, + backoffDetail: []string{}, + regionIsValid: true, + }, + } + s.True(s.runCaseAndCompare(ca)) } func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) { From d97f9709e64ce11d589ade5b6cffb6c9c0232d71 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Mon, 8 Apr 2024 11:46:36 +0800 Subject: [PATCH 5/6] update test to adapt the fix on master branch Signed-off-by: MyonKeminta --- internal/locate/replica_selector_test.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index 9784046c1..2bcf56b71 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -2747,6 +2747,19 @@ func TestReplicaReadAvoidSlowStore(t *testing.T) { s.True(s.runCase(ca, true)) s.T().Logf("test case: stale read: %v, with label: %v, slow: false, encoutner err: true", staleRead, withLabel) + var expectedSecondPath string + if staleRead { + // Retry leader, and fallback to leader-read mode. + expectedSecondPath = "{addr: store3, replica-read: false, stale-read: false}" + } else { + if withLabel { + // Prefer retrying leader. + expectedSecondPath = "{addr: store3, replica-read: true, stale-read: false}" + } else { + // Retry any another replica. + expectedSecondPath = "{addr: store2, replica-read: true, stale-read: false}" + } + } ca = replicaSelectorAccessPathCase{ reqType: tikvrpc.CmdGet, readType: kv.ReplicaReadMixed, @@ -2760,7 +2773,7 @@ func TestReplicaReadAvoidSlowStore(t *testing.T) { fmt.Sprintf("{addr: store1, replica-read: %v, stale-read: %v}", !staleRead, staleRead), // Retry leader. // For stale read, it fallbacks to leader read. However, replica-read doesn't do so. - fmt.Sprintf("{addr: store3, replica-read: %v, stale-read: false}", !staleRead), + expectedSecondPath, }, respErr: "", respRegionError: nil, @@ -2769,15 +2782,9 @@ func TestReplicaReadAvoidSlowStore(t *testing.T) { regionIsValid: true, }, } - if !staleRead && !withLabel { - // v1 and v2 are inconsistent in this case. Skip running on v1. - s.True(s.runCase(ca, true)) - } else { - s.True(s.runCaseAndCompare(ca)) - } + s.True(s.runCaseAndCompare(ca)) s.T().Logf("test case: stale read: %v, with label: %v, slow: true, encoutner err: true", staleRead, withLabel) - var expectedSecondPath string if expectedFirstStore == 3 { // Retry on store 2 which is a follower. // Stale-read mode falls back to replica-read mode. From 3ce76a93421f482fa28e15fefd16cf65f4466df2 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Mon, 8 Apr 2024 11:53:48 +0800 Subject: [PATCH 6/6] Remove TestMultiReplicaInOneAZ Signed-off-by: MyonKeminta --- internal/locate/replica_selector_test.go | 104 ----------------------- 1 file changed, 104 deletions(-) diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index 2bcf56b71..521700d8a 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -1802,110 +1802,6 @@ func TestReplicaReadAccessPathByMixedAndPreferLeaderCase(t *testing.T) { s.True(s.runMultiCaseAndCompare(cas)) } -func TestMultiReplicaInOneAZ(t *testing.T) { - s := new(testReplicaSelectorSuite) - s.SetupTest(t) - defer s.TearDownTest() - - rc := s.getRegion() - for i := uint64(4); i <= 6; i++ { - s.cluster.AddStore(i, fmt.Sprintf("store%d", i), &metapb.StoreLabel{ - Key: "id", - Value: strconv.Itoa(int(i - 3)), - }) - s.cluster.AddPeer(rc.GetID(), i, s.cluster.AllocID()) - s.Equal(s.cluster.GetStore(i - 3).Labels[0], s.cluster.GetStore(i).Labels[0]) - } - rc.invalidate(Other) - - s.changeRegionLeader(2) - // For stale read mode, always retry leader in the second attempt. - ca := replicaSelectorAccessPathCase{ - reqType: tikvrpc.CmdGet, - readType: kv.ReplicaReadMixed, - staleRead: true, - accessErr: []RegionErrorType{ServerIsBusyErr}, - label: &metapb.StoreLabel{Key: "id", Value: "3"}, - expect: &accessPathResult{ - accessPath: []string{ - "{addr: store3, replica-read: false, stale-read: true}", - "{addr: store2, replica-read: false, stale-read: false}", - }, - respErr: "", - respRegionError: nil, - backoffCnt: 0, - backoffDetail: []string{}, - regionIsValid: true, - }, - } - s.True(s.runCaseAndCompare(ca)) - - // For non-stale-read, retrying leader is not that high priority. - ca = replicaSelectorAccessPathCase{ - reqType: tikvrpc.CmdGet, - readType: kv.ReplicaReadMixed, - staleRead: false, - accessErr: []RegionErrorType{ServerIsBusyErr}, - label: &metapb.StoreLabel{Key: "id", Value: "3"}, - expect: &accessPathResult{ - accessPath: []string{ - "{addr: store3, replica-read: true, stale-read: false}", - "{addr: store6, replica-read: true, stale-read: false}", - }, - respErr: "", - respRegionError: nil, - backoffCnt: 0, - backoffDetail: []string{}, - regionIsValid: true, - }, - } - s.True(s.runCaseAndCompare(ca)) - - s.changeRegionLeader(4) - // Prefer choosing leader for replicas with same label matching results. - for _, staleRead := range []bool{false, true} { - ca = replicaSelectorAccessPathCase{ - reqType: tikvrpc.CmdGet, - readType: kv.ReplicaReadMixed, - staleRead: staleRead, - accessErr: []RegionErrorType{}, - label: &metapb.StoreLabel{Key: "id", Value: "1"}, - expect: &accessPathResult{ - accessPath: []string{ - fmt.Sprintf("{addr: store4, replica-read: %v, stale-read: %v}", !staleRead, staleRead), - }, - respErr: "", - respRegionError: nil, - backoffCnt: 0, - backoffDetail: []string{}, - regionIsValid: true, - }, - } - s.True(s.runCaseAndCompare(ca)) - } - - ca = replicaSelectorAccessPathCase{ - reqType: tikvrpc.CmdGet, - readType: kv.ReplicaReadMixed, - staleRead: false, - accessErr: []RegionErrorType{ServerIsBusyErr, ServerIsBusyErr}, - label: &metapb.StoreLabel{Key: "id", Value: "3"}, - expect: &accessPathResult{ - accessPath: []string{ - "{addr: store3, replica-read: true, stale-read: false}", - "{addr: store6, replica-read: true, stale-read: false}", - "{addr: store4, replica-read: true, stale-read: false}", - }, - respErr: "", - respRegionError: nil, - backoffCnt: 0, - backoffDetail: []string{}, - regionIsValid: true, - }, - } - s.True(s.runCaseAndCompare(ca)) -} - func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) { s := new(testReplicaSelectorSuite) s.SetupTest(t)