Skip to content

Commit

Permalink
Notify JCache listener in key-order on eviction
Browse files Browse the repository at this point in the history
The specification requires that listeners are notified in sequence of
key operations on the cache. Like all other jsr107 implementations,
Caffeine was notifying in entry order due to the eviction event being
published asynchronously to the write. A Caffeine CacheWriter is used
to publish the eviction atomically with the cache update, so a JCache
listener should always receive events in the proper sequence.
  • Loading branch information
ben-manes committed Jul 12, 2015
1 parent ea286bb commit dc6e4e8
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 57 deletions.
2 changes: 1 addition & 1 deletion caffeine/testing.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ testNames.each { testName ->
tasks.withType(Test) {
useTestNG()
if (name.startsWith('slow')) {
maxParallelForks = 4
maxParallelForks = 2
options.includeGroups = ['slow']
} else {
options {
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.5-rc-2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-2.5-bin.zip
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import com.github.benmanes.caffeine.jcache.configuration.CaffeineConfiguration;
import com.github.benmanes.caffeine.jcache.configuration.TypesafeConfigurator;
import com.github.benmanes.caffeine.jcache.event.EventDispatcher;
import com.github.benmanes.caffeine.jcache.event.JCacheEvictionListener;
import com.github.benmanes.caffeine.jcache.integration.JCacheLoaderAdapter;
import com.github.benmanes.caffeine.jcache.integration.JCacheRemovalListener;
import com.github.benmanes.caffeine.jcache.management.JCacheStatisticsMXBean;
import com.typesafe.config.Config;

Expand Down Expand Up @@ -107,7 +107,7 @@ private final class Builder<K, V> {
final CaffeineConfiguration<K, V> config;

CacheLoader<K, V> cacheLoader;
JCacheRemovalListener<K, V> removalListener;
JCacheEvictionListener<K, V> evictionListener;

Builder(String cacheName, CaffeineConfiguration<K, V> config) {
this.config = config;
Expand All @@ -127,18 +127,15 @@ private final class Builder<K, V> {

/** Creates a configured cache. */
public CacheProxy<K, V> build() {
boolean requiresRemovalListener =
configureMaximumSize() ||
configureMaximumWeight() ||
configureExpireAfterWrite() ||
configureExpireAfterAccess();
if (requiresRemovalListener) {
configureRemovalListener();
boolean evicts = configureMaximumSize() || configureMaximumWeight()
|| configureExpireAfterWrite() || configureExpireAfterAccess();
if (evicts) {
configureEvictionListener();
}

CacheProxy<K, V> cache = isReadThrough() ? newLoadingCacheProxy() : newCacheProxy();
if (requiresRemovalListener) {
removalListener.setCache(cache);
if (evicts) {
evictionListener.setCache(cache);
}
return cache;
}
Expand Down Expand Up @@ -198,9 +195,9 @@ private boolean configureExpireAfterAccess() {
}

/** Configures the removal listener. */
private void configureRemovalListener() {
removalListener = new JCacheRemovalListener<>(dispatcher, statistics);
caffeine.removalListener(removalListener);
private void configureEvictionListener() {
evictionListener = new JCacheEvictionListener<>(dispatcher, statistics);
caffeine.writer(evictionListener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@
*
* @author [email protected] (Ben Manes)
*/
public class SerializationAwareCopyStrategy extends AbstractCopyStrategy<byte[]> {
public class JavaSerializationCopyStrategy extends AbstractCopyStrategy<byte[]> {

public SerializationAwareCopyStrategy() {
public JavaSerializationCopyStrategy() {
super();
}

public SerializationAwareCopyStrategy(Set<Class<?>> immutableClasses,
public JavaSerializationCopyStrategy(Set<Class<?>> immutableClasses,
Map<Class<?>, Function<Object, Object>> deepCopyStrategies) {
super(immutableClasses, deepCopyStrategies);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
* A dispatcher that publishes cache events to listeners for asynchronous execution.
* <p>
* A {@link CacheEntryListener} is required to receive events in the order of the actions being
* performed on the associated entry. This implementation supports this through an actor-like model
* performed on the associated key. This implementation supports this through an actor-like model
* by using a dispatch queue per listener. A listener is never executed in parallel on different
* events, but may be executed sequentially on different threads. Batch processing of the dispatch
* queue is not presently supported.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.benmanes.caffeine.jcache.integration;
package com.github.benmanes.caffeine.jcache.event;

import static java.util.Objects.requireNonNull;

import javax.cache.Cache;

import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.RemovalNotification;
import com.github.benmanes.caffeine.cache.CacheWriter;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.jcache.Expirable;
import com.github.benmanes.caffeine.jcache.event.EventDispatcher;
import com.github.benmanes.caffeine.jcache.management.JCacheStatisticsMXBean;

/**
* A Caffeine listener that publishes eviction events to the JCache listeners.
* A Caffeine {@link CacheWriter} that provides an adapter to publish events in the order of the
* actions being performed on a key.
*
* @author [email protected] (Ben Manes)
*/
public final class JCacheRemovalListener<K, V> implements RemovalListener<K, Expirable<V>> {
public final class JCacheEvictionListener<K, V> implements CacheWriter<K, Expirable<V>> {
private final JCacheStatisticsMXBean statistics;
private final EventDispatcher<K, V> dispatcher;

private Cache<K, V> cache;

public JCacheRemovalListener(EventDispatcher<K, V> dispatcher,
public JCacheEvictionListener(EventDispatcher<K, V> dispatcher,
JCacheStatisticsMXBean statistics) {
this.dispatcher = requireNonNull(dispatcher);
this.statistics = requireNonNull(statistics);
Expand All @@ -52,9 +52,16 @@ public void setCache(Cache<K, V> cache) {
}

@Override
public void onRemoval(RemovalNotification<K, Expirable<V>> notification) {
if (notification.wasEvicted()) {
dispatcher.publishRemoved(cache, notification.getKey(), notification.getValue().get());
public void write(K key, Expirable<V> value) {}

@Override
public void delete(K key, Expirable<V> value, RemovalCause cause) {
if (cause.wasEvicted()) {
if (cause == RemovalCause.EXPIRED) {
dispatcher.publishExpired(cache, key, value.get());
} else {
dispatcher.publishRemoved(cache, key, value.get());
}
dispatcher.ignoreSynchronous();
statistics.recordEvictions(1L);
}
Expand Down
30 changes: 15 additions & 15 deletions jcache/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ caffeine.jcache {
default {
# The strategy for copying the cache entry for value-based storage
store-by-value {

# If enabled, the entry is copied when crossing the API boundary
enabled = false

# It is highly recommended that the default strategy be replaced with a higher performance
# alternative (see https://github.com/eishay/jvm-serializers/wiki).
strategy = "com.github.benmanes.caffeine.jcache.copy.SerializationAwareCopyStrategy"
strategy = "com.github.benmanes.caffeine.jcache.copy.JavaSerializationCopyStrategy"
}

# The list of configuration paths to the listeners that consume this cache's events
listeners = []

Expand All @@ -27,20 +27,20 @@ caffeine.jcache {
# The CacheLoader class for loading entries
loader = null
}

write-through {
# If enabled, the entry is written to the resource before the cache is updated
enabled = false

# The CacheWriter class for writing entries
writer = null
}

# The JMX monitoring configuration
monitoring {
# If cache statistics should be recorded and externalized
statistics = false

# If the configuration should be externalized
management = false
}
Expand All @@ -49,22 +49,22 @@ caffeine.jcache {
policy {
# The expiration threshold before lazily evicting an entry. This single threshold is reset on
# every operation where a duration is specified. As required by the specification, if an entry
# expires but is not accessed and no resource constraints force eviction, then the expired
# expires but is not accessed and no resource constraints force eviction, then the expired
# entry remains in place.
lazy-expiration {
# The duration before a newly created entry is considered expired. If set to 0 then the
# entry is considered to be already expired and will not be added to the cache.
creation = null

# The duration before a updated entry is considered expired. If set to 0 then the entry is
# considered immediately expired.
update = null

# The duration before a read of an entry is considered expired. If set to 0 then the entry
# is considered immediately expired.
access = null
}

# The expiration thresholds before eagerly evicting an entry. This settings correspond to the
# expiration supported natively by Caffeine where expired entries are collected during
# maintenance operations.
Expand All @@ -89,21 +89,21 @@ caffeine.jcache {
# The maximum total weight of entries the cache may contain (requires a weigher). This
# setting cannot be combined with the size configuration.
weight = null

# The weigher class to use when calculating the weight of cache entries
weigher = null
}
}
}

# A catalog of cache listeners; optionally defined in any namespace as referenced by path
listeners {

# An example definition of a listener
example {
# The CacheEntryListener class
class = null

# The CacheEntryEventFilter class that should be applied prior to notifying the listener
filter = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

import org.testng.annotations.Test;

import com.github.benmanes.caffeine.jcache.copy.SerializationAwareCopyStrategy;
import com.github.benmanes.caffeine.jcache.copy.JavaSerializationCopyStrategy;
import com.google.common.collect.Iterables;
import com.typesafe.config.ConfigFactory;

Expand Down Expand Up @@ -78,7 +78,7 @@ static void checkConfig(CaffeineConfiguration<?, ?> config) {
static void checkStoreByValue(CaffeineConfiguration<?, ?> config) {
assertThat(config.isStoreByValue(), is(true));
assertThat(config.getCopyStrategyFactory().create(),
instanceOf(SerializationAwareCopyStrategy.class));
instanceOf(JavaSerializationCopyStrategy.class));
}

static void checkListener(CaffeineConfiguration<?, ?> config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
/**
* @author [email protected] (Ben Manes)
*/
public final class SerializationAwareCopyStrategyTest {
final CopyStrategy copier = new SerializationAwareCopyStrategy();
public final class JavaSerializationCopyStrategyTest {
final CopyStrategy copier = new JavaSerializationCopyStrategy();

@Test(expectedExceptions = NullPointerException.class)
public void null_object() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.benmanes.caffeine.jcache.integration;
package com.github.benmanes.caffeine.jcache.event;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand All @@ -26,6 +26,7 @@

import javax.cache.Cache;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryRemovedListener;

import org.mockito.Mock;
Expand All @@ -37,27 +38,26 @@
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalNotification;
import com.github.benmanes.caffeine.jcache.Expirable;
import com.github.benmanes.caffeine.jcache.event.EventDispatcher;
import com.github.benmanes.caffeine.jcache.management.JCacheStatisticsMXBean;
import com.google.common.util.concurrent.MoreExecutors;

/**
* @author [email protected] (Ben Manes)
*/
public final class JCacheRemovalListenerTest {
JCacheRemovalListener<Integer, Integer> listener;
public final class JCacheEvictionListenerTest {
JCacheEvictionListener<Integer, Integer> listener;
JCacheStatisticsMXBean statistics;

@Mock Cache<Integer, Integer> cache;
@Mock CacheEntryRemovedListener<Integer, Integer> entryListener;
@Mock EvictionListener entryListener;

@BeforeMethod
public void before() {
MockitoAnnotations.initMocks(this);
statistics = new JCacheStatisticsMXBean();
EventDispatcher<Integer, Integer> dispatcher =
new EventDispatcher<>(MoreExecutors.directExecutor());
listener = new JCacheRemovalListener<>(dispatcher, statistics);
listener = new JCacheEvictionListener<>(dispatcher, statistics);
listener.setCache(cache);
statistics.enable(true);

Expand All @@ -75,14 +75,21 @@ public Iterator<Object[]> notifications() {

@Test(dataProvider = "notifications")
public void publishIfEvicted(RemovalNotification<Integer, Expirable<Integer>> notification) {
listener.onRemoval(notification);
listener.delete(notification.getKey(), notification.getValue(), notification.getCause());

if (notification.wasEvicted()) {
verify(entryListener).onRemoved(any());
if (notification.getCause() == RemovalCause.EXPIRED) {
verify(entryListener).onExpired(any());
} else {
verify(entryListener).onRemoved(any());
}
assertThat(statistics.getCacheEvictions(), is(1L));
} else {
verify(entryListener, never()).onRemoved(any());
assertThat(statistics.getCacheEvictions(), is(0L));
}
}

interface EvictionListener extends CacheEntryRemovedListener<Integer, Integer>,
CacheEntryExpiredListener<Integer, Integer> {}
}

0 comments on commit dc6e4e8

Please sign in to comment.