Skip to content

Commit

Permalink
GH-5148 Introduce "soft fail" for corrupt ValueStore (#5157)
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad authored Nov 10, 2024
2 parents 517353e + c47fe2b commit 136be9f
Show file tree
Hide file tree
Showing 17 changed files with 1,299 additions and 57 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@ org.eclipse.dash.licenses-1.0.2.jar
e2e/node_modules
e2e/playwright-report
e2e/test-results
.aider*
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf;

import static org.eclipse.rdf4j.sail.nativerdf.NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES;

import java.io.IOException;

import org.eclipse.rdf4j.common.io.ByteArrayUtil;
Expand All @@ -20,13 +22,20 @@
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.nativerdf.btree.RecordIterator;
import org.eclipse.rdf4j.sail.nativerdf.model.CorruptIRI;
import org.eclipse.rdf4j.sail.nativerdf.model.CorruptIRIOrBNode;
import org.eclipse.rdf4j.sail.nativerdf.model.CorruptUnknownValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A statement iterator that wraps a RecordIterator containing statement records and translates these records to
* {@link Statement} objects.
*/
class NativeStatementIterator extends LookAheadIteration<Statement> {

private static final Logger logger = LoggerFactory.getLogger(NativeStatementIterator.class);

/*-----------*
* Variables *
*-----------*/
Expand Down Expand Up @@ -54,25 +63,42 @@ public NativeStatementIterator(RecordIterator btreeIter, ValueStore valueStore)
@Override
public Statement getNextElement() throws SailException {
try {
byte[] nextValue = btreeIter.next();
byte[] nextValue;
try {
nextValue = btreeIter.next();
} catch (AssertionError | Exception e) {
logger.error("Error while reading next value from btree iterator for {}", btreeIter.toString(), e);
throw e;
}

if (nextValue == null) {
return null;
}

int subjID = ByteArrayUtil.getInt(nextValue, TripleStore.SUBJ_IDX);
Resource subj = (Resource) valueStore.getValue(subjID);
Resource subj = valueStore.getResource(subjID);

int predID = ByteArrayUtil.getInt(nextValue, TripleStore.PRED_IDX);
IRI pred = (IRI) valueStore.getValue(predID);
IRI pred = valueStore.getIRI(predID);

int objID = ByteArrayUtil.getInt(nextValue, TripleStore.OBJ_IDX);
Value obj = valueStore.getValue(objID);

Resource context = null;
int contextID = ByteArrayUtil.getInt(nextValue, TripleStore.CONTEXT_IDX);
if (contextID != 0) {
context = (Resource) valueStore.getValue(contextID);
context = valueStore.getResource(contextID);
}
if (SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) {
if (subj == null) {
subj = new CorruptIRIOrBNode(valueStore.getRevision(), subjID, null);
}
if (pred == null) {
pred = new CorruptIRI(valueStore.getRevision(), predID, null, null);
}
if (obj == null) {
obj = new CorruptUnknownValue(valueStore.getRevision(), objID, null);
}
}

return valueStore.createStatement(subj, pred, obj, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.commons.io.FileUtils;
import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
import org.eclipse.rdf4j.collection.factory.mapdb.MapDb3CollectionFactory;
import org.eclipse.rdf4j.common.annotation.InternalUseOnly;
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
import org.eclipse.rdf4j.common.concurrent.locks.LockManager;
import org.eclipse.rdf4j.common.io.MavenUtil;
Expand Down Expand Up @@ -62,6 +63,17 @@ public class NativeStore extends AbstractNotifyingSail implements FederatedServi

private static final String VERSION = MavenUtil.loadVersion("org.eclipse.rdf4j", "rdf4j-sail-nativerdf", "devel");

/**
* Do not throw an exception when corrupt data is detected. Instead, try to return as much data as possible.
*
* Variable can be set through the system property
* org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptDataAndRepairIndexes.
*/
@InternalUseOnly
public static boolean SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = "true"
.equalsIgnoreCase(
System.getProperty("org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptDataAndRepairIndexes"));;

private static final Cleaner REMOVE_STORES_USED_FOR_MEMORY_OVERFLOW = Cleaner.create();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,71 @@ private Set<String> parseIndexSpecList(String indexSpecStr) throws SailException
}

private void initIndexes(Set<String> indexSpecs) throws IOException {

HashSet<String> invalidIndexes = new HashSet<>();

for (String fieldSeq : indexSpecs) {
logger.trace("Initializing index '{}'...", fieldSeq);
indexes.add(new TripleIndex(fieldSeq));
try {
indexes.add(new TripleIndex(fieldSeq, false));
} catch (Exception e) {
if (NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) {
invalidIndexes.add(fieldSeq);
logger.warn("Ignoring index because it failed to initialize index '{}'", fieldSeq, e);
} else {
logger.error(
"Failed to initialize index '{}', consider setting org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptDataAndRepairIndexes to true.",
fieldSeq, e);
throw e;
}

}

}

if (NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) {
indexSpecs.removeAll(invalidIndexes);
}

List<TripleIndex> emptyIndexes = new ArrayList<>();
List<TripleIndex> nonEmptyIndexes = new ArrayList<>();

checkIfIndexesAreEmptyOrNot(nonEmptyIndexes, emptyIndexes);

if (!emptyIndexes.isEmpty() && !nonEmptyIndexes.isEmpty()) {
if (NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) {
indexes.removeAll(emptyIndexes);
} else {
for (TripleIndex index : emptyIndexes) {
throw new IOException("Index '" + new String(index.getFieldSeq())
+ "' is unexpectedly empty while other indexes are not. Consider setting the system property org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptDataAndRepairIndexes to true. Index file: "
+ index.getBTree().getFile().getAbsolutePath());
}
}
}

}

private void checkIfIndexesAreEmptyOrNot(List<TripleIndex> nonEmptyIndexes, List<TripleIndex> emptyIndexes)
throws IOException {
for (TripleIndex index : indexes) {
try (RecordIterator recordIterator = index.getBTree().iterateAll()) {
try {
byte[] next = recordIterator.next();
if (next != null) {
next = recordIterator.next();
if (next != null) {
nonEmptyIndexes.add(index);
} else {
emptyIndexes.add(index);
}
} else {
emptyIndexes.add(index);
}
} catch (Throwable ignored) {
emptyIndexes.add(index);
}
}
}
}

Expand Down Expand Up @@ -355,7 +417,7 @@ private void reindex(Set<String> currentIndexSpecs, Set<String> newIndexSpecs) t
for (String fieldSeq : addedIndexSpecs) {
logger.debug("Initializing new index '{}'...", fieldSeq);

TripleIndex addedIndex = new TripleIndex(fieldSeq);
TripleIndex addedIndex = new TripleIndex(fieldSeq, true);
BTree addedBTree = null;
RecordIterator sourceIter = null;
try {
Expand Down Expand Up @@ -1122,7 +1184,17 @@ private class TripleIndex {

private final BTree btree;

public TripleIndex(String fieldSeq) throws IOException {
public TripleIndex(String fieldSeq, boolean deleteExistingIndexFile) throws IOException {
if (deleteExistingIndexFile) {
File indexFile = new File(dir, getFilenamePrefix(fieldSeq) + ".dat");
if (indexFile.exists()) {
indexFile.delete();
}
File alloxFile = new File(dir, getFilenamePrefix(fieldSeq) + ".alloc");
if (alloxFile.exists()) {
alloxFile.delete();
}
}
tripleComparator = new TripleComparator(fieldSeq);
btree = new BTree(dir, getFilenamePrefix(fieldSeq), 2048, RECORD_LENGTH, tripleComparator, forceSync);
}
Expand Down
Loading

0 comments on commit 136be9f

Please sign in to comment.