This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 397bf354db [WIP](optional) using hash set to distinct single value (#11246) 397bf354db is described below commit 397bf354db7af43885618c78b486cb1ac0588b92 Author: wangbo <wan...@apache.org> AuthorDate: Thu Aug 4 15:52:58 2022 +0800 [WIP](optional) using hash set to distinct single value (#11246) * [WIP](optional) using hash set to distinct single value Co-authored-by: wangb...@meituan.com <wangb...@meituan.com> --- .../org/apache/doris/analysis/AggregateInfo.java | 58 +++++++++++++++------- .../java/org/apache/doris/analysis/SelectStmt.java | 4 +- .../apache/doris/planner/DistributedPlanner.java | 6 +-- .../java/org/apache/doris/qe/SessionVariable.java | 12 +++++ 4 files changed, 57 insertions(+), 23 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java index 1ddab852ee..cfba7226da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java @@ -22,7 +22,9 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.FunctionSet; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.planner.DataPartition; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TPartitionType; import com.google.common.base.MoreObjects; @@ -113,8 +115,7 @@ public final class AggregateInfo extends AggregateInfoBase { private ArrayList<Integer> materializedAggregateSlots = Lists.newArrayList(); // if true, this AggregateInfo is the first phase of a 2-phase DISTINCT computation private boolean isDistinctAgg = false; - // If true, the sql has MultiDistinct - private boolean isMultiDistinct; + private boolean isUsingSetForDistinct; // the multi distinct's begin pos and end pos in groupby exprs private ArrayList<Integer> firstIdx = Lists.newArrayList(); @@ -127,10 +128,10 @@ public final class AggregateInfo extends AggregateInfoBase { } private AggregateInfo(ArrayList<Expr> groupingExprs, - ArrayList<FunctionCallExpr> aggExprs, AggPhase aggPhase, boolean isMultiDistinct) { + ArrayList<FunctionCallExpr> aggExprs, AggPhase aggPhase, boolean isUsingSetForDistinct) { super(groupingExprs, aggExprs); this.aggPhase = aggPhase; - this.isMultiDistinct = isMultiDistinct; + this.isUsingSetForDistinct = isUsingSetForDistinct; } /** @@ -197,11 +198,11 @@ public final class AggregateInfo extends AggregateInfoBase { // 1: if aggExprs don't have distinct or have multi distinct , create aggregate info for // one stage aggregation. // 2: if aggExprs have one distinct , create aggregate info for two stage aggregation - boolean isMultiDistinct = estimateIfContainsMultiDistinct(distinctAggExprs); - if (distinctAggExprs.isEmpty() || isMultiDistinct) { + boolean isUsingSetForDistinct = estimateIfUsingSetForDistinct(distinctAggExprs); + if (distinctAggExprs.isEmpty() || isUsingSetForDistinct) { // It is used to map new aggr expr to old expr to help create an external // reference to the aggregation node tuple - result.setIsMultiDistinct(isMultiDistinct); + result.setIsUsingSetForDistinct(isUsingSetForDistinct); if (tupleDesc == null) { result.createTupleDescs(analyzer); result.createSmaps(analyzer); @@ -241,6 +242,26 @@ public final class AggregateInfo extends AggregateInfoBase { return result; } + + // note(wb): in some cases, using hashset for distinct is better + public static boolean isSetUsingSetForDistinct(List<FunctionCallExpr> distinctAggExprs) { + boolean isSetUsingSetForDistinct = false; + // for vectorized execution, we force it to using hash set to execution + if (distinctAggExprs.size() == 1 + && distinctAggExprs.get(0).getFnParams().isDistinct() + && VectorizedUtil.isVectorized() + && ConnectContext.get().getSessionVariable().enableSingleDistinctColumnOpt()) { + isSetUsingSetForDistinct = true; + } + return isSetUsingSetForDistinct; + } + + public static boolean estimateIfUsingSetForDistinct(List<FunctionCallExpr> distinctAggExprs) + throws AnalysisException { + return estimateIfContainsMultiDistinct(distinctAggExprs) + || isSetUsingSetForDistinct(distinctAggExprs); + } + /** * estimate if functions contains multi distinct * @param distinctAggExprs @@ -283,6 +304,7 @@ public final class AggregateInfo extends AggregateInfoBase { hasMultiDistinct = true; } } + return hasMultiDistinct; } @@ -340,11 +362,11 @@ public final class AggregateInfo extends AggregateInfoBase { } } - this.isMultiDistinct = estimateIfContainsMultiDistinct(distinctAggExprs); + this.isUsingSetForDistinct = estimateIfUsingSetForDistinct(distinctAggExprs); isDistinctAgg = true; // add DISTINCT parameters to grouping exprs - if (!isMultiDistinct) { + if (!isUsingSetForDistinct) { groupingExprs.addAll(expr0Children); } @@ -399,12 +421,12 @@ public final class AggregateInfo extends AggregateInfoBase { && !secondPhaseDistinctAggInfo.getAggregateExprs().isEmpty()); } - public void setIsMultiDistinct(boolean value) { - this.isMultiDistinct = value; + public void setIsUsingSetForDistinct(boolean value) { + this.isUsingSetForDistinct = value; } - public boolean isMultiDistinct() { - return isMultiDistinct; + public boolean isUsingSetForDistinct() { + return isUsingSetForDistinct; } public AggregateInfo getSecondPhaseDistinctAggInfo() { @@ -547,7 +569,7 @@ public final class AggregateInfo extends AggregateInfoBase { AggPhase aggPhase = (this.aggPhase == AggPhase.FIRST) ? AggPhase.FIRST_MERGE : AggPhase.SECOND_MERGE; - mergeAggInfo = new AggregateInfo(groupingExprs, aggExprs, aggPhase, isMultiDistinct); + mergeAggInfo = new AggregateInfo(groupingExprs, aggExprs, aggPhase, isUsingSetForDistinct); mergeAggInfo.intermediateTupleDesc = intermediateTupleDesc; mergeAggInfo.outputTupleDesc = outputTupleDesc; mergeAggInfo.intermediateTupleSmap = intermediateTupleSmap; @@ -616,7 +638,7 @@ public final class AggregateInfo extends AggregateInfoBase { for (FunctionCallExpr inputExpr : distinctAggExprs) { Preconditions.checkState(inputExpr.isAggregateFunction()); FunctionCallExpr aggExpr = null; - if (!isMultiDistinct) { + if (!isUsingSetForDistinct) { if (inputExpr.getFnName().getFunction().equalsIgnoreCase(FunctionSet.COUNT)) { // COUNT(DISTINCT ...) -> // COUNT(IF(IsNull(<agg slot 1>), NULL, IF(IsNull(<agg slot 2>), NULL, ...))) @@ -682,7 +704,7 @@ public final class AggregateInfo extends AggregateInfoBase { ArrayList<Expr> substGroupingExprs = Expr.substituteList(origGroupingExprs, intermediateTupleSmap, analyzer, false); secondPhaseDistinctAggInfo = - new AggregateInfo(substGroupingExprs, secondPhaseAggExprs, AggPhase.SECOND, isMultiDistinct); + new AggregateInfo(substGroupingExprs, secondPhaseAggExprs, AggPhase.SECOND, isUsingSetForDistinct); secondPhaseDistinctAggInfo.createTupleDescs(analyzer); secondPhaseDistinctAggInfo.createSecondPhaseAggSMap(this, distinctAggExprs); secondPhaseDistinctAggInfo.createMergeAggInfo(analyzer); @@ -699,7 +721,7 @@ public final class AggregateInfo extends AggregateInfoBase { ArrayList<SlotDescriptor> slotDescs = outputTupleDesc.getSlots(); int numDistinctParams = 0; - if (!isMultiDistinct) { + if (!isUsingSetForDistinct) { numDistinctParams = distinctAggExprs.get(0).getChildren().size(); } else { for (int i = 0; i < distinctAggExprs.size(); i++) { @@ -849,7 +871,7 @@ public final class AggregateInfo extends AggregateInfoBase { outputTupleDesc.getSlots().get(groupExprsSize + i); SlotDescriptor intermediateSlotDesc = intermediateTupleDesc.getSlots().get(groupExprsSize + i); - if (isDistinctAgg || isMultiDistinct) { + if (isDistinctAgg || isUsingSetForDistinct) { slotDesc.setIsMaterialized(true); intermediateSlotDesc.setIsMaterialized(true); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index 2440dc1956..62062243db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -1173,8 +1173,8 @@ public class SelectStmt extends QueryStmt { } } final ExprSubstitutionMap result = new ExprSubstitutionMap(); - final boolean hasMultiDistinct = AggregateInfo.estimateIfContainsMultiDistinct(distinctExprs); - if (!hasMultiDistinct) { + final boolean isUsingSetForDistinct = AggregateInfo.estimateIfUsingSetForDistinct(distinctExprs); + if (!isUsingSetForDistinct) { return result; } for (FunctionCallExpr inputExpr : distinctExprs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 5ea7e7524c..6c64d1b600 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -1069,7 +1069,7 @@ public class DistributedPlanner { AggregateInfo firstPhaseAggInfo = ((AggregationNode) node.getChild(0)).getAggInfo(); List<Expr> partitionExprs = null; - boolean isMultiDistinct = node.getAggInfo().isMultiDistinct(); + boolean isUsingSetForDistinct = node.getAggInfo().isUsingSetForDistinct(); if (hasGrouping) { // We need to do // - child fragment: @@ -1092,7 +1092,7 @@ public class DistributedPlanner { // * phase 2 agg // - merge fragment 2, unpartitioned: // * merge agg of phase 2 - if (!isMultiDistinct) { + if (!isUsingSetForDistinct) { partitionExprs = Expr.substituteList(firstPhaseAggInfo.getGroupingExprs(), firstPhaseAggInfo.getIntermediateSmap(), ctx.getRootAnalyzer(), false); } @@ -1130,7 +1130,7 @@ public class DistributedPlanner { mergeFragment.addPlanRoot(node); } - if (!hasGrouping && !isMultiDistinct) { + if (!hasGrouping && !isUsingSetForDistinct) { // place the merge aggregation of the 2nd phase in an unpartitioned fragment; // add preceding merge fragment at end if (mergeFragment != childFragment) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index f3281f56ce..bd2ef62fda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -174,6 +174,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_VECTORIZED_ENGINE = "enable_vectorized_engine"; + public static final String ENABLE_SINGLE_DISTINCT_COLUMN_OPT = "enable_single_distinct_column_opt"; + public static final String CPU_RESOURCE_LIMIT = "cpu_resource_limit"; public static final String ENABLE_PARALLEL_OUTFILE = "enable_parallel_outfile"; @@ -234,6 +236,12 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true) public boolean enableProfile = false; + // using hashset intead of group by + count can improve performance + // but may cause rpc failed when cluster has less BE + // Whether this switch is turned on depends on the BE number + @VariableMgr.VarAttr(name = ENABLE_SINGLE_DISTINCT_COLUMN_OPT) + public boolean enableSingleDistinctColumnOpt = false; + // Set sqlMode to empty string @VariableMgr.VarAttr(name = SQL_MODE, needForward = true) public long sqlMode = 0L; @@ -540,6 +548,10 @@ public class SessionVariable implements Serializable, Writable { return enableProfile; } + public boolean enableSingleDistinctColumnOpt() { + return enableSingleDistinctColumnOpt; + } + public int getWaitTimeoutS() { return waitTimeoutS; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org