Skip to content

Commit

Permalink
AsyncCacheLoader for asynchronous caches (fixes #53)
Browse files Browse the repository at this point in the history
Creating an AsyncLoadingCache previously required passing a CacheLoader
to Caffine.buildAsync(), optionally as a lambda. This is convenient
when the computations are written in a synchronous fashion and the
cache provides the asynchronous wrapping.

That API was clunky when the computation is defined asynchronously,
such as when bubbling up a future from another library. In that case
the asyncLoad default method had to be overriden in addition to load
(which may never be called). It offered all the power needed but in a
combersome fashion.

AsyncCacheLoader resolves this API flaw by providing the asynchronous
computations that a loader can make (asyncLoad, asyncLoadAll, and
asyncReload). It can be used as a lambda for buildAsync, e.g.
(key, executor) -> future.

Backwards compatibility is retained by having the CacheLoader extend
AsyncCacheLoader. The asyncLoad and asyncLoadAll were already defined
as defaults, so only asyncReload was introduced. This approach is
nice by providing symmetry, low integration effort, and minimal impact
to the API.
  • Loading branch information
ben-manes committed Feb 18, 2016
1 parent 16e2173 commit 0337c68
Show file tree
Hide file tree
Showing 22 changed files with 326 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.yahoo.ycsb.generator.IntegerGenerator;
import com.yahoo.ycsb.generator.NumberGenerator;
import com.yahoo.ycsb.generator.ScrambledZipfianGenerator;

/**
Expand Down Expand Up @@ -60,9 +60,9 @@ public static class ThreadState {

public ComputeBenchmark() {
ints = new Integer[SIZE];
IntegerGenerator generator = new ScrambledZipfianGenerator(ITEMS);
NumberGenerator generator = new ScrambledZipfianGenerator(ITEMS);
for (int i = 0; i < SIZE; i++) {
ints[i] = generator.nextInt();
ints[i] = generator.nextValue().intValue();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;

import com.yahoo.ycsb.generator.IntegerGenerator;
import com.yahoo.ycsb.generator.NumberGenerator;
import com.yahoo.ycsb.generator.ScrambledZipfianGenerator;

/**
Expand All @@ -46,9 +46,9 @@ public void setup() {
sketch = new FrequencySketch<>();
sketch.ensureCapacity(ITEMS);

IntegerGenerator generator = new ScrambledZipfianGenerator(ITEMS);
NumberGenerator generator = new ScrambledZipfianGenerator(ITEMS);
for (int i = 0; i < SIZE; i++) {
ints[i] = generator.nextInt();
ints[i] = generator.nextValue().intValue();
sketch.increment(i);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;

import com.yahoo.ycsb.generator.IntegerGenerator;
import com.yahoo.ycsb.generator.NumberGenerator;
import com.yahoo.ycsb.generator.ScrambledZipfianGenerator;

/**
Expand Down Expand Up @@ -85,9 +85,9 @@ public void setup() {
cache.cleanUp();

// Populate with a realistic access distribution
IntegerGenerator generator = new ScrambledZipfianGenerator(ITEMS);
NumberGenerator generator = new ScrambledZipfianGenerator(ITEMS);
for (int i = 0; i < SIZE; i++) {
ints[i] = generator.nextInt();
ints[i] = generator.nextValue().intValue();
cache.put(ints[i], Boolean.TRUE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import com.github.benmanes.caffeine.cache.BasicCache;
import com.github.benmanes.caffeine.cache.CacheType;
import com.yahoo.ycsb.generator.IntegerGenerator;
import com.yahoo.ycsb.generator.NumberGenerator;
import com.yahoo.ycsb.generator.ScrambledZipfianGenerator;

/**
Expand All @@ -42,9 +42,9 @@ public final class CacheProfiler extends ProfilerHook {
CacheProfiler() {
ints = new Integer[SIZE];
cache = cacheType.create(2 * SIZE);
IntegerGenerator generator = new ScrambledZipfianGenerator(ITEMS);
NumberGenerator generator = new ScrambledZipfianGenerator(ITEMS);
for (int i = 0; i < SIZE; i++) {
ints[i] = generator.nextInt();
ints[i] = generator.nextValue().intValue();
cache.put(ints[i], Boolean.TRUE);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2016 Ben Manes. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.benmanes.caffeine.cache;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;

/**
* Computes or retrieves values asynchronously, based on a key, for use in populating a
* {@link AsyncLoadingCache}.
* <p>
* Most implementations will only need to implement {@link #asyncLoad}. Other methods may be
* overridden as desired.
* <p>
* Usage example:
* <pre>{@code
* AsyncCacheLoader<Key, Graph> loader = (key, executor) ->
* createExpensiveGraphAsync(key, executor);
* AsyncLoadingCache<Key, Graph> cache = Caffeine.newBuilder().build(loader);
* }</pre>
*
* @author [email protected] (Ben Manes)
*/
@ThreadSafe
@FunctionalInterface
public interface AsyncCacheLoader<K, V> {

/**
* Asynchronously computes or retrieves the value corresponding to {@code key}.
*
* @param key the non-null key whose value should be loaded
* @param executor the executor with which the entry is asynchronously loaded
* @return the future value associated with {@code key}
*/
@Nonnull
CompletableFuture<V> asyncLoad(@Nonnull K key, @Nonnull Executor executor);

/**
* Asynchronously computes or retrieves the values corresponding to {@code keys}. This method is
* called by {@link AsyncLoadingCache#getAll}.
* <p>
* If the returned map doesn't contain all requested {@code keys} then the entries it does contain
* will be cached and {@code getAll} will return the partial results. If the returned map contains
* extra keys not present in {@code keys} then all returned entries will be cached, but only the
* entries for {@code keys} will be returned from {@code getAll}.
* <p>
* This method should be overridden when bulk retrieval is significantly more efficient than many
* individual lookups. Note that {@link AsyncLoadingCache#getAll} will defer to individual calls
* to {@link AsyncLoadingCache#get} if this method is not overridden.
*
* @param keys the unique, non-null keys whose values should be loaded
* @param executor the executor with which the entries are asynchronously loaded
* @return a future containing the map from each key in {@code keys} to the value associated with
* that key; <b>may not contain null values</b>
*/
@Nonnull
default CompletableFuture<Map<K, V>> asyncLoadAll(
@Nonnull Iterable<? extends K> keys, @Nonnull Executor executor) {
throw new UnsupportedOperationException();
}

/**
* Asynchronously computes or retrieves a replacement value corresponding to an already-cached
* {@code key}. If the replacement value is not found then the mapping will be removed if
* {@code null} is computed. This method is called when an existing cache entry is refreshed by
* {@link Caffeine#refreshAfterWrite}, or through a call to {@link LoadingCache#refresh}.
* <p>
* <b>Note:</b> <i>all exceptions thrown by this method will be logged and then swallowed</i>.
*
* @param key the non-null key whose value should be loaded
* @param oldValue the non-null old value corresponding to {@code key}
* @param executor the executor with which the entry is asynchronously loaded
* @return a future containing the new value associated with {@code key}, or containing
* {@code null} if the mapping is to be removed
*/
@Nonnull
default CompletableFuture<V> asyncReload(
@Nonnull K key, @Nonnull V oldValue, @Nonnull Executor executor) {
return asyncLoad(key, executor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ CompletableFuture<V> get(@Nonnull K key,
* @throws RuntimeException or Error if the mappingFunction does when constructing the future,
* in which case the mapping is left unestablished
*/
@CheckForNull
@Nonnull
CompletableFuture<V> get(@Nonnull K key,
@Nonnull BiFunction<? super K, Executor, CompletableFuture<V>> mappingFunction);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2675,14 +2675,14 @@ static final class BoundedLocalAsyncLoadingCache<K, V>
Policy<K, V> policy;

@SuppressWarnings("unchecked")
BoundedLocalAsyncLoadingCache(Caffeine<K, V> builder, CacheLoader<? super K, V> loader) {
BoundedLocalAsyncLoadingCache(Caffeine<K, V> builder, AsyncCacheLoader<? super K, V> loader) {
super(LocalCacheFactory.newBoundedLocalCache((Caffeine<K, CompletableFuture<V>>) builder,
asyncLoader(loader, builder), true), loader);
isWeighted = builder.isWeighted();
}

private static <K, V> CacheLoader<? super K, CompletableFuture<V>> asyncLoader(
CacheLoader<? super K, V> loader, Caffeine<?, ?> builder) {
AsyncCacheLoader<? super K, V> loader, Caffeine<?, ?> builder) {
Executor executor = builder.getExecutor();
return key -> loader.asyncLoad(key, executor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
@ThreadSafe
@FunctionalInterface
@SuppressWarnings("PMD.SignatureDeclareThrowsException")
public interface CacheLoader<K, V> {
public interface CacheLoader<K, V> extends AsyncCacheLoader<K, V> {

/**
* Computes or retrieves the value corresponding to {@code key}.
Expand Down Expand Up @@ -96,7 +96,7 @@ default Map<K, V> loadAll(@Nonnull Iterable<? extends K> keys) throws Exception
* @param executor the executor that asynchronously loads the entry
* @return the future value associated with {@code key}
*/
@Nonnull
@Override @Nonnull
default CompletableFuture<V> asyncLoad(@Nonnull K key, @Nonnull Executor executor) {
requireNonNull(key);
requireNonNull(executor);
Expand Down Expand Up @@ -129,9 +129,9 @@ default CompletableFuture<V> asyncLoad(@Nonnull K key, @Nonnull Executor executo
* @return a future containing the map from each key in {@code keys} to the value associated with
* that key; <b>may not contain null values</b>
*/
@Nonnull
@Override @Nonnull
default CompletableFuture<Map<K, V>> asyncLoadAll(
Iterable<? extends K> keys, @Nonnull Executor executor) {
@Nonnull Iterable<? extends K> keys, @Nonnull Executor executor) {
requireNonNull(keys);
requireNonNull(executor);
return CompletableFuture.supplyAsync(() -> {
Expand Down Expand Up @@ -166,4 +166,34 @@ default CompletableFuture<Map<K, V>> asyncLoadAll(
default V reload(@Nonnull K key, @Nonnull V oldValue) throws Exception {
return load(key);
}

/**
* Asynchronously computes or retrieves a replacement value corresponding to an already-cached
* {@code key}. If the replacement value is not found then the mapping will be removed if
* {@code null} is computed. This method is called when an existing cache entry is refreshed by
* {@link Caffeine#refreshAfterWrite}, or through a call to {@link LoadingCache#refresh}.
* <p>
* <b>Note:</b> <i>all exceptions thrown by this method will be logged and then swallowed</i>.
*
* @param key the non-null key whose value should be loaded
* @param oldValue the non-null old value corresponding to {@code key}
* @param executor the executor with which the entry is asynchronously loaded
* @return a future containing the new value associated with {@code key}, or containing
* {@code null} if the mapping is to be removed
*/
@Override @Nonnull
default CompletableFuture<V> asyncReload(
@Nonnull K key, @Nonnull V oldValue, @Nonnull Executor executor) {
requireNonNull(key);
requireNonNull(executor);
return CompletableFuture.supplyAsync(() -> {
try {
return reload(key, oldValue);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new CompletionException(e);
}
}, executor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -833,9 +833,9 @@ public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(
/**
* Builds a cache, which either returns a {@link CompletableFuture} already loaded or currently
* computing the value for a given key or atomically computes the value asynchronously through a
* supplied mapping function or the supplied {@code CacheLoader}. If the asynchronous
* computation fails then the entry will be automatically removed. Note that multiple threads can
* concurrently load values for distinct keys.
* supplied mapping function or the supplied {@code CacheLoader}. If the asynchronous computation
* fails or computes a {@code null} value then the entry will be automatically removed. Note that
* multiple threads can concurrently load values for distinct keys.
* <p>
* This method does not alter the state of this {@code Caffeine} instance, so it can be invoked
* again to create multiple independent caches.
Expand All @@ -850,6 +850,29 @@ public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(
@Nonnull
public <K1 extends K, V1 extends V> AsyncLoadingCache<K1, V1> buildAsync(
@Nonnull CacheLoader<? super K1, V1> loader) {
return buildAsync((AsyncCacheLoader<? super K1, V1>) loader);
}

/**
* Builds a cache, which either returns a {@link CompletableFuture} already loaded or currently
* computing the value for a given key or atomically computes the value asynchronously through a
* supplied mapping function or the supplied {@code AsyncCacheLoader}. If the asynchronous
* computation fails or computes a {@code null} value then the entry will be automatically
* removed. Note that multiple threads can concurrently load values for distinct keys.
* <p>
* This method does not alter the state of this {@code Caffeine} instance, so it can be invoked
* again to create multiple independent caches.
*
* @param loader the cache loader used to obtain new values
* @param <K1> the key type of the loader
* @param <V1> the value type of the loader
* @return a cache having the requested features
* @throws IllegalStateException if the value strength is weak or soft
* @throws NullPointerException if the specified cache loader is null
*/
@Nonnull
public <K1 extends K, V1 extends V> AsyncLoadingCache<K1, V1> buildAsync(
@Nonnull AsyncCacheLoader<? super K1, V1> loader) {
requireState(valueStrength == null);
requireState(writer == null);
requireWeightWithWeigher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ abstract class LocalAsyncLoadingCache<C extends LocalCache<K, CompletableFuture<

final C cache;
final boolean canBulkLoad;
final CacheLoader<K, V> loader;
final AsyncCacheLoader<K, V> loader;

LoadingCacheView localCacheView;

@SuppressWarnings("unchecked")
LocalAsyncLoadingCache(C cache, CacheLoader<? super K, V> loader) {
this.loader = (CacheLoader<K, V>) loader;
LocalAsyncLoadingCache(C cache, AsyncCacheLoader<? super K, V> loader) {
this.loader = (AsyncCacheLoader<K, V>) loader;
this.canBulkLoad = canBulkLoad(loader);
this.cache = cache;
}
Expand All @@ -73,13 +74,24 @@ abstract class LocalAsyncLoadingCache<C extends LocalCache<K, CompletableFuture<
protected abstract Policy<K, V> policy();

/** Returns whether the supplied cache loader has bulk load functionality. */
private static boolean canBulkLoad(CacheLoader<?, ?> loader) {
private static boolean canBulkLoad(AsyncCacheLoader<?, ?> loader) {
try {
Method loadAll = loader.getClass().getMethod(
"loadAll", Iterable.class);
Method asyncLoadAll = loader.getClass().getMethod(
Class<?> defaultLoaderClass = AsyncCacheLoader.class;
if (loader instanceof CacheLoader<?, ?>) {
defaultLoaderClass = CacheLoader.class;

Method classLoadAll = loader.getClass().getMethod("loadAll", Iterable.class);
Method defaultLoadAll = CacheLoader.class.getMethod("loadAll", Iterable.class);
if (!classLoadAll.equals(defaultLoadAll)) {
return true;
}
}

Method classAsyncLoadAll = loader.getClass().getMethod(
"asyncLoadAll", Iterable.class, Executor.class);
return !loadAll.isDefault() || !asyncLoadAll.isDefault();
Method defaultAsyncLoadAll = defaultLoaderClass.getMethod(
"asyncLoadAll", Iterable.class, Executor.class);
return !classAsyncLoadAll.equals(defaultAsyncLoadAll);
} catch (NoSuchMethodException | SecurityException e) {
logger.log(Level.WARNING, "Cannot determine if CacheLoader can bulk load", e);
return false;
Expand Down Expand Up @@ -436,12 +448,26 @@ public void refresh(K key) {
BiFunction<K, CompletableFuture<V>, CompletableFuture<V>> refreshFunction =
(k, oldValueFuture) -> {
try {
V oldValue = (oldValueFuture == null) ? null : oldValueFuture.join();
V newValue = (oldValue == null) ? loader.load(key) : loader.reload(key, oldValue);
return (newValue == null) ? null : CompletableFuture.completedFuture(newValue);
V oldValue = Async.getWhenSuccessful(oldValueFuture);
if (loader instanceof CacheLoader<?, ?>) {
CacheLoader<K, V> cacheLoader = (CacheLoader<K, V>) loader;
V newValue = (oldValue == null)
? cacheLoader.load(key)
: cacheLoader.reload(key, oldValue);
return (newValue == null) ? null : CompletableFuture.completedFuture(newValue);
} else {
// Hint that the async task should be run on this async task's thread
CompletableFuture<V> newValueFuture = (oldValue == null)
? loader.asyncLoad(key, Runnable::run)
: loader.asyncReload(key, oldValue, Runnable::run);
V newValue = newValueFuture.get();
return (newValue == null) ? null : newValueFuture;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return LocalCache.throwUnchecked(e);
} catch (ExecutionException e) {
return LocalCache.throwUnchecked(e.getCause());
} catch (Exception e) {
return LocalCache.throwUnchecked(e);
}
Expand Down
Loading

0 comments on commit 0337c68

Please sign in to comment.