This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new a149a5c748 [Feature](inverted index) push count on index down to scan 
node #22687 (#23055)
a149a5c748 is described below

commit a149a5c7481afea8028efab14853552dbaa855dc
Author: airborne12 <airborn...@gmail.com>
AuthorDate: Fri Aug 18 18:28:14 2023 +0800

    [Feature](inverted index) push count on index down to scan node #22687 
(#23055)
---
 be/src/olap/rowset/segment_v2/segment.cpp          |  3 +-
 be/src/olap/rowset/segment_v2/segment_iterator.cpp | 14 ++++-
 be/src/vec/exec/scan/new_olap_scan_node.cpp        |  3 +-
 .../glue/translator/PhysicalPlanTranslator.java    |  3 +
 .../org/apache/doris/nereids/rules/RuleType.java   |  1 +
 .../rules/implementation/AggregateStrategies.java  | 73 ++++++++++++++++++++++
 .../physical/PhysicalStorageLayerAggregate.java    |  3 +-
 .../java/org/apache/doris/qe/SessionVariable.java  | 17 ++++-
 gensrc/thrift/PlanNodes.thrift                     |  3 +-
 9 files changed, 113 insertions(+), 7 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index c6d472d035..f70d1bd1d5 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -160,7 +160,8 @@ Status Segment::new_iterator(SchemaSPtr schema, const 
StorageReadOptions& read_o
 
     RETURN_IF_ERROR(load_index());
     if (read_options.delete_condition_predicates->num_of_column_predicate() == 
0 &&
-        read_options.push_down_agg_type_opt != TPushAggOp::NONE) {
+        read_options.push_down_agg_type_opt != TPushAggOp::NONE &&
+        read_options.push_down_agg_type_opt != TPushAggOp::COUNT_ON_INDEX) {
         
iter->reset(vectorized::new_vstatistics_iterator(this->shared_from_this(), 
*schema));
     } else {
         iter->reset(new SegmentIterator(this->shared_from_this(), schema));
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index f1c5b15bce..cc5d4b9fdb 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -956,9 +956,19 @@ bool SegmentIterator::_need_read_data(ColumnId cid) {
         // occurring, return true here that column data needs to be read
         return true;
     }
+    // Check the following conditions:
+    // 1. If the column represented by the unique ID is an inverted index 
column (indicated by '_need_read_data_indices.count(unique_id) > 0 && 
!_need_read_data_indices[unique_id]')
+    //    and it's not marked for projection in '_output_columns'.
+    // 2. Or, if the column is an inverted index column and it's marked for 
projection in '_output_columns',
+    //    and the operation is a push down of the 'COUNT_ON_INDEX' aggregation 
function.
+    // If any of the above conditions are met, log a debug message indicating 
that there's no need to read data for the indexed column.
+    // Then, return false.
     int32_t unique_id = _opts.tablet_schema->column(cid).unique_id();
-    if (_need_read_data_indices.count(unique_id) > 0 && 
!_need_read_data_indices[unique_id] &&
-        _output_columns.count(unique_id) < 1) {
+    if ((_need_read_data_indices.count(unique_id) > 0 && 
!_need_read_data_indices[unique_id] &&
+         _output_columns.count(unique_id) < 1) ||
+        (_need_read_data_indices.count(unique_id) > 0 && 
!_need_read_data_indices[unique_id] &&
+         _output_columns.count(unique_id) == 1 &&
+         _opts.push_down_agg_type_opt == TPushAggOp::COUNT_ON_INDEX)) {
         VLOG_DEBUG << "SegmentIterator no need read data for column: "
                    << _opts.tablet_schema->column_by_uid(unique_id).name();
         return false;
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 3af5bb9f89..c5f9e75586 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -249,7 +249,8 @@ Status NewOlapScanNode::_process_conjuncts() {
 }
 
 Status NewOlapScanNode::_build_key_ranges_and_filters() {
-    if (_push_down_agg_type == TPushAggOp::NONE) {
+    if (_push_down_agg_type == TPushAggOp::NONE ||
+        _push_down_agg_type == TPushAggOp::COUNT_ON_INDEX) {
         const std::vector<std::string>& column_names = 
_olap_scan_node.key_column_name;
         const std::vector<TPrimitiveType::type>& column_types = 
_olap_scan_node.key_column_type;
         DCHECK(column_types.size() == column_names.size());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 9fda9b99f9..ed9b30f93e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -787,6 +787,9 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             case COUNT:
                 pushAggOp = TPushAggOp.COUNT;
                 break;
+            case COUNT_ON_MATCH:
+                pushAggOp = TPushAggOp.COUNT_ON_INDEX;
+                break;
             case MIN_MAX:
                 pushAggOp = TPushAggOp.MINMAX;
                 break;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 4015a30ef6..3be748fe0c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -319,6 +319,7 @@ public enum RuleType {
     STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION),
     STORAGE_LAYER_AGGREGATE_WITH_PROJECT(RuleTypeClass.IMPLEMENTATION),
     
STORAGE_LAYER_AGGREGATE_WITH_PROJECT_FOR_FILE_SCAN(RuleTypeClass.IMPLEMENTATION),
+    COUNT_ON_INDEX(RuleTypeClass.IMPLEMENTATION),
     ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT(RuleTypeClass.IMPLEMENTATION),
     TWO_PHASE_AGGREGATE_WITHOUT_DISTINCT(RuleTypeClass.IMPLEMENTATION),
     
TWO_PHASE_AGGREGATE_WITH_COUNT_DISTINCT_MULTI(RuleTypeClass.IMPLEMENTATION),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
index 03962ff752..48e975b00c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
@@ -36,6 +36,7 @@ import org.apache.doris.nereids.trees.expressions.Alias;
 import org.apache.doris.nereids.trees.expressions.Cast;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.IsNull;
+import org.apache.doris.nereids.trees.expressions.Match;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.expressions.functions.ExpressionTrait;
@@ -56,6 +57,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.algebra.Project;
 import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
 import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
@@ -97,6 +99,28 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
         PatternDescriptor<LogicalAggregate<GroupPlan>> basePattern = 
logicalAggregate();
 
         return ImmutableList.of(
+            RuleType.COUNT_ON_INDEX.build(
+                logicalAggregate(
+                    logicalProject(
+                        logicalFilter(
+                            logicalOlapScan()
+                        ).when(filter -> 
containsMatchExpression(filter.getExpressions())
+                                && filter.getExpressions().size() == 1)
+                    ))
+                    .when(agg -> enablePushDownCountOnIndex())
+                    .when(agg -> agg.getGroupByExpressions().size() == 0)
+                    .when(agg -> {
+                        Set<AggregateFunction> funcs = 
agg.getAggregateFunctions();
+                        return !funcs.isEmpty() && funcs.stream().allMatch(f 
-> f instanceof Count && !f.isDistinct());
+                    })
+                    .thenApply(ctx -> {
+                        
LogicalAggregate<LogicalProject<LogicalFilter<LogicalOlapScan>>> agg = ctx.root;
+                        LogicalProject<LogicalFilter<LogicalOlapScan>> project 
= agg.child();
+                        LogicalFilter<LogicalOlapScan> filter = 
project.child();
+                        LogicalOlapScan olapScan = filter.child();
+                        return pushdownCountOnIndex(agg, project, filter, 
olapScan, ctx.cascadesContext);
+                    })
+            ),
             RuleType.STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT.build(
                 logicalAggregate(
                     logicalOlapScan()
@@ -188,6 +212,55 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
         );
     }
 
+    private boolean containsMatchExpression(List<Expression> expressions) {
+        return expressions.stream().allMatch(expr -> expr instanceof Match);
+    }
+
+    private boolean enablePushDownCountOnIndex() {
+        ConnectContext connectContext = ConnectContext.get();
+        return connectContext != null && 
connectContext.getSessionVariable().isEnablePushDownCountOnIndex();
+    }
+
+    /**
+     * sql: select count(*) from tbl where column match 'token'
+     * <p>
+     * before:
+     * <p>
+     *               LogicalAggregate(groupBy=[], output=[count(*)])
+     *                                |
+     *                     LogicalFilter(column match 'token')
+     *                                |
+     *                       LogicalOlapScan(table=tbl)
+     * <p>
+     * after:
+     * <p>
+     *               LogicalAggregate(groupBy=[], output=[count(*)])
+     *                                |
+     *                    LogicalFilter(column match 'token')
+     *                                |
+     *        PhysicalStorageLayerAggregate(pushAggOp=COUNT_ON_INDEX, 
table=PhysicalOlapScan(table=tbl))
+     *
+     */
+    private LogicalAggregate<? extends Plan> pushdownCountOnIndex(
+            LogicalAggregate<? extends Plan> agg,
+            LogicalProject<? extends Plan> project,
+            LogicalFilter<? extends Plan> filter,
+            LogicalOlapScan olapScan,
+            CascadesContext cascadesContext) {
+        PhysicalOlapScan physicalOlapScan
+                = (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan()
+                .build()
+                .transform(olapScan, cascadesContext)
+                .get(0);
+        return agg.withChildren(ImmutableList.of(
+                project.withChildren(ImmutableList.of(
+                        filter.withChildren(ImmutableList.of(
+                                new PhysicalStorageLayerAggregate(
+                                        physicalOlapScan,
+                                        PushDownAggOp.COUNT_ON_MATCH)))))
+        ));
+    }
+
     /**
      * sql: select count(*) from tbl
      * <p>
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalStorageLayerAggregate.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalStorageLayerAggregate.java
index 7a9550adc3..73fdfa7305 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalStorageLayerAggregate.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalStorageLayerAggregate.java
@@ -108,8 +108,9 @@ public class PhysicalStorageLayerAggregate extends 
PhysicalCatalogRelation {
 
     /** PushAggOp */
     public enum PushDownAggOp {
-        COUNT, MIN_MAX, MIX;
+        COUNT, MIN_MAX, MIX, COUNT_ON_MATCH;
 
+        /** supportedFunctions */
         public static Map<Class<? extends AggregateFunction>, PushDownAggOp> 
supportedFunctions() {
             return ImmutableMap.<Class<? extends AggregateFunction>, 
PushDownAggOp>builder()
                     .put(Count.class, PushDownAggOp.COUNT)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 36afda4d3b..d570dc3f7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -318,6 +318,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_INVERTED_INDEX_QUERY = 
"enable_inverted_index_query";
 
+    public static final String ENABLE_PUSHDOWN_COUNT_ON_INDEX = 
"enable_count_on_index_pushdown";
+
     public static final String GROUP_BY_AND_HAVING_USE_ALIAS_FIRST = 
"group_by_and_having_use_alias_first";
     public static final String DROP_TABLE_IF_CTAS_FAILED = 
"drop_table_if_ctas_failed";
 
@@ -958,9 +960,14 @@ public class SessionVariable implements Serializable, 
Writable {
 
     // Whether enable query with inverted index.
     @VariableMgr.VarAttr(name = ENABLE_INVERTED_INDEX_QUERY, needForward = 
true, description = {
-            "是否启用inverted index query。", "Set wether to use inverted index 
query."})
+            "是否启用inverted index query。", "Set whether to use inverted index 
query."})
     public boolean enableInvertedIndexQuery = true;
 
+    // Whether enable pushdown count agg to scan node when using inverted 
index match.
+    @VariableMgr.VarAttr(name = ENABLE_PUSHDOWN_COUNT_ON_INDEX, needForward = 
true, description = {
+            "是否启用count_on_index pushdown。", "Set whether to pushdown 
count_on_index."})
+    public boolean enablePushDownCountOnIndex = true;
+
     // Whether drop table when create table as select insert data appear error.
     @VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true)
     public boolean dropTableIfCtasFailed = true;
@@ -2019,6 +2026,14 @@ public class SessionVariable implements Serializable, 
Writable {
         this.enableInvertedIndexQuery = enableInvertedIndexQuery;
     }
 
+    public boolean isEnablePushDownCountOnIndex() {
+        return enablePushDownCountOnIndex;
+    }
+
+    public void setEnablePushDownCountOnIndex(boolean 
enablePushDownCountOnIndex) {
+        this.enablePushDownCountOnIndex = enablePushDownCountOnIndex;
+    }
+
     public int getMaxTableCountUseCascadesJoinReorder() {
         return this.maxTableCountUseCascadesJoinReorder;
     }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 614017d66a..72cd772cfc 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -622,7 +622,8 @@ enum TPushAggOp {
        NONE = 0,
        MINMAX = 1,
        COUNT = 2,
-       MIX = 3
+       MIX = 3,
+       COUNT_ON_INDEX = 4
 }
 
 struct TOlapScanNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to