Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-5121: support empty left bind join (OPTIONAL) in FedX #5200

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.List;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.federated.algebra.EmptyStatementPattern;
import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr;
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
Expand Down Expand Up @@ -42,7 +43,9 @@ protected TaskCreator determineTaskCreator(TupleExpr expr, BindingSet bs) {
if (expr instanceof StatementTupleExpr) {
StatementTupleExpr stmt = (StatementTupleExpr) expr;
taskCreator = new LeftBoundJoinTaskCreator(strategy, stmt);

} else if (expr instanceof EmptyStatementPattern) {
EmptyStatementPattern stmt = (EmptyStatementPattern) expr;
taskCreator = new EmptyLeftBoundJoinTaskCreator(strategy, stmt);
} else {
throw new RuntimeException("Expr is of unexpected type: " + expr.getClass().getCanonicalName()
+ ". Please report this problem.");
Expand All @@ -67,4 +70,20 @@ public ParallelTask<BindingSet> getTask(ParallelExecutor<BindingSet> control, Li
}
}

static protected class EmptyLeftBoundJoinTaskCreator implements TaskCreator {
protected final FederationEvalStrategy _strategy;
protected final EmptyStatementPattern _expr;

public EmptyLeftBoundJoinTaskCreator(
FederationEvalStrategy strategy, EmptyStatementPattern expr) {
super();
_strategy = strategy;
_expr = expr;
}

@Override
public ParallelTask<BindingSet> getTask(ParallelExecutor<BindingSet> control, List<BindingSet> bindings) {
return new ParallelEmptyBindLeftJoinTask(control, _strategy, _expr, bindings);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*******************************************************************************
* Copyright (c) 2024 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated.evaluation.join;

import java.util.List;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.federated.algebra.EmptyStatementPattern;
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTaskBase;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.repository.sparql.federation.CollectionIteration;

/**
* A {@link ParallelTaskBase} for executing bind left joins, where the join argument is an
* {@link EmptyStatementPattern}. The effective result is that the input bindings from the left operand are passed
* through.
*
* @author Andreas Schwarte
*/
public class ParallelEmptyBindLeftJoinTask extends ParallelTaskBase<BindingSet> {

protected final FederationEvalStrategy strategy;
protected final EmptyStatementPattern rightArg;
protected final List<BindingSet> bindings;
protected final ParallelExecutor<BindingSet> joinControl;

public ParallelEmptyBindLeftJoinTask(ParallelExecutor<BindingSet> joinControl, FederationEvalStrategy strategy,
EmptyStatementPattern expr, List<BindingSet> bindings) {
this.strategy = strategy;
this.rightArg = expr;
this.bindings = bindings;
this.joinControl = joinControl;
}

@Override
public ParallelExecutor<BindingSet> getControl() {
return joinControl;
}

@Override
protected CloseableIteration<BindingSet> performTaskInternal() throws Exception {
// simply return the input bindings (=> the empty statement pattern cannot add results)
return new CollectionIteration<BindingSet>(bindings);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,6 @@ public void testBoundLeftJoin_stmt_nonExclusive_boundCheck(boolean bindLeftJoinO
conn.add(Values.iri("http://other.com/p30"), FOAF.GENDER, Values.literal("male"));
}

fedxRule.enableDebug();

try {
// run query which joins results from multiple repos
// for a subset of persons there exist names
Expand Down Expand Up @@ -246,4 +244,99 @@ public void testBoundLeftJoin_stmt_nonExclusive_boundCheck(boolean bindLeftJoinO
}
}

@ParameterizedTest
@ValueSource(booleans = { true, false })
public void test_leftBindJoin_emptyOptional(boolean bindLeftJoinOptimizationEnabled) throws Exception {

prepareTest(
Arrays.asList("/tests/basic/data_emptyStore.ttl", "/tests/basic/data_emptyStore.ttl",
"/tests/basic/data_emptyStore.ttl"));

Repository repo1 = getRepository(1);
Repository repo2 = getRepository(2);
Repository repo3 = getRepository(3);

Repository fedxRepo = fedxRule.getRepository();

fedxRule.setConfig(config -> {
config.withBoundJoinBlockSize(10);
config.withEnableOptionalAsBindJoin(bindLeftJoinOptimizationEnabled);
});

// add some persons
try (RepositoryConnection conn = repo1.getConnection()) {

for (int i = 1; i <= 30; i++) {
var p = Values.iri("http://ex.com/p" + i);
var otherP = Values.iri("http://other.com/p" + i);
conn.add(p, OWL.SAMEAS, otherP);
}
}

// add names for person 1, 4, 7, ...
try (RepositoryConnection conn = repo2.getConnection()) {

for (int i = 1; i <= 30; i += 3) {
var otherP = Values.iri("http://other.com/p" + i);
conn.add(otherP, FOAF.NAME, Values.literal("Person " + i));
}
}

// add names for person 2, 5, 8, ...
try (RepositoryConnection conn = repo3.getConnection()) {

for (int i = 2; i <= 30; i += 3) {
var otherP = Values.iri("http://other.com/p" + i);
conn.add(otherP, FOAF.NAME, Values.literal("Person " + i));
}
}

try {
// run query which joins results from multiple repos
// for a subset of persons there exist names
// the age does not exist for any person
try (RepositoryConnection conn = fedxRepo.getConnection()) {
String query = "PREFIX foaf: <http://xmlns.com/foaf/0.1/> " +
"SELECT * WHERE { "
+ " ?person owl:sameAs ?otherPerson . "
+ " OPTIONAL { ?otherPerson foaf:name ?name . } " // # @repo2 and @repo3
+ " OPTIONAL { ?otherPerson foaf:age ?age . } " // # does not exist
+ "}";

TupleQuery tupleQuery = conn.prepareTupleQuery(query);
try (TupleQueryResult tqr = tupleQuery.evaluate()) {
var bindings = Iterations.asList(tqr);

Assertions.assertEquals(30, bindings.size());

for (int i = 1; i <= 30; i++) {
var p = Values.iri("http://ex.com/p" + i);
var otherP = Values.iri("http://other.com/p" + i);

// find the bindingset for the person in the unordered result
BindingSet bs = bindings.stream()
.filter(b -> b.getValue("person").equals(p))
.findFirst()
.orElseThrow();

Assertions.assertEquals(otherP, bs.getValue("otherPerson"));
if (i % 3 == 1 || i % 3 == 2) {
// names from repo 2 or 3
Assertions.assertEquals("Person " + i, bs.getValue("name").stringValue());
} else {
// no name for others
Assertions.assertFalse(bs.hasBinding("name"));
}

Assertions.assertEquals(otherP, bs.getValue("otherPerson"));
Assertions.assertFalse(bs.hasBinding("age"));
}
}
}

} finally {
fedxRepo.shutDown();
}
}

}
Loading