This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 9fd47950809 [opt](iceberg)Add a new appearance to display the pushDown 
`count` for 2.1 (#37046) (#34928) (#37810)
9fd47950809 is described below

commit 9fd4795080963e10fb4ef5e17a2564e9d072c28f
Author: wuwenchi <wuwenchi...@hotmail.com>
AuthorDate: Tue Jul 16 16:03:44 2024 +0800

    [opt](iceberg)Add a new appearance to display the pushDown `count` for 2.1 
(#37046) (#34928) (#37810)
    
    ## Proposed changes
    
    bp: #37046 #34928
---
 .../org/apache/doris/datasource/FileScanNode.java  |  14 ++-
 .../datasource/iceberg/source/IcebergScanNode.java |  22 +++--
 .../org/apache/doris/nereids/NereidsPlanner.java   |  25 ++++-
 .../glue/translator/PhysicalPlanTranslator.java    |   7 ++
 .../org/apache/doris/planner/OriginalPlanner.java  |  16 ++++
 .../java/org/apache/doris/qe/SessionVariable.java  |  10 ++
 .../iceberg/test_iceberg_optimize_count.out        |  25 +++++
 .../iceberg/test_iceberg_optimize_count.groovy     | 105 +++++++++++++++++++++
 8 files changed, 213 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index c0364670d9f..11a7b13024e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -39,6 +39,7 @@ import org.apache.doris.thrift.TFileScanNode;
 import org.apache.doris.thrift.TFileScanRangeParams;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
+import org.apache.doris.thrift.TPushAggOp;
 import org.apache.doris.thrift.TScanRangeLocations;
 
 import com.google.common.base.Preconditions;
@@ -70,6 +71,7 @@ public abstract class FileScanNode extends ExternalScanNode {
     protected long totalPartitionNum = 0;
     protected long readPartitionNum = 0;
     protected long fileSplitSize;
+    public long rowCount = 0;
 
     public FileScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName, StatisticalType statisticalType,
             boolean needCheckColumnPriv) {
@@ -95,6 +97,10 @@ public abstract class FileScanNode extends ExternalScanNode {
         planNode.setFileScanNode(fileScanNode);
     }
 
+    public long getPushDownCount() {
+        return 0;
+    }
+
     @Override
     public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
         StringBuilder output = new StringBuilder();
@@ -170,7 +176,13 @@ public abstract class FileScanNode extends 
ExternalScanNode {
             output.append(String.format("avgRowSize=%s, ", avgRowSize));
         }
         output.append(String.format("numNodes=%s", numNodes)).append("\n");
-        output.append(prefix).append(String.format("pushdown agg=%s", 
pushDownAggNoGroupingOp)).append("\n");
+
+        // pushdown agg
+        output.append(prefix).append(String.format("pushdown agg=%s", 
pushDownAggNoGroupingOp));
+        if (pushDownAggNoGroupingOp.equals(TPushAggOp.COUNT)) {
+            output.append(" (").append(getPushDownCount()).append(")");
+        }
+        output.append("\n");
 
         return output.toString();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 25d28b092fb..bfb2a5aeb34 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -78,7 +78,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -213,6 +212,12 @@ public class IcebergScanNode extends FileQueryScanNode {
         HashSet<String> partitionPathSet = new HashSet<>();
         boolean isPartitionedTable = icebergTable.spec().isPartitioned();
 
+        long rowCount = getCountFromSnapshot();
+        if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT) && rowCount 
>= 0) {
+            this.rowCount = rowCount;
+            return new ArrayList<>();
+        }
+
         CloseableIterable<FileScanTask> fileScanTasks = 
TableScanUtil.splitFiles(scan.planFiles(), splitSize);
         try (CloseableIterable<CombinedScanTask> combinedScanTasks =
                 TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
@@ -266,12 +271,6 @@ public class IcebergScanNode extends FileQueryScanNode {
             throw new UserException(e.getMessage(), e.getCause());
         }
 
-        TPushAggOp aggOp = getPushDownAggNoGroupingOp();
-        if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() > 0) {
-            // we can create a special empty split and skip the plan process
-            return Collections.singletonList(splits.get(0));
-        }
-
         readPartitionNum = partitionPathSet.size();
 
         return splits;
@@ -425,7 +424,7 @@ public class IcebergScanNode extends FileQueryScanNode {
 
         // empty table
         if (snapshot == null) {
-            return -1;
+            return 0;
         }
 
         Map<String, String> summary = snapshot.summary();
@@ -442,12 +441,17 @@ public class IcebergScanNode extends FileQueryScanNode {
         super.toThrift(planNode);
         if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT)) {
             long countFromSnapshot = getCountFromSnapshot();
-            if (countFromSnapshot > 0) {
+            if (countFromSnapshot >= 0) {
                 planNode.setPushDownCount(countFromSnapshot);
             }
         }
     }
 
+    @Override
+    public long getPushDownCount() {
+        return getCountFromSnapshot();
+    }
+
     @Override
     public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
         if (pushdownIcebergPredicates.isEmpty()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index 44a5d0dd639..3b92cd88da8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -20,10 +20,12 @@ package org.apache.doris.nereids;
 import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.analysis.ExplainOptions;
 import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.catalog.Column;
 import org.apache.doris.common.FormatOptions;
 import org.apache.doris.common.NereidsException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
 import org.apache.doris.nereids.CascadesContext.Lock;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.glue.LogicalPlanAdapter;
@@ -49,14 +51,18 @@ import org.apache.doris.nereids.trees.plans.Plan;
 import 
org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.Planner;
 import org.apache.doris.planner.RuntimeFilter;
 import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.CommonResultSet;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ResultSet;
+import org.apache.doris.qe.ResultSetMetaData;
 import org.apache.doris.qe.SessionVariable;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -66,6 +72,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.Function;
@@ -540,7 +547,23 @@ public class NereidsPlanner extends Planner {
             }
         }
 
-        return Optional.empty();
+        if (physicalPlan instanceof PhysicalResultSink
+                && physicalPlan.child(0) instanceof PhysicalHashAggregate && 
!getScanNodes().isEmpty()
+                && getScanNodes().get(0) instanceof IcebergScanNode) {
+            List<Column> columns = Lists.newArrayList();
+            NamedExpression output = physicalPlan.getOutput().get(0);
+            columns.add(new Column(output.getName(), 
output.getDataType().toCatalogDataType()));
+            if (((IcebergScanNode) getScanNodes().get(0)).rowCount > 0) {
+                ResultSetMetaData metadata = new 
CommonResultSet.CommonResultSetMetaData(columns);
+                ResultSet resultSet = new CommonResultSet(metadata, 
Collections.singletonList(
+                        Lists.newArrayList(String.valueOf(((IcebergScanNode) 
getScanNodes().get(0)).rowCount))));
+                // only support one iceberg scan node and one count, e.g. 
select count(*) from icetbl;
+                return Optional.of(resultSet);
+            }
+            return Optional.empty();
+        } else {
+            return Optional.empty();
+        }
     }
 
     private void setFormatOptions() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 6dc72634b19..24c49864187 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -189,6 +189,7 @@ import org.apache.doris.planner.SetOperationNode;
 import org.apache.doris.planner.SortNode;
 import org.apache.doris.planner.TableFunctionNode;
 import org.apache.doris.planner.UnionNode;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.StatisticConstants;
 import org.apache.doris.tablefunction.TableValuedFunctionIf;
 import org.apache.doris.thrift.TFetchOption;
@@ -1087,6 +1088,12 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                         + storageLayerAggregate.getAggOp());
         }
 
+        if (storageLayerAggregate.getRelation() instanceof PhysicalFileScan
+                && pushAggOp.equals(TPushAggOp.COUNT)
+                && 
!ConnectContext.get().getSessionVariable().isEnableCountPushDownForExternalTable())
 {
+            pushAggOp = TPushAggOp.NONE;
+        }
+
         context.setRelationPushAggOp(
                 storageLayerAggregate.getRelation().getRelationId(), 
pushAggOp);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index 2657232db89..70e442546c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -41,6 +41,7 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FormatOptions;
 import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
 import org.apache.doris.nereids.PlannerHook;
 import org.apache.doris.qe.CommonResultSet;
 import org.apache.doris.qe.ConnectContext;
@@ -641,6 +642,21 @@ public class OriginalPlanner extends Planner {
         List<Column> columns = new ArrayList<>(selectItems.size());
         List<String> columnLabels = parsedSelectStmt.getColLabels();
         List<String> data = new ArrayList<>();
+        if ((singleNodePlanner.getScanNodes().size() > 0 && 
singleNodePlanner.getScanNodes().get(0)
+                instanceof IcebergScanNode) && (((IcebergScanNode) 
getScanNodes().get(0)).rowCount > 0)) {
+            SelectListItem item = selectItems.get(0);
+            Expr expr = item.getExpr();
+            String columnName = columnLabels.get(0);
+            columns.add(new Column(columnName, expr.getType()));
+            data.add(String.valueOf(((IcebergScanNode) 
getScanNodes().get(0)).rowCount));
+            ResultSetMetaData metadata = new 
CommonResultSet.CommonResultSetMetaData(columns);
+            ResultSet resultSet = new CommonResultSet(metadata, 
Collections.singletonList(data));
+            // only support one iceberg scan node and one count, e.g. select 
count(*) from icetbl;
+            return Optional.of(resultSet);
+        }
+        if (!parsedSelectStmt.getTableRefs().isEmpty()) {
+            return Optional.empty();
+        }
         FormatOptions options = FormatOptions.getDefault();
         for (int i = 0; i < selectItems.size(); i++) {
             SelectListItem item = selectItems.get(i);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 5046acfd5a4..046dc28c4a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -564,6 +564,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String FORCE_JNI_SCANNER = "force_jni_scanner";
 
+    public static final String ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE = 
"enable_count_push_down_for_external_table";
+
     public static final String SHOW_ALL_FE_CONNECTION = 
"show_all_fe_connection";
 
     public static final String MAX_MSG_SIZE_OF_RESULT_RECEIVER = 
"max_msg_size_of_result_receiver";
@@ -1757,6 +1759,10 @@ public class SessionVariable implements Serializable, 
Writable {
             description = {"强制使用jni方式读取外表", "Force the use of jni mode to read 
external table"})
     private boolean forceJniScanner = false;
 
+    @VariableMgr.VarAttr(name = ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE,
+            description = {"对外表启用 count(*) 下推优化", "enable count(*) pushdown 
optimization for external table"})
+    private boolean enableCountPushDownForExternalTable = true;
+
     public static final String IGNORE_RUNTIME_FILTER_IDS = 
"ignore_runtime_filter_ids";
 
     public Set<Integer> getIgnoredRuntimeFilterIds() {
@@ -3919,6 +3925,10 @@ public class SessionVariable implements Serializable, 
Writable {
         forceJniScanner = force;
     }
 
+    public boolean isEnableCountPushDownForExternalTable() {
+        return enableCountPushDownForExternalTable;
+    }
+
     public boolean isForceToLocalShuffle() {
         return getEnablePipelineXEngine() && enableLocalShuffle && 
enableNereidsPlanner && forceToLocalShuffle;
     }
diff --git 
a/regression-test/data/external_table_p0/iceberg/test_iceberg_optimize_count.out
 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_optimize_count.out
new file mode 100644
index 00000000000..f2e945f9cec
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_optimize_count.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !q01 --
+1000
+
+-- !q02 --
+1000
+
+-- !q03 --
+1000
+
+-- !q04 --
+1000
+
+-- !q05 --
+1000
+
+-- !q06 --
+1000
+
+-- !q07 --
+1000
+
+-- !q08 --
+1000
+
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
new file mode 100644
index 00000000000..927d442b8dd
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_optimize_count", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String catalog_name = "test_iceberg_optimize_count"
+
+    try {
+
+        sql """drop catalog if exists ${catalog_name}"""
+        sql """CREATE CATALOG ${catalog_name} PROPERTIES (
+                'type'='iceberg',
+                'iceberg.catalog.type'='rest',
+                'uri' = 'http://${externalEnvIp}:${rest_port}',
+                "s3.access_key" = "admin",
+                "s3.secret_key" = "password",
+                "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+                "s3.region" = "us-east-1"
+            );"""
+
+        sql """ switch ${catalog_name} """
+        sql """ use format_v2 """
+
+        sqlstr1 = """ select count(*) from sample_cow_orc; """
+        sqlstr2 = """ select count(*) from sample_cow_parquet; """
+        sqlstr3 = """ select count(*) from sample_mor_orc; """
+        sqlstr4 = """ select count(*) from sample_mor_parquet; """
+
+        // use push down count
+        sql """ set enable_count_push_down_for_external_table=true; """
+
+        qt_q01 """${sqlstr1}""" 
+        qt_q02 """${sqlstr2}""" 
+        qt_q03 """${sqlstr3}""" 
+        qt_q04 """${sqlstr4}""" 
+
+        explain {
+            sql("""${sqlstr1}""")
+            contains """pushdown agg=COUNT (1000)"""
+        }
+        explain {
+            sql("""${sqlstr2}""")
+            contains """pushdown agg=COUNT (1000)"""
+        }
+        explain {
+            sql("""${sqlstr3}""")
+            contains """pushdown agg=COUNT (1000)"""
+        }
+        explain {
+            sql("""${sqlstr4}""")
+            contains """pushdown agg=COUNT (1000)"""
+        }
+
+        // don't use push down count
+        sql """ set enable_count_push_down_for_external_table=false; """
+
+        qt_q05 """${sqlstr1}""" 
+        qt_q06 """${sqlstr2}""" 
+        qt_q07 """${sqlstr3}""" 
+        qt_q08 """${sqlstr4}""" 
+
+        explain {
+            sql("""${sqlstr1}""")
+            contains """pushdown agg=NONE"""
+        }
+        explain {
+            sql("""${sqlstr2}""")
+            contains """pushdown agg=NONE"""
+        }
+        explain {
+            sql("""${sqlstr3}""")
+            contains """pushdown agg=NONE"""
+        }
+        explain {
+            sql("""${sqlstr4}""")
+            contains """pushdown agg=NONE"""
+        }
+
+    } finally {
+        sql """ set enable_count_push_down_for_external_table=true; """
+        sql """drop catalog if exists ${catalog_name}"""
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to