Skip to content

Commit

Permalink
Adding DiskCache stats
Browse files Browse the repository at this point in the history
  • Loading branch information
ola-rozenfeld committed May 20, 2024
1 parent b994285 commit 9eb7483
Show file tree
Hide file tree
Showing 9 changed files with 381 additions and 120 deletions.
18 changes: 7 additions & 11 deletions go/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"errors"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/actas"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/balancer"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/chunker"
Expand Down Expand Up @@ -244,6 +245,10 @@ func (c *Client) Close() error {
if c.casConnection != c.connection {
return c.casConnection.Close()
}
if c.diskCache != nil {
// Waits for local disk GC to complete.
c.diskCache.Shutdown()
}
return nil
}

Expand Down Expand Up @@ -354,21 +359,12 @@ func (o *TreeSymlinkOpts) Apply(c *Client) {
}

type DiskCacheOpts struct {
Context context.Context
Path string
MaxCapacityGb float64
DiskCache *diskcache.DiskCache
}

// Apply sets the client's TreeSymlinkOpts.
func (o *DiskCacheOpts) Apply(c *Client) {
if o.Path != "" {
capBytes := uint64(o.MaxCapacityGb * 1024 * 1024 * 1024)
var err error
// TODO(ola): propagate errors from Apply.
if c.diskCache, err = diskcache.New(o.Context, o.Path, capBytes); err != nil {
log.Errorf("Error initializing disk cache on %s: %v", o.Path, err)
}
}
c.diskCache = o.DiskCache
}

// MaxBatchDigests is maximum amount of digests to batch in upload and download operations.
Expand Down
2 changes: 1 addition & 1 deletion go/pkg/diskcache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ go_test(
"//go/pkg/testutil",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_google_go_cmp//cmp:go_default_library",
"@com_github_pborman_uuid//:go_default_library",
"@com_github_google_uuid//:uuid",
"@org_golang_x_sync//errgroup:go_default_library",
],
)
149 changes: 94 additions & 55 deletions go/pkg/diskcache/diskcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,28 @@ type DiskCache struct {
mu sync.Mutex // protects the queue.
store sync.Map // map of keys to qitems.
queue *priorityQueue // keys by last accessed time.
sizeBytes int64 // total size.
ctx context.Context
shutdown chan bool
shutdownOnce sync.Once
gcTick uint64
gcReq chan uint64
testGcTicks chan uint64
gcDone chan bool
stats *DiskCacheStats
}

type DiskCacheStats struct {
TotalSizeBytes int64
TotalNumFiles int64
NumFilesStored int64
TotalStoredBytes int64
NumFilesGCed int64
TotalGCedSizeBytes int64
NumCacheHits int64
NumCacheMisses int64
TotalCacheHitSizeBytes int64
InitTime time.Duration
TotalGCTime time.Duration
TotalGCDiskOpsTime time.Duration
}

func New(ctx context.Context, root string, maxCapacityBytes uint64) (*DiskCache, error) {
Expand All @@ -115,7 +131,11 @@ func New(ctx context.Context, root string, maxCapacityBytes uint64) (*DiskCache,
},
gcReq: make(chan uint64, maxConcurrentRequests),
shutdown: make(chan bool),
gcDone: make(chan bool),
stats: &DiskCacheStats{},
}
start := time.Now()
defer func() { atomic.AddInt64((*int64)(&res.stats.InitTime), int64(time.Since(start))) }()
heap.Init(res.queue)
if err := os.MkdirAll(root, os.ModePerm); err != nil {
return nil, err

Check failure on line 141 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func os.MkdirAll(path string, perm io/fs.FileMode) error (wrapcheck)
Expand All @@ -135,30 +155,31 @@ func New(ctx context.Context, root string, maxCapacityBytes uint64) (*DiskCache,
return filepath.WalkDir(prefixDir, func(path string, d fs.DirEntry, err error) error {
// We log and continue on all errors, because cache read errors are not critical.
if err != nil {
return fmt.Errorf("error reading cache directory: %v", err)
return fmt.Errorf("error reading cache directory: %w", err)
}
if d.IsDir() {
return nil
}
subdir := filepath.Base(filepath.Dir(path))
k, err := res.getKeyFromFileName(subdir + d.Name())
k, err := getKeyFromFileName(subdir + d.Name())
if err != nil {
return fmt.Errorf("error parsing cached file name %s: %v", path, err)
return fmt.Errorf("error parsing cached file name %s: %w", path, err)
}
atime, err := getLastAccessTime(path)
info, err := d.Info()
if err != nil {
return fmt.Errorf("error getting last accessed time of %s: %v", path, err)
return fmt.Errorf("error getting file info of %s: %w", path, err)
}
it := &qitem{
key: k,
lat: atime,
lat: fileInfoToAccessTime(info),
}
size, err := res.getItemSize(k)
if err != nil {
return fmt.Errorf("error getting file size of %s: %v", path, err)
return fmt.Errorf("error getting file size of %s: %w", path, err)
}
res.store.Store(k, it)
atomic.AddInt64(&res.sizeBytes, size)
atomic.AddInt64(&res.stats.TotalSizeBytes, size)
atomic.AddInt64(&res.stats.TotalNumFiles, 1)
res.mu.Lock()
heap.Push(res.queue, it)
res.mu.Unlock()
Expand All @@ -180,42 +201,50 @@ func (d *DiskCache) getItemSize(k key) (int64, error) {
fname := d.getPath(k)
info, err := os.Stat(fname)
if err != nil {
return 0, fmt.Errorf("Error getting info for %s: %v", fname, err)
return 0, fmt.Errorf("error getting info for %s: %w", fname, err)
}
return info.Size(), nil
}

// Releases resources and terminates the GC daemon. Should be the last call to the DiskCache.
// Terminates the GC daemon, waiting for it to complete. No further Store* calls to the DiskCache should be made.
func (d *DiskCache) Shutdown() {
d.shutdown <- true
}

func (d *DiskCache) TotalSizeBytes() uint64 {
return uint64(atomic.LoadInt64(&d.sizeBytes))
d.shutdownOnce.Do(func() {
d.shutdown <- true
<-d.gcDone
log.Infof("DiskCacheStats: %+v", d.stats)
})
}

// This function is defined in https://pkg.go.dev/strings#CutSuffix
// It is copy/pasted here as a hack, because I failed to upgrade the *Reclient* repo to the latest Go 1.20.7.
func CutSuffix(s, suffix string) (before string, found bool) {
if !strings.HasSuffix(s, suffix) {
return s, false
func (d *DiskCache) GetStats() *DiskCacheStats {
// Return a copy for safety.
return &DiskCacheStats{
TotalSizeBytes: atomic.LoadInt64(&d.stats.TotalSizeBytes),
TotalNumFiles: atomic.LoadInt64(&d.stats.TotalNumFiles),
NumFilesStored: atomic.LoadInt64(&d.stats.NumFilesStored),
TotalStoredBytes: atomic.LoadInt64(&d.stats.TotalStoredBytes),
NumFilesGCed: atomic.LoadInt64(&d.stats.NumFilesGCed),
TotalGCedSizeBytes: atomic.LoadInt64(&d.stats.TotalGCedSizeBytes),
NumCacheHits: atomic.LoadInt64(&d.stats.NumCacheHits),
NumCacheMisses: atomic.LoadInt64(&d.stats.NumCacheMisses),
TotalCacheHitSizeBytes: atomic.LoadInt64(&d.stats.TotalCacheHitSizeBytes),
InitTime: time.Duration(atomic.LoadInt64((*int64)(&d.stats.InitTime))),
TotalGCTime: time.Duration(atomic.LoadInt64((*int64)(&d.stats.TotalGCTime))),
}
return s[:len(s)-len(suffix)], true
}

func (d *DiskCache) getKeyFromFileName(fname string) (key, error) {
func getKeyFromFileName(fname string) (key, error) {
pair := strings.Split(fname, ".")
if len(pair) != 2 {
return key{}, fmt.Errorf("expected file name in the form [ac_]hash/size, got %s", fname)
return key{}, fmt.Errorf("expected file name in the form hash[_ac].size, got %s", fname)
}
size, err := strconv.ParseInt(pair[1], 10, 64)
if err != nil {
return key{}, fmt.Errorf("invalid size in digest %s: %s", fname, err)
}
hash, isAc := CutSuffix(pair[0], "ac_")
hash, isAc := strings.CutSuffix(pair[0], "_ac")
dg, err := digest.New(hash, size)
if err != nil {
return key{}, fmt.Errorf("invalid digest from file name %s: %v", fname, err)
return key{}, fmt.Errorf("invalid digest from file name %s: %w", fname, err)
}
return key{digest: dg, isCas: !isAc}, nil
}
Expand Down Expand Up @@ -248,7 +277,10 @@ func (d *DiskCache) StoreCas(dg digest.Digest, path string) error {
if err := copyFile(path, d.getPath(it.key), dg.Size); err != nil {
return err
}
newSize := uint64(atomic.AddInt64(&d.sizeBytes, dg.Size))
newSize := uint64(atomic.AddInt64(&d.stats.TotalSizeBytes, dg.Size))
atomic.AddInt64(&d.stats.TotalNumFiles, 1)
atomic.AddInt64(&d.stats.NumFilesStored, 1)
atomic.AddInt64(&d.stats.TotalStoredBytes, dg.Size)
if newSize > d.maxCapacityBytes {
select {
case d.gcReq <- atomic.AddUint64(&d.gcTick, 1):
Expand Down Expand Up @@ -281,7 +313,10 @@ func (d *DiskCache) StoreActionCache(dg digest.Digest, ar *repb.ActionResult) er
if err := os.WriteFile(d.getPath(it.key), bytes, 0644); err != nil {
return err

Check failure on line 314 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func os.WriteFile(name string, data []byte, perm io/fs.FileMode) error (wrapcheck)
}
newSize := uint64(atomic.AddInt64(&d.sizeBytes, int64(size)))
newSize := uint64(atomic.AddInt64(&d.stats.TotalSizeBytes, int64(size)))
atomic.AddInt64(&d.stats.TotalNumFiles, 1)
atomic.AddInt64(&d.stats.NumFilesStored, 1)
atomic.AddInt64(&d.stats.TotalStoredBytes, int64(size))
if newSize > d.maxCapacityBytes {
select {
case d.gcReq <- atomic.AddUint64(&d.gcTick, 1):
Expand All @@ -292,38 +327,40 @@ func (d *DiskCache) StoreActionCache(dg digest.Digest, ar *repb.ActionResult) er
}

func (d *DiskCache) gc() {
defer func() { d.gcDone <- true }()
for {
select {
case <-d.shutdown:
return
case <-d.ctx.Done():
return
case t := <-d.gcReq:
case <-d.gcReq:
start := time.Now()
// Evict old entries until total size is below cap.
for uint64(atomic.LoadInt64(&d.sizeBytes)) > d.maxCapacityBytes {
for uint64(atomic.LoadInt64(&d.stats.TotalSizeBytes)) > d.maxCapacityBytes {
d.mu.Lock()
it := heap.Pop(d.queue).(*qitem)
d.mu.Unlock()
size, err := d.getItemSize(it.key)
if err != nil {
log.Errorf("error getting item size for %v: %v", it.key, err)
log.Errorf("error getting item size for %v: %w", it.key, err)
size = 0
}
atomic.AddInt64(&d.sizeBytes, -size)
atomic.AddInt64(&d.stats.TotalSizeBytes, -size)
atomic.AddInt64(&d.stats.TotalNumFiles, -1)
atomic.AddInt64(&d.stats.TotalGCedSizeBytes, size)
atomic.AddInt64(&d.stats.NumFilesGCed, 1)
it.mu.Lock()
diskOpsStart := time.Now()
// We only delete the files, and not the prefix directories, because the prefixes are not worth worrying about.
if err := os.Remove(d.getPath(it.key)); err != nil {
log.Errorf("Error removing file: %v", err)
log.Errorf("Error removing file: %w", err)
}
atomic.AddInt64((*int64)(&d.stats.TotalGCDiskOpsTime), int64(time.Since(diskOpsStart)))
d.store.Delete(it.key)
it.mu.Unlock()
}
if d.testGcTicks != nil {
select {
case d.testGcTicks <- t:
default:
}
}
atomic.AddInt64((*int64)(&d.stats.TotalGCTime), int64(time.Since(start)))
}
}
}
Expand Down Expand Up @@ -363,6 +400,7 @@ func (d *DiskCache) LoadCas(dg digest.Digest, path string) bool {
k := key{digest: dg, isCas: true}
iUntyped, loaded := d.store.Load(k)
if !loaded {
atomic.AddInt64(&d.stats.NumCacheMisses, 1)
return false
}
it := iUntyped.(*qitem)
Expand All @@ -371,58 +409,59 @@ func (d *DiskCache) LoadCas(dg digest.Digest, path string) bool {
it.mu.RUnlock()
if err != nil {
// It is not possible to prevent a race with GC; hence, we return false on copy errors.
atomic.AddInt64(&d.stats.NumCacheMisses, 1)
return false
}

d.mu.Lock()
d.queue.Bump(it)
d.mu.Unlock()
atomic.AddInt64(&d.stats.NumCacheHits, 1)
atomic.AddInt64(&d.stats.TotalCacheHitSizeBytes, dg.Size)
return true
}

func (d *DiskCache) LoadActionCache(dg digest.Digest) (ar *repb.ActionResult, loaded bool) {
k := key{digest: dg, isCas: false}
iUntyped, loaded := d.store.Load(k)
if !loaded {
atomic.AddInt64(&d.stats.NumCacheMisses, 1)
return nil, false
}
it := iUntyped.(*qitem)
it.mu.RLock()
ar = &repb.ActionResult{}
if err := d.loadActionResult(k, ar); err != nil {
size, err := d.loadActionResult(k, ar)
if err != nil {
// It is not possible to prevent a race with GC; hence, we return false on load errors.
it.mu.RUnlock()
atomic.AddInt64(&d.stats.NumCacheMisses, 1)
return nil, false
}
it.mu.RUnlock()

d.mu.Lock()
d.queue.Bump(it)
d.mu.Unlock()
atomic.AddInt64(&d.stats.NumCacheHits, 1)
atomic.AddInt64(&d.stats.TotalCacheHitSizeBytes, int64(size))
return ar, true
}

func (d *DiskCache) loadActionResult(k key, ar *repb.ActionResult) error {
func (d *DiskCache) loadActionResult(k key, ar *repb.ActionResult) (int, error) {
bytes, err := os.ReadFile(d.getPath(k))
if err != nil {
return err
return 0, err
}
n := len(bytes)
// Required sanity check: sometimes the read pretends to succeed, but doesn't, if
// the file is being concurrently deleted. Empty ActionResult is advised against in
// the RE-API: https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/remote_execution.proto#L1052
if len(bytes) == 0 {
return fmt.Errorf("read empty ActionResult for %v", k.digest)
if n == 0 {
return n, fmt.Errorf("read empty ActionResult for %v", k.digest)
}
if err := proto.Unmarshal(bytes, ar); err != nil {
return fmt.Errorf("error unmarshalling %v as ActionResult: %v", bytes, err)
}
return nil
}

func getLastAccessTime(path string) (time.Time, error) {
info, err := os.Stat(path)
if err != nil {
return time.Time{}, err
return n, fmt.Errorf("error unmarshalling %v as ActionResult: %w", bytes, err)
}
return FileInfoToAccessTime(info), nil
return n, nil
}
Loading

0 comments on commit 9eb7483

Please sign in to comment.