This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 9fd47950809 [opt](iceberg)Add a new appearance to display the pushDown `count` for 2.1 (#37046) (#34928) (#37810) 9fd47950809 is described below commit 9fd4795080963e10fb4ef5e17a2564e9d072c28f Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Tue Jul 16 16:03:44 2024 +0800 [opt](iceberg)Add a new appearance to display the pushDown `count` for 2.1 (#37046) (#34928) (#37810) ## Proposed changes bp: #37046 #34928 --- .../org/apache/doris/datasource/FileScanNode.java | 14 ++- .../datasource/iceberg/source/IcebergScanNode.java | 22 +++-- .../org/apache/doris/nereids/NereidsPlanner.java | 25 ++++- .../glue/translator/PhysicalPlanTranslator.java | 7 ++ .../org/apache/doris/planner/OriginalPlanner.java | 16 ++++ .../java/org/apache/doris/qe/SessionVariable.java | 10 ++ .../iceberg/test_iceberg_optimize_count.out | 25 +++++ .../iceberg/test_iceberg_optimize_count.groovy | 105 +++++++++++++++++++++ 8 files changed, 213 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index c0364670d9f..11a7b13024e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -39,6 +39,7 @@ import org.apache.doris.thrift.TFileScanNode; import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; +import org.apache.doris.thrift.TPushAggOp; import org.apache.doris.thrift.TScanRangeLocations; import com.google.common.base.Preconditions; @@ -70,6 +71,7 @@ public abstract class FileScanNode extends ExternalScanNode { protected long totalPartitionNum = 0; protected long readPartitionNum = 0; protected long fileSplitSize; + public long rowCount = 0; public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { @@ -95,6 +97,10 @@ public abstract class FileScanNode extends ExternalScanNode { planNode.setFileScanNode(fileScanNode); } + public long getPushDownCount() { + return 0; + } + @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { StringBuilder output = new StringBuilder(); @@ -170,7 +176,13 @@ public abstract class FileScanNode extends ExternalScanNode { output.append(String.format("avgRowSize=%s, ", avgRowSize)); } output.append(String.format("numNodes=%s", numNodes)).append("\n"); - output.append(prefix).append(String.format("pushdown agg=%s", pushDownAggNoGroupingOp)).append("\n"); + + // pushdown agg + output.append(prefix).append(String.format("pushdown agg=%s", pushDownAggNoGroupingOp)); + if (pushDownAggNoGroupingOp.equals(TPushAggOp.COUNT)) { + output.append(" (").append(getPushDownCount()).append(")"); + } + output.append("\n"); return output.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 25d28b092fb..bfb2a5aeb34 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -78,7 +78,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.time.Instant; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -213,6 +212,12 @@ public class IcebergScanNode extends FileQueryScanNode { HashSet<String> partitionPathSet = new HashSet<>(); boolean isPartitionedTable = icebergTable.spec().isPartitioned(); + long rowCount = getCountFromSnapshot(); + if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT) && rowCount >= 0) { + this.rowCount = rowCount; + return new ArrayList<>(); + } + CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), splitSize); try (CloseableIterable<CombinedScanTask> combinedScanTasks = TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) { @@ -266,12 +271,6 @@ public class IcebergScanNode extends FileQueryScanNode { throw new UserException(e.getMessage(), e.getCause()); } - TPushAggOp aggOp = getPushDownAggNoGroupingOp(); - if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() > 0) { - // we can create a special empty split and skip the plan process - return Collections.singletonList(splits.get(0)); - } - readPartitionNum = partitionPathSet.size(); return splits; @@ -425,7 +424,7 @@ public class IcebergScanNode extends FileQueryScanNode { // empty table if (snapshot == null) { - return -1; + return 0; } Map<String, String> summary = snapshot.summary(); @@ -442,12 +441,17 @@ public class IcebergScanNode extends FileQueryScanNode { super.toThrift(planNode); if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT)) { long countFromSnapshot = getCountFromSnapshot(); - if (countFromSnapshot > 0) { + if (countFromSnapshot >= 0) { planNode.setPushDownCount(countFromSnapshot); } } } + @Override + public long getPushDownCount() { + return getCountFromSnapshot(); + } + @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { if (pushdownIcebergPredicates.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 44a5d0dd639..3b92cd88da8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -20,10 +20,12 @@ package org.apache.doris.nereids; import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.ExplainOptions; import org.apache.doris.analysis.StatementBase; +import org.apache.doris.catalog.Column; import org.apache.doris.common.FormatOptions; import org.apache.doris.common.NereidsException; import org.apache.doris.common.Pair; import org.apache.doris.common.profile.SummaryProfile; +import org.apache.doris.datasource.iceberg.source.IcebergScanNode; import org.apache.doris.nereids.CascadesContext.Lock; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.glue.LogicalPlanAdapter; @@ -49,14 +51,18 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.Planner; import org.apache.doris.planner.RuntimeFilter; import org.apache.doris.planner.ScanNode; +import org.apache.doris.qe.CommonResultSet; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ResultSet; +import org.apache.doris.qe.ResultSetMetaData; import org.apache.doris.qe.SessionVariable; import com.google.common.annotations.VisibleForTesting; @@ -66,6 +72,7 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.function.Function; @@ -540,7 +547,23 @@ public class NereidsPlanner extends Planner { } } - return Optional.empty(); + if (physicalPlan instanceof PhysicalResultSink + && physicalPlan.child(0) instanceof PhysicalHashAggregate && !getScanNodes().isEmpty() + && getScanNodes().get(0) instanceof IcebergScanNode) { + List<Column> columns = Lists.newArrayList(); + NamedExpression output = physicalPlan.getOutput().get(0); + columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType())); + if (((IcebergScanNode) getScanNodes().get(0)).rowCount > 0) { + ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns); + ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList( + Lists.newArrayList(String.valueOf(((IcebergScanNode) getScanNodes().get(0)).rowCount)))); + // only support one iceberg scan node and one count, e.g. select count(*) from icetbl; + return Optional.of(resultSet); + } + return Optional.empty(); + } else { + return Optional.empty(); + } } private void setFormatOptions() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 6dc72634b19..24c49864187 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -189,6 +189,7 @@ import org.apache.doris.planner.SetOperationNode; import org.apache.doris.planner.SortNode; import org.apache.doris.planner.TableFunctionNode; import org.apache.doris.planner.UnionNode; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticConstants; import org.apache.doris.tablefunction.TableValuedFunctionIf; import org.apache.doris.thrift.TFetchOption; @@ -1087,6 +1088,12 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla + storageLayerAggregate.getAggOp()); } + if (storageLayerAggregate.getRelation() instanceof PhysicalFileScan + && pushAggOp.equals(TPushAggOp.COUNT) + && !ConnectContext.get().getSessionVariable().isEnableCountPushDownForExternalTable()) { + pushAggOp = TPushAggOp.NONE; + } + context.setRelationPushAggOp( storageLayerAggregate.getRelation().getRelationId(), pushAggOp); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index 2657232db89..70e442546c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -41,6 +41,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.Config; import org.apache.doris.common.FormatOptions; import org.apache.doris.common.UserException; +import org.apache.doris.datasource.iceberg.source.IcebergScanNode; import org.apache.doris.nereids.PlannerHook; import org.apache.doris.qe.CommonResultSet; import org.apache.doris.qe.ConnectContext; @@ -641,6 +642,21 @@ public class OriginalPlanner extends Planner { List<Column> columns = new ArrayList<>(selectItems.size()); List<String> columnLabels = parsedSelectStmt.getColLabels(); List<String> data = new ArrayList<>(); + if ((singleNodePlanner.getScanNodes().size() > 0 && singleNodePlanner.getScanNodes().get(0) + instanceof IcebergScanNode) && (((IcebergScanNode) getScanNodes().get(0)).rowCount > 0)) { + SelectListItem item = selectItems.get(0); + Expr expr = item.getExpr(); + String columnName = columnLabels.get(0); + columns.add(new Column(columnName, expr.getType())); + data.add(String.valueOf(((IcebergScanNode) getScanNodes().get(0)).rowCount)); + ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns); + ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data)); + // only support one iceberg scan node and one count, e.g. select count(*) from icetbl; + return Optional.of(resultSet); + } + if (!parsedSelectStmt.getTableRefs().isEmpty()) { + return Optional.empty(); + } FormatOptions options = FormatOptions.getDefault(); for (int i = 0; i < selectItems.size(); i++) { SelectListItem item = selectItems.get(i); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 5046acfd5a4..046dc28c4a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -564,6 +564,8 @@ public class SessionVariable implements Serializable, Writable { public static final String FORCE_JNI_SCANNER = "force_jni_scanner"; + public static final String ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE = "enable_count_push_down_for_external_table"; + public static final String SHOW_ALL_FE_CONNECTION = "show_all_fe_connection"; public static final String MAX_MSG_SIZE_OF_RESULT_RECEIVER = "max_msg_size_of_result_receiver"; @@ -1757,6 +1759,10 @@ public class SessionVariable implements Serializable, Writable { description = {"强制使用jni方式读取外表", "Force the use of jni mode to read external table"}) private boolean forceJniScanner = false; + @VariableMgr.VarAttr(name = ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE, + description = {"对外表启用 count(*) 下推优化", "enable count(*) pushdown optimization for external table"}) + private boolean enableCountPushDownForExternalTable = true; + public static final String IGNORE_RUNTIME_FILTER_IDS = "ignore_runtime_filter_ids"; public Set<Integer> getIgnoredRuntimeFilterIds() { @@ -3919,6 +3925,10 @@ public class SessionVariable implements Serializable, Writable { forceJniScanner = force; } + public boolean isEnableCountPushDownForExternalTable() { + return enableCountPushDownForExternalTable; + } + public boolean isForceToLocalShuffle() { return getEnablePipelineXEngine() && enableLocalShuffle && enableNereidsPlanner && forceToLocalShuffle; } diff --git a/regression-test/data/external_table_p0/iceberg/test_iceberg_optimize_count.out b/regression-test/data/external_table_p0/iceberg/test_iceberg_optimize_count.out new file mode 100644 index 00000000000..f2e945f9cec --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/test_iceberg_optimize_count.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q01 -- +1000 + +-- !q02 -- +1000 + +-- !q03 -- +1000 + +-- !q04 -- +1000 + +-- !q05 -- +1000 + +-- !q06 -- +1000 + +-- !q07 -- +1000 + +-- !q08 -- +1000 + diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy new file mode 100644 index 00000000000..927d442b8dd --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_optimize_count", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "test_iceberg_optimize_count" + + try { + + sql """drop catalog if exists ${catalog_name}""" + sql """CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """ switch ${catalog_name} """ + sql """ use format_v2 """ + + sqlstr1 = """ select count(*) from sample_cow_orc; """ + sqlstr2 = """ select count(*) from sample_cow_parquet; """ + sqlstr3 = """ select count(*) from sample_mor_orc; """ + sqlstr4 = """ select count(*) from sample_mor_parquet; """ + + // use push down count + sql """ set enable_count_push_down_for_external_table=true; """ + + qt_q01 """${sqlstr1}""" + qt_q02 """${sqlstr2}""" + qt_q03 """${sqlstr3}""" + qt_q04 """${sqlstr4}""" + + explain { + sql("""${sqlstr1}""") + contains """pushdown agg=COUNT (1000)""" + } + explain { + sql("""${sqlstr2}""") + contains """pushdown agg=COUNT (1000)""" + } + explain { + sql("""${sqlstr3}""") + contains """pushdown agg=COUNT (1000)""" + } + explain { + sql("""${sqlstr4}""") + contains """pushdown agg=COUNT (1000)""" + } + + // don't use push down count + sql """ set enable_count_push_down_for_external_table=false; """ + + qt_q05 """${sqlstr1}""" + qt_q06 """${sqlstr2}""" + qt_q07 """${sqlstr3}""" + qt_q08 """${sqlstr4}""" + + explain { + sql("""${sqlstr1}""") + contains """pushdown agg=NONE""" + } + explain { + sql("""${sqlstr2}""") + contains """pushdown agg=NONE""" + } + explain { + sql("""${sqlstr3}""") + contains """pushdown agg=NONE""" + } + explain { + sql("""${sqlstr4}""") + contains """pushdown agg=NONE""" + } + + } finally { + sql """ set enable_count_push_down_for_external_table=true; """ + sql """drop catalog if exists ${catalog_name}""" + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org