Skip to content

Commit

Permalink
renew transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
thoniTUB committed Dec 17, 2024
1 parent 9dd290e commit 5190da1
Showing 1 changed file with 34 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public SerializingStore.IterationStatistic forEach(StoreEntryConsumer<ByteIterab
}

public Stream<ByteIterable> getAllKeys() {
XodusIterator spliterator = new XodusIterator(environment, store);
XodusKeyIterator spliterator = new XodusKeyIterator(environment, store, timeoutHalfMillis);
return StreamSupport.stream(spliterator, false).onClose(spliterator::onClose);
}

Expand Down Expand Up @@ -142,26 +142,56 @@ public String toString() {
return "XodusStore[" + environment.getLocation() + ":" +store.getName() +"}";
}

private static class XodusIterator extends Spliterators.AbstractSpliterator<ByteIterable> {
private static class XodusKeyIterator extends Spliterators.AbstractSpliterator<ByteIterable> {

private final long timeoutHalfMillis;
private final Environment environment;
private final Store store;
private Transaction transaction;
private Cursor cursor;
private long start;

protected XodusIterator(Environment environment, Store store) {
private final AtomicReference<ByteIterable> lastKey = new AtomicReference<>();

protected XodusKeyIterator(Environment environment, Store store, long timeoutHalfMillis) {
super(Long.MAX_VALUE, Spliterator.ORDERED);
this.timeoutHalfMillis = timeoutHalfMillis;
this.environment = environment;
this.store = store;

refreshCursor();
}

private void refreshCursor() {
if (transaction != null && !transaction.isFinished()) {
transaction.abort();
}

start = System.currentTimeMillis();
transaction = environment.beginReadonlyTransaction();
cursor = store.openCursor(transaction);

if (lastKey.get() != null) {
cursor.getSearchKey(lastKey.get());
}
}

@Override
public boolean tryAdvance(Consumer<? super ByteIterable> action) {
if (System.currentTimeMillis() - start >= timeoutHalfMillis) {
// refresh transaction after half of the timeout
refreshCursor();
}

if (cursor.getNext()) {
action.accept(cursor.getKey());
ByteIterable key = cursor.getKey();
lastKey.set(key);
action.accept(key);
return true;
}
else {
cursor.close();
onClose();
return false;
}
}
Expand Down

0 comments on commit 5190da1

Please sign in to comment.