From 72c874e9d64b937b8474acecb227bc2bb707c913 Mon Sep 17 00:00:00 2001 From: Codey Oxley Date: Thu, 18 Apr 2024 14:22:44 -0400 Subject: [PATCH 1/3] dicts: Add option to use the *_byReference variants at creation time --- Makefile | 2 +- dict.go | 75 +++++++++++++++++++++++++++++++++++++- dict_test.go | 32 +++++++++++++++++ gozstd_test.go | 61 +++++++++++++++++++++++++++++++ gozstd_timing_test.go | 84 +++++++++++++++++++++++++++++++++++++------ 5 files changed, 242 insertions(+), 12 deletions(-) diff --git a/Makefile b/Makefile index 90ca115..3165163 100644 --- a/Makefile +++ b/Makefile @@ -82,7 +82,7 @@ update-zstd: cp zstd/lib/zstd_errors.h . test: - CGO_ENABLED=1 GODEBUG=cgocheck=2 go test -v + CGO_ENABLED=1 GOEXPERIMENT=cgocheck2 go test -v bench: CGO_ENABLED=1 go test -bench=. diff --git a/dict.go b/dict.go index af759a5..0594db3 100644 --- a/dict.go +++ b/dict.go @@ -23,6 +23,14 @@ static ZSTD_DDict* ZSTD_createDDict_wrapper(uintptr_t dictBuffer, size_t dictSiz return ZSTD_createDDict((const void *)dictBuffer, dictSize); } +static ZSTD_CDict* ZSTD_createCDict_byReference_wrapper(uintptr_t dictBuffer, size_t dictSize, int compressionLevel) { + return ZSTD_createCDict_byReference((const void *)dictBuffer, dictSize, compressionLevel); +} + +static ZSTD_DDict* ZSTD_createDDict_byReference_wrapper(uintptr_t dictBuffer, size_t dictSize) { + return ZSTD_createDDict_byReference((const void *)dictBuffer, dictSize); +} + */ import "C" @@ -103,7 +111,9 @@ var buildDictLock sync.Mutex // // A single CDict may be re-used in concurrently running goroutines. type CDict struct { - p *C.ZSTD_CDict + p *C.ZSTD_CDict + // Keep the underlying bytes alive from the GC's point of view (if passing by ref) + input []byte compressionLevel int } @@ -114,6 +124,16 @@ func NewCDict(dict []byte) (*CDict, error) { return NewCDictLevel(dict, DefaultCompressionLevel) } +// NewCDictByRef creates a new CDict that shares the given dict +// +// Callers *must not* mutate the underlying array of 'dict'. Doing so will lead +// to undefined and undesirable behavior. +// +// Call Release when the returned dict is no longer used. +func NewCDictByRef(dict []byte) (*CDict, error) { + return NewCDictLevelByRef(dict, DefaultCompressionLevel) +} + // NewCDictLevel creates new CDict from the given dict // using the given compressionLevel. // @@ -136,6 +156,32 @@ func NewCDictLevel(dict []byte, compressionLevel int) (*CDict, error) { return cd, nil } +// NewCDictLevelByRef creates a new CDict that shares the given dict using +// the given compressionLevel. +// +// Callers *must not* mutate the underlying array of 'dict'. Doing so will lead +// to undefined and undesirable behavior. +// +// Call Release when the returned dict is no longer used. +func NewCDictLevelByRef(dict []byte, compressionLevel int) (*CDict, error) { + if len(dict) == 0 { + return nil, fmt.Errorf("dict cannot be empty") + } + + cd := &CDict{ + p: C.ZSTD_createCDict_byReference_wrapper( + C.uintptr_t(uintptr(unsafe.Pointer(&dict[0]))), + C.size_t(len(dict)), + C.int(compressionLevel)), + input: dict, + compressionLevel: compressionLevel, + } + // Prevent from GC'ing of dict during CGO call above. + runtime.KeepAlive(dict) + runtime.SetFinalizer(cd, freeCDict) + return cd, nil +} + // Release releases resources occupied by cd. // // cd cannot be used after the release. @@ -146,6 +192,7 @@ func (cd *CDict) Release() { result := C.ZSTD_freeCDict(cd.p) ensureNoError("ZSTD_freeCDict", result) cd.p = nil + cd.input = nil } func freeCDict(v interface{}) { @@ -157,6 +204,8 @@ func freeCDict(v interface{}) { // A single DDict may be re-used in concurrently running goroutines. type DDict struct { p *C.ZSTD_DDict + // Keep the underlying bytes alive from the GC's point of view (if passing by ref) + input []byte } // NewDDict creates new DDict from the given dict. @@ -178,6 +227,29 @@ func NewDDict(dict []byte) (*DDict, error) { return dd, nil } +// NewDDictByRef creates a new DDict that shares the given dict +// +// Callers *must not* mutate the underlying array of 'dict'. Doing so will lead +// to undefined and undesirable behavior. +// +// Call Release when the returned dict is no longer needed. +func NewDDictByRef(dict []byte) (*DDict, error) { + if len(dict) == 0 { + return nil, fmt.Errorf("dict cannot be empty") + } + + dd := &DDict{ + p: C.ZSTD_createDDict_byReference_wrapper( + C.uintptr_t(uintptr(unsafe.Pointer(&dict[0]))), + C.size_t(len(dict))), + input: dict, + } + // Prevent from GC'ing of dict during CGO call above. + runtime.KeepAlive(dict) + runtime.SetFinalizer(dd, freeDDict) + return dd, nil +} + // Release releases resources occupied by dd. // // dd cannot be used after the release. @@ -189,6 +261,7 @@ func (dd *DDict) Release() { result := C.ZSTD_freeDDict(dd.p) ensureNoError("ZSTD_freeDDict", result) dd.p = nil + dd.input = nil } func freeDDict(v interface{}) { diff --git a/dict_test.go b/dict_test.go index bd7cce5..b6bc332 100644 --- a/dict_test.go +++ b/dict_test.go @@ -43,6 +43,22 @@ func TestCDictCreateRelease(t *testing.T) { } } +func TestCDictByRefCreateRelease(t *testing.T) { + var samples [][]byte + for i := 0; i < 1000; i++ { + samples = append(samples, []byte(fmt.Sprintf("sample %d", i))) + } + dict := BuildDict(samples, 64*1024) + + for i := 0; i < 10; i++ { + cd, err := NewCDictByRef(dict) + if err != nil { + t.Fatalf("cannot create dict: %s", err) + } + cd.Release() + } +} + func TestDDictCreateRelease(t *testing.T) { var samples [][]byte for i := 0; i < 1000; i++ { @@ -59,6 +75,22 @@ func TestDDictCreateRelease(t *testing.T) { } } +func TestDDictByRefCreateRelease(t *testing.T) { + var samples [][]byte + for i := 0; i < 1000; i++ { + samples = append(samples, []byte(fmt.Sprintf("sample %d", i))) + } + dict := BuildDict(samples, 64*1024) + + for i := 0; i < 10; i++ { + dd, err := NewDDictByRef(dict) + if err != nil { + t.Fatalf("cannot create dict: %s", err) + } + dd.Release() + } +} + func TestBuildDict(t *testing.T) { for _, samplesCount := range []int{0, 1, 10, 100, 1000} { t.Run(fmt.Sprintf("samples_%d", samplesCount), func(t *testing.T) { diff --git a/gozstd_test.go b/gozstd_test.go index 3a465b8..6475982 100644 --- a/gozstd_test.go +++ b/gozstd_test.go @@ -123,6 +123,67 @@ func TestCompressDecompressDistinctConcurrentDicts(t *testing.T) { } } +func TestCompressDecompressDistinctConcurrentDictsByRef(t *testing.T) { + // Build multiple distinct dicts, sharing the underlying byte array + var cdicts []*CDict + var ddicts []*DDict + defer func() { + for _, cd := range cdicts { + cd.Release() + } + for _, dd := range ddicts { + dd.Release() + } + }() + for i := 0; i < 4; i++ { + var samples [][]byte + for j := 0; j < 1000; j++ { + sample := fmt.Sprintf("this is %d,%d sample", j, i) + samples = append(samples, []byte(sample)) + } + dict := BuildDict(samples, 4*1024) + cd, err := NewCDictByRef(dict) + if err != nil { + t.Fatalf("cannot create CDict: %s", err) + } + cdicts = append(cdicts, cd) + dd, err := NewDDictByRef(dict) + if err != nil { + t.Fatalf("cannot create DDict: %s", err) + } + ddicts = append(ddicts, dd) + } + + // Build data for the compression. + var bb bytes.Buffer + i := 0 + for bb.Len() < 1e4 { + fmt.Fprintf(&bb, "%d sample line this is %d", bb.Len(), i) + i++ + } + data := bb.Bytes() + + // Run concurrent goroutines compressing/decompressing with distinct dicts. + ch := make(chan error, len(cdicts)) + for i := 0; i < cap(ch); i++ { + go func(cd *CDict, dd *DDict) { + ch <- testCompressDecompressDistinctConcurrentDicts(cd, dd, data) + }(cdicts[i], ddicts[i]) + } + + // Wait for goroutines to finish. + for i := 0; i < cap(ch); i++ { + select { + case err := <-ch: + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + case <-time.After(time.Second): + t.Fatalf("timeout") + } + } +} + func testCompressDecompressDistinctConcurrentDicts(cd *CDict, dd *DDict, data []byte) error { var compressedData, decompressedData []byte for j := 0; j < 10; j++ { diff --git a/gozstd_timing_test.go b/gozstd_timing_test.go index 450dad6..9143f10 100644 --- a/gozstd_timing_test.go +++ b/gozstd_timing_test.go @@ -19,16 +19,33 @@ func BenchmarkDecompressDict(b *testing.B) { b.Run(fmt.Sprintf("blockSize_%d", blockSize), func(b *testing.B) { for _, level := range benchCompressionLevels { b.Run(fmt.Sprintf("level_%d", level), func(b *testing.B) { - benchmarkDecompressDict(b, blockSize, level) + benchmarkDecompressDict(b, blockSize, level, false) }) } }) } } -func benchmarkDecompressDict(b *testing.B, blockSize, level int) { +func BenchmarkDecompressDictByRef(b *testing.B) { + for _, blockSize := range benchBlockSizes { + b.Run(fmt.Sprintf("blockSize_%d", blockSize), func(b *testing.B) { + for _, level := range benchCompressionLevels { + b.Run(fmt.Sprintf("level_%d", level), func(b *testing.B) { + benchmarkDecompressDict(b, blockSize, level, true) + }) + } + }) + } +} + +func benchmarkDecompressDict(b *testing.B, blockSize, level int, byReference bool) { block := newBenchString(blockSize) - bd := getBenchDicts(level) + var bd *benchDicts + if byReference { + bd = getBenchDictsByRef(level) + } else { + bd = getBenchDicts(level) + } src := CompressDict(nil, block, bd.cd) b.Logf("compressionRatio: %f", float64(len(block))/float64(len(src))) b.ReportAllocs() @@ -54,16 +71,33 @@ func BenchmarkCompressDict(b *testing.B) { b.Run(fmt.Sprintf("blockSize_%d", blockSize), func(b *testing.B) { for _, level := range benchCompressionLevels { b.Run(fmt.Sprintf("level_%d", level), func(b *testing.B) { - benchmarkCompressDict(b, blockSize, level) + benchmarkCompressDict(b, blockSize, level, false) + }) + } + }) + } +} + +func BenchmarkCompressDictByRef(b *testing.B) { + for _, blockSize := range benchBlockSizes { + b.Run(fmt.Sprintf("blockSize_%d", blockSize), func(b *testing.B) { + for _, level := range benchCompressionLevels { + b.Run(fmt.Sprintf("level_%d", level), func(b *testing.B) { + benchmarkCompressDict(b, blockSize, level, true) }) } }) } } -func benchmarkCompressDict(b *testing.B, blockSize, level int) { +func benchmarkCompressDict(b *testing.B, blockSize, level int, byReference bool) { src := newBenchString(blockSize) - bd := getBenchDicts(level) + var bd *benchDicts + if byReference { + bd = getBenchDictsByRef(level) + } else { + bd = getBenchDicts(level) + } b.ReportAllocs() b.SetBytes(int64(len(src))) b.ResetTimer() @@ -89,15 +123,45 @@ func getBenchDicts(level int) *benchDicts { return tmp } +func getBenchDictsByRef(level int) *benchDicts { + benchDictsByRefLock.Lock() + tmp := benchDictsByRefMap[level] + if tmp == nil { + tmp = newBenchDictsByRef(level) + benchDictsByRefMap[level] = tmp + } + benchDictsByRefLock.Unlock() + return tmp +} + type benchDicts struct { cd *CDict dd *DDict } -var benchDictsMap = make(map[int]*benchDicts) -var benchDictsLock sync.Mutex +var ( + benchDictsMap = make(map[int]*benchDicts) + benchDictsLock sync.Mutex + benchDictsByRefMap = make(map[int]*benchDicts) + benchDictsByRefLock sync.Mutex +) func newBenchDicts(level int) *benchDicts { + return createNewBenchDicts(NewCDictLevel, NewDDict, level) +} + +func newBenchDictsByRef(level int) *benchDicts { + return createNewBenchDicts(NewCDictLevelByRef, NewDDictByRef, level) +} + +// Make it easier to toggle between copying the underlying bytes on creation +// vs. sharing by reference. +type ( + cdictFactory func(dict []byte, level int) (*CDict, error) + ddictFactory func(dict []byte) (*DDict, error) +) + +func createNewBenchDicts(createCDict cdictFactory, createDDict ddictFactory, level int) *benchDicts { var samples [][]byte for i := 0; i < 300; i++ { sampleLen := rand.Intn(300) @@ -106,11 +170,11 @@ func newBenchDicts(level int) *benchDicts { } dict := BuildDict(samples, 32*1024) - cd, err := NewCDictLevel(dict, level) + cd, err := createCDict(dict, level) if err != nil { panic(fmt.Errorf("cannot create CDict: %s", err)) } - dd, err := NewDDict(dict) + dd, err := createDDict(dict) if err != nil { panic(fmt.Errorf("cannot create DDict: %s", err)) } From 1bec57304c6b488d0975fc01943fd0a38fb6bcac Mon Sep 17 00:00:00 2001 From: Codey Oxley Date: Fri, 19 Apr 2024 18:03:09 -0400 Subject: [PATCH 2/3] use runtime.Pinner instead of storing input as a struct member --- dict.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/dict.go b/dict.go index 0594db3..16122b6 100644 --- a/dict.go +++ b/dict.go @@ -111,9 +111,8 @@ var buildDictLock sync.Mutex // // A single CDict may be re-used in concurrently running goroutines. type CDict struct { - p *C.ZSTD_CDict - // Keep the underlying bytes alive from the GC's point of view (if passing by ref) - input []byte + p *C.ZSTD_CDict + pinner runtime.Pinner compressionLevel int } @@ -168,12 +167,17 @@ func NewCDictLevelByRef(dict []byte, compressionLevel int) (*CDict, error) { return nil, fmt.Errorf("dict cannot be empty") } + // Pin the backing array of the input - it must not move while C.ZSTD_CDict + // is alive. + var pinner runtime.Pinner + pinner.Pin(&dict[0]) + cd := &CDict{ p: C.ZSTD_createCDict_byReference_wrapper( C.uintptr_t(uintptr(unsafe.Pointer(&dict[0]))), C.size_t(len(dict)), C.int(compressionLevel)), - input: dict, + pinner: pinner, compressionLevel: compressionLevel, } // Prevent from GC'ing of dict during CGO call above. @@ -192,7 +196,7 @@ func (cd *CDict) Release() { result := C.ZSTD_freeCDict(cd.p) ensureNoError("ZSTD_freeCDict", result) cd.p = nil - cd.input = nil + cd.pinner.Unpin() } func freeCDict(v interface{}) { @@ -203,9 +207,8 @@ func freeCDict(v interface{}) { // // A single DDict may be re-used in concurrently running goroutines. type DDict struct { - p *C.ZSTD_DDict - // Keep the underlying bytes alive from the GC's point of view (if passing by ref) - input []byte + p *C.ZSTD_DDict + pinner runtime.Pinner } // NewDDict creates new DDict from the given dict. @@ -238,11 +241,16 @@ func NewDDictByRef(dict []byte) (*DDict, error) { return nil, fmt.Errorf("dict cannot be empty") } + // Pin the backing array of the input - it must not move while C.ZSTD_DDict + // is alive. + var pinner runtime.Pinner + pinner.Pin(&dict[0]) + dd := &DDict{ p: C.ZSTD_createDDict_byReference_wrapper( C.uintptr_t(uintptr(unsafe.Pointer(&dict[0]))), C.size_t(len(dict))), - input: dict, + pinner: pinner, } // Prevent from GC'ing of dict during CGO call above. runtime.KeepAlive(dict) @@ -261,7 +269,7 @@ func (dd *DDict) Release() { result := C.ZSTD_freeDDict(dd.p) ensureNoError("ZSTD_freeDDict", result) dd.p = nil - dd.input = nil + dd.pinner.Unpin() } func freeDDict(v interface{}) { From 3f7b06978aa39afa1709ebbcd00e41e06968f91a Mon Sep 17 00:00:00 2001 From: Codey Oxley Date: Wed, 29 May 2024 09:24:38 -0400 Subject: [PATCH 3/3] remove runtime.KeepAlive in the ref variants due to pinning --- dict.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/dict.go b/dict.go index 16122b6..590001f 100644 --- a/dict.go +++ b/dict.go @@ -180,8 +180,7 @@ func NewCDictLevelByRef(dict []byte, compressionLevel int) (*CDict, error) { pinner: pinner, compressionLevel: compressionLevel, } - // Prevent from GC'ing of dict during CGO call above. - runtime.KeepAlive(dict) + // No need for runtime.KeepAlive due to explicit pinning runtime.SetFinalizer(cd, freeCDict) return cd, nil } @@ -252,8 +251,7 @@ func NewDDictByRef(dict []byte) (*DDict, error) { C.size_t(len(dict))), pinner: pinner, } - // Prevent from GC'ing of dict during CGO call above. - runtime.KeepAlive(dict) + // No need for runtime.KeepAlive due to explicit pinning runtime.SetFinalizer(dd, freeDDict) return dd, nil }