Skip to content

Commit

Permalink
add tests for cancelation of the future during a batch load
Browse files Browse the repository at this point in the history
  • Loading branch information
ben-manes committed Feb 25, 2024
1 parent 7f82472 commit b9c446b
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static com.github.benmanes.caffeine.testing.FutureSubject.assertThat;
import static com.github.benmanes.caffeine.testing.IntSubject.assertThat;
import static com.github.benmanes.caffeine.testing.MapSubject.assertThat;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.truth.Truth.assertThat;
import static java.util.function.Function.identity;
Expand All @@ -43,6 +44,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand Down Expand Up @@ -478,7 +480,8 @@ public void getAllFunction_present_partial(AsyncCache<Int, Int> cache, CacheCont
}

@Test(dataProvider = "caches")
@CacheSpec(removalListener = { Listener.DISABLED, Listener.REJECTING })
@CacheSpec(removalListener = { Listener.DISABLED, Listener.REJECTING },
executor = { CacheExecutor.DIRECT, CacheExecutor.THREADED })
public void getAllFunction_exceeds(AsyncCache<Int, Int> cache, CacheContext context) {
var result = cache.getAll(context.absentKeys(), keys -> {
var moreKeys = new ArrayList<Int>(keys);
Expand Down Expand Up @@ -582,6 +585,52 @@ public void getAllFunction_present_ordered_exceeds(
assertThat(result).containsExactlyKeys(keys).inOrder();
}

@Test(dataProvider = "caches")
@CacheSpec(removalListener = { Listener.DISABLED, Listener.REJECTING },
executor = CacheExecutor.THREADED)
public void getAllFunction_canceled_individual(AsyncCache<Int, Int> cache, CacheContext context) {
var ready = new AtomicBoolean();
var bulk = cache.getAll(context.absentKeys(), keysToLoad -> {
await().untilTrue(ready);
return Maps.toMap(keysToLoad, Int::negate);
});
for (var key : context.absentKeys()) {
var future = cache.getIfPresent(key);
future.cancel(true);
assertThat(future).hasCompletedExceptionally();
}
ready.set(true);
bulk.join();

await().untilAsserted(() -> {
for (var key : context.absentKeys()) {
assertThat(cache).containsKey(key);
}
});
}

@Test(dataProvider = "caches")
@CacheSpec(removalListener = { Listener.DISABLED, Listener.REJECTING },
executor = CacheExecutor.THREADED)
public void getAllFunction_canceled_bulk(AsyncCache<Int, Int> cache, CacheContext context) {
var ready = new AtomicBoolean();
var bulk = cache.getAll(context.absentKeys(), keysToLoad -> {
await().untilTrue(ready);
return Maps.toMap(keysToLoad, Int::negate);
});
var pending = context.absentKeys().stream().map(cache::getIfPresent).collect(toImmutableList());

bulk.cancel(true);
ready.set(true);

CompletableFuture.allOf(pending.toArray(CompletableFuture[]::new))
.orTimeout(10, TimeUnit.SECONDS)
.join();
for (var key : context.absentKeys()) {
assertThat(cache).containsKey(key);
}
}

@Test(dataProvider = "caches")
@CacheSpec(removalListener = { Listener.DISABLED, Listener.REJECTING })
public void getAllFunction_badLoader(AsyncCache<Int, Int> cache, CacheContext context) {
Expand Down Expand Up @@ -751,7 +800,8 @@ public void getAllBifunction_present_partial(AsyncCache<Int, Int> cache, CacheCo
}

@Test(dataProvider = "caches")
@CacheSpec(removalListener = { Listener.DISABLED, Listener.REJECTING })
@CacheSpec(removalListener = { Listener.DISABLED, Listener.REJECTING },
executor = { CacheExecutor.DIRECT, CacheExecutor.THREADED })
public void getAllBifunction_exceeds(AsyncCache<Int, Int> cache, CacheContext context) {
var result = cache.getAll(context.absentKeys(), (keys, executor) -> {
var moreKeys = new ArrayList<Int>(keys);
Expand Down Expand Up @@ -845,6 +895,54 @@ public void getAllBifunction_present_ordered_present(
assertThat(result).containsExactlyKeys(keys).inOrder();
}

@Test(dataProvider = "caches")
@CacheSpec(removalListener = { Listener.DISABLED, Listener.REJECTING },
executor = CacheExecutor.THREADED)
public void getAllBifunction_canceled_individual(
AsyncCache<Int, Int> cache, CacheContext context) {
var ready = new AtomicBoolean();
var bulk = cache.getAll(context.absentKeys(), (keysToLoad, executor) -> {
return CompletableFuture.supplyAsync(() -> {
await().untilTrue(ready);
return Maps.toMap(keysToLoad, Int::negate);
}, executor);
});
for (var key : context.absentKeys()) {
var future = cache.getIfPresent(key);
future.cancel(true);
assertThat(future).hasCompletedExceptionally();
}
ready.set(true);
bulk.join();

await().untilAsserted(() -> {
for (var key : context.absentKeys()) {
assertThat(cache).containsKey(key);
}
});
}

@Test(dataProvider = "caches")
@CacheSpec(removalListener = { Listener.DISABLED, Listener.REJECTING },
executor = CacheExecutor.THREADED)
public void getAllBifunction_canceled_bulk(AsyncCache<Int, Int> cache, CacheContext context) {
var ready = new AtomicBoolean();
var bulk = cache.getAll(context.absentKeys(), (keysToLoad, executor) -> {
await().untilTrue(ready);
return CompletableFuture.completedFuture(Maps.toMap(keysToLoad, Int::negate));
});
var pending = context.absentKeys().stream().map(cache::getIfPresent).collect(toImmutableList());
bulk.cancel(true);
ready.set(true);

CompletableFuture.allOf(pending.toArray(CompletableFuture[]::new))
.orTimeout(10, TimeUnit.SECONDS)
.join();
for (var key : context.absentKeys()) {
assertThat(cache).containsKey(key);
}
}

@Test(dataProvider = "caches")
@CacheSpec(removalListener = { Listener.DISABLED, Listener.REJECTING })
public void getAllBifunction_present_ordered_exceeds(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static com.github.benmanes.caffeine.testing.FutureSubject.assertThat;
import static com.github.benmanes.caffeine.testing.IntSubject.assertThat;
import static com.github.benmanes.caffeine.testing.MapSubject.assertThat;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.truth.Truth.assertThat;
import static java.util.function.Function.identity;
Expand All @@ -40,6 +41,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand Down Expand Up @@ -320,7 +322,7 @@ public void getAll_present_partial(AsyncLoadingCache<Int, Int> cache, CacheConte
@Test(dataProvider = "caches")
@CacheSpec(loader = Loader.BULK_NEGATIVE_EXCEEDS,
removalListener = { Listener.DISABLED, Listener.REJECTING },
executor = { CacheExecutor.DIRECT, CacheExecutor.DEFAULT })
executor = { CacheExecutor.DIRECT, CacheExecutor.THREADED })
public void getAll_exceeds(AsyncLoadingCache<Int, Int> cache, CacheContext context) {
var result = cache.getAll(context.absentKeys()).join();

Expand Down Expand Up @@ -410,6 +412,68 @@ public void getAll_present_ordered_exceeds(
assertThat(result).containsExactlyKeys(keys).inOrder();
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.ASYNC, removalListener = { Listener.DISABLED, Listener.REJECTING },
executor = CacheExecutor.THREADED)
public void getAll_canceled_individual(CacheContext context) {
var ready = new AtomicBoolean();
var loader = new CacheLoader<Int, Int>() {
@Override public Int load(Int key) {
throw new IllegalStateException();
}
@Override public ImmutableMap<Int, Int> loadAll(Set<? extends Int> keys) {
await().untilTrue(ready);
return keys.stream().collect(toImmutableMap(identity(), Int::negate));
}
};

var cache = context.buildAsync(loader);
var bulk = cache.getAll(context.absentKeys());
for (var key : context.absentKeys()) {
var future = cache.getIfPresent(key);
future.cancel(true);
assertThat(future).hasCompletedExceptionally();
}
ready.set(true);
bulk.join();

await().untilAsserted(() -> {
for (var key : context.absentKeys()) {
assertThat(cache).containsKey(key);
}
});
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.ASYNC, removalListener = { Listener.DISABLED, Listener.REJECTING },
executor = CacheExecutor.THREADED)
public void getAll_canceled_bulk(CacheContext context) {
var ready = new AtomicBoolean();
var loader = new CacheLoader<Int, Int>() {
@Override public Int load(Int key) {
throw new IllegalStateException();
}
@Override public ImmutableMap<Int, Int> loadAll(Set<? extends Int> keys) {
await().untilTrue(ready);
return keys.stream().collect(toImmutableMap(identity(), Int::negate));
}
};

var cache = context.buildAsync(loader);
var bulk = cache.getAll(context.absentKeys());
var pending = context.absentKeys().stream().map(cache::getIfPresent).collect(toImmutableList());

bulk.cancel(true);
ready.set(true);

CompletableFuture.allOf(pending.toArray(CompletableFuture[]::new))
.orTimeout(10, TimeUnit.SECONDS)
.join();
for (var key : context.absentKeys()) {
assertThat(cache).containsKey(key);
}
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.ASYNC, removalListener = { Listener.DISABLED, Listener.REJECTING })
public void getAll_badLoader(CacheContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public abstract class AbstractLincheckCacheTest {
private final LoadingCache<Integer, Integer> cache;

public AbstractLincheckCacheTest(Caffeine<Object, Object> builder) {
cache = builder.build(key -> -key);
cache = builder.executor(Runnable::run).build(key -> -key);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@ public Object[] factory() {
}

public static final class BoundedLincheckTest extends AbstractLincheckCacheTest {

public BoundedLincheckTest() {
super(Caffeine.newBuilder()
.executor(Runnable::run)
.maximumSize(Long.MAX_VALUE)
.expireAfterWrite(Duration.ofNanos(Long.MAX_VALUE)));
}
Expand Down
2 changes: 1 addition & 1 deletion examples/coalescing-bulkloader-reactor/settings.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
plugins {
id("com.gradle.enterprise") version "3.16.2"
id("com.gradle.common-custom-user-data-gradle-plugin") version "1.12.1"
id("com.gradle.common-custom-user-data-gradle-plugin") version "1.12.2"
id("org.gradle.toolchains.foojay-resolver-convention") version "0.8.0"
}

Expand Down
2 changes: 1 addition & 1 deletion examples/graal-native/settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pluginManagement {
}
plugins {
id("com.gradle.enterprise") version "3.16.2"
id("com.gradle.common-custom-user-data-gradle-plugin") version "1.12.1"
id("com.gradle.common-custom-user-data-gradle-plugin") version "1.12.2"
id("org.gradle.toolchains.foojay-resolver-convention") version "0.8.0"
}

Expand Down
2 changes: 1 addition & 1 deletion examples/hibernate/gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ caffeine = "3.1.8"
h2 = "2.2.224"
hibernate = "6.4.4.Final"
junit = "5.10.2"
log4j2 = "3.0.0-beta1"
log4j2 = "3.0.0-beta2"
slf4j = "2.0.7"
truth = "1.4.1"
versions = "0.51.0"
Expand Down
2 changes: 1 addition & 1 deletion examples/hibernate/settings.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
plugins {
id("com.gradle.enterprise") version "3.16.2"
id("com.gradle.common-custom-user-data-gradle-plugin") version "1.12.1"
id("com.gradle.common-custom-user-data-gradle-plugin") version "1.12.2"
id("org.gradle.toolchains.foojay-resolver-convention") version "0.8.0"
}

Expand Down
2 changes: 1 addition & 1 deletion examples/resilience-failsafe/settings.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
plugins {
id("com.gradle.enterprise") version "3.16.2"
id("com.gradle.common-custom-user-data-gradle-plugin") version "1.12.1"
id("com.gradle.common-custom-user-data-gradle-plugin") version "1.12.2"
id("org.gradle.toolchains.foojay-resolver-convention") version "0.8.0"
}

Expand Down
2 changes: 1 addition & 1 deletion examples/write-behind-rxjava/settings.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
plugins {
id("com.gradle.enterprise") version "3.16.2"
id("com.gradle.common-custom-user-data-gradle-plugin") version "1.12.1"
id("com.gradle.common-custom-user-data-gradle-plugin") version "1.12.2"
id("org.gradle.toolchains.foojay-resolver-convention") version "0.8.0"
}

Expand Down
2 changes: 1 addition & 1 deletion gradle/plugins/settings.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
plugins {
id("com.gradle.enterprise") version "3.16.2"
id("com.gradle.common-custom-user-data-gradle-plugin") version "1.12.1"
id("com.gradle.common-custom-user-data-gradle-plugin") version "1.12.2"
id("org.gradle.toolchains.foojay-resolver-convention") version "0.8.0"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ tasks.named<DependencyUpdatesTask>("dependencyUpdates").configure {
resolutionStrategy {
componentSelection {
all {
val stable = listOf("javax.json.bind", "org.jetbrains.kotlin", "org.osgi")
val stable = setOf("com.hazelcast", "javax.json.bind",
"org.jetbrains.kotlin", "org.osgi", "org.slf4j")
if ((candidate.group in stable) && isNonStable(candidate.version)) {
reject("Release candidate")
} else if ((candidate.module == "commons-io") && candidate.version.startsWith("2003")) {
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pluginManagement {
}
plugins {
id("com.gradle.enterprise") version "3.16.2"
id("com.gradle.common-custom-user-data-gradle-plugin") version "1.12.1"
id("com.gradle.common-custom-user-data-gradle-plugin") version "1.12.2"
id("org.gradle.toolchains.foojay-resolver-convention") version "0.8.0"
}

Expand Down

0 comments on commit b9c446b

Please sign in to comment.