Skip to content

Commit

Permalink
Add optimized spliterator implementations
Browse files Browse the repository at this point in the history
The default spliterator did not provide any characteristics, like
distinct or concurrent. These optimized ones are better suited if
for stream processing.

The tests identified a bug in ConcurrentHashMap which incorrectly
estimates the size for any odd map after a split. When checking the
ConcurrentSkipListMap behavior, it reports Integer.MAX_VALUE for any
split of a populated map. This issue has been reported.
  • Loading branch information
ben-manes committed Feb 22, 2016
1 parent 4fdb6c7 commit da23464
Show file tree
Hide file tree
Showing 3 changed files with 416 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.Spliterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -2122,11 +2123,6 @@ public void clear() {
cache.clear();
}

@Override
public Iterator<K> iterator() {
return new KeyIterator<>(cache);
}

@Override
public boolean contains(Object obj) {
return cache.containsKey(obj);
Expand All @@ -2137,6 +2133,16 @@ public boolean remove(Object obj) {
return (cache.remove(obj) != null);
}

@Override
public Iterator<K> iterator() {
return new KeyIterator<>(cache);
}

@Override
public Spliterator<K> spliterator() {
return new KeySpliterator<>(cache);
}

@Override
public Object[] toArray() {
if (cache.collectKeys()) {
Expand Down Expand Up @@ -2193,6 +2199,75 @@ public void remove() {
}
}

/** An adapter to safely externalize the key spliterator. */
static final class KeySpliterator<K, V> implements Spliterator<K> {
final Spliterator<Node<K, V>> spliterator;
final BoundedLocalCache<K, V> cache;

KeySpliterator(BoundedLocalCache<K, V> cache) {
this(cache, cache.data.values().spliterator());
}

KeySpliterator(BoundedLocalCache<K, V> cache, Spliterator<Node<K, V>> spliterator) {
this.spliterator = requireNonNull(spliterator);
this.cache = requireNonNull(cache);
}

@Override
public void forEachRemaining(Consumer<? super K> action) {
requireNonNull(action);
long now = cache.expirationTicker().read();
Consumer<Node<K, V>> consumer = node -> {
K key = node.getKey();
V value = node.getValue();
if ((key != null) && (value != null) && !cache.hasExpired(node, now) && node.isAlive()) {
action.accept(key);
}
};
spliterator.forEachRemaining(consumer);
}

@Override
public boolean tryAdvance(Consumer<? super K> action) {
requireNonNull(action);
boolean[] advanced = { false };
long now = cache.expirationTicker().read();
Consumer<Node<K, V>> consumer = node -> {
K key = node.getKey();
V value = node.getValue();
if ((key != null) && (value != null) && !cache.hasExpired(node, now) && node.isAlive()) {
action.accept(key);
advanced[0] = true;
}
};
for (;;) {
if (spliterator.tryAdvance(consumer)) {
if (advanced[0]) {
return true;
}
continue;
}
return false;
}
}

@Override
public Spliterator<K> trySplit() {
Spliterator<Node<K, V>> split = spliterator.trySplit();
return (split == null) ? null : new KeySpliterator<>(cache, split);
}

@Override
public long estimateSize() {
return spliterator.estimateSize();
}

@Override
public int characteristics() {
return Spliterator.DISTINCT | Spliterator.CONCURRENT | Spliterator.NONNULL;
}
}

/** An adapter to safely externalize the values. */
static final class ValuesView<K, V> extends AbstractCollection<V> {
final BoundedLocalCache<K, V> cache;
Expand All @@ -2211,6 +2286,11 @@ public void clear() {
cache.clear();
}

@Override
public boolean contains(Object o) {
return cache.containsValue(o);
}

@Override
public boolean removeIf(Predicate<? super V> filter) {
requireNonNull(filter);
Expand All @@ -2229,8 +2309,8 @@ public Iterator<V> iterator() {
}

@Override
public boolean contains(Object o) {
return cache.containsValue(o);
public Spliterator<V> spliterator() {
return new ValueSpliterator<>(cache);
}
}

Expand Down Expand Up @@ -2258,6 +2338,75 @@ public void remove() {
}
}

/** An adapter to safely externalize the value spliterator. */
static final class ValueSpliterator<K, V> implements Spliterator<V> {
final Spliterator<Node<K, V>> spliterator;
final BoundedLocalCache<K, V> cache;

ValueSpliterator(BoundedLocalCache<K, V> cache) {
this(cache, cache.data.values().spliterator());
}

ValueSpliterator(BoundedLocalCache<K, V> cache, Spliterator<Node<K, V>> spliterator) {
this.spliterator = requireNonNull(spliterator);
this.cache = requireNonNull(cache);
}

@Override
public void forEachRemaining(Consumer<? super V> action) {
requireNonNull(action);
long now = cache.expirationTicker().read();
Consumer<Node<K, V>> consumer = node -> {
K key = node.getKey();
V value = node.getValue();
if ((key != null) && (value != null) && !cache.hasExpired(node, now) && node.isAlive()) {
action.accept(value);
}
};
spliterator.forEachRemaining(consumer);
}

@Override
public boolean tryAdvance(Consumer<? super V> action) {
requireNonNull(action);
boolean[] advanced = { false };
long now = cache.expirationTicker().read();
Consumer<Node<K, V>> consumer = node -> {
K key = node.getKey();
V value = node.getValue();
if ((key != null) && (value != null) && !cache.hasExpired(node, now) && node.isAlive()) {
action.accept(value);
advanced[0] = true;
}
};
for (;;) {
if (spliterator.tryAdvance(consumer)) {
if (advanced[0]) {
return true;
}
continue;
}
return false;
}
}

@Override
public Spliterator<V> trySplit() {
Spliterator<Node<K, V>> split = spliterator.trySplit();
return (split == null) ? null : new ValueSpliterator<>(cache, split);
}

@Override
public long estimateSize() {
return spliterator.estimateSize();
}

@Override
public int characteristics() {
return Spliterator.CONCURRENT | Spliterator.NONNULL;
}
}

/** An adapter to safely externalize the entries. */
static final class EntrySetView<K, V> extends AbstractSet<Entry<K, V>> {
final BoundedLocalCache<K, V> cache;
Expand All @@ -2276,11 +2425,6 @@ public void clear() {
cache.clear();
}

@Override
public Iterator<Entry<K, V>> iterator() {
return new EntryIterator<>(cache);
}

@Override
public boolean contains(Object obj) {
if (!(obj instanceof Entry<?, ?>)) {
Expand Down Expand Up @@ -2311,6 +2455,16 @@ public boolean removeIf(Predicate<? super Entry<K, V>> filter) {
}
return removed;
}

@Override
public Iterator<Entry<K, V>> iterator() {
return new EntryIterator<>(cache);
}

@Override
public Spliterator<Entry<K, V>> spliterator() {
return new EntrySpliterator<>(cache);
}
}

/** An adapter to safely externalize the entry iterator. */
Expand Down Expand Up @@ -2373,6 +2527,75 @@ public void remove() {
}
}

/** An adapter to safely externalize the entry spliterator. */
static final class EntrySpliterator<K, V> implements Spliterator<Entry<K, V>> {
final Spliterator<Node<K, V>> spliterator;
final BoundedLocalCache<K, V> cache;

EntrySpliterator(BoundedLocalCache<K, V> cache) {
this(cache, cache.data.values().spliterator());
}

EntrySpliterator(BoundedLocalCache<K, V> cache, Spliterator<Node<K, V>> spliterator) {
this.spliterator = requireNonNull(spliterator);
this.cache = requireNonNull(cache);
}

@Override
public void forEachRemaining(Consumer<? super Entry<K, V>> action) {
requireNonNull(action);
long now = cache.expirationTicker().read();
Consumer<Node<K, V>> consumer = node -> {
K key = node.getKey();
V value = node.getValue();
if ((key != null) && (value != null) && !cache.hasExpired(node, now) && node.isAlive()) {
action.accept(new WriteThroughEntry<K, V>(cache, key, value));
}
};
spliterator.forEachRemaining(consumer);
}

@Override
public boolean tryAdvance(Consumer<? super Entry<K, V>> action) {
requireNonNull(action);
boolean[] advanced = { false };
long now = cache.expirationTicker().read();
Consumer<Node<K, V>> consumer = node -> {
K key = node.getKey();
V value = node.getValue();
if ((key != null) && (value != null) && !cache.hasExpired(node, now) && node.isAlive()) {
action.accept(new WriteThroughEntry<K, V>(cache, key, value));
advanced[0] = true;
}
};
for (;;) {
if (spliterator.tryAdvance(consumer)) {
if (advanced[0]) {
return true;
}
continue;
}
return false;
}
}

@Override
public Spliterator<Entry<K, V>> trySplit() {
Spliterator<Node<K, V>> split = spliterator.trySplit();
return (split == null) ? null : new EntrySpliterator<>(cache, split);
}

@Override
public long estimateSize() {
return spliterator.estimateSize();
}

@Override
public int characteristics() {
return Spliterator.DISTINCT | Spliterator.CONCURRENT | Spliterator.NONNULL;
}
}

/** Creates a serialization proxy based on the common configuration shared by all cache types. */
static <K, V> SerializationProxy<K, V> makeSerializationProxy(
BoundedLocalCache<?, ?> cache, boolean isWeighted) {
Expand Down
Loading

0 comments on commit da23464

Please sign in to comment.