This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 397bf354db [WIP](optional) using hash set to distinct single value 
(#11246)
397bf354db is described below

commit 397bf354db7af43885618c78b486cb1ac0588b92
Author: wangbo <wan...@apache.org>
AuthorDate: Thu Aug 4 15:52:58 2022 +0800

    [WIP](optional) using hash set to distinct single value (#11246)
    
    * [WIP](optional) using hash set to distinct single value
    
    
    Co-authored-by: wangb...@meituan.com <wangb...@meituan.com>
---
 .../org/apache/doris/analysis/AggregateInfo.java   | 58 +++++++++++++++-------
 .../java/org/apache/doris/analysis/SelectStmt.java |  4 +-
 .../apache/doris/planner/DistributedPlanner.java   |  6 +--
 .../java/org/apache/doris/qe/SessionVariable.java  | 12 +++++
 4 files changed, 57 insertions(+), 23 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
index 1ddab852ee..cfba7226da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
@@ -22,7 +22,9 @@ package org.apache.doris.analysis;
 
 import org.apache.doris.catalog.FunctionSet;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.util.VectorizedUtil;
 import org.apache.doris.planner.DataPartition;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TPartitionType;
 
 import com.google.common.base.MoreObjects;
@@ -113,8 +115,7 @@ public final class AggregateInfo extends AggregateInfoBase {
     private ArrayList<Integer> materializedAggregateSlots = 
Lists.newArrayList();
     // if true, this AggregateInfo is the first phase of a 2-phase DISTINCT 
computation
     private boolean isDistinctAgg = false;
-    // If true, the sql has MultiDistinct
-    private boolean isMultiDistinct;
+    private boolean isUsingSetForDistinct;
 
     // the multi distinct's begin pos  and end pos in groupby exprs
     private ArrayList<Integer> firstIdx = Lists.newArrayList();
@@ -127,10 +128,10 @@ public final class AggregateInfo extends 
AggregateInfoBase {
     }
 
     private AggregateInfo(ArrayList<Expr> groupingExprs,
-                          ArrayList<FunctionCallExpr> aggExprs, AggPhase 
aggPhase, boolean isMultiDistinct)  {
+                          ArrayList<FunctionCallExpr> aggExprs, AggPhase 
aggPhase, boolean isUsingSetForDistinct)  {
         super(groupingExprs, aggExprs);
         this.aggPhase = aggPhase;
-        this.isMultiDistinct = isMultiDistinct;
+        this.isUsingSetForDistinct = isUsingSetForDistinct;
     }
 
     /**
@@ -197,11 +198,11 @@ public final class AggregateInfo extends 
AggregateInfoBase {
         // 1: if aggExprs don't have distinct or have multi distinct , create 
aggregate info for
         // one stage aggregation.
         // 2: if aggExprs have one distinct , create aggregate info for two 
stage aggregation
-        boolean isMultiDistinct = 
estimateIfContainsMultiDistinct(distinctAggExprs);
-        if (distinctAggExprs.isEmpty() || isMultiDistinct) {
+        boolean isUsingSetForDistinct = 
estimateIfUsingSetForDistinct(distinctAggExprs);
+        if (distinctAggExprs.isEmpty() || isUsingSetForDistinct) {
             // It is used to map new aggr expr to old expr to help create an 
external
             // reference to the aggregation node tuple
-            result.setIsMultiDistinct(isMultiDistinct);
+            result.setIsUsingSetForDistinct(isUsingSetForDistinct);
             if (tupleDesc == null) {
                 result.createTupleDescs(analyzer);
                 result.createSmaps(analyzer);
@@ -241,6 +242,26 @@ public final class AggregateInfo extends AggregateInfoBase 
{
         return result;
     }
 
+
+    // note(wb): in some cases, using hashset for distinct is better
+    public static boolean isSetUsingSetForDistinct(List<FunctionCallExpr> 
distinctAggExprs) {
+        boolean isSetUsingSetForDistinct = false;
+        // for vectorized execution, we force it to using hash set to execution
+        if (distinctAggExprs.size() == 1
+                && distinctAggExprs.get(0).getFnParams().isDistinct()
+                && VectorizedUtil.isVectorized()
+                && 
ConnectContext.get().getSessionVariable().enableSingleDistinctColumnOpt()) {
+            isSetUsingSetForDistinct = true;
+        }
+        return isSetUsingSetForDistinct;
+    }
+
+    public static boolean estimateIfUsingSetForDistinct(List<FunctionCallExpr> 
distinctAggExprs)
+            throws AnalysisException {
+        return estimateIfContainsMultiDistinct(distinctAggExprs)
+                || isSetUsingSetForDistinct(distinctAggExprs);
+    }
+
     /**
      * estimate if functions contains multi distinct
      * @param distinctAggExprs
@@ -283,6 +304,7 @@ public final class AggregateInfo extends AggregateInfoBase {
                 hasMultiDistinct = true;
             }
         }
+
         return hasMultiDistinct;
     }
 
@@ -340,11 +362,11 @@ public final class AggregateInfo extends 
AggregateInfoBase {
             }
         }
 
-        this.isMultiDistinct = 
estimateIfContainsMultiDistinct(distinctAggExprs);
+        this.isUsingSetForDistinct = 
estimateIfUsingSetForDistinct(distinctAggExprs);
         isDistinctAgg = true;
 
         // add DISTINCT parameters to grouping exprs
-        if (!isMultiDistinct) {
+        if (!isUsingSetForDistinct) {
             groupingExprs.addAll(expr0Children);
         }
 
@@ -399,12 +421,12 @@ public final class AggregateInfo extends 
AggregateInfoBase {
                 && !secondPhaseDistinctAggInfo.getAggregateExprs().isEmpty());
     }
 
-    public void setIsMultiDistinct(boolean value) {
-        this.isMultiDistinct = value;
+    public void setIsUsingSetForDistinct(boolean value) {
+        this.isUsingSetForDistinct = value;
     }
 
-    public boolean isMultiDistinct() {
-        return isMultiDistinct;
+    public boolean isUsingSetForDistinct() {
+        return isUsingSetForDistinct;
     }
 
     public AggregateInfo getSecondPhaseDistinctAggInfo() {
@@ -547,7 +569,7 @@ public final class AggregateInfo extends AggregateInfoBase {
 
         AggPhase aggPhase =
                 (this.aggPhase == AggPhase.FIRST) ? AggPhase.FIRST_MERGE : 
AggPhase.SECOND_MERGE;
-        mergeAggInfo = new AggregateInfo(groupingExprs, aggExprs, aggPhase, 
isMultiDistinct);
+        mergeAggInfo = new AggregateInfo(groupingExprs, aggExprs, aggPhase, 
isUsingSetForDistinct);
         mergeAggInfo.intermediateTupleDesc = intermediateTupleDesc;
         mergeAggInfo.outputTupleDesc = outputTupleDesc;
         mergeAggInfo.intermediateTupleSmap = intermediateTupleSmap;
@@ -616,7 +638,7 @@ public final class AggregateInfo extends AggregateInfoBase {
         for (FunctionCallExpr inputExpr : distinctAggExprs) {
             Preconditions.checkState(inputExpr.isAggregateFunction());
             FunctionCallExpr aggExpr = null;
-            if (!isMultiDistinct) {
+            if (!isUsingSetForDistinct) {
                 if 
(inputExpr.getFnName().getFunction().equalsIgnoreCase(FunctionSet.COUNT)) {
                     // COUNT(DISTINCT ...) ->
                     // COUNT(IF(IsNull(<agg slot 1>), NULL, IF(IsNull(<agg 
slot 2>), NULL, ...)))
@@ -682,7 +704,7 @@ public final class AggregateInfo extends AggregateInfoBase {
         ArrayList<Expr> substGroupingExprs =
                 Expr.substituteList(origGroupingExprs, intermediateTupleSmap, 
analyzer, false);
         secondPhaseDistinctAggInfo =
-                new AggregateInfo(substGroupingExprs, secondPhaseAggExprs, 
AggPhase.SECOND, isMultiDistinct);
+                new AggregateInfo(substGroupingExprs, secondPhaseAggExprs, 
AggPhase.SECOND, isUsingSetForDistinct);
         secondPhaseDistinctAggInfo.createTupleDescs(analyzer);
         secondPhaseDistinctAggInfo.createSecondPhaseAggSMap(this, 
distinctAggExprs);
         secondPhaseDistinctAggInfo.createMergeAggInfo(analyzer);
@@ -699,7 +721,7 @@ public final class AggregateInfo extends AggregateInfoBase {
         ArrayList<SlotDescriptor> slotDescs = outputTupleDesc.getSlots();
 
         int numDistinctParams = 0;
-        if (!isMultiDistinct) {
+        if (!isUsingSetForDistinct) {
             numDistinctParams = distinctAggExprs.get(0).getChildren().size();
         } else {
             for (int i = 0; i < distinctAggExprs.size(); i++) {
@@ -849,7 +871,7 @@ public final class AggregateInfo extends AggregateInfoBase {
                     outputTupleDesc.getSlots().get(groupExprsSize + i);
             SlotDescriptor intermediateSlotDesc =
                     intermediateTupleDesc.getSlots().get(groupExprsSize + i);
-            if (isDistinctAgg || isMultiDistinct) {
+            if (isDistinctAgg || isUsingSetForDistinct) {
                 slotDesc.setIsMaterialized(true);
                 intermediateSlotDesc.setIsMaterialized(true);
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
index 2440dc1956..62062243db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
@@ -1173,8 +1173,8 @@ public class SelectStmt extends QueryStmt {
             }
         }
         final ExprSubstitutionMap result = new ExprSubstitutionMap();
-        final boolean hasMultiDistinct = 
AggregateInfo.estimateIfContainsMultiDistinct(distinctExprs);
-        if (!hasMultiDistinct) {
+        final boolean isUsingSetForDistinct = 
AggregateInfo.estimateIfUsingSetForDistinct(distinctExprs);
+        if (!isUsingSetForDistinct) {
             return result;
         }
         for (FunctionCallExpr inputExpr : distinctExprs) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 5ea7e7524c..6c64d1b600 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -1069,7 +1069,7 @@ public class DistributedPlanner {
 
         AggregateInfo firstPhaseAggInfo = ((AggregationNode) 
node.getChild(0)).getAggInfo();
         List<Expr> partitionExprs = null;
-        boolean isMultiDistinct = node.getAggInfo().isMultiDistinct();
+        boolean isUsingSetForDistinct = 
node.getAggInfo().isUsingSetForDistinct();
         if (hasGrouping) {
             // We need to do
             // - child fragment:
@@ -1092,7 +1092,7 @@ public class DistributedPlanner {
             //   * phase 2 agg
             // - merge fragment 2, unpartitioned:
             //   * merge agg of phase 2
-            if (!isMultiDistinct) {
+            if (!isUsingSetForDistinct) {
                 partitionExprs = 
Expr.substituteList(firstPhaseAggInfo.getGroupingExprs(),
                         firstPhaseAggInfo.getIntermediateSmap(), 
ctx.getRootAnalyzer(), false);
             }
@@ -1130,7 +1130,7 @@ public class DistributedPlanner {
             mergeFragment.addPlanRoot(node);
         }
 
-        if (!hasGrouping && !isMultiDistinct) {
+        if (!hasGrouping && !isUsingSetForDistinct) {
             // place the merge aggregation of the 2nd phase in an 
unpartitioned fragment;
             // add preceding merge fragment at end
             if (mergeFragment != childFragment) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index f3281f56ce..bd2ef62fda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -174,6 +174,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_VECTORIZED_ENGINE = 
"enable_vectorized_engine";
 
+    public static final String ENABLE_SINGLE_DISTINCT_COLUMN_OPT = 
"enable_single_distinct_column_opt";
+
     public static final String CPU_RESOURCE_LIMIT = "cpu_resource_limit";
 
     public static final String ENABLE_PARALLEL_OUTFILE = 
"enable_parallel_outfile";
@@ -234,6 +236,12 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true)
     public boolean enableProfile = false;
 
+    // using hashset intead of group by + count can improve performance
+    //        but may cause rpc failed when cluster has less BE
+    // Whether this switch is turned on depends on the BE number
+    @VariableMgr.VarAttr(name = ENABLE_SINGLE_DISTINCT_COLUMN_OPT)
+    public boolean enableSingleDistinctColumnOpt = false;
+
     // Set sqlMode to empty string
     @VariableMgr.VarAttr(name = SQL_MODE, needForward = true)
     public long sqlMode = 0L;
@@ -540,6 +548,10 @@ public class SessionVariable implements Serializable, 
Writable {
         return enableProfile;
     }
 
+    public boolean enableSingleDistinctColumnOpt() {
+        return enableSingleDistinctColumnOpt;
+    }
+
     public int getWaitTimeoutS() {
         return waitTimeoutS;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to