From 46a2f51ea08f88779ae8ee8092984cbc83a7593f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcin=20G=C3=B3rzy=C5=84ski?= Date: Wed, 9 Oct 2024 18:08:12 +0200 Subject: [PATCH] Google Cloud Storage leverage CustomTime and Object Lifecycle Rules and implementation of StoreCleaner interface (#20) --- cachestore.go | 6 +++ gcstorage/gcstorage.go | 99 ++++++++++++++++++++++++++++++------- gcstorage/gcstorage_test.go | 48 ++++++++++++++++++ 3 files changed, 136 insertions(+), 17 deletions(-) diff --git a/cachestore.go b/cachestore.go index 3e15e4c..dff81d8 100644 --- a/cachestore.go +++ b/cachestore.go @@ -65,6 +65,12 @@ type Store[V any] interface { GetOrSetWithLockEx(ctx context.Context, key string, getter func(context.Context, string) (V, error), ttl time.Duration) (V, error) } +type StoreCleaner interface { + // CleanExpiredEvery cleans expired keys every d duration. + // If onError is not nil, it will be called when an error occurs. + CleanExpiredEvery(ctx context.Context, d time.Duration, onError func(err error)) +} + type Backend interface { Apply(*StoreOptions) } diff --git a/gcstorage/gcstorage.go b/gcstorage/gcstorage.go index 9bcf37c..b324bf9 100644 --- a/gcstorage/gcstorage.go +++ b/gcstorage/gcstorage.go @@ -1,12 +1,12 @@ package gcstorage import ( - "cmp" "context" "encoding/json" "errors" "fmt" "io" + "strings" "time" "cloud.google.com/go/storage" @@ -17,12 +17,15 @@ import ( const DefaultTTL = time.Second * 24 * 60 * 60 // 1 day in seconds +const keySuffix = ".cachestore" + type cacheObject[T any] struct { Object T `json:"object"` ExpiresAt time.Time `json:"expires_at"` } var _ cachestore.Store[any] = &GCStorage[any]{} +var _ cachestore.StoreCleaner = &GCStorage[any]{} type GCStorage[V any] struct { keyPrefix string @@ -50,6 +53,20 @@ func NewWithBackend[V any](backend cachestore.Backend, opts ...cachestore.StoreO return New[V](cfg, cfg.StoreOptions) } +// New creates a new GCStorage instance. +// +// The GCStorage instance is a cachestore.Store implementation that uses Google Cloud Storage as the backend. The object +// is serialized to JSON and stored in the bucket. The key is prefixed with the keyPrefix and the object is stored in the +// bucket with the keyPrefix + key + `.cachestore`. The object is stored with a custom time of the expiry time. +// +// Please note that: The Google Cloud Storage bucket must have proper Object Lifecycle Management rules to delete +// the objects after expiry automatically. In case the bucket does not have Object Lifecycle Management rules, the +// objects will not be deleted. In that case the objects will be deleted only if CleanExpiredEvery is working in the +// background. +// +// Required Lifecycle Management Rule for automatic deletion: +// 1. Delete object 0+ days since object's custom time +// 2. Name matches suffix '.cachestore' func New[V any](cfg *Config, opts ...cachestore.StoreOptions) (cachestore.Store[V], error) { for _, opt := range opts { opt.Apply(&cfg.StoreOptions) @@ -72,14 +89,13 @@ func New[V any](cfg *Config, opts ...cachestore.StoreOptions) (cachestore.Store[ return &GCStorage[V]{ keyPrefix: cfg.KeyPrefix, defaultKeyExpiry: cfg.DefaultKeyExpiry, - client: client, bucketHandle: client.Bucket(cfg.Bucket), }, nil } func (g *GCStorage[V]) Exists(ctx context.Context, key string) (bool, error) { - attr, err := g.bucketHandle.Object(g.keyPrefix + key).Attrs(ctx) + attr, err := g.bucketHandle.Object(g.objectKey(key)).Attrs(ctx) if err != nil { if errors.Is(err, storage.ErrObjectNotExist) { return false, nil @@ -87,14 +103,8 @@ func (g *GCStorage[V]) Exists(ctx context.Context, key string) (bool, error) { return false, fmt.Errorf("cachestore/gcstorage: get attrs returned error: %w", err) } - if attr.Metadata["expires_at"] != "" { - expiresAt, err := time.Parse(time.RFC3339, attr.Metadata["expires_at"]) - if err != nil { - return false, fmt.Errorf("cachestore/gcstorage: time parse returned error: %w", err) - } - if !expiresAt.IsZero() && expiresAt.Before(time.Now()) { - return false, nil - } + if !attr.CustomTime.IsZero() && attr.CustomTime.Before(time.Now()) { + return false, nil } return true, nil } @@ -124,12 +134,11 @@ func (g *GCStorage[V]) SetEx(ctx context.Context, key string, value V, ttl time. return err } - obj := g.bucketHandle.Object(g.keyPrefix + key) + obj := g.bucketHandle.Object(g.objectKey(key)) w := obj.NewWriter(ctx) w.ObjectAttrs.ContentType = "application/json" - w.ObjectAttrs.Metadata = map[string]string{ - "expires_at": expiresAt.Format(time.RFC3339), - } + w.ObjectAttrs.CustomTime = expiresAt + if _, err := w.Write(data); err != nil { _ = w.Close() return fmt.Errorf("cachestore/gcstorage: write returned error: %w", err) @@ -146,7 +155,7 @@ func (g *GCStorage[V]) Get(ctx context.Context, key string) (V, bool, error) { } func (g *GCStorage[V]) GetEx(ctx context.Context, key string) (V, *time.Duration, bool, error) { - obj := g.bucketHandle.Object(g.keyPrefix + key) + obj := g.bucketHandle.Object(g.objectKey(key)) r, err := obj.NewReader(ctx) if err != nil { if errors.Is(err, storage.ErrObjectNotExist) { @@ -204,7 +213,7 @@ func (g *GCStorage[V]) BatchGet(ctx context.Context, keys []string) ([]V, []bool } func (g *GCStorage[V]) Delete(ctx context.Context, key string) error { - return g.bucketHandle.Object(g.keyPrefix + key).Delete(ctx) + return g.bucketHandle.Object(g.objectKey(key)).Delete(ctx) } func (g *GCStorage[V]) DeletePrefix(ctx context.Context, keyPrefix string) error { @@ -228,6 +237,10 @@ func (g *GCStorage[V]) DeletePrefix(ctx context.Context, keyPrefix string) error return fmt.Errorf("cachestore/gcstorage: it next error: %w", err) } + if !strings.HasSuffix(objAttrs.Name, keySuffix) { + continue + } + if err = g.bucketHandle.Object(objAttrs.Name).Delete(ctx); err != nil { return fmt.Errorf("cachestore/gcstorage: object delete error: %w", err) } @@ -250,6 +263,10 @@ func (g *GCStorage[V]) ClearAll(ctx context.Context) error { return fmt.Errorf("cachestore/gcstorage: it next error: %w", err) } + if !strings.HasSuffix(objAttrs.Name, keySuffix) { + continue + } + if err = g.bucketHandle.Object(objAttrs.Name).Delete(ctx); err != nil { return fmt.Errorf("cachestore/gcstorage: object delete error: %w", err) } @@ -265,6 +282,53 @@ func (g *GCStorage[V]) GetOrSetWithLockEx(ctx context.Context, key string, gette return *new(V), cachestore.ErrNotSupported } +func (g *GCStorage[V]) CleanExpiredEvery(ctx context.Context, d time.Duration, onError func(err error)) { + ticker := time.NewTicker(d) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + objIt := g.bucketHandle.Objects(ctx, &storage.Query{ + Prefix: g.keyPrefix, + }) + + for { + objAttrs, err := objIt.Next() + if err != nil { + if errors.Is(err, iterator.Done) { + break + } + return + } + + if !strings.HasSuffix(objAttrs.Name, keySuffix) { + continue + } + + if objAttrs.CustomTime.IsZero() { + continue + } + + if objAttrs.CustomTime.Before(time.Now()) { + if err = g.bucketHandle.Object(objAttrs.Name).Delete(ctx); err != nil { + if onError != nil { + onError(fmt.Errorf("cachestore/gcstorage: delete error: %w", err)) + } + continue + } + } + } + } + } +} + +func (g *GCStorage[V]) objectKey(key string) string { + return fmt.Sprintf("%s%s%s", g.keyPrefix, key, keySuffix) +} + func serialize[V any](value V) ([]byte, error) { return json.Marshal(value) } @@ -276,4 +340,5 @@ func deserialize[V any](data []byte) (V, error) { } return out, nil + } diff --git a/gcstorage/gcstorage_test.go b/gcstorage/gcstorage_test.go index 83e9d4c..1a95cc5 100644 --- a/gcstorage/gcstorage_test.go +++ b/gcstorage/gcstorage_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "cloud.google.com/go/storage" + "github.com/goware/cachestore" "github.com/stretchr/testify/require" ) @@ -101,6 +103,23 @@ func TestGCStorage(t *testing.T) { require.False(t, ok) }) + t.Run("Exists_Expiry", func(t *testing.T) { + ctx := context.Background() + + err = store.SetEx(ctx, "foo", "bar", 1*time.Second) + require.NoError(t, err) + + ok, err := store.Exists(ctx, "foo") + require.NoError(t, err) + require.True(t, ok) + + time.Sleep(2 * time.Second) + + ok, err = store.Exists(ctx, "foo") + require.NoError(t, err) + require.False(t, ok) + }) + t.Run("Delete", func(t *testing.T) { ctx := context.Background() @@ -181,4 +200,33 @@ func TestGCStorage(t *testing.T) { require.Equal(t, []string{"", "", ""}, values) require.Equal(t, []bool{false, false, false}, exists) }) + + t.Run("CleanExpiredEvery", func(t *testing.T) { + ctx := context.Background() + + // Set a key with 1 second expiry + err = store.SetEx(ctx, "foo", "bar", 1*time.Second) + require.NoError(t, err) + + time.Sleep(2 * time.Second) + + // Start the cleaner + cCtx, cancel := context.WithCancel(ctx) + defer cancel() + + storeCleaner, ok := store.(cachestore.StoreCleaner) + require.True(t, ok) + + go storeCleaner.CleanExpiredEvery(cCtx, 1*time.Second, nil) + + // Wait for cleaner to clean the expired key + time.Sleep(2 * time.Second) + + // Check if the key is deleted + gcsClient, err := storage.NewClient(ctx) + require.NoError(t, err) + + _, err = gcsClient.Bucket("my-bucket").Object("test/foo.cachestore").Attrs(ctx) + require.ErrorIs(t, err, storage.ErrObjectNotExist) + }) }