Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do not update atime when region has down peers #1118

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ jobs:
uses: golangci/golangci-lint-action@v3
with:
version: v1.51.2

install-mode: "goinstall"
134 changes: 52 additions & 82 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"fmt"
"math"
"math/rand"
"slices"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -148,14 +149,14 @@ const (

// Region presents kv region
type Region struct {
meta *metapb.Region // raw region meta from PD, immutable after init
store unsafe.Pointer // point to region store info, see RegionStore
syncFlag int32 // region need be sync in next turn
lastAccess int64 // last region access time, see checkRegionCacheTTL
invalidReason InvalidReason // the reason why the region is invalidated
asyncReload atomic.Bool // the region need to be reloaded in async mode
lastLoad int64 // last region load time
hasUnavailableTiFlashStore bool // has unavailable TiFlash store, if yes, need to trigger async reload periodically
meta *metapb.Region // raw region meta from PD, immutable after init
store unsafe.Pointer // point to region store info, see RegionStore
lastLoad int64 // last region load time
lastAccess int64 // last region access time, see checkRegionCacheTTL
syncFlag int32 // region need be sync in next turn
invalidReason InvalidReason // the reason why the region is invalidated
asyncReload atomic.Bool // the region need to be reloaded in async mode
hasDownPeers bool // the region has down peers, if true, lastAccess won't be updated so that the cached region will expire after RegionCacheTTL.
}

// AccessIndex represent the index for accessIndex array
Expand All @@ -180,9 +181,10 @@ type regionStore struct {
// buckets is not accurate and it can change even if the region is not changed.
// It can be stale and buckets keys can be out of the region range.
buckets *metapb.Buckets
// record all storeIDs on which pending peers reside.
// key is storeID, val is peerID.
pendingTiFlashPeerStores map[uint64]uint64
// pendingPeers refers to pdRegion.PendingPeers. It's immutable and can be used to reconstruct pdRegions.
pendingPeers []*metapb.Peer
// downPeers refers to pdRegion.DownPeers. It's immutable and can be used to reconstruct pdRegions.
downPeers []*metapb.Peer
}

func (r *regionStore) accessStore(mode accessMode, idx AccessIndex) (int, *Store) {
Expand Down Expand Up @@ -275,12 +277,13 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio
// regionStore pull used store from global store map
// to avoid acquire storeMu in later access.
rs := &regionStore{
workTiKVIdx: 0,
proxyTiKVIdx: -1,
stores: make([]*Store, 0, len(r.meta.Peers)),
pendingTiFlashPeerStores: map[uint64]uint64{},
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
buckets: pdRegion.Buckets,
workTiKVIdx: 0,
proxyTiKVIdx: -1,
stores: make([]*Store, 0, len(r.meta.Peers)),
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
buckets: pdRegion.Buckets,
pendingPeers: pdRegion.PendingPeers,
downPeers: pdRegion.DownPeers,
}

leader := pdRegion.Leader
Expand All @@ -297,8 +300,8 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio
if err != nil {
return nil, err
}
// Filter the peer on a tombstone store.
if addr == "" {
// Filter out the peer on a tombstone|down store.
if addr == "" || slices.ContainsFunc(pdRegion.DownPeers, func(dp *metapb.Peer) bool { return isSamePeer(dp, p) }) {
continue
}

Expand All @@ -321,44 +324,17 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio
}
rs.stores = append(rs.stores, store)
rs.storeEpochs = append(rs.storeEpochs, atomic.LoadUint32(&store.epoch))
for _, pendingPeer := range pdRegion.PendingPeers {
if pendingPeer.Id == p.Id {
rs.pendingTiFlashPeerStores[store.storeID] = p.Id
}
}
}
// TODO(youjiali1995): It's possible the region info in PD is stale for now but it can recover.
// Maybe we need backoff here.
if len(availablePeers) == 0 {
return nil, errors.Errorf("no available peers, region: {%v}", r.meta)
}

for _, p := range pdRegion.DownPeers {
c.storeMu.RLock()
store, exists := c.storeMu.stores[p.StoreId]
c.storeMu.RUnlock()
if !exists {
store = c.getStoreByStoreID(p.StoreId)
}
addr, err := store.initResolve(bo, c)
if err != nil {
continue
}
// Filter the peer on a tombstone store.
if addr == "" {
continue
}

if store.storeType == tikvrpc.TiFlash {
r.hasUnavailableTiFlashStore = true
break
}
}

rs.workTiKVIdx = leaderAccessIdx
r.meta.Peers = availablePeers

r.setStore(rs)
r.meta.Peers = availablePeers
r.hasDownPeers = len(pdRegion.DownPeers) > 0

// mark region has been init accessed.
r.lastAccess = time.Now().Unix()
Expand Down Expand Up @@ -395,7 +371,7 @@ func (r *Region) checkRegionCacheTTL(ts int64) bool {
if ts-lastAccess > regionCacheTTLSec {
return false
}
if atomic.CompareAndSwapInt64(&r.lastAccess, lastAccess, ts) {
if r.hasDownPeers || atomic.CompareAndSwapInt64(&r.lastAccess, lastAccess, ts) {
return true
}
}
Expand Down Expand Up @@ -494,6 +470,7 @@ type RegionCache struct {
// Context for background jobs
ctx context.Context
cancelFunc context.CancelFunc
wg sync.WaitGroup

testingKnobs struct {
// Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set,
Expand Down Expand Up @@ -535,16 +512,21 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
c.mu = *newRegionIndexMu(nil)
}

// TODO(zyguan): refine management of background cron jobs
c.wg.Add(1)
go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second)
c.enableForwarding = config.GetGlobalConfig().EnableForwarding
// Default use 15s as the update inerval.
c.wg.Add(1)
go c.asyncUpdateStoreSlowScore(time.Duration(interval/4) * time.Second)
if config.GetGlobalConfig().RegionsRefreshInterval > 0 {
c.timelyRefreshCache(config.GetGlobalConfig().RegionsRefreshInterval)
} else {
// cacheGC is not compatible with timelyRefreshCache
c.wg.Add(1)
go c.cacheGC()
}
c.wg.Add(1)
go c.asyncReportStoreReplicaFlows(time.Duration(interval/2) * time.Second)
return c
}
Expand Down Expand Up @@ -577,6 +559,7 @@ func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldReg
// Close releases region cache's resource.
func (c *RegionCache) Close() {
c.cancelFunc()
c.wg.Wait()
}

var reloadRegionInterval = int64(10 * time.Second)
Expand All @@ -586,6 +569,7 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
reloadRegionTicker := time.NewTicker(time.Duration(atomic.LoadInt64(&reloadRegionInterval)))
defer func() {
c.wg.Done()
ticker.Stop()
reloadRegionTicker.Stop()
}()
Expand Down Expand Up @@ -890,7 +874,7 @@ func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Sto
allStores = append(allStores, store.storeID)
}
for _, storeID := range allStores {
if _, ok := regionStore.pendingTiFlashPeerStores[storeID]; !ok {
if !slices.ContainsFunc(regionStore.pendingPeers, func(p *metapb.Peer) bool { return p.StoreId == storeID }) {
nonPendingStores = append(nonPendingStores, storeID)
}
}
Expand All @@ -907,11 +891,6 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID,
return nil, nil
}

if cachedRegion.hasUnavailableTiFlashStore && time.Now().Unix()-cachedRegion.lastLoad > regionCacheTTLSec {
/// schedule an async reload to avoid load balance issue, refer https://github.com/pingcap/tidb/issues/35418 for details
c.scheduleReloadRegion(cachedRegion)
}

regionStore := cachedRegion.getStore()

// sIdx is for load balance of TiFlash store.
Expand Down Expand Up @@ -1618,7 +1597,7 @@ func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region {
if ts-lastAccess > regionCacheTTLSec {
return nil
}
if latestRegion != nil {
if !latestRegion.hasDownPeers {
atomic.CompareAndSwapInt64(&latestRegion.lastAccess, atomic.LoadInt64(&latestRegion.lastAccess), ts)
}
return latestRegion
Expand Down Expand Up @@ -1662,26 +1641,6 @@ func (c *RegionCache) GetAllStores() []*Store {
return append(stores, tiflashStores...)
}

func filterUnavailablePeers(region *pd.Region) {
if len(region.DownPeers) == 0 {
return
}
new := region.Meta.Peers[:0]
for _, p := range region.Meta.Peers {
available := true
for _, downPeer := range region.DownPeers {
if p.Id == downPeer.Id && p.StoreId == downPeer.StoreId {
available = false
break
}
}
if available {
new = append(new, p)
}
}
region.Meta.Peers = new
}

// loadRegion loads region from pd client, and picks the first peer as leader.
// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful
// when processing in reverse order.
Expand Down Expand Up @@ -1729,7 +1688,6 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool,
backoffErr = errors.Errorf("region not found for key %q, encode_key: %q", util.HexRegionKeyStr(key), util.HexRegionKey(c.codec.EncodeRegionKey(key)))
continue
}
filterUnavailablePeers(reg)
if len(reg.Meta.Peers) == 0 {
return nil, errors.New("receive Region with no available peer")
}
Expand Down Expand Up @@ -1775,7 +1733,6 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg
if reg == nil || reg.Meta == nil {
return nil, errors.Errorf("region not found for regionID %d", regionID)
}
filterUnavailablePeers(reg)
if len(reg.Meta.Peers) == 0 {
return nil, errors.New("receive Region with no available peer")
}
Expand Down Expand Up @@ -1805,8 +1762,12 @@ func (c *RegionCache) timelyRefreshCache(intervalS uint64) {
return
}
ticker := time.NewTicker(time.Duration(intervalS) * time.Second)
c.wg.Add(1)
go func() {
defer ticker.Stop()
defer func() {
c.wg.Done()
ticker.Stop()
}()
for {
select {
case <-c.ctx.Done():
Expand Down Expand Up @@ -2238,7 +2199,10 @@ const cleanRegionNumPerRound = 50
// negligible.
func (c *RegionCache) cacheGC() {
ticker := time.NewTicker(cleanCacheInterval)
defer ticker.Stop()
defer func() {
c.wg.Done()
ticker.Stop()
}()

beginning := newBtreeSearchItem([]byte(""))
iterItem := beginning
Expand Down Expand Up @@ -3060,7 +3024,10 @@ func (s *Store) markAlreadySlow() {
// asyncUpdateStoreSlowScore updates the slow score of each store periodically.
func (c *RegionCache) asyncUpdateStoreSlowScore(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
defer func() {
c.wg.Done()
ticker.Stop()
}()
for {
select {
case <-c.ctx.Done():
Expand Down Expand Up @@ -3112,7 +3079,10 @@ func (s *Store) recordReplicaFlowsStats(destType replicaFlowsType) {
// asyncReportStoreReplicaFlows reports the statistics on the related replicaFlowsType.
func (c *RegionCache) asyncReportStoreReplicaFlows(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
defer func() {
c.wg.Done()
ticker.Stop()
}()
for {
select {
case <-c.ctx.Done():
Expand Down
Loading
Loading