Skip to content

Commit

Permalink
fix race when using entry pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Yiling-J committed Oct 28, 2024
1 parent d40c55f commit 5fc37f1
Showing 1 changed file with 19 additions and 13 deletions.
32 changes: 19 additions & 13 deletions internal/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,6 @@ func (s *Store[K, V]) setShard(shard *Shard[K, V], hash uint64, key K, value V,
entry.expire.Store(expire)
entry.weight.Store(cost)
entry.policyWeight = 0
entry.flag = Flag{}
s.setEntry(hash, shard, cost, entry, nvmClean)
return shard, entry, true

Expand Down Expand Up @@ -474,9 +473,10 @@ func (s *Store[K, V]) index(key K) (uint64, int) {
}

func (s *Store[K, V]) postDelete(entry *Entry[K, V]) {
var zero V
entry.value = zero
if s.entryPool != nil {
var zero V
entry.value = zero
entry.flag = Flag{}
s.entryPool.Put(entry)
}
}
Expand Down Expand Up @@ -512,12 +512,15 @@ func (s *Store[K, V]) removeEntry(entry *Entry[K, V], reason RemoveReason) {
}

if rn <= s.probability {
s.secondaryCacheBuf <- SecondaryCacheItem[K, V]{
select {
case s.secondaryCacheBuf <- SecondaryCacheItem[K, V]{
entry: entry,
reason: reason,
shard: shard,
}:
return
default:
}
return
}
}
shard.mu.Lock()
Expand Down Expand Up @@ -606,6 +609,14 @@ func (s *Store[K, V]) sinkWrite(item WriteBufItem[K, V]) {
case EVICTE:
s.removeEntry(entry, EVICTED)
case UPDATE:
// recheck hash if entry pool enabled to avoid race
if s.entryPool != nil {
hh := s.hasher.hash(entry.key)
if hh != item.hash {
return
}
}

// update entry policy weight
entry.policyWeight += item.costChange

Expand All @@ -619,13 +630,6 @@ func (s *Store[K, V]) sinkWrite(item WriteBufItem[K, V]) {
}

if item.costChange != 0 {
// recheck hash if entry pool enabled to avoid race
if s.entryPool != nil {
hh := s.hasher.hash(entry.key)
if hh != item.hash {
return
}
}
// update policy weight
s.policy.UpdateCost(entry, item.costChange)
}
Expand Down Expand Up @@ -836,10 +840,12 @@ func (s *Store[K, V]) processSecondary() {
if item.reason == EVICTED {
item.shard.mu.Lock()
deleted := item.shard.delete(item.entry)
item.shard.mu.Unlock()
if deleted {
s.policyMu.Lock()
s.postDelete(item.entry)
s.policyMu.Unlock()
}
item.shard.mu.Unlock()
}
} else {
item.shard.mu.RUnlock(tk)
Expand Down

0 comments on commit 5fc37f1

Please sign in to comment.