Skip to content

Commit

Permalink
Refactor (#51)
Browse files Browse the repository at this point in the history
* remove nvm and rename tests

* fix loading cache race

* update ci

* update Readme

* fix queue race

* fix update race

* fix policy weight

* update race test

* fix policy/hashmap inconsistent

* fix persist tests

* update some comments

* minor improvement

* atomic sketch

* fix test

* fix test

* fix race

* update ci

* minor improve

* reduce allocation

* add a simple run script

* minor fix
  • Loading branch information
Yiling-J authored Oct 10, 2024
1 parent 0ddf7a3 commit 02529ef
Show file tree
Hide file tree
Showing 55 changed files with 1,154 additions and 3,086 deletions.
46 changes: 30 additions & 16 deletions .github/workflows/go.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
name: test
strategy:
matrix:
go: ["1.19.x", "1.22.x", "1.23.x"]
go: ["1.22.x", "1.23.x"]
runs-on: ubuntu-latest
steps:
- name: Setup Go
Expand All @@ -44,13 +44,12 @@ jobs:
- name: Upload coverage to codecov.io
uses: codecov/codecov-action@v3

test-os:
name: test-os
test-race-1:
name: test-race-1
strategy:
matrix:
go: ["1.19.x", "1.22.x", "1.23.x"]
os: [macos-latest, windows-latest, ubuntu-latest]
runs-on: ${{ matrix.os }}
go: ["1.23.x"]
runs-on: ubuntu-latest
steps:
- name: Setup Go
with:
Expand All @@ -60,17 +59,13 @@ jobs:
- uses: actions/checkout@v2

- name: Test
run: |
go test ./... -run=TestPersistOS
go test ./... -run=TestHybridCacheGetSetNoRace
go test ./... -run=TestNvmResize
run: go test ./... -run=TestCacheRace_GetSetDeleteExpire -count=1 -race

build-os:
name: build-os
test-race-2:
name: test-race-2
strategy:
matrix:
go: ["1.19.x", "1.22.x", "1.23.x"]
os: [darwin, windows, freebsd, solaris, illumos, openbsd, plan9]
go: ["1.23.x"]
runs-on: ubuntu-latest
steps:
- name: Setup Go
Expand All @@ -80,5 +75,24 @@ jobs:

- uses: actions/checkout@v2

- name: Build
run: GOOS=${{ matrix.os }} CGO_ENABLED=0 go build
- name: Test
run: go test ./... -run=TestCacheRace_ -count=1

test-os:
name: test-os
strategy:
matrix:
go: ["1.22.x", "1.23.x"]
os: [macos-latest, windows-latest, ubuntu-latest]
runs-on: ${{ matrix.os }}
steps:
- name: Setup Go
with:
go-version: ${{ matrix.go }}
uses: actions/setup-go@v2

- uses: actions/checkout@v2

- name: Test
run: |
go test ./... -run=TestPersistOS
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
.PHONY: test testx lint bench cover
.PHONY: test test-race testx lint bench cover

test:
go test ./... -race
go test -skip=TestCacheRace_ ./...

test-race:
go test ./... -run=TestCacheRace_ -count=1 -race

testx:
go test ./... -v -failfast
Expand All @@ -10,5 +13,5 @@ lint:
golangci-lint run

cover:
go test -race -timeout 2000s -coverprofile=cover.out -coverpkg=./... ./...
go test -timeout 2000s -coverprofile=cover.out -coverpkg=./... -skip=TestCacheRace_ ./...
go tool cover -html=cover.out -o cover.html
89 changes: 14 additions & 75 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ High performance in-memory & hybrid cache inspired by [Caffeine](https://github.
- [Benchmarks](#benchmarks)
* [throughput](#throughput)
* [hit ratios](#hit-ratios)
- [Hybrid Cache(Experimental)](#hybrid-cacheexperimental)
- [Secondary Cache(Experimental)](#secondary-cacheexperimental)
- [Support](#support)

## Requirements
Expand Down Expand Up @@ -246,92 +246,31 @@ BenchmarkCache/zipf_ristretto_reads=0%,writes=100%-8 2028530 670.6
![hit ratios](benchmarks/results/oltp.png)


## Hybrid Cache(Experimental)
## Secondary Cache(Experimental)

HybridCache feature enables Theine to extend the DRAM cache to NVM. With HybridCache, Theine can seamlessly move Items stored in cache across DRAM and NVM as they are accessed. Using HybridCache, you can shrink your DRAM footprint of the cache and replace it with NVM like Flash. This can also enable you to achieve large cache capacities for the same or relatively lower power and dollar cost.
SecondaryCache is the interface for caching data on a secondary tier, which can be a non-volatile media or alternate forms of caching such as compressed data. The purpose of the secondary cache is to support other ways of caching the object, such as persistent or compressed data. It can be viewed as an extension of Theine’s current in-memory cache.

#### Design
Hybrid Cache is inspired by CacheLib's HybridCache. See [introduction](https://cachelib.org/docs/Cache_Library_User_Guides/HybridCache) and [architecture](https://cachelib.org/docs/Cache_Library_Architecture_Guide/hybrid_cache) from CacheLib's guide.
Currently, the SecondaryCache interface has one implementation inspired by CacheLib's Hybrid Cache.

When you use HybridCache, items allocated in the cache can live on NVM or DRAM based on how they are accessed. Irrespective of where they are, **when you access them, you always get them to be in DRAM**.

Items start their lifetime on DRAM. As an item becomes cold it gets evicted from DRAM when the cache is full. Theine spills it to a cache on the NVM device. Upon subsequent access through `Get()`, if the item is not in DRAM, theine looks it up in the HybridCache and if found, moves it to DRAM. When the HybridCache gets filled up, subsequent insertions into the HybridCache from DRAM will throw away colder items from HybridCache.

Same as CacheLib, Theine hybrid cache also has **BigHash** and **Block Cache**, it's highly recommended to read the CacheLib architecture design before using hybrid cache, here is a simple introduction of these 2 engines(just copy from CacheLib):

- **BigHash** is effectively a giant fixed-bucket hash map on the device. To read or write, the entire bucket is read (in case of write, updated and written back). Bloom filter used to reduce number of IO. When bucket is full, items evicted in FIFO manner. You don't pay any RAM price here (except Bloom filter, which is 2GB for 1TB BigHash, tunable).
- **Block Cache**, on the other hand, divides device into equally sized regions (16MB, tunable) and fills a region with items of same size class, or, in case of log-mode fills regions sequentially with items of different size. Sometimes we call log-mode “stack alloc”. BC stores compact index in memory: key hash to offset. We do not store full key in memory and if collision happens (super rare), old item will look like evicted. In your calculations, use 12 bytes overhead per item to estimate RAM usage. For example, if your average item size is 4KB and cache size is 500GB you'll need around 1.4GB of memory.

#### Using Hybrid Cache

To use HybridCache, you need to create a nvm cache with NvmBuilder. NewNvmBuilder require 2 params, first is cache file name, second is cache size in bytes. Theine will use direct I/O to read/write file.

```go
nvm, err := theine.NewNvmBuilder[int, int]("cache", 150<<20).[settings...].Build()
```

Then enable hybrid mode in your Theine builder.
```go
client, err := theine.NewBuilder[int, int](100).Hybrid(nvm).Build()
type SecondaryCache[K comparable, V any] interface {
Get(key K) (value V, cost int64, expire int64, ok bool, err error)
Set(key K, value V, cost int64, expire int64) error
Delete(key K) error
HandleAsyncError(err error)
}
```

#### NVM Builder Settings

All settings are optional, unless marked as "Required".

* **[Common]** `BlockSize` default 4096

Device block size in bytes (minimum IO granularity).
* **[Common]** `KeySerializer` default JsonSerializer

KeySerializer is used to marshal/unmarshal between your key type and bytes.
```go
type Serializer[T any] interface {
Marshal(v T) ([]byte, error)
Unmarshal(raw []byte, v *T) error
}
```
* **[Common]** `ValueSerializer` default JsonSerializer
If you plan to use a remote cache or database, such as Redis, as a secondary cache, keep in mind that the in-memory cache remains the primary source of truth. Evicted entries from memory are sent to the secondary cache. This approach differs from most tiered cache systems, where the remote cache is treated as the primary source of truth and is written to first.

ValueSerializer is used to marshal/unmarshal between your value type and bytes. Same interface as KeySerializer.
* **[Common]** `ErrorHandler` default do nothing

Theine evicts entries to Nvm asynchronously, so errors will be handled by this error handler.
* **[BlockCache]** `RegionSize` default 16 << 20 (16 MB)

Region size in bytes.
* **[BlockCache]** `CleanRegionSize` default 3

How many regions do we reserve for future writes. Set this to be equivalent to your per-second write rate. It should ensure your writes will not have to retry to wait for a region reclamation to finish.
* **[BigHash]** `BucketSize` defalut 4 << 10 (4 KB)

Bucket size in bytes.
* **[BigHash]** `BigHashPct` default 10

Percentage of space to reserve for BigHash. Set the percentage > 0 to enable BigHash. The remaining part is for BlockCache. The value has to be in the range of [0, 100]. Set to 100 will disable block cache.
* **[BigHash]** `BigHashMaxItemSize` default (bucketSize - 80)

Maximum size of a small item to be stored in BigHash. Must be less than (bucket size - 80).
* **[BigHash]** `BucketBfSize` default 8 bytes

Bloom filter size, bytes per bucket.

#### Hybrid Mode Settings

After you call `Hybrid(...)` in a cache builder. Theine will convert current builder to hybrid builder. Hybrid builder has several settings.

* `Workers` defalut 2

Theine evicts entries in a separate policy goroutinue, but insert to NVM can be done parallel. To make this work, Theine send evicted entries to workers, and worker will sync data to NVM cache. This setting controls how many workers are used to sync data.

* `AdmProbability` defalut 1

This is an admission policy for endurance and performance reason. When entries are evicted from DRAM cache, this policy will be used to control the insertion percentage. A value of 1 means that all entries evicted from DRAM will be inserted into NVM. Values should be in the range of [0, 1].
#### Secondary Cache Implementations
NVM: https://github.com/Yiling-J/theine-nvm

#### Limitations
- Cache Persistence is not currently supported, but it may be added in the future. You can still use the Persistence API in a hybrid-enabled cache, but only the DRAM part of the cache will be saved or loaded.
- The removal listener will only receive REMOVED events, which are generated when an entry is explicitly removed by calling the Delete API.
- No Range/Len API.


## Support
Feel free to open an issue or ask question in discussions.
19 changes: 5 additions & 14 deletions builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package theine_test

import (
"context"
"os"
"reflect"
"testing"

"github.com/Yiling-J/theine-go"
"github.com/Yiling-J/theine-go/internal"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -36,18 +36,16 @@ func TestBuilder(t *testing.T) {
// hybrid cache
_, err = builder.Hybrid(nil).Build()
require.Error(t, err)
nvm, err := theine.NewNvmBuilder[int, int]("afoo", 500<<10).RegionSize(5 << 10).KeySerializer(&IntSerializer{}).ValueSerializer(&IntSerializer{}).BucketBfSize(16).Build()
defer os.Remove("afoo")
require.Nil(t, err)
_, err = builder.Hybrid(nvm).Workers(0).Build()
secondary := internal.NewSimpleMapSecondary[int, int]()
_, err = builder.Hybrid(secondary).Workers(0).Build()
require.Error(t, err)
builderH := builder.Hybrid(nvm).Workers(1).AdmProbability(0.8)
builderH := builder.Hybrid(secondary).Workers(1).AdmProbability(0.8)
cacheH, err := builderH.Build()
require.Nil(t, err)
require.Equal(t, reflect.TypeOf(&theine.HybridCache[int, int]{}), reflect.TypeOf(cacheH))

// loading + hybrid
builderLH := builderL.Hybrid(nvm)
builderLH := builderL.Hybrid(secondary)
cacheLH, err := builderLH.Build()
require.Nil(t, err)
require.Equal(t, reflect.TypeOf(&theine.HybridLoadingCache[int, int]{}), reflect.TypeOf(cacheLH))
Expand All @@ -61,10 +59,3 @@ func TestBuilder(t *testing.T) {
require.Nil(t, err)
require.Equal(t, reflect.TypeOf(&theine.HybridLoadingCache[int, int]{}), reflect.TypeOf(cacheLH))
}

func TestNvmBuilder(t *testing.T) {
_, err := theine.NewNvmBuilder[int, int]("afoo", 100<<10).BlockSize(512).BucketSize(4 << 10).RegionSize(20 << 10).CleanRegionSize(3).KeySerializer(&IntSerializer{}).ValueSerializer(&IntSerializer{}).BigHashPct(20).Build()
defer os.Remove("afoo")
require.Nil(t, err)

}
139 changes: 139 additions & 0 deletions cache_race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package theine

import (
"math/rand"
"sync"
"testing"
"time"

"github.com/Yiling-J/theine-go/internal"
"github.com/stretchr/testify/require"
)

func keyGen() []uint64 {
keys := []uint64{}
r := rand.New(rand.NewSource(0))
z := rand.NewZipf(r, 1.01, 9.0, 200000)
for i := 0; i < 2<<16; i++ {
keys = append(keys, z.Uint64())
}
return keys
}

func TestCacheRace_GetSet(t *testing.T) {
for _, size := range []int{500, 2000, 10000, 50000} {
builder := NewBuilder[uint64, uint64](int64(size))
builder.RemovalListener(func(key, value uint64, reason RemoveReason) {})
client, err := builder.Build()
require.Nil(t, err)
var wg sync.WaitGroup
keys := keyGen()

for i := 1; i <= 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
rd := rand.Intn(2 << 16)
for i := 0; i < 100000; i++ {
keyGet := keys[(i+rd)&(2<<16-1)]
keyUpdate := keys[(i+3*rd)&(2<<16-1)]

v, ok := client.Get(keyGet)
if ok && v != keyGet {
panic(keyGet)
}
if !ok {
client.SetWithTTL(keyGet, keyGet, 1, 0)
}

client.SetWithTTL(keyUpdate, keyUpdate, int64(i%10+1), 0)
}
}()
}
wg.Wait()
client.store.Wait()

require.True(
t, client.Len() < size+internal.WriteBufferSize,
)

di := client.store.DebugInfo()

require.Equal(t, client.Len(), int(di.TotalCount()))
require.True(t, di.TotalWeight() <= int64(size+size/10))
require.Equal(t, di.ProbationWeight, di.ProbationWeightField)
require.Equal(t, di.ProtectedWeight, di.ProtectedWeightField)

for i := 0; i < len(di.QueueWeight); i++ {
require.Equal(t, di.QueueWeight[i], di.QueueWeightField[i])
}

client.store.RangeEntry(func(entry *internal.Entry[uint64, uint64]) {
require.Equal(t, entry.Weight(), entry.PolicyWeight(), entry.Position())
})

client.Close()
}
}

func TestCacheRace_GetSetDeleteExpire(t *testing.T) {
for _, size := range []int{500, 2000, 10000, 50000} {
builder := NewBuilder[uint64, uint64](int64(size))
builder.RemovalListener(func(key, value uint64, reason RemoveReason) {})
client, err := builder.Build()
require.Nil(t, err)
var wg sync.WaitGroup
keys := keyGen()

for i := 1; i <= 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
rd := rand.Intn(2 << 16)
for i := 0; i < 100000; i++ {
key := keys[(i+rd)&(2<<16-1)]
v, ok := client.Get(key)
if ok && v != key {
panic(key)
}
if i%3 == 0 {
client.SetWithTTL(key, key, int64(i%10+1), time.Second*time.Duration(i%25+5))
}
if i%5 == 0 {
client.Delete(key)
}
if i%5000 == 0 {
client.Range(func(key, value uint64) bool {
return true
})
}
}
}()
}
wg.Wait()

client.store.Wait()

require.True(
t, client.Len() < size+internal.WriteBufferSize,
)

di := client.store.DebugInfo()

require.Equal(t, client.Len(), int(di.TotalCount()))
require.True(t, di.TotalWeight() <= int64(size+size/10))
require.Equal(t, di.ProbationWeight, di.ProbationWeightField)
require.Equal(t, di.ProtectedWeight, di.ProtectedWeightField)

for i := 0; i < len(di.QueueWeight); i++ {
require.Equal(t, di.QueueWeight[i], di.QueueWeightField[i])
}

client.store.RangeEntry(func(entry *internal.Entry[uint64, uint64]) {
require.Equal(t, entry.Weight(), entry.PolicyWeight(), entry.Position())
})

client.Close()

}
}
Loading

0 comments on commit 02529ef

Please sign in to comment.