Skip to content

Commit

Permalink
Fix leak in read cache tracking
Browse files Browse the repository at this point in the history
A race could cause the read cache tracking to overcount, eventually
evicting the entire read cache, and degenerating into always reading
from disk.
  • Loading branch information
cberner committed Oct 2, 2024
1 parent c620e53 commit 3dcdc23
Showing 1 changed file with 65 additions and 3 deletions.
68 changes: 65 additions & 3 deletions src/tree_store/page_store/cached_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,11 @@ impl PagedCachedFile {
if cache_size + buffer.len() <= self.max_read_cache_bytes {
let cache_slot: usize = (offset % Self::lock_stripes()).try_into().unwrap();
let mut lock = self.read_cache[cache_slot].write().unwrap();
lock.insert(*offset, buffer, CachePriority::High);
if let Some(replaced) = lock.insert(*offset, buffer, CachePriority::High) {
// A race could cause us to replace an existing buffer
self.read_cache_bytes
.fetch_sub(replaced.len(), Ordering::AcqRel);
}
} else {
self.read_cache_bytes
.fetch_sub(buffer.len(), Ordering::AcqRel);
Expand All @@ -369,7 +373,11 @@ impl PagedCachedFile {
if cache_size + buffer.len() <= self.max_read_cache_bytes {
let cache_slot: usize = (offset % Self::lock_stripes()).try_into().unwrap();
let mut lock = self.read_cache[cache_slot].write().unwrap();
lock.insert(*offset, buffer, CachePriority::Low);
if let Some(replaced) = lock.insert(*offset, buffer, CachePriority::Low) {
// A race could cause us to replace an existing buffer
self.read_cache_bytes
.fetch_sub(replaced.len(), Ordering::AcqRel);
}
} else {
self.read_cache_bytes
.fetch_sub(buffer.len(), Ordering::AcqRel);
Expand Down Expand Up @@ -443,7 +451,15 @@ impl PagedCachedFile {
let buffer: Arc<[u8]> = self.read_direct(offset, len)?.into();
let cache_size = self.read_cache_bytes.fetch_add(len, Ordering::AcqRel);
let mut write_lock = self.read_cache[cache_slot].write().unwrap();
write_lock.insert(offset, buffer.clone(), cache_policy(&buffer));
let cache_size = if let Some(replaced) =
write_lock.insert(offset, buffer.clone(), cache_policy(&buffer))
{
// A race could cause us to replace an existing buffer
self.read_cache_bytes
.fetch_sub(replaced.len(), Ordering::AcqRel)
} else {
cache_size
};
let mut removed = 0;
if cache_size + len > self.max_read_cache_bytes {
while removed < len {
Expand Down Expand Up @@ -566,3 +582,49 @@ impl PagedCachedFile {
})
}
}

#[cfg(test)]
mod test {
use crate::backends::InMemoryBackend;
use crate::tree_store::page_store::cached_file::PagedCachedFile;
use crate::tree_store::{CachePriority, PageHint};
use crate::StorageBackend;
use std::sync::atomic::Ordering;
use std::sync::Arc;

#[test]
fn cache_leak() {
let backend = InMemoryBackend::new();
backend.set_len(1024).unwrap();
let cached_file = PagedCachedFile::new(Box::new(backend), 128, 1024, 128).unwrap();
let cached_file = Arc::new(cached_file);

let t1 = {
let cached_file = cached_file.clone();
std::thread::spawn(move || {
for _ in 0..1000 {
cached_file
.read(0, 128, PageHint::None, CachePriority::default_btree)
.unwrap();
cached_file.invalidate_cache(0, 128);
}
})
};
let t2 = {
let cached_file = cached_file.clone();
std::thread::spawn(move || {
for _ in 0..1000 {
cached_file
.read(0, 128, PageHint::None, CachePriority::default_btree)
.unwrap();
cached_file.invalidate_cache(0, 128);
}
})
};

t1.join().unwrap();
t2.join().unwrap();
cached_file.invalidate_cache(0, 128);
assert_eq!(cached_file.read_cache_bytes.load(Ordering::Acquire), 0);
}
}

0 comments on commit 3dcdc23

Please sign in to comment.