From 2c75a2e660dffcee3eba622164c0f6a13b11f421 Mon Sep 17 00:00:00 2001 From: zyguan Date: Tue, 16 Jan 2024 17:35:48 +0800 Subject: [PATCH 1/3] do not update atime when region has down peers Signed-off-by: zyguan --- internal/locate/region_cache.go | 105 ++++++------------------ internal/locate/region_cache_test.go | 100 +++++++++++++++------- internal/locate/region_request3_test.go | 43 ++++++++++ 3 files changed, 142 insertions(+), 106 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index a4f40ef8ef..e905ec9012 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -41,6 +41,7 @@ import ( "fmt" "math" "math/rand" + "slices" "sort" "strings" "sync" @@ -148,14 +149,14 @@ const ( // Region presents kv region type Region struct { - meta *metapb.Region // raw region meta from PD, immutable after init - store unsafe.Pointer // point to region store info, see RegionStore - syncFlag int32 // region need be sync in next turn - lastAccess int64 // last region access time, see checkRegionCacheTTL - invalidReason InvalidReason // the reason why the region is invalidated - asyncReload atomic.Bool // the region need to be reloaded in async mode - lastLoad int64 // last region load time - hasUnavailableTiFlashStore bool // has unavailable TiFlash store, if yes, need to trigger async reload periodically + meta *metapb.Region // raw region meta from PD, immutable after init + store unsafe.Pointer // point to region store info, see RegionStore + lastLoad int64 // last region load time + lastAccess int64 // last region access time, see checkRegionCacheTTL + syncFlag int32 // region need be sync in next turn + invalidReason InvalidReason // the reason why the region is invalidated + asyncReload atomic.Bool // the region need to be reloaded in async mode + hasDownPeers bool // the region has down peers, if true, lastAccess won't be updated so that the cached region will expire after RegionCacheTTL. } // AccessIndex represent the index for accessIndex array @@ -180,9 +181,10 @@ type regionStore struct { // buckets is not accurate and it can change even if the region is not changed. // It can be stale and buckets keys can be out of the region range. buckets *metapb.Buckets - // record all storeIDs on which pending peers reside. - // key is storeID, val is peerID. - pendingTiFlashPeerStores map[uint64]uint64 + // pendingPeers refers to pdRegion.PendingPeers. It's immutable and can be used to reconstruct pdRegions. + pendingPeers []*metapb.Peer + // downPeers refers to pdRegion.DownPeers. It's immutable and can be used to reconstruct pdRegions. + downPeers []*metapb.Peer } func (r *regionStore) accessStore(mode accessMode, idx AccessIndex) (int, *Store) { @@ -275,12 +277,13 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio // regionStore pull used store from global store map // to avoid acquire storeMu in later access. rs := ®ionStore{ - workTiKVIdx: 0, - proxyTiKVIdx: -1, - stores: make([]*Store, 0, len(r.meta.Peers)), - pendingTiFlashPeerStores: map[uint64]uint64{}, - storeEpochs: make([]uint32, 0, len(r.meta.Peers)), - buckets: pdRegion.Buckets, + workTiKVIdx: 0, + proxyTiKVIdx: -1, + stores: make([]*Store, 0, len(r.meta.Peers)), + storeEpochs: make([]uint32, 0, len(r.meta.Peers)), + buckets: pdRegion.Buckets, + pendingPeers: pdRegion.PendingPeers, + downPeers: pdRegion.DownPeers, } leader := pdRegion.Leader @@ -297,8 +300,8 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio if err != nil { return nil, err } - // Filter the peer on a tombstone store. - if addr == "" { + // Filter out the peer on a tombstone|down store. + if addr == "" || slices.ContainsFunc(pdRegion.DownPeers, func(dp *metapb.Peer) bool { return isSamePeer(dp, p) }) { continue } @@ -321,11 +324,6 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio } rs.stores = append(rs.stores, store) rs.storeEpochs = append(rs.storeEpochs, atomic.LoadUint32(&store.epoch)) - for _, pendingPeer := range pdRegion.PendingPeers { - if pendingPeer.Id == p.Id { - rs.pendingTiFlashPeerStores[store.storeID] = p.Id - } - } } // TODO(youjiali1995): It's possible the region info in PD is stale for now but it can recover. // Maybe we need backoff here. @@ -333,32 +331,10 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio return nil, errors.Errorf("no available peers, region: {%v}", r.meta) } - for _, p := range pdRegion.DownPeers { - c.storeMu.RLock() - store, exists := c.storeMu.stores[p.StoreId] - c.storeMu.RUnlock() - if !exists { - store = c.getStoreByStoreID(p.StoreId) - } - addr, err := store.initResolve(bo, c) - if err != nil { - continue - } - // Filter the peer on a tombstone store. - if addr == "" { - continue - } - - if store.storeType == tikvrpc.TiFlash { - r.hasUnavailableTiFlashStore = true - break - } - } - rs.workTiKVIdx = leaderAccessIdx - r.meta.Peers = availablePeers - r.setStore(rs) + r.meta.Peers = availablePeers + r.hasDownPeers = len(pdRegion.DownPeers) > 0 // mark region has been init accessed. r.lastAccess = time.Now().Unix() @@ -395,7 +371,7 @@ func (r *Region) checkRegionCacheTTL(ts int64) bool { if ts-lastAccess > regionCacheTTLSec { return false } - if atomic.CompareAndSwapInt64(&r.lastAccess, lastAccess, ts) { + if r.hasDownPeers || atomic.CompareAndSwapInt64(&r.lastAccess, lastAccess, ts) { return true } } @@ -890,7 +866,7 @@ func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Sto allStores = append(allStores, store.storeID) } for _, storeID := range allStores { - if _, ok := regionStore.pendingTiFlashPeerStores[storeID]; !ok { + if !slices.ContainsFunc(regionStore.pendingPeers, func(p *metapb.Peer) bool { return p.StoreId == storeID }) { nonPendingStores = append(nonPendingStores, storeID) } } @@ -907,11 +883,6 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, return nil, nil } - if cachedRegion.hasUnavailableTiFlashStore && time.Now().Unix()-cachedRegion.lastLoad > regionCacheTTLSec { - /// schedule an async reload to avoid load balance issue, refer https://github.com/pingcap/tidb/issues/35418 for details - c.scheduleReloadRegion(cachedRegion) - } - regionStore := cachedRegion.getStore() // sIdx is for load balance of TiFlash store. @@ -1618,7 +1589,7 @@ func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region { if ts-lastAccess > regionCacheTTLSec { return nil } - if latestRegion != nil { + if !latestRegion.hasDownPeers { atomic.CompareAndSwapInt64(&latestRegion.lastAccess, atomic.LoadInt64(&latestRegion.lastAccess), ts) } return latestRegion @@ -1662,26 +1633,6 @@ func (c *RegionCache) GetAllStores() []*Store { return append(stores, tiflashStores...) } -func filterUnavailablePeers(region *pd.Region) { - if len(region.DownPeers) == 0 { - return - } - new := region.Meta.Peers[:0] - for _, p := range region.Meta.Peers { - available := true - for _, downPeer := range region.DownPeers { - if p.Id == downPeer.Id && p.StoreId == downPeer.StoreId { - available = false - break - } - } - if available { - new = append(new, p) - } - } - region.Meta.Peers = new -} - // loadRegion loads region from pd client, and picks the first peer as leader. // If the given key is the end key of the region that you want, you may set the second argument to true. This is useful // when processing in reverse order. @@ -1729,7 +1680,6 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool, backoffErr = errors.Errorf("region not found for key %q, encode_key: %q", util.HexRegionKeyStr(key), util.HexRegionKey(c.codec.EncodeRegionKey(key))) continue } - filterUnavailablePeers(reg) if len(reg.Meta.Peers) == 0 { return nil, errors.New("receive Region with no available peer") } @@ -1775,7 +1725,6 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg if reg == nil || reg.Meta == nil { return nil, errors.Errorf("region not found for regionID %d", regionID) } - filterUnavailablePeers(reg) if len(reg.Meta.Peers) == 0 { return nil, errors.New("receive Region with no available peer") } diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index c84c4e77b3..5550409f38 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -58,6 +58,18 @@ import ( uatomic "go.uber.org/atomic" ) +type inspectedPDClient struct { + pd.Client + getRegion func(ctx context.Context, cli pd.Client, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) +} + +func (c *inspectedPDClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) { + if c.getRegion != nil { + return c.getRegion(ctx, c.Client, key, opts...) + } + return c.Client.GetRegion(ctx, key, opts...) +} + func TestRegionCache(t *testing.T) { suite.Run(t, new(testRegionCacheSuite)) } @@ -293,7 +305,28 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { s.cluster.AddStore(storeMeta.GetId(), storeMeta.GetAddress(), storeMeta.GetLabels()...) } -func (s *testRegionCacheSuite) TestTiFlashDownPeersAndAsyncReload() { +func (s *testRegionCacheSuite) TestReloadRegionWithDownPeers() { + cntGetRegion := 0 + s.cache.pdClient = &inspectedPDClient{ + Client: s.cache.pdClient, + getRegion: func(ctx context.Context, cli pd.Client, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) { + cntGetRegion++ + return cli.GetRegion(ctx, key, opts...) + }, + } + s.cluster.MarkPeerDown(s.peer2) + + defer SetRegionCacheTTLSec(regionCacheTTLSec) + SetRegionCacheTTLSec(2) + for i := 0; i < 50; i++ { + time.Sleep(100 * time.Millisecond) + _, err := s.cache.LocateKey(s.bo, []byte("a")) + s.NoError(err) + } + s.Equal(2, cntGetRegion, "should reload region with down peers every RegionCacheTTL") +} + +func (s *testRegionCacheSuite) TestTiFlashRecoveredFromDown() { store3 := s.cluster.AllocID() peer3 := s.cluster.AllocID() s.cluster.AddStore(store3, s.storeAddr(store3)) @@ -313,8 +346,7 @@ func (s *testRegionCacheSuite) TestTiFlashDownPeersAndAsyncReload() { s.Nil(err) s.NotNil(ctx) region := s.cache.GetCachedRegionWithRLock(loc.Region) - s.Equal(region.hasUnavailableTiFlashStore, false) - s.Equal(region.asyncReload.Load(), false) + s.Equal(region.hasDownPeers, false) s.cache.clear() s.cluster.MarkPeerDown(peer3) @@ -323,24 +355,39 @@ func (s *testRegionCacheSuite) TestTiFlashDownPeersAndAsyncReload() { s.Nil(err) s.Equal(loc.Region.id, s.region1) region = s.cache.GetCachedRegionWithRLock(loc.Region) - s.Equal(region.hasUnavailableTiFlashStore, true) - s.Equal(region.asyncReload.Load(), false) + s.Equal(region.hasDownPeers, true) + defer SetRegionCacheTTLSec(regionCacheTTLSec) SetRegionCacheTTLSec(3) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i <= 3; i++ { - s.cache.GetTiFlashRPCContext(s.bo, loc.Region, true, LabelFilterNoTiFlashWriteNode) - time.Sleep(1 * time.Second) + for i := 0; i <= 3; i++ { + time.Sleep(1 * time.Second) + loc, err = s.cache.LocateKey(s.bo, []byte("a")) + s.Nil(err) + rpcCtx, err := s.cache.GetTiFlashRPCContext(s.bo, loc.Region, true, LabelFilterNoTiFlashWriteNode) + s.Nil(err) + if rpcCtx != nil { + s.NotEqual(s.storeAddr(store3), rpcCtx.Addr, "should not access peer3 when it is down") } - }() - wg.Wait() - s.cache.GetTiFlashRPCContext(s.bo, loc.Region, true, LabelFilterNoTiFlashWriteNode) - s.Equal(region.hasUnavailableTiFlashStore, true) - s.Equal(region.asyncReload.Load(), true) + } + newRegion := s.cache.GetCachedRegionWithRLock(loc.Region) + s.NotNil(newRegion) + s.Less(region.lastLoad, newRegion.lastLoad) + s.cluster.RemoveDownPeer(peer3) + for i := 0; ; i++ { + if i > 10 { + s.Fail("should access peer3 after it is up") + break + } + loc, err = s.cache.LocateKey(s.bo, []byte("a")) + s.Nil(err) + rpcCtx, err := s.cache.GetTiFlashRPCContext(s.bo, loc.Region, true, LabelFilterNoTiFlashWriteNode) + s.Nil(err) + if rpcCtx != nil && rpcCtx.Addr == s.storeAddr(store3) { + break + } + time.Sleep(1 * time.Second) + } } // TestFilterDownPeersOrPeersOnTombstoneOrDroppedStore verifies the RegionCache filter @@ -1306,7 +1353,6 @@ func (s *testRegionCacheSuite) TestPeersLenChange() { Meta: cpMeta, DownPeers: []*metapb.Peer{{Id: s.peer1, StoreId: s.store1}}, } - filterUnavailablePeers(cpRegion) region, err := newRegion(s.bo, s.cache, cpRegion) s.Nil(err) s.cache.insertRegionToCache(region, true, true) @@ -2011,11 +2057,10 @@ func BenchmarkInsertRegionToCache(b *testing.B) { }, } rs := ®ionStore{ - workTiKVIdx: 0, - proxyTiKVIdx: -1, - stores: make([]*Store, 0, len(r.meta.Peers)), - pendingTiFlashPeerStores: map[uint64]uint64{}, - storeEpochs: make([]uint32, 0, len(r.meta.Peers)), + workTiKVIdx: 0, + proxyTiKVIdx: -1, + stores: make([]*Store, 0, len(r.meta.Peers)), + storeEpochs: make([]uint32, 0, len(r.meta.Peers)), } r.setStore(rs) b.StartTimer() @@ -2049,11 +2094,10 @@ func BenchmarkInsertRegionToCache2(b *testing.B) { }, } rs := ®ionStore{ - workTiKVIdx: 0, - proxyTiKVIdx: -1, - stores: make([]*Store, 0, len(r.meta.Peers)), - pendingTiFlashPeerStores: map[uint64]uint64{}, - storeEpochs: make([]uint32, 0, len(r.meta.Peers)), + workTiKVIdx: 0, + proxyTiKVIdx: -1, + stores: make([]*Store, 0, len(r.meta.Peers)), + storeEpochs: make([]uint32, 0, len(r.meta.Peers)), } r.setStore(rs) b.StartTimer() diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 9e5ce2969a..21d45f78f6 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1695,3 +1695,46 @@ func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { // `tryFollower` always try the local peer firstly s.Equal(follower.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) } + +func (s *testRegionRequestToThreeStoresSuite) TestTiKVRecoveredFromDown() { + defer SetRegionCacheTTLSec(regionCacheTTLSec) + SetRegionCacheTTLSec(2) + + bo := retry.NewBackoffer(context.Background(), -1) + key := []byte("key") + + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadMixed, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + + downStore := s.cluster.GetStore(s.storeIDs[2]) + s.cluster.MarkPeerDown(s.peerIDs[2]) + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + s.Require().NotEqual(addr, downStore.Address) + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil + }} + for i := 0; i < 30; i++ { + time.Sleep(200 * time.Millisecond) + loc, err := s.cache.LocateKey(bo, key) + s.Require().Nil(err) + resp, rpcCtx, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV, WithMatchLabels(downStore.Labels)) + s.Require().Nil(err) + s.Require().Equal(rpcCtx.Addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value), "should access other peers") + } + + s.cluster.RemoveDownPeer(s.peerIDs[2]) + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil + }} + for i := 0; i < 30; i++ { + time.Sleep(200 * time.Millisecond) + loc, err := s.cache.LocateKey(bo, key) + s.Require().Nil(err) + _, rpcCtx, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV, WithMatchLabels(downStore.Labels)) + s.Require().Nil(err) + if rpcCtx.Addr == downStore.Address { + return + } + } + s.Require().Fail("should access recovered peer after region reloading within RegionCacheTTL") +} From 73075b8efa0c98d038b07a1ed21aa51e9a70783d Mon Sep 17 00:00:00 2001 From: zyguan Date: Tue, 16 Jan 2024 18:04:22 +0800 Subject: [PATCH 2/3] avoid data race in tests Signed-off-by: zyguan --- internal/locate/region_cache.go | 29 +++++++++++++++++++++---- internal/locate/region_cache_test.go | 14 ++++++++---- internal/locate/region_request3_test.go | 10 ++++++--- 3 files changed, 42 insertions(+), 11 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index e905ec9012..7dad49a48a 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -470,6 +470,7 @@ type RegionCache struct { // Context for background jobs ctx context.Context cancelFunc context.CancelFunc + wg sync.WaitGroup testingKnobs struct { // Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set, @@ -511,16 +512,21 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c.mu = *newRegionIndexMu(nil) } + // TODO(zyguan): refine management of background cron jobs + c.wg.Add(1) go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second) c.enableForwarding = config.GetGlobalConfig().EnableForwarding // Default use 15s as the update inerval. + c.wg.Add(1) go c.asyncUpdateStoreSlowScore(time.Duration(interval/4) * time.Second) if config.GetGlobalConfig().RegionsRefreshInterval > 0 { c.timelyRefreshCache(config.GetGlobalConfig().RegionsRefreshInterval) } else { // cacheGC is not compatible with timelyRefreshCache + c.wg.Add(1) go c.cacheGC() } + c.wg.Add(1) go c.asyncReportStoreReplicaFlows(time.Duration(interval/2) * time.Second) return c } @@ -553,6 +559,7 @@ func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldReg // Close releases region cache's resource. func (c *RegionCache) Close() { c.cancelFunc() + c.wg.Wait() } var reloadRegionInterval = int64(10 * time.Second) @@ -562,6 +569,7 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { ticker := time.NewTicker(interval) reloadRegionTicker := time.NewTicker(time.Duration(atomic.LoadInt64(&reloadRegionInterval))) defer func() { + c.wg.Done() ticker.Stop() reloadRegionTicker.Stop() }() @@ -1754,8 +1762,12 @@ func (c *RegionCache) timelyRefreshCache(intervalS uint64) { return } ticker := time.NewTicker(time.Duration(intervalS) * time.Second) + c.wg.Add(1) go func() { - defer ticker.Stop() + defer func() { + c.wg.Done() + ticker.Stop() + }() for { select { case <-c.ctx.Done(): @@ -2187,7 +2199,10 @@ const cleanRegionNumPerRound = 50 // negligible. func (c *RegionCache) cacheGC() { ticker := time.NewTicker(cleanCacheInterval) - defer ticker.Stop() + defer func() { + c.wg.Done() + ticker.Stop() + }() beginning := newBtreeSearchItem([]byte("")) iterItem := beginning @@ -3009,7 +3024,10 @@ func (s *Store) markAlreadySlow() { // asyncUpdateStoreSlowScore updates the slow score of each store periodically. func (c *RegionCache) asyncUpdateStoreSlowScore(interval time.Duration) { ticker := time.NewTicker(interval) - defer ticker.Stop() + defer func() { + c.wg.Done() + ticker.Stop() + }() for { select { case <-c.ctx.Done(): @@ -3061,7 +3079,10 @@ func (s *Store) recordReplicaFlowsStats(destType replicaFlowsType) { // asyncReportStoreReplicaFlows reports the statistics on the related replicaFlowsType. func (c *RegionCache) asyncReportStoreReplicaFlows(interval time.Duration) { ticker := time.NewTicker(interval) - defer ticker.Stop() + defer func() { + c.wg.Done() + ticker.Stop() + }() for { select { case <-c.ctx.Done(): diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 5550409f38..a171c6f3f4 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -85,6 +85,7 @@ type testRegionCacheSuite struct { region1 uint64 cache *RegionCache bo *retry.Backoffer + onClosed func() } func (s *testRegionCacheSuite) SetupTest() { @@ -104,6 +105,9 @@ func (s *testRegionCacheSuite) SetupTest() { func (s *testRegionCacheSuite) TearDownTest() { s.cache.Close() s.mvccStore.Close() + if s.onClosed != nil { + s.onClosed() + } } func (s *testRegionCacheSuite) storeAddr(id uint64) string { @@ -306,6 +310,9 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { } func (s *testRegionCacheSuite) TestReloadRegionWithDownPeers() { + s.onClosed = func() { SetRegionCacheTTLSec(600) } + SetRegionCacheTTLSec(2) + cntGetRegion := 0 s.cache.pdClient = &inspectedPDClient{ Client: s.cache.pdClient, @@ -316,8 +323,6 @@ func (s *testRegionCacheSuite) TestReloadRegionWithDownPeers() { } s.cluster.MarkPeerDown(s.peer2) - defer SetRegionCacheTTLSec(regionCacheTTLSec) - SetRegionCacheTTLSec(2) for i := 0; i < 50; i++ { time.Sleep(100 * time.Millisecond) _, err := s.cache.LocateKey(s.bo, []byte("a")) @@ -327,6 +332,9 @@ func (s *testRegionCacheSuite) TestReloadRegionWithDownPeers() { } func (s *testRegionCacheSuite) TestTiFlashRecoveredFromDown() { + s.onClosed = func() { SetRegionCacheTTLSec(600) } + SetRegionCacheTTLSec(3) + store3 := s.cluster.AllocID() peer3 := s.cluster.AllocID() s.cluster.AddStore(store3, s.storeAddr(store3)) @@ -357,8 +365,6 @@ func (s *testRegionCacheSuite) TestTiFlashRecoveredFromDown() { region = s.cache.GetCachedRegionWithRLock(loc.Region) s.Equal(region.hasDownPeers, true) - defer SetRegionCacheTTLSec(regionCacheTTLSec) - SetRegionCacheTTLSec(3) for i := 0; i <= 3; i++ { time.Sleep(1 * time.Second) loc, err = s.cache.LocateKey(s.bo, []byte("a")) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 21d45f78f6..3812f8c3e5 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -75,6 +75,7 @@ type testRegionRequestToThreeStoresSuite struct { bo *retry.Backoffer regionRequestSender *RegionRequestSender mvccStore mocktikv.MVCCStore + onClosed func() } func (s *testRegionRequestToThreeStoresSuite) SetupTest() { @@ -91,6 +92,9 @@ func (s *testRegionRequestToThreeStoresSuite) SetupTest() { func (s *testRegionRequestToThreeStoresSuite) TearDownTest() { s.cache.Close() s.mvccStore.Close() + if s.onClosed != nil { + s.onClosed() + } } func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit() { @@ -1697,7 +1701,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { } func (s *testRegionRequestToThreeStoresSuite) TestTiKVRecoveredFromDown() { - defer SetRegionCacheTTLSec(regionCacheTTLSec) + s.onClosed = func() { SetRegionCacheTTLSec(600) } SetRegionCacheTTLSec(2) bo := retry.NewBackoffer(context.Background(), -1) @@ -1713,7 +1717,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestTiKVRecoveredFromDown() { s.Require().NotEqual(addr, downStore.Address) return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil }} - for i := 0; i < 30; i++ { + for i := 0; i < 15; i++ { time.Sleep(200 * time.Millisecond) loc, err := s.cache.LocateKey(bo, key) s.Require().Nil(err) @@ -1726,7 +1730,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestTiKVRecoveredFromDown() { s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil }} - for i := 0; i < 30; i++ { + for i := 0; i < 15; i++ { time.Sleep(200 * time.Millisecond) loc, err := s.cache.LocateKey(bo, key) s.Require().Nil(err) From 5e5548ee3f503285d7334b75dab7f2b96422701f Mon Sep 17 00:00:00 2001 From: zyguan Date: Tue, 16 Jan 2024 23:15:04 +0800 Subject: [PATCH 3/3] fix lint Signed-off-by: zyguan --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 50c4909737..4c86426793 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -48,4 +48,4 @@ jobs: uses: golangci/golangci-lint-action@v3 with: version: v1.51.2 - + install-mode: "goinstall"