diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 2d16e235be..b74c10af51 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -326,11 +326,11 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio var leaderAccessIdx AccessIndex availablePeers := r.meta.GetPeers()[:0] for _, p := range r.meta.Peers { - store, exists := c.getStore(p.StoreId) + store, exists := c.stores.get(p.StoreId) if !exists { - store = c.getStoreOrInsertDefault(p.StoreId) + store = c.stores.getOrInsertDefault(p.StoreId) } - addr, err := store.initResolve(bo, c) + addr, err := store.initResolve(bo, c.stores) if err != nil { return nil, err } @@ -633,25 +633,11 @@ type RegionCache struct { mu regionIndexMu - storeMu struct { - sync.RWMutex - stores map[uint64]*Store - } - tiflashComputeStoreMu struct { - sync.RWMutex - needReload bool - stores []*Store - } - notifyCheckCh chan struct{} + stores storeCache // runner for background jobs bg *bgRunner - testingKnobs struct { - // Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set, - // requestLiveness always returns unreachable. - mockRequestLiveness atomic.Pointer[livenessFunc] - } clusterID uint64 } @@ -681,10 +667,7 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { c.codec = codecPDClient.GetCodec() } - c.storeMu.stores = make(map[uint64]*Store) - c.tiflashComputeStoreMu.needReload = true - c.tiflashComputeStoreMu.stores = make([]*Store, 0) - c.notifyCheckCh = make(chan struct{}, 1) + c.stores = newStoreCache(pdClient) c.bg = newBackgroundRunner(context.Background()) c.enableForwarding = config.GetGlobalConfig().EnableForwarding if c.pdClient != nil { @@ -719,7 +702,7 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { } needCheckStores = c.checkAndResolve(needCheckStores[:0], func(s *Store) bool { return filter(s.getResolveState()) }) return false - }, time.Duration(refreshStoreInterval/4)*time.Second, c.getCheckStoreEvents()) + }, time.Duration(refreshStoreInterval/4)*time.Second, c.stores.getCheckStoreEvents()) if !options.noHealthTick { c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second) } @@ -741,10 +724,6 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { // only used fot test. func newTestRegionCache() *RegionCache { c := &RegionCache{} - c.storeMu.stores = make(map[uint64]*Store) - c.tiflashComputeStoreMu.needReload = true - c.tiflashComputeStoreMu.stores = make([]*Store, 0) - c.notifyCheckCh = make(chan struct{}, 1) c.bg = newBackgroundRunner(context.Background()) c.mu = *newRegionIndexMu(nil) return c @@ -753,7 +732,7 @@ func newTestRegionCache() *RegionCache { // clear clears all cached data in the RegionCache. It's only used in tests. func (c *RegionCache) clear() { c.mu.refresh(nil) - c.clearStores() + c.stores.clear() } // thread unsafe, should use with lock @@ -778,9 +757,9 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(* } }() - needCheckStores = c.filterStores(needCheckStores, needCheck) + needCheckStores = c.stores.filter(needCheckStores, needCheck) for _, store := range needCheckStores { - _, err := store.reResolve(c) + _, err := store.reResolve(c.stores) tikverr.Log(err) } return needCheckStores @@ -788,12 +767,13 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(* // SetRegionCacheStore is used to set a store in region cache, for testing only func (c *RegionCache) SetRegionCacheStore(id uint64, addr string, peerAddr string, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) { - c.putStore(newStore(id, addr, peerAddr, "", storeType, resolveState(state), labels)) + c.stores.put(newStore(id, addr, peerAddr, "", storeType, resolveState(state), labels)) } // SetPDClient replaces pd client,for testing only func (c *RegionCache) SetPDClient(client pd.Client) { c.pdClient = client + c.stores = newStoreCache(client) } // RPCContext contains data that is needed to send RPC to a region. @@ -1057,7 +1037,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, return nil, nil } if store.getResolveState() == needCheck { - _, err := store.reResolve(c) + _, err := store.reResolve(c.stores) tikverr.Log(err) } regionStore.workTiFlashIdx.Store(int32(accessIdx)) @@ -1406,7 +1386,7 @@ func (c *RegionCache) markRegionNeedBeRefill(s *Store, storeIdx int, rs *regionS metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc() } // schedule a store addr resolve. - c.markStoreNeedCheck(s) + c.stores.markStoreNeedCheck(s) return incEpochStoreIdx } @@ -1759,14 +1739,14 @@ func (c *RegionCache) searchCachedRegionByID(regionID uint64) (*Region, bool) { // GetStoresByType gets stores by type `typ` func (c *RegionCache) GetStoresByType(typ tikvrpc.EndpointType) []*Store { - return c.filterStores(nil, func(s *Store) bool { + return c.stores.filter(nil, func(s *Store) bool { return s.getResolveState() == resolved && s.storeType == typ }) } // GetAllStores gets TiKV and TiFlash stores. func (c *RegionCache) GetAllStores() []*Store { - return c.filterStores(nil, func(s *Store) bool { + return c.stores.filter(nil, func(s *Store) bool { return s.getResolveState() == resolved && (s.storeType == tikvrpc.TiKV || s.storeType == tikvrpc.TiFlash) }) } @@ -2037,7 +2017,7 @@ func (c *RegionCache) getStoreAddr(bo *retry.Backoffer, region *Region, store *S addr = store.addr return case unresolved: - addr, err = store.initResolve(bo, c) + addr, err = store.initResolve(bo, c.stores) return case deleted: addr = c.changeToActiveStore(region, store.storeID) @@ -2094,7 +2074,7 @@ func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStor // changeToActiveStore replace the deleted store in the region by an up-to-date store in the stores map. // The order is guaranteed by reResolve() which adds the new store before marking old store deleted. func (c *RegionCache) changeToActiveStore(region *Region, storeID uint64) (addr string) { - store, _ := c.getStore(storeID) + store, _ := c.stores.get(storeID) for { oldRegionStore := region.getStore() newRegionStore := oldRegionStore.clone() @@ -2204,19 +2184,19 @@ func (c *RegionCache) PDClient() pd.Client { // GetTiFlashStores returns the information of all tiflash nodes. Like `GetAllStores`, the method only returns resolved // stores so that users won't be bothered by tombstones. (related issue: https://github.com/pingcap/tidb/issues/46602) func (c *RegionCache) GetTiFlashStores(labelFilter LabelFilter) []*Store { - return c.filterStores(nil, func(s *Store) bool { + return c.stores.filter(nil, func(s *Store) bool { return s.storeType == tikvrpc.TiFlash && labelFilter(s.labels) && s.getResolveState() == resolved }) } // GetTiFlashComputeStores returns all stores with lable . func (c *RegionCache) GetTiFlashComputeStores(bo *retry.Backoffer) (res []*Store, err error) { - stores, needReload := c.listTiflashComputeStores() + stores, needReload := c.stores.listTiflashComputeStores() if needReload { - stores, err = reloadTiFlashComputeStores(bo.GetCtx(), c) + stores, err = reloadTiFlashComputeStores(bo.GetCtx(), c.stores) if err == nil { - c.setTiflashComputeStores(stores) + c.stores.setTiflashComputeStores(stores) } return stores, err } @@ -2266,7 +2246,7 @@ func (c *RegionCache) InvalidateTiFlashComputeStoresIfGRPCError(err error) bool // InvalidateTiFlashComputeStores set needReload be true, // and will refresh tiflash_compute store cache next time. func (c *RegionCache) InvalidateTiFlashComputeStores() { - c.markTiflashComputeStoresNeedReload() + c.stores.markTiflashComputeStoresNeedReload() } // UpdateBucketsIfNeeded queries PD to update the buckets of the region in the cache if @@ -2652,7 +2632,7 @@ func (c *RegionCache) checkAndUpdateStoreHealthStatus() { }() healthDetails := make(map[uint64]HealthStatusDetail) now := time.Now() - c.forEachStore(func(store *Store) { + c.stores.forEach(func(store *Store) { store.healthStatus.tick(now) healthDetails[store.storeID] = store.healthStatus.GetHealthStatusDetail() }) @@ -2664,7 +2644,7 @@ func (c *RegionCache) checkAndUpdateStoreHealthStatus() { // reportStoreReplicaFlows reports the statistics on the related replicaFlowsType. func (c *RegionCache) reportStoreReplicaFlows() { - c.forEachStore(func(store *Store) { + c.stores.forEach(func(store *Store) { for destType := toLeader; destType < numReplicaFlowsType; destType++ { metrics.TiKVPreferLeaderFlowsGauge.WithLabelValues(destType.String(), store.addr).Set(float64(store.getReplicaFlowsStats(destType))) store.resetReplicaFlowsStats(destType) @@ -2683,7 +2663,7 @@ func contains(startKey, endKey, key []byte) bool { } func (c *RegionCache) onHealthFeedback(feedback *tikvpb.HealthFeedback) { - store, ok := c.getStore(feedback.GetStoreId()) + store, ok := c.stores.get(feedback.GetStoreId()) if !ok { logutil.BgLogger().Info("dropped health feedback info due to unknown store id", zap.Uint64("storeID", feedback.GetStoreId())) return diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 8bffee0c86..544302cae6 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -331,8 +331,8 @@ func (s *testRegionCacheSuite) TestStoreLabels() { } for _, testcase := range testcases { s.T().Log(testcase.storeID) - store := s.cache.getStoreOrInsertDefault(testcase.storeID) - _, err := store.initResolve(s.bo, s.cache) + store := s.cache.stores.getOrInsertDefault(testcase.storeID) + _, err := store.initResolve(s.bo, s.cache.stores) s.Nil(err) labels := []*metapb.StoreLabel{ { @@ -340,7 +340,7 @@ func (s *testRegionCacheSuite) TestStoreLabels() { Value: fmt.Sprintf("%v", testcase.storeID), }, } - stores := s.cache.filterStores(nil, func(s *Store) bool { return s.IsLabelsMatch(labels) }) + stores := s.cache.stores.filter(nil, func(s *Store) bool { return s.IsLabelsMatch(labels) }) s.Equal(len(stores), 1) s.Equal(stores[0].labels, labels) } @@ -372,9 +372,9 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { // Check resolving normal stores. The resolve state should be resolved. for _, storeMeta := range s.cluster.GetAllStores() { - store := cache.getStoreOrInsertDefault(storeMeta.GetId()) + store := cache.stores.getOrInsertDefault(storeMeta.GetId()) s.Equal(store.getResolveState(), unresolved) - addr, err := store.initResolve(bo, cache) + addr, err := store.initResolve(bo, cache.stores) s.Nil(err) s.Equal(addr, storeMeta.GetAddress()) s.Equal(store.getResolveState(), resolved) @@ -390,26 +390,26 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { } // Mark the store needCheck. The resolve state should be resolved soon. - store := cache.getStoreOrInsertDefault(s.store1) - cache.markStoreNeedCheck(store) + store := cache.stores.getOrInsertDefault(s.store1) + cache.stores.markStoreNeedCheck(store) waitResolve(store) s.Equal(store.getResolveState(), resolved) // Mark the store needCheck and it becomes a tombstone. The resolve state should be tombstone. s.cluster.MarkTombstone(s.store1) - cache.markStoreNeedCheck(store) + cache.stores.markStoreNeedCheck(store) waitResolve(store) s.Equal(store.getResolveState(), tombstone) s.cluster.StartStore(s.store1) // Mark the store needCheck and it's deleted from PD. The resolve state should be tombstone. cache.clear() - store = cache.getStoreOrInsertDefault(s.store1) - store.initResolve(bo, cache) + store = cache.stores.getOrInsertDefault(s.store1) + store.initResolve(bo, cache.stores) s.Equal(store.getResolveState(), resolved) storeMeta := s.cluster.GetStore(s.store1) s.cluster.RemoveStore(s.store1) - cache.markStoreNeedCheck(store) + cache.stores.markStoreNeedCheck(store) waitResolve(store) s.Equal(store.getResolveState(), tombstone) s.cluster.AddStore(storeMeta.GetId(), storeMeta.GetAddress(), storeMeta.GetLabels()...) @@ -417,14 +417,14 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { // Mark the store needCheck and its address and labels are changed. // The resolve state should be deleted and a new store is added to the cache. cache.clear() - store = cache.getStoreOrInsertDefault(s.store1) - store.initResolve(bo, cache) + store = cache.stores.getOrInsertDefault(s.store1) + store.initResolve(bo, cache.stores) s.Equal(store.getResolveState(), resolved) s.cluster.UpdateStoreAddr(s.store1, store.addr+"0", &metapb.StoreLabel{Key: "k", Value: "v"}) - cache.markStoreNeedCheck(store) + cache.stores.markStoreNeedCheck(store) waitResolve(store) s.Equal(store.getResolveState(), deleted) - newStore := cache.getStoreOrInsertDefault(s.store1) + newStore := cache.stores.getOrInsertDefault(s.store1) s.Equal(newStore.getResolveState(), resolved) s.Equal(newStore.addr, store.addr+"0") s.Equal(newStore.labels, []*metapb.StoreLabel{{Key: "k", Value: "v"}}) @@ -432,9 +432,9 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { // Check initResolve()ing a tombstone store. The resolve state should be tombstone. cache.clear() s.cluster.MarkTombstone(s.store1) - store = cache.getStoreOrInsertDefault(s.store1) + store = cache.stores.getOrInsertDefault(s.store1) for i := 0; i < 2; i++ { - addr, err := store.initResolve(bo, cache) + addr, err := store.initResolve(bo, cache.stores) s.Nil(err) s.Equal(addr, "") s.Equal(store.getResolveState(), tombstone) @@ -446,9 +446,9 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { cache.clear() storeMeta = s.cluster.GetStore(s.store1) s.cluster.RemoveStore(s.store1) - store = cache.getStoreOrInsertDefault(s.store1) + store = cache.stores.getOrInsertDefault(s.store1) for i := 0; i < 2; i++ { - addr, err := store.initResolve(bo, cache) + addr, err := store.initResolve(bo, cache.stores) s.Nil(err) s.Equal(addr, "") s.Equal(store.getResolveState(), tombstone) @@ -485,7 +485,7 @@ func (s *testRegionCacheSuite) TestNeedExpireRegionAfterTTL() { s.Run("WithStaleStores", func() { cntGetRegion = 0 s.cache.clear() - store2 := s.cache.getStoreOrInsertDefault(s.store2) + store2 := s.cache.stores.getOrInsertDefault(s.store2) for i := 0; i < 50; i++ { atomic.StoreUint32(&store2.epoch, uint32(i)) @@ -499,7 +499,7 @@ func (s *testRegionCacheSuite) TestNeedExpireRegionAfterTTL() { s.Run("WithUnreachableStores", func() { cntGetRegion = 0 s.cache.clear() - store2 := s.cache.getStoreOrInsertDefault(s.store2) + store2 := s.cache.stores.getOrInsertDefault(s.store2) atomic.StoreUint32(&store2.livenessState, uint32(unreachable)) defer atomic.StoreUint32(&store2.livenessState, uint32(reachable)) @@ -1774,7 +1774,7 @@ func (s *testRegionCacheSuite) TestBuckets() { newMeta := proto.Clone(cachedRegion.meta).(*metapb.Region) newMeta.RegionEpoch.Version++ newMeta.RegionEpoch.ConfVer++ - _, err = s.cache.OnRegionEpochNotMatch(s.bo, &RPCContext{Region: cachedRegion.VerID(), Store: s.cache.getStoreOrInsertDefault(s.store1)}, []*metapb.Region{newMeta}) + _, err = s.cache.OnRegionEpochNotMatch(s.bo, &RPCContext{Region: cachedRegion.VerID(), Store: s.cache.stores.getOrInsertDefault(s.store1)}, []*metapb.Region{newMeta}) s.Nil(err) cachedRegion = s.getRegion([]byte("a")) s.Equal(newBuckets, cachedRegion.getStore().buckets) @@ -2049,13 +2049,13 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() { // init region cache s.cache.LocateKey(s.bo, []byte("a")) - store1, _ := s.cache.getStore(s.store1) + store1, _ := s.cache.stores.get(s.store1) s.Require().NotNil(store1) s.Require().Equal(resolved, store1.getResolveState()) // setup mock liveness func store1Liveness := uint32(unreachable) - s.cache.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState { + s.cache.stores.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState { if s.storeID == store1.storeID { return livenessState(atomic.LoadUint32(&store1Liveness)) } @@ -2064,7 +2064,7 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() { // start health check loop atomic.StoreUint32(&store1.livenessState, store1Liveness) - startHealthCheckLoop(s.cache, store1, livenessState(store1Liveness), time.Second) + startHealthCheckLoop(s.cache.bg, s.cache.stores, store1, livenessState(store1Liveness), time.Second) // update store meta s.cluster.UpdateStoreAddr(store1.storeID, store1.addr+"'", store1.labels...) @@ -2075,7 +2075,7 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() { }, 3*time.Second, time.Second) // assert that the new store should be added and it's also not reachable - newStore1, _ := s.cache.getStore(store1.storeID) + newStore1, _ := s.cache.stores.get(store1.storeID) s.Require().NotEqual(reachable, newStore1.getLivenessState()) // recover store1 @@ -2156,7 +2156,7 @@ func (s *testRegionCacheSuite) TestRegionCacheHandleHealthStatus() { _, err := s.cache.LocateKey(s.bo, []byte("k")) s.Nil(err) - store1, exists := s.cache.getStore(s.store1) + store1, exists := s.cache.stores.get(s.store1) s.True(exists) s.False(store1.healthStatus.IsSlow()) @@ -2197,7 +2197,7 @@ func (s *testRegionCacheSuite) TestRegionCacheHandleHealthStatus() { s.False(store1.healthStatus.IsSlow()) s.Equal(int64(50), store1.healthStatus.GetHealthStatusDetail().TiKVSideSlowScore) - store2, exists := s.cache.getStore(s.store2) + store2, exists := s.cache.stores.get(s.store2) s.True(exists) // Store 2 is never affected by updating store 1 s.LessOrEqual(store2.healthStatus.GetHealthStatusDetail().TiKVSideSlowScore, int64(1)) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index d84711fb63..77b22c50e9 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1273,7 +1273,7 @@ func (s *baseReplicaSelector) getBaseReplicaSelector() *baseReplicaSelector { } func (s *baseReplicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState { - return accessReplica.store.requestLivenessAndStartHealthCheckLoopIfNeeded(bo, s.regionCache) + return accessReplica.store.requestLivenessAndStartHealthCheckLoopIfNeeded(bo, s.regionCache.bg, s.regionCache.stores) } func (s *baseReplicaSelector) invalidateReplicaStore(replica *replica, cause error) { @@ -1287,7 +1287,7 @@ func (s *baseReplicaSelector) invalidateReplicaStore(replica *replica, cause err ) metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc() // schedule a store addr resolve. - s.regionCache.markStoreNeedCheck(store) + s.regionCache.stores.markStoreNeedCheck(store) store.healthStatus.markAlreadySlow() } } @@ -2329,7 +2329,7 @@ func (s *RegionRequestSender) onRegionError( zap.Stringer("storeNotMatch", storeNotMatch), zap.Stringer("ctx", ctx), ) - s.regionCache.markStoreNeedCheck(ctx.Store) + s.regionCache.stores.markStoreNeedCheck(ctx.Store) s.regionCache.InvalidateCachedRegion(ctx.Region) // It's possible the address of store is not changed but the DNS resolves to a different address in k8s environment, // so we always reconnect in this case. diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index b28fc34d05..32ed218d7b 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -110,7 +110,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit() { s.NotNil(region) oldStoreLimit := kv.StoreLimit.Load() kv.StoreLimit.Store(500) - s.cache.getStoreOrInsertDefault(s.storeIDs[0]).tokenCount.Store(500) + s.cache.stores.getOrInsertDefault(s.storeIDs[0]).tokenCount.Store(500) // cause there is only one region in this cluster, regionID maps this leader. resp, _, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second) s.NotNil(err) @@ -256,7 +256,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() { return innerClient.SendRequest(ctx, addr, req, timeout) }} var storeState = uint32(unreachable) - sender.regionCache.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState { + sender.regionCache.stores.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState { if s.addr == leaderAddr { return livenessState(atomic.LoadUint32(&storeState)) } @@ -536,7 +536,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) - unreachable.injectConstantLiveness(cache) + unreachable.injectConstantLiveness(cache.stores) s.IsType(&accessKnownLeader{}, replicaSelector.state) _, err = replicaSelector.next(s.bo, req) s.Nil(err) @@ -572,7 +572,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { // Do not try to use proxy if livenessState is unknown instead of unreachable. refreshEpochs(regionStore) cache.enableForwarding = true - unknown.injectConstantLiveness(cache) + unknown.injectConstantLiveness(cache.stores) replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) @@ -594,7 +594,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) - unreachable.injectConstantLiveness(cache) + unreachable.injectConstantLiveness(cache.stores) s.Eventually(func() bool { return regionStore.stores[regionStore.workTiKVIdx].getLivenessState() == unreachable }, 3*time.Second, 200*time.Millisecond) @@ -878,7 +878,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { } return nil } - reachable.injectConstantLiveness(s.cache) + reachable.injectConstantLiveness(s.cache.stores) s.Eventually(func() bool { stores := getReplicaSelectorRegionStores() return stores[0].getLivenessState() == reachable && @@ -1004,7 +1004,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { } // Runs out of all replicas and then returns a send error. - unreachable.injectConstantLiveness(s.cache) + unreachable.injectConstantLiveness(s.cache.stores) reloadRegion() for _, store := range s.storeIDs { s.cluster.StopStore(store) @@ -1292,7 +1292,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { } // Test for write request. - reachable.injectConstantLiveness(s.cache) + reachable.injectConstantLiveness(s.cache.stores) resetStats() req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{}, kvrpcpb.Context{}) req.ReplicaReadType = kv.ReplicaReadLeader diff --git a/internal/locate/region_request_state_test.go b/internal/locate/region_request_state_test.go index ac6dd20011..022fb13a62 100644 --- a/internal/locate/region_request_state_test.go +++ b/internal/locate/region_request_state_test.go @@ -85,7 +85,7 @@ func (s *testRegionCacheStaleReadSuite) SetupTest() { } func (s *testRegionCacheStaleReadSuite) TearDownTest() { - s.cache.setMockRequestLiveness(nil) + s.cache.stores.setMockRequestLiveness(nil) s.cache.Close() s.mvccStore.Close() } @@ -223,7 +223,7 @@ func (s *testRegionCacheStaleReadSuite) setClient() { return }} - s.cache.setMockRequestLiveness(func(ctx context.Context, store *Store) livenessState { + s.cache.stores.setMockRequestLiveness(func(ctx context.Context, store *Store) livenessState { _, ok := s.injection.unavailableStoreIDs[store.storeID] if ok { return unreachable diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 1ab228ca0a..a86de2a121 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -765,7 +765,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { }() req := tikvrpc.NewRequest(tikvrpc.CmdCop, &coprocessor.Request{Data: []byte("a"), StartTs: 1}) regionRequestSender := NewRegionRequestSender(s.cache, fnClient) - reachable.injectConstantLiveness(regionRequestSender.regionCache) + reachable.injectConstantLiveness(regionRequestSender.regionCache.stores) regionRequestSender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) } }() diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index 97b7e52bcc..bd73bd8be1 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -2589,7 +2589,7 @@ func TestReplicaReadAvoidSlowStore(t *testing.T) { defer s.TearDownTest() s.changeRegionLeader(3) - store, exists := s.cache.getStore(1) + store, exists := s.cache.stores.get(1) s.True(exists) for _, staleRead := range []bool{false, true} { @@ -3056,7 +3056,7 @@ func (ca *replicaSelectorAccessPathCase) genAccessErr(regionCache *RegionCache, } if err != nil { // inject unreachable liveness. - unreachable.injectConstantLiveness(regionCache) + unreachable.injectConstantLiveness(regionCache.stores) } return regionErr, err } @@ -3094,7 +3094,7 @@ func (c *replicaSelectorAccessPathCase) Format() string { func (s *testReplicaSelectorSuite) resetStoreState() { // reset slow score, since serverIsBusyErr will mark the store is slow, and affect remaining test cases. - reachable.injectConstantLiveness(s.cache) // inject reachable liveness. + reachable.injectConstantLiveness(s.cache.stores) // inject reachable liveness. rc := s.getRegion() s.NotNil(rc) for _, store := range rc.getStore().stores { diff --git a/internal/locate/store_cache.go b/internal/locate/store_cache.go index 94ea5bfedb..c8af9e0e86 100644 --- a/internal/locate/store_cache.go +++ b/internal/locate/store_cache.go @@ -32,6 +32,7 @@ import ( "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" + pd "github.com/tikv/pd/client" "go.uber.org/zap" "golang.org/x/sync/singleflight" "google.golang.org/grpc" @@ -47,7 +48,59 @@ type testingKnobs interface { setMockRequestLiveness(f livenessFunc) } -func (c *RegionCache) getMockRequestLiveness() livenessFunc { +type storeRegistry interface { + fetchStore(ctx context.Context, id uint64) (*metapb.Store, error) + fetchAllStores(ctx context.Context) ([]*metapb.Store, error) +} + +type storeCache interface { + testingKnobs + storeRegistry + get(id uint64) (store *Store, exists bool) + getOrInsertDefault(id uint64) *Store + put(store *Store) + clear() + forEach(f func(*Store)) + filter(dst []*Store, predicate func(*Store) bool) []*Store + listTiflashComputeStores() (stores []*Store, needReload bool) + setTiflashComputeStores(stores []*Store) + markTiflashComputeStoresNeedReload() + markStoreNeedCheck(store *Store) + getCheckStoreEvents() <-chan struct{} +} + +func newStoreCache(pdClient pd.Client) *storeCacheImpl { + c := &storeCacheImpl{pdClient: pdClient} + c.notifyCheckCh = make(chan struct{}, 1) + c.storeMu.stores = make(map[uint64]*Store) + c.tiflashComputeStoreMu.needReload = true + c.tiflashComputeStoreMu.stores = make([]*Store, 0) + return c +} + +type storeCacheImpl struct { + pdClient pd.Client + + testingKnobs struct { + // Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set, + // requestLiveness always returns unreachable. + mockRequestLiveness atomic.Pointer[livenessFunc] + } + + notifyCheckCh chan struct{} + storeMu struct { + sync.RWMutex + stores map[uint64]*Store + } + + tiflashComputeStoreMu struct { + sync.RWMutex + needReload bool + stores []*Store + } +} + +func (c *storeCacheImpl) getMockRequestLiveness() livenessFunc { f := c.testingKnobs.mockRequestLiveness.Load() if f == nil { return nil @@ -55,46 +108,26 @@ func (c *RegionCache) getMockRequestLiveness() livenessFunc { return *f } -func (c *RegionCache) setMockRequestLiveness(f livenessFunc) { +func (c *storeCacheImpl) setMockRequestLiveness(f livenessFunc) { c.testingKnobs.mockRequestLiveness.Store(&f) } -type storeRegistry interface { - fetchStore(ctx context.Context, id uint64) (*metapb.Store, error) - fetchAllStores(ctx context.Context) ([]*metapb.Store, error) -} - -func (c *RegionCache) fetchStore(ctx context.Context, id uint64) (*metapb.Store, error) { +func (c *storeCacheImpl) fetchStore(ctx context.Context, id uint64) (*metapb.Store, error) { return c.pdClient.GetStore(ctx, id) } -func (c *RegionCache) fetchAllStores(ctx context.Context) ([]*metapb.Store, error) { +func (c *storeCacheImpl) fetchAllStores(ctx context.Context) ([]*metapb.Store, error) { return c.pdClient.GetAllStores(ctx) } -type storeCache interface { - storeRegistry - getStore(id uint64) (store *Store, exists bool) - getStoreOrInsertDefault(id uint64) *Store - putStore(store *Store) - clearStores() - forEachStore(f func(*Store)) - filterStores(dst []*Store, predicate func(*Store) bool) []*Store - listTiflashComputeStores() (stores []*Store, needReload bool) - setTiflashComputeStores(stores []*Store) - markTiflashComputeStoresNeedReload() - markStoreNeedCheck(store *Store) - getCheckStoreEvents() <-chan struct{} -} - -func (c *RegionCache) getStore(id uint64) (store *Store, exists bool) { +func (c *storeCacheImpl) get(id uint64) (store *Store, exists bool) { c.storeMu.RLock() store, exists = c.storeMu.stores[id] c.storeMu.RUnlock() return } -func (c *RegionCache) getStoreOrInsertDefault(id uint64) *Store { +func (c *storeCacheImpl) getOrInsertDefault(id uint64) *Store { c.storeMu.Lock() store, exists := c.storeMu.stores[id] if !exists { @@ -105,19 +138,19 @@ func (c *RegionCache) getStoreOrInsertDefault(id uint64) *Store { return store } -func (c *RegionCache) putStore(store *Store) { +func (c *storeCacheImpl) put(store *Store) { c.storeMu.Lock() c.storeMu.stores[store.storeID] = store c.storeMu.Unlock() } -func (c *RegionCache) clearStores() { +func (c *storeCacheImpl) clear() { c.storeMu.Lock() c.storeMu.stores = make(map[uint64]*Store) c.storeMu.Unlock() } -func (c *RegionCache) forEachStore(f func(*Store)) { +func (c *storeCacheImpl) forEach(f func(*Store)) { c.storeMu.RLock() defer c.storeMu.RUnlock() for _, s := range c.storeMu.stores { @@ -125,7 +158,7 @@ func (c *RegionCache) forEachStore(f func(*Store)) { } } -func (c *RegionCache) filterStores(dst []*Store, predicate func(*Store) bool) []*Store { +func (c *storeCacheImpl) filter(dst []*Store, predicate func(*Store) bool) []*Store { c.storeMu.RLock() for _, store := range c.storeMu.stores { if predicate == nil || predicate(store) { @@ -136,7 +169,7 @@ func (c *RegionCache) filterStores(dst []*Store, predicate func(*Store) bool) [] return dst } -func (c *RegionCache) listTiflashComputeStores() (stores []*Store, needReload bool) { +func (c *storeCacheImpl) listTiflashComputeStores() (stores []*Store, needReload bool) { c.tiflashComputeStoreMu.RLock() needReload = c.tiflashComputeStoreMu.needReload stores = c.tiflashComputeStoreMu.stores @@ -144,20 +177,20 @@ func (c *RegionCache) listTiflashComputeStores() (stores []*Store, needReload bo return } -func (c *RegionCache) setTiflashComputeStores(stores []*Store) { +func (c *storeCacheImpl) setTiflashComputeStores(stores []*Store) { c.tiflashComputeStoreMu.Lock() c.tiflashComputeStoreMu.stores = stores c.tiflashComputeStoreMu.needReload = false c.tiflashComputeStoreMu.Unlock() } -func (c *RegionCache) markTiflashComputeStoresNeedReload() { +func (c *storeCacheImpl) markTiflashComputeStoresNeedReload() { c.tiflashComputeStoreMu.Lock() c.tiflashComputeStoreMu.needReload = true c.tiflashComputeStoreMu.Unlock() } -func (c *RegionCache) markStoreNeedCheck(store *Store) { +func (c *storeCacheImpl) markStoreNeedCheck(store *Store) { if store.changeResolveStateTo(resolved, needCheck) { select { case c.notifyCheckCh <- struct{}{}: @@ -166,7 +199,7 @@ func (c *RegionCache) markStoreNeedCheck(store *Store) { } } -func (c *RegionCache) getCheckStoreEvents() <-chan struct{} { +func (c *storeCacheImpl) getCheckStoreEvents() <-chan struct{} { return c.notifyCheckCh } @@ -435,7 +468,7 @@ func (s *Store) reResolve(c storeCache) (bool, error) { if s.addr == addr { newStore.healthStatus = s.healthStatus } - c.putStore(newStore) + c.put(newStore) s.setResolveState(deleted) return false, nil } @@ -536,7 +569,7 @@ func (s *Store) getLivenessState() livenessState { return livenessState(atomic.LoadUint32(&s.livenessState)) } -func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoffer, c *RegionCache) (liveness livenessState) { +func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoffer, scheduler *bgRunner, c storeCache) (liveness livenessState) { liveness = requestLiveness(bo.GetCtx(), s, c) if liveness == reachable { return @@ -560,15 +593,15 @@ func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoff if _, err := util.EvalFailpoint("skipStoreCheckUntilHealth"); err == nil { return } - startHealthCheckLoop(c, s, liveness, reResolveInterval) + startHealthCheckLoop(scheduler, c, s, liveness, reResolveInterval) } return } -func startHealthCheckLoop(c *RegionCache, s *Store, liveness livenessState, reResolveInterval time.Duration) { +func startHealthCheckLoop(scheduler *bgRunner, c storeCache, s *Store, liveness livenessState, reResolveInterval time.Duration) { lastCheckPDTime := time.Now() - c.bg.schedule(func(ctx context.Context, t time.Time) bool { + scheduler.schedule(func(ctx context.Context, t time.Time) bool { if t.Sub(lastCheckPDTime) > reResolveInterval { lastCheckPDTime = t @@ -584,7 +617,7 @@ func startHealthCheckLoop(c *RegionCache, s *Store, liveness livenessState, reRe return true } // if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve). - newStore, _ := c.getStore(s.storeID) + newStore, _ := c.get(s.storeID) logutil.BgLogger().Info("[health check] store meta changed", zap.Uint64("storeID", s.storeID), zap.String("oldAddr", s.addr),