Skip to content

Commit

Permalink
Join Pushdown SPI and Core changes
Browse files Browse the repository at this point in the history
Co-Authored-By: Ajas M <[email protected]>
Co-Authored-By: Glerin Pinhero <[email protected]>
Co-Authored-By: Thanzeel Hassan <[email protected]>
Co-Authored-By: Anant Aneja <[email protected]>
  • Loading branch information
5 people committed Dec 2, 2024
1 parent b19167e commit 471eb62
Show file tree
Hide file tree
Showing 23 changed files with 1,786 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ public final class SystemSessionProperties
private static final String NATIVE_EXECUTION_EXECUTABLE_PATH = "native_execution_executable_path";
private static final String NATIVE_EXECUTION_PROGRAM_ARGUMENTS = "native_execution_program_arguments";
public static final String NATIVE_EXECUTION_PROCESS_REUSE_ENABLED = "native_execution_process_reuse_enabled";
public static final String INNER_JOIN_PUSHDOWN_ENABLED = "optimizer_inner_join_pushdown_enabled";
public static final String INEQUALITY_JOIN_PUSHDOWN_ENABLED = "optimizer_inequality_join_pushdown_enabled";
public static final String NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING = "native_min_columnar_encoding_channels_to_prefer_row_wise_encoding";

private final List<PropertyMetadata<?>> sessionProperties;
Expand Down Expand Up @@ -1826,6 +1828,16 @@ public SystemSessionProperties(
"Whether to evaluate project node on values node",
featuresConfig.getInlineProjectionsOnValues(),
false),
booleanProperty(
INNER_JOIN_PUSHDOWN_ENABLED,
"Enable Join Predicate Pushdown",
featuresConfig.isInnerJoinPushdownEnabled(),
false),
booleanProperty(
INEQUALITY_JOIN_PUSHDOWN_ENABLED,
"Enable Join Pushdown for Inequality Predicates",
featuresConfig.isInEqualityJoinPushdownEnabled(),
false),
integerProperty(
NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING,
"Minimum number of columnar encoding channels to consider row wise encoding for partitioned exchange. Native execution only",
Expand Down Expand Up @@ -3107,6 +3119,16 @@ public static boolean isInlineProjectionsOnValues(Session session)
return session.getSystemProperty(INLINE_PROJECTIONS_ON_VALUES, Boolean.class);
}

public static Boolean isInnerJoinPushdownEnabled(Session session)
{
return session.getSystemProperty(INNER_JOIN_PUSHDOWN_ENABLED, Boolean.class);
}

public static Boolean isInEqualityPushdownEnabled(Session session)
{
return session.getSystemProperty(INEQUALITY_JOIN_PUSHDOWN_ENABLED, Boolean.class);
}

public static int getMinColumnarEncodingChannelsToPreferRowWiseEncoding(Session session)
{
return session.getSystemProperty(NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING, Integer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,32 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(ConnectorSession s
return new ConnectorTableLayoutResult(layout, constraint.getSummary());
}

@Override
public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns)
{
if (constraint.getSummary().isNone()) {
return ImmutableList.of();
}

InformationSchemaTableHandle handle = checkTableHandle(table);

Set<QualifiedTablePrefix> prefixes = calculatePrefixesWithSchemaName(session, constraint.getSummary(), constraint.predicate());
if (isTablesEnumeratingTable(handle.getSchemaTableName())) {
Set<QualifiedTablePrefix> tablePrefixes = calculatePrefixesWithTableName(session, prefixes, constraint.getSummary(), constraint.predicate());
// in case of high number of prefixes it is better to populate all data and then filter
if (tablePrefixes.size() <= MAX_PREFIXES_COUNT) {
prefixes = tablePrefixes;
}
}
if (prefixes.size() > MAX_PREFIXES_COUNT) {
// in case of high number of prefixes it is better to populate all data and then filter
prefixes = ImmutableSet.of(new QualifiedTablePrefix(catalogName));
}

ConnectorTableLayout layout = new ConnectorTableLayout(new InformationSchemaTableLayoutHandle(handle, prefixes));
return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
}

private boolean isTablesEnumeratingTable(SchemaTableName schemaTableName)
{
return ImmutableSet.of(TABLE_COLUMNS, TABLE_VIEWS, TABLE_TABLES, TABLE_TABLE_PRIVILEGES).contains(schemaTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import com.facebook.presto.spi.constraints.TableConstraint;
import com.facebook.presto.spi.function.SqlFunction;
import com.facebook.presto.spi.plan.PartitioningHandle;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.security.GrantInfo;
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.security.Privilege;
Expand Down Expand Up @@ -518,4 +520,8 @@ default TableLayoutFilterCoverage getTableLayoutFilterCoverage(Session session,
void dropConstraint(Session session, TableHandle tableHandle, Optional<String> constraintName, Optional<String> columnName);

void addConstraint(Session session, TableHandle tableHandle, TableConstraint<String> tableConstraint);
default boolean isPushdownSupportedForFilter(Session session, TableHandle tableHandle, RowExpression filter, Map<VariableReferenceExpression, ColumnHandle> symbolToColumnHandleMap)
{
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import com.facebook.presto.spi.constraints.TableConstraint;
import com.facebook.presto.spi.function.SqlFunction;
import com.facebook.presto.spi.plan.PartitioningHandle;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.security.GrantInfo;
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.security.Privilege;
Expand Down Expand Up @@ -1549,4 +1551,15 @@ public static Function<SchemaTableName, QualifiedObjectName> convertFromSchemaTa
{
return input -> new QualifiedObjectName(catalogName, input.getSchemaName(), input.getTableName());
}

@Override
public boolean isPushdownSupportedForFilter(Session session, TableHandle tableHandle, RowExpression filter, Map<VariableReferenceExpression, ColumnHandle> symbolToColumnHandleMap)
{
ConnectorId connectorId = tableHandle.getConnectorId();
CatalogMetadata catalogMetadata = getCatalogMetadata(session, connectorId);
ConnectorSession connectorSession = session.toConnectorSession(catalogMetadata.getConnectorId());
ConnectorMetadata metadata = catalogMetadata.getMetadata();

return metadata.isPushdownSupportedForFilter(connectorSession, tableHandle.getConnectorHandle(), filter, symbolToColumnHandleMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ public class FeaturesConfig

private boolean eagerPlanValidationEnabled;
private int eagerPlanValidationThreadPoolSize = 20;
private boolean innerJoinPushdownEnabled;
private boolean inEqualityJoinPushdownEnabled;

private boolean prestoSparkExecutionEnvironment;

Expand Down Expand Up @@ -2849,6 +2851,31 @@ public int getEagerPlanValidationThreadPoolSize()
return this.eagerPlanValidationThreadPoolSize;
}

@Config("optimizer.inner-join-pushdown-enabled")
@ConfigDescription("push down inner join predicates to database")
public FeaturesConfig setInnerJoinPushdownEnabled(boolean innerJoinPushdownEnabled)
{
this.innerJoinPushdownEnabled = innerJoinPushdownEnabled;
return this;
}

public boolean isInnerJoinPushdownEnabled()
{
return innerJoinPushdownEnabled;
}

@Config("optimizer.inequality-join-pushdown-enabled")
@ConfigDescription("push down inner join inequality predicates to database")
public FeaturesConfig setInEqualityJoinPushdownEnabled(boolean inEqualityJoinPushdownEnabled)
{
this.inEqualityJoinPushdownEnabled = inEqualityJoinPushdownEnabled;
return this;
}

public boolean isInEqualityJoinPushdownEnabled()
{
return inEqualityJoinPushdownEnabled;
}
public boolean isPrestoSparkExecutionEnvironment()
{
return prestoSparkExecutionEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ public Map<ConnectorId, Set<ConnectorPlanOptimizer>> getOptimizers(PlanPhase pha
return ImmutableMap.copyOf(transformValues(planOptimizerProviders, ConnectorPlanOptimizerProvider::getLogicalPlanOptimizers));
case PHYSICAL:
return ImmutableMap.copyOf(transformValues(planOptimizerProviders, ConnectorPlanOptimizerProvider::getPhysicalPlanOptimizers));
case STRUCTURAL:
return ImmutableMap.copyOf(transformValues(planOptimizerProviders, ConnectorPlanOptimizerProvider::getStructuralPlanOptimizers));
default:
throw new PrestoException(GENERIC_INTERNAL_ERROR, "Unknown plan phase " + phase);
}
}

public enum PlanPhase
{
LOGICAL, PHYSICAL
LOGICAL, PHYSICAL, STRUCTURAL
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
import com.facebook.presto.sql.planner.optimizations.ApplyConnectorOptimization;
import com.facebook.presto.sql.planner.optimizations.CheckSubqueryNodesAreRewritten;
import com.facebook.presto.sql.planner.optimizations.CteProjectionAndPredicatePushDown;
import com.facebook.presto.sql.planner.optimizations.GroupInnerJoinsByConnector;
import com.facebook.presto.sql.planner.optimizations.HashGenerationOptimizer;
import com.facebook.presto.sql.planner.optimizations.HistoricalStatisticsEquivalentPlanMarkingOptimizer;
import com.facebook.presto.sql.planner.optimizations.ImplementIntersectAndExceptAsUnion;
Expand Down Expand Up @@ -196,6 +197,7 @@

import static com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager.PlanPhase.LOGICAL;
import static com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager.PlanPhase.PHYSICAL;
import static com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager.PlanPhase.STRUCTURAL;

public class PlanOptimizers
{
Expand Down Expand Up @@ -715,7 +717,6 @@ public PlanOptimizers(
new ApplyConnectorOptimization(() -> planOptimizerManager.getOptimizers(LOGICAL)),
projectionPushDown,
new PruneUnreferencedOutputs());

// Pass after connector optimizer, as it relies on connector optimizer to identify empty input tables and convert them to empty ValuesNode
builder.add(new SimplifyPlanWithEmptyInput(),
new PruneUnreferencedOutputs());
Expand Down Expand Up @@ -758,6 +759,10 @@ public PlanOptimizers(
// After this step, nodes with same `statsEquivalentPlanNode` will share same history based statistics.
builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new HistoricalStatisticsEquivalentPlanMarkingOptimizer(statsCalculator)));

builder.add(new GroupInnerJoinsByConnector(metadata));
builder.add(new ApplyConnectorOptimization(() -> planOptimizerManager.getOptimizers(STRUCTURAL)));
builder.add(predicatePushDown, simplifyRowExpressionOptimizer);

builder.add(new IterativeOptimizer(
metadata,
// Because ReorderJoins runs only once,
Expand All @@ -767,7 +772,8 @@ public PlanOptimizers(
ruleStats,
statsCalculator,
estimatedExchangesCostCalculator,
ImmutableSet.of(new ReorderJoins(costComparator, metadata))));
ImmutableSet.of(
new ReorderJoins(costComparator, metadata))));

// After ReorderJoins, `statsEquivalentPlanNode` will be unassigned to intermediate join nodes.
// We run it again to mark this for intermediate join nodes.
Expand Down
Loading

0 comments on commit 471eb62

Please sign in to comment.