Skip to content

Commit

Permalink
Feat: cmd-warmup add option "check" and "evict" (#4370)
Browse files Browse the repository at this point in the history
Signed-off-by: jiefeng <[email protected]>
  • Loading branch information
jiefenghuang authored Jan 23, 2024
1 parent 2bce542 commit 5e4d424
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 64 deletions.
86 changes: 71 additions & 15 deletions cmd/warmup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cmd
import (
"bufio"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"os"
Expand All @@ -28,8 +29,10 @@ import (
"syscall"
"time"

"github.com/dustin/go-humanize"
"github.com/juicedata/juicefs/pkg/meta"
"github.com/juicedata/juicefs/pkg/utils"
"github.com/juicedata/juicefs/pkg/vfs"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -71,6 +74,14 @@ $ juicefs warmup -f /tmp/filelist`,
Aliases: []string{"b"},
Usage: "run in background",
},
&cli.BoolFlag{
Name: "evict",
Usage: "evict cached blocks",
},
&cli.BoolFlag{
Name: "check",
Usage: "check whether the data blocks are cached or not",
},
},
}
}
Expand Down Expand Up @@ -128,35 +139,59 @@ END:
}

// send fill-cache command to controller file
func sendCommand(cf *os.File, batch []string, threads uint, background bool, dspin *utils.DoubleSpinner) {
func sendCommand(cf *os.File, action vfs.CacheAction, batch []string, threads uint, background bool, dspin *utils.DoubleSpinner) vfs.CacheResponse {
paths := strings.Join(batch, "\n")
var back uint8
if background {
back = 1
}
wb := utils.NewBuffer(8 + 4 + 3 + uint32(len(paths)))
headerLen, bodyLen := uint32(8), uint32(4+len(paths)+2+1+1)
wb := utils.NewBuffer(headerLen + bodyLen)
wb.Put32(meta.FillCache)
wb.Put32(4 + 3 + uint32(len(paths)))
wb.Put32(bodyLen)

wb.Put32(uint32(len(paths)))
wb.Put([]byte(paths))
wb.Put16(uint16(threads))
wb.Put8(back)
wb.Put8(uint8(action))

if _, err := cf.Write(wb.Bytes()); err != nil {
logger.Fatalf("Write message: %s", err)
}

var resp vfs.CacheResponse
if background {
logger.Infof("Warm-up cache for %d paths in background", len(batch))
return
logger.Infof("%s for %d paths in background", action, len(batch))
return resp
}
if _, errno := readProgress(cf, func(count, bytes uint64) {
dspin.SetCurrent(int64(count), int64(bytes))
}); errno != 0 {
logger.Fatalf("Warm up failed: %s", errno)

lastCnt, lastBytes := dspin.Current()
data, errno := readProgress(cf, func(fileCount, totalBytes uint64) {
dspin.SetCurrent(lastCnt+int64(fileCount), lastBytes+int64(totalBytes))
})

if errno != 0 {
logger.Fatalf("%s failed: %s", action, errno)
}

err := json.Unmarshal(data, &resp)
if err != nil {
logger.Fatalf("unmarshal error: %s", err)
}

return resp
}

func warmup(ctx *cli.Context) error {
setup(ctx, 0)

evict := ctx.Bool("evict")
check := ctx.Bool("check")
if evict && check {
logger.Fatalf("--check and --evict can't be used together")
}

var paths []string
for _, p := range ctx.Args().Slice() {
if abs, err := filepath.Abs(p); err == nil {
Expand Down Expand Up @@ -186,7 +221,7 @@ func warmup(ctx *cli.Context) error {
}
}
if len(paths) == 0 {
logger.Infof("Nothing to warm up")
logger.Infof("no path")
return nil
}

Expand Down Expand Up @@ -214,11 +249,20 @@ func warmup(ctx *cli.Context) error {
logger.Warnf("threads should be larger than 0, reset it to 1")
threads = 1
}

action := vfs.WarmupCache
if evict {
action = vfs.EvictCache
} else if check {
action = vfs.CheckCache
}

background := ctx.Bool("background")
start := len(mp)
batch := make([]string, 0, batchMax)
progress := utils.NewProgress(background)
dspin := progress.AddDoubleSpinner("Warming up")
dspin := progress.AddDoubleSpinner(action.String())
total := &vfs.CacheResponse{}
for _, path := range paths {
if mp == "/" {
inode, err := utils.GetFileInode(path)
Expand All @@ -234,18 +278,30 @@ func warmup(ctx *cli.Context) error {
continue
}
if len(batch) >= batchMax {
sendCommand(controller, batch, threads, background, dspin)
resp := sendCommand(controller, action, batch, threads, background, dspin)
total.Add(resp)
batch = batch[0:]
}
}
if len(batch) > 0 {
sendCommand(controller, batch, threads, background, dspin)
resp := sendCommand(controller, action, batch, threads, background, dspin)
total.Add(resp)
}
progress.Done()

if !background {
count, bytes := dspin.Current()
logger.Infof("Successfully warmed up %d files (%d bytes)", count, bytes)
switch action {
case vfs.WarmupCache:
logger.Infof("%s: %d files (%s bytes)", action, count, humanize.IBytes(uint64(bytes)))
case vfs.EvictCache:
logger.Infof("%s: %d files (%s bytes)", action, count, humanize.IBytes(uint64(bytes)))
case vfs.CheckCache:
logger.Infof("%s: %d files (%s of %s (%2.1f%%)) cached", action, count,
humanize.IBytes(uint64(bytes)-total.MissBytes),
humanize.IBytes(uint64(bytes)),
float64(uint64(bytes)-total.MissBytes)*100/float64(bytes))
}
}

return nil
}
24 changes: 24 additions & 0 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,30 @@ func (store *cachedStore) FillCache(id uint64, length uint32) error {
return err
}

func (store *cachedStore) EvictCache(id uint64, length uint32) error {
r := sliceForRead(id, int(length), store)
keys := r.keys()
for _, k := range keys {
store.bcache.remove(k)
}
return nil
}

func (store *cachedStore) CheckCache(id uint64, length uint32) (uint64, error) {
r := sliceForRead(id, int(length), store)
keys := r.keys()
missBytes := uint64(0)
for i, k := range keys {
tmpReader, err := store.bcache.load(k)
if err == nil {
_ = tmpReader.Close()
continue
}
missBytes += uint64(r.blockSize(i))
}
return missBytes, nil
}

func (store *cachedStore) UsedMemory() int64 {
return store.bcache.usedMemory()
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/chunk/cached_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/juicedata/juicefs/pkg/object"
"github.com/stretchr/testify/assert"
)

func forgetSlice(store ChunkStore, sliceId uint64, size int) error {
Expand Down Expand Up @@ -246,6 +247,29 @@ func TestFillCache(t *testing.T) {
if cnt, used := bcache.stats(); cnt != 2 || used != expect {
t.Fatalf("cache cnt %d used %d, expect cnt 2 used %d", cnt, used, expect)
}

// check
missBytes, err := store.CheckCache(10, 1024)
assert.Nil(t, err)
assert.Equal(t, uint64(0), missBytes)

missBytes, err = store.CheckCache(11, uint32(bsize))
assert.Nil(t, err)
assert.Equal(t, uint64(0), missBytes)

// evict slice 11
err = store.EvictCache(11, uint32(bsize))
assert.Nil(t, err)

// stat
if cnt, used := bcache.stats(); cnt != 1 || used != 1024+4096 { // only chunk 10 cached
t.Fatalf("cache cnt %d used %d, expect cnt 1 used 5120", cnt, used)
}

// check again
missBytes, err = store.CheckCache(11, uint32(bsize))
assert.Nil(t, err)
assert.Equal(t, uint64(bsize), missBytes)
}

func BenchmarkCachedRead(b *testing.B) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type ChunkStore interface {
NewWriter(id uint64) Writer
Remove(id uint64, length int) error
FillCache(id uint64, length uint32) error
EvictCache(id uint64, length uint32) error
CheckCache(id uint64, length uint32) (uint64, error)
UsedMemory() int64
UpdateLimit(upload, download int64)
}
Loading

0 comments on commit 5e4d424

Please sign in to comment.