Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race Condition in Cache Refresh #1690

Closed
seashore115 opened this issue May 9, 2024 · 9 comments
Closed

Race Condition in Cache Refresh #1690

seashore115 opened this issue May 9, 2024 · 9 comments

Comments

@seashore115
Copy link

I currently utilize the caffeine cache to achieve no cache missing during the cache refresh. My expectation is during the cache refresh, it always return stales value and after cache refresh completes, the cache would return latest value. The cache is supposed to not miss and only load once. However, when I tested this code below, it looks like cache refresh twice or cache miss triggered another load.

Here is the failure test case:

private static int callCount = 0;
private final LoadingCache<Integer, Integer> cache = Caffeine.newBuilder()
        .removalListener(this::listenRemoval)
        .executor(MoreExecutors.directExecutor())
        .recordStats()
        .build(new CacheLoader<>() {
            @Override
            public  Integer load(Integer _key) {
                Integer val = getValue();
                System.out.println("Loading value: " + val);
                return val;
            }
        });

private void listenRemoval(Integer key, Integer _value, RemovalCause cause) {
    // We don't want to reload if the value was just replaced
    if (cause.wasEvicted() || cause == RemovalCause.EXPLICIT) {
        cache.refresh(key);
    }
}

private Integer getValue() {
        callCount++;
        return callCount;
}

@RepeatedTest(20)
public void testCacheEvictionRefresh() throws BrokenBarrierException, InterruptedException {
    cache.get(1);
    assertThat(cache.stats().loadCount()).isEqualTo(1);

    final CyclicBarrier gate = new CyclicBarrier(7);

    Thread t1 = new Thread(){
        public void run(){
            try {
                gate.await();
                System.out.println("t1 Cache retrieval:" + cache.get(1));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            //do stuff
        }};
    Thread t2 = new Thread(){
        public void run(){
            try {
                gate.await();
                System.out.println("t2 Cache retrieval:" + cache.get(1));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            //do stuff
        }};
    Thread t6 = new Thread(){
        public void run(){
            try {
                gate.await();
                Thread.sleep(5);
                System.out.println("t6 Cache retrieval:" + cache.get(1));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            //do stuff
        }};
    Thread t3 = new Thread(){
        public void run(){
            try {
                gate.await();
                Thread.sleep(5);
                System.out.println("t3 Cache retrieval:" + cache.get(1));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            //do stuff
        }};
    Thread t4 = new Thread(){
        public void run(){
            try {
                gate.await();
                System.out.println("Cache Invalidate start");
                cache.invalidateAll();
                System.out.println("Cache Invalidate end");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            //do stuff
        }};
    Thread t5 = new Thread(){
        public void run(){
            try {
                gate.await();
                System.out.println("t5 Cache retrieval:" + cache.get(1));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            //do stuff
        }};

    t1.start();
    t2.start();
    t3.start();
    t4.start();
    t5.start();
    t6.start();

    gate.await();
    assertThat(cache.stats().loadCount()).isEqualTo(1);
    Thread.sleep(2000);
}

The result shows
Loading value: 1
Cache Invalidate start
t1 Cache retrieval:1
t2 Cache retrieval:1
t5 Cache retrieval:1
Loading value: 2
Loading value: 3
t3 Cache retrieval:3
t6 Cache retrieval:3
Cache Invalidate end

@ben-manes
Copy link
Owner

Can you see if this is related to #1478? Sorry, I haven't been able to work on these issues.

@seashore115
Copy link
Author

Hi Ben, thanks for your quick reply on it. I think #1478 doesn't work as you mentioned refresh has to work with expire, while my test cases first invalidate cache and then trigger the refresh. Thus, I think they might be different issues.

@ben-manes
Copy link
Owner

Thanks. If you override CacheLoader.reload then you'll be able to distinguish if it was a cache miss or reloaded twice. I'll try to trace through this tonight. I recall we sometimes had to be aggressive in discarding a refresh in order to have stronger linearizable behavior, so a discarded reload may happen (e.g. if a write explicitly forced it to be discarded).

@ben-manes
Copy link
Owner

I modified your tests slightly to remove the races so it fails consistently. What I see is,

  1. The entry is explicitly loaded (a miss + load)
  2. Some threads hit
  3. The cache is cleared
  4. The removal listener triggers a refresh
  5. Some threads hit as the refresh completes

Since the cache entry wasn't present, the refresh acts like an asynchronous load, so only CacheLoader.load is called as there is no stale value being replaced. Since that is a new load, but not a cache miss, the loadCount=2 and the missCount=1.

I think this is working as expected because of the explicit invalidation causing the entry to be discarded, so a new load is required. The loadCount includes refreshes, which I think you assumed it did not?

test code
import static com.google.common.truth.Truth.assertThat;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.RepetitionInfo;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.google.common.util.concurrent.MoreExecutors;

public class Issue1690 {
  private static int callCount = 0;
  private final LoadingCache<Integer, Integer> cache =
      Caffeine.newBuilder()
          .removalListener(this::listenRemoval)
          .executor(MoreExecutors.directExecutor()).recordStats()
          .build(new CacheLoader<>() {
            @Override
            public Integer load(Integer _key) {
              Integer val = getValue();
              System.out.println("Loading value: " + val);
              return val;
            }
            @Override
            public Integer reload(Integer _key, Integer _oldValue) {
              Integer val = getValue();
              System.out.printf("Reloading value: %s -> %s%n", _oldValue, val);
              return val;
            }
          });

  private void listenRemoval(Integer key, Integer _value, RemovalCause cause) {
    // We don't want to reload if the value was just replaced
    if (cause.wasEvicted() || cause == RemovalCause.EXPLICIT) {
      System.out.printf("Refreshing %s -> %s%n", key, _value);
      cache.refresh(key);
    }
  }

  private Integer getValue() {
    callCount++;
    return callCount;
  }

  @RepeatedTest(20)
  public void testCacheEvictionRefresh(RepetitionInfo info) throws BrokenBarrierException, InterruptedException {
    System.out.println("START #" + info.getCurrentRepetition());

    cache.get(1);
    System.out.println(cache.stats());
    assertThat(cache.stats().loadCount()).isEqualTo(1);

    final CyclicBarrier gate = new CyclicBarrier(7);

    Thread t1 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          System.out.println("t1 Cache retrieval:" + cache.get(1));
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };
    Thread t2 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          System.out.println("t2 Cache retrieval:" + cache.get(1));
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };
    Thread t6 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          Thread.sleep(5);
          System.out.println("t6 Cache retrieval:" + cache.get(1));
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };
    Thread t3 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          Thread.sleep(5);
          System.out.println("t3 Cache retrieval:" + cache.get(1));
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };
    Thread t4 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          System.out.println("Cache Invalidate start");
          cache.invalidateAll();
          System.out.println("Cache Invalidate end");
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };
    Thread t5 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          System.out.println("t5 Cache retrieval:" + cache.get(1));
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };

    t1.start();
    t2.start();
    t3.start();
    t4.start();
    t5.start();
    t6.start();

    gate.await();
    t1.join();
    t2.join();
    t3.join();
    t4.join();
    t5.join();
    t6.join();

    System.out.println(cache.stats());
    assertThat(cache.stats().loadCount()).isEqualTo(1);
    Thread.sleep(2000);

    System.out.println("END\n\n");
  }
}
test output
Loading value: 1
CacheStats{hitCount=0, missCount=1, loadSuccessCount=1, loadFailureCount=0, totalLoadTime=163958, evictionCount=0, evictionWeight=0}
Cache Invalidate start
t1 Cache retrieval:1
t5 Cache retrieval:1
t2 Cache retrieval:1
Refreshing 1 -> 1
Loading value: 2
Cache Invalidate end
t3 Cache retrieval:2
t6 Cache retrieval:2
CacheStats{hitCount=5, missCount=1, loadSuccessCount=2, loadFailureCount=0, totalLoadTime=1510375, evictionCount=0, evictionWeight=0}

@seashore115
Copy link
Author

I modified your tests slightly to remove the races so it fails consistently. What I see is,

  1. The entry is explicitly loaded (a miss + load)
  2. Some threads hit
  3. The cache is cleared
  4. The removal listener triggers a refresh
  5. Some threads hit as the refresh completes

Since the cache entry wasn't present, the refresh acts like an asynchronous load, so only CacheLoader.load is called as there is no stale value being replaced. Since that is a new load, but not a cache miss, the loadCount=2 and the missCount=1.

I think this is working as expected because of the explicit invalidation causing the entry to be discarded, so a new load is required. The loadCount includes refreshes, which I think you assumed it did not?

test code

import static com.google.common.truth.Truth.assertThat;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.RepetitionInfo;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.google.common.util.concurrent.MoreExecutors;

public class Issue1690 {
  private static int callCount = 0;
  private final LoadingCache<Integer, Integer> cache =
      Caffeine.newBuilder()
          .removalListener(this::listenRemoval)
          .executor(MoreExecutors.directExecutor()).recordStats()
          .build(new CacheLoader<>() {
            @Override
            public Integer load(Integer _key) {
              Integer val = getValue();
              System.out.println("Loading value: " + val);
              return val;
            }
            @Override
            public Integer reload(Integer _key, Integer _oldValue) {
              Integer val = getValue();
              System.out.printf("Reloading value: %s -> %s%n", _oldValue, val);
              return val;
            }
          });

  private void listenRemoval(Integer key, Integer _value, RemovalCause cause) {
    // We don't want to reload if the value was just replaced
    if (cause.wasEvicted() || cause == RemovalCause.EXPLICIT) {
      System.out.printf("Refreshing %s -> %s%n", key, _value);
      cache.refresh(key);
    }
  }

  private Integer getValue() {
    callCount++;
    return callCount;
  }

  @RepeatedTest(20)
  public void testCacheEvictionRefresh(RepetitionInfo info) throws BrokenBarrierException, InterruptedException {
    System.out.println("START #" + info.getCurrentRepetition());

    cache.get(1);
    System.out.println(cache.stats());
    assertThat(cache.stats().loadCount()).isEqualTo(1);

    final CyclicBarrier gate = new CyclicBarrier(7);

    Thread t1 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          System.out.println("t1 Cache retrieval:" + cache.get(1));
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };
    Thread t2 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          System.out.println("t2 Cache retrieval:" + cache.get(1));
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };
    Thread t6 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          Thread.sleep(5);
          System.out.println("t6 Cache retrieval:" + cache.get(1));
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };
    Thread t3 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          Thread.sleep(5);
          System.out.println("t3 Cache retrieval:" + cache.get(1));
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };
    Thread t4 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          System.out.println("Cache Invalidate start");
          cache.invalidateAll();
          System.out.println("Cache Invalidate end");
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };
    Thread t5 = new Thread() {
      @Override
      public void run() {
        try {
          gate.await();
          System.out.println("t5 Cache retrieval:" + cache.get(1));
        } catch (InterruptedException | BrokenBarrierException e) {
          throw new RuntimeException(e);
        }
        // do stuff
      }
    };

    t1.start();
    t2.start();
    t3.start();
    t4.start();
    t5.start();
    t6.start();

    gate.await();
    t1.join();
    t2.join();
    t3.join();
    t4.join();
    t5.join();
    t6.join();

    System.out.println(cache.stats());
    assertThat(cache.stats().loadCount()).isEqualTo(1);
    Thread.sleep(2000);

    System.out.println("END\n\n");
  }
}

test output

Loading value: 1
CacheStats{hitCount=0, missCount=1, loadSuccessCount=1, loadFailureCount=0, totalLoadTime=163958, evictionCount=0, evictionWeight=0}
Cache Invalidate start
t1 Cache retrieval:1
t5 Cache retrieval:1
t2 Cache retrieval:1
Refreshing 1 -> 1
Loading value: 2
Cache Invalidate end
t3 Cache retrieval:2
t6 Cache retrieval:2
CacheStats{hitCount=5, missCount=1, loadSuccessCount=2, loadFailureCount=0, totalLoadTime=1510375, evictionCount=0, evictionWeight=0}

Hi Ben, thanks for looking into it. I run your tests at my local but miscount is more than 1.
START #1

Loading value: 1
CacheStats{hitCount=0, missCount=1, loadSuccessCount=1, loadFailureCount=0, totalLoadTime=381299, evictionCount=0, evictionWeight=0}
Cache Invalidate start
t1 Cache retrieval:1
t5 Cache retrieval:1
t2 Cache retrieval:1
Refreshing 1 -> 1
Loading value: 2
Loading value: 3
t6 Cache retrieval:3
t3 Cache retrieval:3
Cache Invalidate end
CacheStats{hitCount=4, missCount=2, loadSuccessCount=3, loadFailureCount=0, totalLoadTime=2637255, evictionCount=0, evictionWeight=0}

Is it expected? if so, how could I avoid cache miss during the invalidateCache and refresh?

@ben-manes
Copy link
Owner

You'd have a race condition because when invalidated the entry is gone, so a lookup might start before the refresh does.

It sounds like refresh is being used as if to reload the cache rather than as paired with expiration. It probably doesn't make sense alone, but Guava Cache didn't disallow it. The intent was to allow hot entries to be reloaded and cold ones to fade away by expiration, so hot entries did not have a latency hit by expiring.

If you are instead trying to periodically reload an unbounded cache then its better to use a simple scheduled task. Then you can replace the values without a race of a temporary gap and callers see the current value while being reloaded. Is that what you're aiming for?

@seashore115
Copy link
Author

You'd have a race condition because when invalidated the entry is gone, so a lookup might start before the refresh does.

It sounds like refresh is being used as if to reload the cache rather than as paired with expiration. It probably doesn't make sense alone, but Guava Cache didn't disallow it. The intent was to allow hot entries to be reloaded and cold ones to fade away by expiration, so hot entries did not have a latency hit by expiring.

If you are instead trying to periodically reload an unbounded cache then its better to use a simple scheduled task. Then you can replace the values without a race of a temporary gap and callers see the current value while being reloaded. Is that what you're aiming for?

Gotcha. Yep, you are correct.

Is this as what you recommend?
executor.submit({}->cache.asMap().replace(K,V));
if so, will it lock the hot spot key during that time?

@ben-manes
Copy link
Owner

It will lock for other writes to that key, but reads are lock-free. The lock duration will be small.

If you don't need any other features then you can use a ConcurrentHashMap directly. An even simpler approach is to have an immutable map that you rebuild periodically.

volatile Map<K, V> data = Map.of();

scheduledExecutor.scheduleWithFixedDelay(() -> 
    var results = loadAll();
    data = Map.copyOf(results);
}, 0, 1, TimeUnit.MINUTES);

@seashore115
Copy link
Author

Gotcha, thanks Ben. that works

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants