From 5190da1de1d43724133272ff17cd380e6d53660f Mon Sep 17 00:00:00 2001 From: Max Thonagel <12283268+thoniTUB@users.noreply.github.com> Date: Tue, 17 Dec 2024 15:54:35 +0100 Subject: [PATCH] renew transactions --- .../io/storage/xodus/stores/XodusStore.java | 38 +++++++++++++++++-- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/XodusStore.java b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/XodusStore.java index ed0ce73b0a..75d3b82122 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/XodusStore.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/XodusStore.java @@ -86,7 +86,7 @@ public SerializingStore.IterationStatistic forEach(StoreEntryConsumer getAllKeys() { - XodusIterator spliterator = new XodusIterator(environment, store); + XodusKeyIterator spliterator = new XodusKeyIterator(environment, store, timeoutHalfMillis); return StreamSupport.stream(spliterator, false).onClose(spliterator::onClose); } @@ -142,26 +142,56 @@ public String toString() { return "XodusStore[" + environment.getLocation() + ":" +store.getName() +"}"; } - private static class XodusIterator extends Spliterators.AbstractSpliterator { + private static class XodusKeyIterator extends Spliterators.AbstractSpliterator { + private final long timeoutHalfMillis; + private final Environment environment; + private final Store store; private Transaction transaction; private Cursor cursor; + private long start; - protected XodusIterator(Environment environment, Store store) { + private final AtomicReference lastKey = new AtomicReference<>(); + + protected XodusKeyIterator(Environment environment, Store store, long timeoutHalfMillis) { super(Long.MAX_VALUE, Spliterator.ORDERED); + this.timeoutHalfMillis = timeoutHalfMillis; + this.environment = environment; + this.store = store; + + refreshCursor(); + } + + private void refreshCursor() { + if (transaction != null && !transaction.isFinished()) { + transaction.abort(); + } + start = System.currentTimeMillis(); transaction = environment.beginReadonlyTransaction(); cursor = store.openCursor(transaction); + + if (lastKey.get() != null) { + cursor.getSearchKey(lastKey.get()); + } } @Override public boolean tryAdvance(Consumer action) { + if (System.currentTimeMillis() - start >= timeoutHalfMillis) { + // refresh transaction after half of the timeout + refreshCursor(); + } + if (cursor.getNext()) { - action.accept(cursor.getKey()); + ByteIterable key = cursor.getKey(); + lastKey.set(key); + action.accept(key); return true; } else { cursor.close(); + onClose(); return false; } }