diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindLeftJoin.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindLeftJoin.java index a30ed66cf7..86da1a8104 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindLeftJoin.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindLeftJoin.java @@ -13,6 +13,7 @@ import java.util.List; import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.federated.algebra.EmptyStatementPattern; import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr; import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy; import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler; @@ -42,7 +43,9 @@ protected TaskCreator determineTaskCreator(TupleExpr expr, BindingSet bs) { if (expr instanceof StatementTupleExpr) { StatementTupleExpr stmt = (StatementTupleExpr) expr; taskCreator = new LeftBoundJoinTaskCreator(strategy, stmt); - + } else if (expr instanceof EmptyStatementPattern) { + EmptyStatementPattern stmt = (EmptyStatementPattern) expr; + taskCreator = new EmptyLeftBoundJoinTaskCreator(strategy, stmt); } else { throw new RuntimeException("Expr is of unexpected type: " + expr.getClass().getCanonicalName() + ". Please report this problem."); @@ -67,4 +70,20 @@ public ParallelTask getTask(ParallelExecutor control, Li } } + static protected class EmptyLeftBoundJoinTaskCreator implements TaskCreator { + protected final FederationEvalStrategy _strategy; + protected final EmptyStatementPattern _expr; + + public EmptyLeftBoundJoinTaskCreator( + FederationEvalStrategy strategy, EmptyStatementPattern expr) { + super(); + _strategy = strategy; + _expr = expr; + } + + @Override + public ParallelTask getTask(ParallelExecutor control, List bindings) { + return new ParallelEmptyBindLeftJoinTask(control, _strategy, _expr, bindings); + } + } } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ParallelEmptyBindLeftJoinTask.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ParallelEmptyBindLeftJoinTask.java new file mode 100644 index 0000000000..9a24357838 --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ParallelEmptyBindLeftJoinTask.java @@ -0,0 +1,56 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.join; + +import java.util.List; + +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.federated.algebra.EmptyStatementPattern; +import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTaskBase; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.repository.sparql.federation.CollectionIteration; + +/** + * A {@link ParallelTaskBase} for executing bind left joins, where the join argument is an + * {@link EmptyStatementPattern}. The effective result is that the input bindings from the left operand are passed + * through. + * + * @author Andreas Schwarte + */ +public class ParallelEmptyBindLeftJoinTask extends ParallelTaskBase { + + protected final FederationEvalStrategy strategy; + protected final EmptyStatementPattern rightArg; + protected final List bindings; + protected final ParallelExecutor joinControl; + + public ParallelEmptyBindLeftJoinTask(ParallelExecutor joinControl, FederationEvalStrategy strategy, + EmptyStatementPattern expr, List bindings) { + this.strategy = strategy; + this.rightArg = expr; + this.bindings = bindings; + this.joinControl = joinControl; + } + + @Override + public ParallelExecutor getControl() { + return joinControl; + } + + @Override + protected CloseableIteration performTaskInternal() throws Exception { + // simply return the input bindings (=> the empty statement pattern cannot add results) + return new CollectionIteration(bindings); + } + +} diff --git a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/BindLeftJoinTests.java b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/BindLeftJoinTests.java index 19d3d0386e..d19359f5bd 100644 --- a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/BindLeftJoinTests.java +++ b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/BindLeftJoinTests.java @@ -192,8 +192,6 @@ public void testBoundLeftJoin_stmt_nonExclusive_boundCheck(boolean bindLeftJoinO conn.add(Values.iri("http://other.com/p30"), FOAF.GENDER, Values.literal("male")); } - fedxRule.enableDebug(); - try { // run query which joins results from multiple repos // for a subset of persons there exist names @@ -246,4 +244,99 @@ public void testBoundLeftJoin_stmt_nonExclusive_boundCheck(boolean bindLeftJoinO } } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void test_leftBindJoin_emptyOptional(boolean bindLeftJoinOptimizationEnabled) throws Exception { + + prepareTest( + Arrays.asList("/tests/basic/data_emptyStore.ttl", "/tests/basic/data_emptyStore.ttl", + "/tests/basic/data_emptyStore.ttl")); + + Repository repo1 = getRepository(1); + Repository repo2 = getRepository(2); + Repository repo3 = getRepository(3); + + Repository fedxRepo = fedxRule.getRepository(); + + fedxRule.setConfig(config -> { + config.withBoundJoinBlockSize(10); + config.withEnableOptionalAsBindJoin(bindLeftJoinOptimizationEnabled); + }); + + // add some persons + try (RepositoryConnection conn = repo1.getConnection()) { + + for (int i = 1; i <= 30; i++) { + var p = Values.iri("http://ex.com/p" + i); + var otherP = Values.iri("http://other.com/p" + i); + conn.add(p, OWL.SAMEAS, otherP); + } + } + + // add names for person 1, 4, 7, ... + try (RepositoryConnection conn = repo2.getConnection()) { + + for (int i = 1; i <= 30; i += 3) { + var otherP = Values.iri("http://other.com/p" + i); + conn.add(otherP, FOAF.NAME, Values.literal("Person " + i)); + } + } + + // add names for person 2, 5, 8, ... + try (RepositoryConnection conn = repo3.getConnection()) { + + for (int i = 2; i <= 30; i += 3) { + var otherP = Values.iri("http://other.com/p" + i); + conn.add(otherP, FOAF.NAME, Values.literal("Person " + i)); + } + } + + try { + // run query which joins results from multiple repos + // for a subset of persons there exist names + // the age does not exist for any person + try (RepositoryConnection conn = fedxRepo.getConnection()) { + String query = "PREFIX foaf: " + + "SELECT * WHERE { " + + " ?person owl:sameAs ?otherPerson . " + + " OPTIONAL { ?otherPerson foaf:name ?name . } " // # @repo2 and @repo3 + + " OPTIONAL { ?otherPerson foaf:age ?age . } " // # does not exist + + "}"; + + TupleQuery tupleQuery = conn.prepareTupleQuery(query); + try (TupleQueryResult tqr = tupleQuery.evaluate()) { + var bindings = Iterations.asList(tqr); + + Assertions.assertEquals(30, bindings.size()); + + for (int i = 1; i <= 30; i++) { + var p = Values.iri("http://ex.com/p" + i); + var otherP = Values.iri("http://other.com/p" + i); + + // find the bindingset for the person in the unordered result + BindingSet bs = bindings.stream() + .filter(b -> b.getValue("person").equals(p)) + .findFirst() + .orElseThrow(); + + Assertions.assertEquals(otherP, bs.getValue("otherPerson")); + if (i % 3 == 1 || i % 3 == 2) { + // names from repo 2 or 3 + Assertions.assertEquals("Person " + i, bs.getValue("name").stringValue()); + } else { + // no name for others + Assertions.assertFalse(bs.hasBinding("name")); + } + + Assertions.assertEquals(otherP, bs.getValue("otherPerson")); + Assertions.assertFalse(bs.hasBinding("age")); + } + } + } + + } finally { + fedxRepo.shutDown(); + } + } + }