Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SPI and Core Support for JDBC Join Pushdown Optimization #24115

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion presto-docs/src/main/sphinx/admin/properties-session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,27 @@ The corresponding configuration property is :ref:`admin/properties:\`\`optimizer

Enable retry for failed queries who can potentially be helped by HBO.

The corresponding configuration property is :ref:`admin/properties:\`\`optimizer.retry-query-with-history-based-optimization\`\``.
The corresponding configuration property is :ref:`admin/properties:\`\`optimizer.retry-query-with-history-based-optimization\`\``.

``optimizer_inner_join_pushdown_enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``false``

Enable push down inner join predicates to database. Only allows equality joins to be pushed down.
Use :ref:`admin/properties-session:\`\`optimizer_inequality_join_pushdown_enabled\`\`` along with this configuration to push down inequality join predicates.

The corresponding configuration property is :ref:`admin/properties:\`\`optimizer.inner-join-pushdown-enabled\`\``.

``optimizer_inequality_join_pushdown_enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``false``

Enable push down inner join inequality predicates to database. For this configuration to be enabled, :ref:`admin/properties-session:\`\`optimizer_inner_join_pushdown_enabled\`\`` should be set to ``true``.
The corresponding configuration property is :ref:`admin/properties:\`\`optimizer.inequality-join-pushdown-enabled\`\``.

JDBC Properties
---------------
Expand Down
22 changes: 21 additions & 1 deletion presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,27 @@ The corresponding session property is :ref:`admin/properties-session:\`\`treat-l

Enable retry for failed queries who can potentially be helped by HBO.

The corresponding session property is :ref:`admin/properties-session:\`\`retry-query-with-history-based-optimization\`\``.
The corresponding session property is :ref:`admin/properties-session:\`\`retry-query-with-history-based-optimization\`\``.

``optimizer.inner-join-pushdown-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``false``

Enable push down inner join predicates to database. Only allows equality joins to be pushed down.
Use :ref:`admin/properties:\`\`optimizer.inequality-join-pushdown-enabled\`\`` along with this configuration to push down inequality join predicates.

The corresponding session property is :ref:`admin/properties-session:\`\`optimizer_inner_join_pushdown_enabled\`\``.

``optimizer.inequality-join-pushdown-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``false``

Enable push down inner join inequality predicates to database. For this configuration to be enabled, :ref:`admin/properties:\`\`optimizer.inner-join-pushdown-enabled\`\`` should be set to ``true``.
The corresponding session property is :ref:`admin/properties-session:\`\`optimizer_inequality_join_pushdown_enabled\`\``.

``optimizer.use-histograms``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ public final class SystemSessionProperties
private static final String NATIVE_EXECUTION_EXECUTABLE_PATH = "native_execution_executable_path";
private static final String NATIVE_EXECUTION_PROGRAM_ARGUMENTS = "native_execution_program_arguments";
public static final String NATIVE_EXECUTION_PROCESS_REUSE_ENABLED = "native_execution_process_reuse_enabled";
public static final String INNER_JOIN_PUSHDOWN_ENABLED = "optimizer_inner_join_pushdown_enabled";
public static final String INEQUALITY_JOIN_PUSHDOWN_ENABLED = "optimizer_inequality_join_pushdown_enabled";
public static final String NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING = "native_min_columnar_encoding_channels_to_prefer_row_wise_encoding";
public static final String NATIVE_ENFORCE_JOIN_BUILD_INPUT_PARTITION = "native_enforce_join_build_input_partition";
public static final String NATIVE_EXECUTION_SCALE_WRITER_THREADS_ENABLED = "native_execution_scale_writer_threads_enabled";
Expand Down Expand Up @@ -1839,6 +1841,16 @@ public SystemSessionProperties(
"Include values node for connector optimizer",
featuresConfig.isIncludeValuesNodeInConnectorOptimizer(),
false),
booleanProperty(
INNER_JOIN_PUSHDOWN_ENABLED,
"Enable Join Predicate Pushdown",
featuresConfig.isInnerJoinPushdownEnabled(),
false),
booleanProperty(
INEQUALITY_JOIN_PUSHDOWN_ENABLED,
"Enable Join Pushdown for Inequality Predicates",
featuresConfig.isInEqualityJoinPushdownEnabled(),
false),
integerProperty(
NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING,
"Minimum number of columnar encoding channels to consider row wise encoding for partitioned exchange. Native execution only",
Expand Down Expand Up @@ -3144,6 +3156,16 @@ public static boolean isIncludeValuesNodeInConnectorOptimizer(Session session)
return session.getSystemProperty(INCLUDE_VALUES_NODE_IN_CONNECTOR_OPTIMIZER, Boolean.class);
}

public static Boolean isInnerJoinPushdownEnabled(Session session)
{
return session.getSystemProperty(INNER_JOIN_PUSHDOWN_ENABLED, Boolean.class);
}

public static Boolean isInEqualityPushdownEnabled(Session session)
{
return session.getSystemProperty(INEQUALITY_JOIN_PUSHDOWN_ENABLED, Boolean.class);
}

public static int getMinColumnarEncodingChannelsToPreferRowWiseEncoding(Session session)
{
return session.getSystemProperty(NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING, Integer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,32 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(ConnectorSession s
return new ConnectorTableLayoutResult(layout, constraint.getSummary());
}

@Override
public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns)
{
if (constraint.getSummary().isNone()) {
return ImmutableList.of();
}

InformationSchemaTableHandle handle = checkTableHandle(table);

Set<QualifiedTablePrefix> prefixes = calculatePrefixesWithSchemaName(session, constraint.getSummary(), constraint.predicate());
if (isTablesEnumeratingTable(handle.getSchemaTableName())) {
Set<QualifiedTablePrefix> tablePrefixes = calculatePrefixesWithTableName(session, prefixes, constraint.getSummary(), constraint.predicate());
// in case of high number of prefixes it is better to populate all data and then filter
if (tablePrefixes.size() <= MAX_PREFIXES_COUNT) {
prefixes = tablePrefixes;
}
}
if (prefixes.size() > MAX_PREFIXES_COUNT) {
// in case of high number of prefixes it is better to populate all data and then filter
prefixes = ImmutableSet.of(new QualifiedTablePrefix(catalogName));
}

ConnectorTableLayout layout = new ConnectorTableLayout(new InformationSchemaTableLayoutHandle(handle, prefixes));
return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
}

private boolean isTablesEnumeratingTable(SchemaTableName schemaTableName)
{
return ImmutableSet.of(TABLE_COLUMNS, TABLE_VIEWS, TABLE_TABLES, TABLE_TABLE_PRIVILEGES).contains(schemaTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import com.facebook.presto.spi.constraints.TableConstraint;
import com.facebook.presto.spi.function.SqlFunction;
import com.facebook.presto.spi.plan.PartitioningHandle;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.security.GrantInfo;
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.security.Privilege;
Expand Down Expand Up @@ -517,4 +519,8 @@ default TableLayoutFilterCoverage getTableLayoutFilterCoverage(Session session,
void dropConstraint(Session session, TableHandle tableHandle, Optional<String> constraintName, Optional<String> columnName);

void addConstraint(Session session, TableHandle tableHandle, TableConstraint<String> tableConstraint);
default boolean isPushdownSupportedForFilter(Session session, TableHandle tableHandle, RowExpression filter, Map<VariableReferenceExpression, ColumnHandle> symbolToColumnHandleMap)
{
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import com.facebook.presto.spi.constraints.TableConstraint;
import com.facebook.presto.spi.function.SqlFunction;
import com.facebook.presto.spi.plan.PartitioningHandle;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.security.GrantInfo;
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.security.Privilege;
Expand Down Expand Up @@ -1535,4 +1537,15 @@ public static Function<SchemaTableName, QualifiedObjectName> convertFromSchemaTa
{
return input -> new QualifiedObjectName(catalogName, input.getSchemaName(), input.getTableName());
}

@Override
public boolean isPushdownSupportedForFilter(Session session, TableHandle tableHandle, RowExpression filter, Map<VariableReferenceExpression, ColumnHandle> symbolToColumnHandleMap)
{
ConnectorId connectorId = tableHandle.getConnectorId();
CatalogMetadata catalogMetadata = getCatalogMetadata(session, connectorId);
ConnectorSession connectorSession = session.toConnectorSession(catalogMetadata.getConnectorId());
ConnectorMetadata metadata = catalogMetadata.getMetadata();

return metadata.isPushdownSupportedForFilter(connectorSession, tableHandle.getConnectorHandle(), filter, symbolToColumnHandleMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ public class FeaturesConfig

private boolean eagerPlanValidationEnabled;
private int eagerPlanValidationThreadPoolSize = 20;
private boolean innerJoinPushdownEnabled;
private boolean inEqualityJoinPushdownEnabled;

private boolean prestoSparkExecutionEnvironment;
private boolean singleNodeExecutionEnabled;
Expand Down Expand Up @@ -2879,6 +2881,31 @@ public int getEagerPlanValidationThreadPoolSize()
return this.eagerPlanValidationThreadPoolSize;
}

@Config("optimizer.inner-join-pushdown-enabled")
@ConfigDescription("push down inner join predicates to database")
public FeaturesConfig setInnerJoinPushdownEnabled(boolean innerJoinPushdownEnabled)
{
this.innerJoinPushdownEnabled = innerJoinPushdownEnabled;
return this;
}

public boolean isInnerJoinPushdownEnabled()
{
return innerJoinPushdownEnabled;
}

@Config("optimizer.inequality-join-pushdown-enabled")
@ConfigDescription("push down inner join inequality predicates to database")
public FeaturesConfig setInEqualityJoinPushdownEnabled(boolean inEqualityJoinPushdownEnabled)
{
this.inEqualityJoinPushdownEnabled = inEqualityJoinPushdownEnabled;
return this;
}

public boolean isInEqualityJoinPushdownEnabled()
{
return inEqualityJoinPushdownEnabled;
}
public boolean isPrestoSparkExecutionEnvironment()
{
return prestoSparkExecutionEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ public Map<ConnectorId, Set<ConnectorPlanOptimizer>> getOptimizers(PlanPhase pha
return ImmutableMap.copyOf(transformValues(planOptimizerProviders, ConnectorPlanOptimizerProvider::getLogicalPlanOptimizers));
case PHYSICAL:
return ImmutableMap.copyOf(transformValues(planOptimizerProviders, ConnectorPlanOptimizerProvider::getPhysicalPlanOptimizers));
case STRUCTURAL:
return ImmutableMap.copyOf(transformValues(planOptimizerProviders, ConnectorPlanOptimizerProvider::getStructuralPlanOptimizers));
default:
throw new PrestoException(GENERIC_INTERNAL_ERROR, "Unknown plan phase " + phase);
}
}

public enum PlanPhase
{
LOGICAL, PHYSICAL
LOGICAL, PHYSICAL, STRUCTURAL
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
import com.facebook.presto.sql.planner.optimizations.ApplyConnectorOptimization;
import com.facebook.presto.sql.planner.optimizations.CheckSubqueryNodesAreRewritten;
import com.facebook.presto.sql.planner.optimizations.CteProjectionAndPredicatePushDown;
import com.facebook.presto.sql.planner.optimizations.GroupInnerJoinsByConnector;
import com.facebook.presto.sql.planner.optimizations.HashGenerationOptimizer;
import com.facebook.presto.sql.planner.optimizations.HistoricalStatisticsEquivalentPlanMarkingOptimizer;
import com.facebook.presto.sql.planner.optimizations.ImplementIntersectAndExceptAsUnion;
Expand Down Expand Up @@ -197,6 +198,7 @@

import static com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager.PlanPhase.LOGICAL;
import static com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager.PlanPhase.PHYSICAL;
import static com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager.PlanPhase.STRUCTURAL;

public class PlanOptimizers
{
Expand Down Expand Up @@ -716,7 +718,6 @@ public PlanOptimizers(
new ApplyConnectorOptimization(() -> planOptimizerManager.getOptimizers(LOGICAL)),
projectionPushDown,
new PruneUnreferencedOutputs());

// Pass after connector optimizer, as it relies on connector optimizer to identify empty input tables and convert them to empty ValuesNode
builder.add(new SimplifyPlanWithEmptyInput(),
new PruneUnreferencedOutputs());
Expand Down Expand Up @@ -759,6 +760,10 @@ public PlanOptimizers(
// After this step, nodes with same `statsEquivalentPlanNode` will share same history based statistics.
builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new HistoricalStatisticsEquivalentPlanMarkingOptimizer(statsCalculator)));

builder.add(new GroupInnerJoinsByConnector(metadata));
builder.add(new ApplyConnectorOptimization(() -> planOptimizerManager.getOptimizers(STRUCTURAL)));
builder.add(predicatePushDown, simplifyRowExpressionOptimizer);

builder.add(new IterativeOptimizer(
metadata,
// Because ReorderJoins runs only once,
Expand All @@ -768,7 +773,8 @@ public PlanOptimizers(
ruleStats,
statsCalculator,
estimatedExchangesCostCalculator,
ImmutableSet.of(new ReorderJoins(costComparator, metadata))));
ImmutableSet.of(
new ReorderJoins(costComparator, metadata))));

// After ReorderJoins, `statsEquivalentPlanNode` will be unassigned to intermediate join nodes.
// We run it again to mark this for intermediate join nodes.
Expand Down
Loading
Loading