Skip to content

Commit

Permalink
chunk: enhance disk cache fault tolerance (#4848)
Browse files Browse the repository at this point in the history
Signed-off-by: jiefenghuang <[email protected]>
  • Loading branch information
jiefenghuang authored May 15, 2024
1 parent 0897335 commit 26c8755
Show file tree
Hide file tree
Showing 4 changed files with 480 additions and 37 deletions.
80 changes: 45 additions & 35 deletions pkg/chunk/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"regexp"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -48,11 +47,9 @@ import (
var (
stagingDir = "rawstaging"
cacheDir = "raw"
maxIODur = time.Second * 20
maxIOErrors = 10
maxIODur = time.Second * 30
stagingBlocks atomic.Int64
errNotCached = errors.New("not cached")
errCacheDown = errors.New("cache down")
errStageFull = errors.New("space not enough on device")
errStageConcurrency = errors.New("concurrent staging limit reached")
)
Expand Down Expand Up @@ -100,9 +97,11 @@ type cacheStore struct {
checksum string // checksum level
uploader func(key, path string, force bool) bool

ioErrCnt uint32
opTs map[time.Duration]func() error
opMu sync.Mutex
opTs map[time.Duration]func() error
opMu sync.Mutex

state dcState
stateLock sync.Mutex
}

func newCacheStore(m *cacheManagerMetrics, dir string, cacheSize int64, pendingPages int, config *Config, uploader func(key, path string, force bool) bool) *cacheStore {
Expand Down Expand Up @@ -130,6 +129,9 @@ func newCacheStore(m *cacheManagerMetrics, dir string, cacheSize int64, pendingP
uploader: uploader,
opTs: make(map[time.Duration]func() error),
}
c.stateLock = sync.Mutex{}
c.state = newDCState(dcNormal, c)

c.createDir(c.dir)
br, fr := c.curFreeRatio()
if br < c.freeRatio || fr < c.freeRatio {
Expand Down Expand Up @@ -197,11 +199,7 @@ func (cache *cacheStore) checkLockFile() {
}

func (c *cacheStore) available() bool {
return atomic.LoadUint32(&c.ioErrCnt) < uint32(maxIOErrors)
}

func (c *cacheStore) tryShutdown() {
atomic.AddUint32(&c.ioErrCnt, 1)
return c.state.state() != dcDown
}

func (cache *cacheStore) checkErr(f func() error) error {
Expand All @@ -221,8 +219,10 @@ func (cache *cacheStore) checkErr(f func() error) error {
if err != nil {
if errors.Is(err, syscall.EIO) || errors.Is(err, utils.ErrFuncTimeout) {
logger.Errorf("cache store is unavailable: %s", err)
cache.tryShutdown()
cache.state.onIOErr()
}
} else {
cache.state.onIOSucc()
}
return err
}
Expand All @@ -232,20 +232,14 @@ func getFunctionName(f interface{}) string {
}

func (c *cacheStore) checkTimeout() {
var lastReset = time.Now()
for c.available() {
if time.Since(lastReset) > time.Minute*10 {
atomic.StoreUint32(&c.ioErrCnt, 0)
lastReset = time.Now()
}

now := utils.Clock()
cutOff := now - maxIODur
c.opMu.Lock()
for ts := range c.opTs {
if ts < cutOff {
logger.Errorf("IO operation %s on %s is timeout after %s, ", getFunctionName(c.opTs[ts]), c.dir, now-ts)
c.tryShutdown()
c.state.onIOErr()
delete(c.opTs, ts)
}
}
Expand Down Expand Up @@ -374,6 +368,11 @@ func (cache *cacheStore) refreshCacheKeys() {
}

func (cache *cacheStore) removeStage(key string) error {
cache.state.beforeCacheOp()
defer cache.state.afterCacheOp()
if err := cache.state.checkCacheOp(); err != nil {
return err
}
var err error
if err = cache.removeFile(cache.stagePath(key)); err == nil {
cache.m.stageBlocks.Sub(1)
Expand All @@ -387,7 +386,12 @@ func (cache *cacheStore) removeStage(key string) error {
}

func (cache *cacheStore) cache(key string, p *Page, force bool) {
if cache.capacity == 0 || !cache.available() {
cache.state.beforeCacheOp()
defer cache.state.afterCacheOp()
if cache.state.checkCacheOp() != nil {
return
}
if cache.capacity == 0 {
return
}
if cache.rawFull && cache.eviction == "none" {
Expand Down Expand Up @@ -432,7 +436,7 @@ func (cache *cacheStore) curFreeRatio() (float32, float32) {

func (cache *cacheStore) flushPage(path string, data []byte) (err error) {
if !cache.available() {
return errors.New("store not available")
return errCacheDown
}

start := time.Now()
Expand Down Expand Up @@ -543,6 +547,12 @@ func (cache *cacheStore) getPathFromKey(k cacheKey) string {
}

func (cache *cacheStore) remove(key string) {
cache.state.beforeCacheOp()
defer cache.state.afterCacheOp()
if cache.state.checkCacheOp() != nil {
return
}

cache.Lock()
delete(cache.pages, key)
path := cache.cachePath(key)
Expand All @@ -567,6 +577,12 @@ func (cache *cacheStore) remove(key string) {
}

func (cache *cacheStore) load(key string) (ReadCloser, error) {
cache.state.beforeCacheOp()
defer cache.state.afterCacheOp()
if err := cache.state.checkCacheOp(); err != nil {
return nil, err
}

cache.Lock()
defer cache.Unlock()
if p, ok := cache.pages[key]; ok {
Expand Down Expand Up @@ -654,6 +670,12 @@ func (cache *cacheStore) add(key string, size int32, atime uint32) {

func (cache *cacheStore) stage(key string, data []byte, keepCache bool) (string, error) {
stagingPath := cache.stagePath(key)

cache.state.beforeCacheOp()
defer cache.state.afterCacheOp()
if err := cache.state.checkCacheOp(); err != nil {
return stagingPath, err
}
if cache.stageFull {
return stagingPath, errStageFull
}
Expand Down Expand Up @@ -1031,18 +1053,6 @@ func newCacheManager(config *Config, reg prometheus.Registerer, uploader func(ke
return m
}

func getEnvs() {
if os.Getenv("JFS_MAX_IO_ERRORS") != "" {
maxIOErrors, _ = strconv.Atoi(os.Getenv("JFS_MAX_IO_ERRORS"))
logger.Infof("set maxIOErrors to %d", maxIOErrors)
}
if os.Getenv("JFS_MAX_IO_DURATION") != "" {
dur, _ := strconv.Atoi(os.Getenv("JFS_MAX_IO_DURATION"))
maxIODur = time.Duration(dur) * time.Second
logger.Infof("set maxIODur to %d", maxIODur)
}
}

func (m *cacheManager) getMetrics() *cacheManagerMetrics {
return m.metrics
}
Expand Down Expand Up @@ -1160,7 +1170,7 @@ func (m *cacheManager) load(key string) (ReadCloser, error) {
r, err := store.load(key)
if err == errNotCached {
legacy := m.getStoreLegacy(key)
if legacy != store && legacy != nil && legacy.available() {
if legacy != store && legacy != nil {
r, err = legacy.load(key)
}
}
Expand Down
Loading

0 comments on commit 26c8755

Please sign in to comment.