From c7f0b0e2f7870b8271869a539595aa8eb1cd1fff Mon Sep 17 00:00:00 2001 From: canoriz <83388275+canoriz@users.noreply.github.com> Date: Thu, 16 Nov 2023 14:51:48 +0800 Subject: [PATCH] Add expirable LRU finalizer to fix expirable LRU's goroutine leak --- expirable/expirable_lru.go | 66 ++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 24 deletions(-) diff --git a/expirable/expirable_lru.go b/expirable/expirable_lru.go index 89978d6..3d16c03 100644 --- a/expirable/expirable_lru.go +++ b/expirable/expirable_lru.go @@ -4,6 +4,7 @@ package expirable import ( + "runtime" "sync" "time" @@ -13,20 +14,35 @@ import ( // EvictCallback is used to get a callback when a cache entry is evicted type EvictCallback[K comparable, V any] func(key K, value V) -// LRU implements a thread-safe LRU with expirable entries. +// This is a wrapper around lru[K,V] to make delete-expired routine +// stop when LRU is garbage collected. +// +// This trick ensures that the deleteExpired goroutine (is +// running DeleteExpired on lru forever) does not keep the +// returned LRU object from being garbage collected. +// When it is garbage collected, the finalizer of LRU +// stops the deleteExpired goroutine, after which lru can +// be collected. type LRU[K comparable, V any] struct { + *lru[K, V] +} + +// LRU implements a thread-safe LRU with expirable entries. +type lru[K comparable, V any] struct { size int evictList *internal.LruList[K, V] items map[K]*internal.Entry[K, V] onEvict EvictCallback[K, V] // expirable options - mu sync.Mutex + mu sync.Mutex + ttl time.Duration done chan struct{} // buckets for expiration buckets []bucket[K, V] + // uint8 because it's number between 0 and numBuckets nextCleanupBucket uint8 } @@ -59,7 +75,7 @@ func NewLRU[K comparable, V any](size int, onEvict EvictCallback[K, V], ttl time ttl = noEvictionTTL } - res := LRU[K, V]{ + res := lru[K, V]{ ttl: ttl, size: size, evictList: internal.NewList[K, V](), @@ -75,9 +91,6 @@ func NewLRU[K comparable, V any](size int, onEvict EvictCallback[K, V], ttl time } // enable deleteExpired() running in separate goroutine for cache with non-zero TTL - // - // Important: done channel is never closed, so deleteExpired() goroutine will never exit, - // it's decided to add functionality to close it in the version later than v2. if res.ttl != noEvictionTTL { go func(done <-chan struct{}) { ticker := time.NewTicker(res.ttl / numBuckets) @@ -92,12 +105,17 @@ func NewLRU[K comparable, V any](size int, onEvict EvictCallback[K, V], ttl time } }(res.done) } - return &res + + wrap := LRU[K, V]{&res} + runtime.SetFinalizer(&wrap, func(c *LRU[K, V]) { + close(c.lru.done) + }) + return &wrap } // Purge clears the cache completely. // onEvict is called for each evicted key. -func (c *LRU[K, V]) Purge() { +func (c *lru[K, V]) Purge() { c.mu.Lock() defer c.mu.Unlock() for k, v := range c.items { @@ -117,7 +135,7 @@ func (c *LRU[K, V]) Purge() { // Add adds a value to the cache. Returns true if an eviction occurred. // Returns false if there was no eviction: the item was already in the cache, // or the size was not exceeded. -func (c *LRU[K, V]) Add(key K, value V) (evicted bool) { +func (c *lru[K, V]) Add(key K, value V) (evicted bool) { c.mu.Lock() defer c.mu.Unlock() now := time.Now() @@ -146,7 +164,7 @@ func (c *LRU[K, V]) Add(key K, value V) (evicted bool) { } // Get looks up a key's value from the cache. -func (c *LRU[K, V]) Get(key K) (value V, ok bool) { +func (c *lru[K, V]) Get(key K) (value V, ok bool) { c.mu.Lock() defer c.mu.Unlock() var ent *internal.Entry[K, V] @@ -163,7 +181,7 @@ func (c *LRU[K, V]) Get(key K) (value V, ok bool) { // Contains checks if a key is in the cache, without updating the recent-ness // or deleting it for being stale. -func (c *LRU[K, V]) Contains(key K) (ok bool) { +func (c *lru[K, V]) Contains(key K) (ok bool) { c.mu.Lock() defer c.mu.Unlock() _, ok = c.items[key] @@ -172,7 +190,7 @@ func (c *LRU[K, V]) Contains(key K) (ok bool) { // Peek returns the key value (or undefined if not found) without updating // the "recently used"-ness of the key. -func (c *LRU[K, V]) Peek(key K) (value V, ok bool) { +func (c *lru[K, V]) Peek(key K) (value V, ok bool) { c.mu.Lock() defer c.mu.Unlock() var ent *internal.Entry[K, V] @@ -188,7 +206,7 @@ func (c *LRU[K, V]) Peek(key K) (value V, ok bool) { // Remove removes the provided key from the cache, returning if the // key was contained. -func (c *LRU[K, V]) Remove(key K) bool { +func (c *lru[K, V]) Remove(key K) bool { c.mu.Lock() defer c.mu.Unlock() if ent, ok := c.items[key]; ok { @@ -199,7 +217,7 @@ func (c *LRU[K, V]) Remove(key K) bool { } // RemoveOldest removes the oldest item from the cache. -func (c *LRU[K, V]) RemoveOldest() (key K, value V, ok bool) { +func (c *lru[K, V]) RemoveOldest() (key K, value V, ok bool) { c.mu.Lock() defer c.mu.Unlock() if ent := c.evictList.Back(); ent != nil { @@ -210,7 +228,7 @@ func (c *LRU[K, V]) RemoveOldest() (key K, value V, ok bool) { } // GetOldest returns the oldest entry -func (c *LRU[K, V]) GetOldest() (key K, value V, ok bool) { +func (c *lru[K, V]) GetOldest() (key K, value V, ok bool) { c.mu.Lock() defer c.mu.Unlock() if ent := c.evictList.Back(); ent != nil { @@ -220,7 +238,7 @@ func (c *LRU[K, V]) GetOldest() (key K, value V, ok bool) { } // Keys returns a slice of the keys in the cache, from oldest to newest. -func (c *LRU[K, V]) Keys() []K { +func (c *lru[K, V]) Keys() []K { c.mu.Lock() defer c.mu.Unlock() keys := make([]K, 0, len(c.items)) @@ -232,7 +250,7 @@ func (c *LRU[K, V]) Keys() []K { // Values returns a slice of the values in the cache, from oldest to newest. // Expired entries are filtered out. -func (c *LRU[K, V]) Values() []V { +func (c *lru[K, V]) Values() []V { c.mu.Lock() defer c.mu.Unlock() values := make([]V, len(c.items)) @@ -249,14 +267,14 @@ func (c *LRU[K, V]) Values() []V { } // Len returns the number of items in the cache. -func (c *LRU[K, V]) Len() int { +func (c *lru[K, V]) Len() int { c.mu.Lock() defer c.mu.Unlock() return c.evictList.Length() } // Resize changes the cache size. Size of 0 means unlimited. -func (c *LRU[K, V]) Resize(size int) (evicted int) { +func (c *lru[K, V]) Resize(size int) (evicted int) { c.mu.Lock() defer c.mu.Unlock() if size <= 0 { @@ -287,14 +305,14 @@ func (c *LRU[K, V]) Resize(size int) (evicted int) { // } // removeOldest removes the oldest item from the cache. Has to be called with lock! -func (c *LRU[K, V]) removeOldest() { +func (c *lru[K, V]) removeOldest() { if ent := c.evictList.Back(); ent != nil { c.removeElement(ent) } } // removeElement is used to remove a given list element from the cache. Has to be called with lock! -func (c *LRU[K, V]) removeElement(e *internal.Entry[K, V]) { +func (c *lru[K, V]) removeElement(e *internal.Entry[K, V]) { c.evictList.Remove(e) delete(c.items, e.Key) c.removeFromBucket(e) @@ -305,7 +323,7 @@ func (c *LRU[K, V]) removeElement(e *internal.Entry[K, V]) { // deleteExpired deletes expired records from the oldest bucket, waiting for the newest entry // in it to expire first. -func (c *LRU[K, V]) deleteExpired() { +func (c *lru[K, V]) deleteExpired() { c.mu.Lock() bucketIdx := c.nextCleanupBucket timeToExpire := time.Until(c.buckets[bucketIdx].newestEntry) @@ -323,7 +341,7 @@ func (c *LRU[K, V]) deleteExpired() { } // addToBucket adds entry to expire bucket so that it will be cleaned up when the time comes. Has to be called with lock! -func (c *LRU[K, V]) addToBucket(e *internal.Entry[K, V]) { +func (c *lru[K, V]) addToBucket(e *internal.Entry[K, V]) { bucketID := (numBuckets + c.nextCleanupBucket - 1) % numBuckets e.ExpireBucket = bucketID c.buckets[bucketID].entries[e.Key] = e @@ -333,6 +351,6 @@ func (c *LRU[K, V]) addToBucket(e *internal.Entry[K, V]) { } // removeFromBucket removes the entry from its corresponding bucket. Has to be called with lock! -func (c *LRU[K, V]) removeFromBucket(e *internal.Entry[K, V]) { +func (c *lru[K, V]) removeFromBucket(e *internal.Entry[K, V]) { delete(c.buckets[e.ExpireBucket].entries, e.Key) }