This is an automated email from the ASF dual-hosted git repository. lingmiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new e97d835ba7 [feature](statistics) Statistics derivation.Step 2:OtherNode implemen… (#9458) e97d835ba7 is described below commit e97d835ba73ddfda0ec3a4eb9a65f960999481ff Author: zhengshiJ <32082872+zhengs...@users.noreply.github.com> AuthorDate: Tue Jun 7 21:10:28 2022 +0800 [feature](statistics) Statistics derivation.Step 2:OtherNode implemen… (#9458) closed #9644 Second step of statistics derivation: implementation of nodes other than scan_node. The statistical information derivation interface of all nodes is uniformly placed in DeriveFactory.java. Added one-sided to verify the derivation is correct. Statistics derivation for each node is placed in its own *StatsDerive.java detailed design: https://docs.google.com/document/d/1u1L6XhyzKShoyYRwFQ6kE1rnvY2iFwauwg289au5Qq0/edit --- .../org/apache/doris/planner/AggregationNode.java | 45 +--- .../org/apache/doris/planner/AnalyticEvalNode.java | 13 +- .../apache/doris/planner/AssertNumRowsNode.java | 6 +- .../org/apache/doris/planner/CrossJoinNode.java | 18 +- .../org/apache/doris/planner/EmptySetNode.java | 13 +- .../java/org/apache/doris/planner/ExceptNode.java | 2 +- .../org/apache/doris/planner/ExchangeNode.java | 9 +- .../org/apache/doris/planner/HashJoinNode.java | 17 +- .../org/apache/doris/planner/IcebergScanNode.java | 2 +- .../org/apache/doris/planner/MysqlScanNode.java | 10 +- .../org/apache/doris/planner/OdbcScanNode.java | 10 +- .../org/apache/doris/planner/OlapScanNode.java | 4 +- .../java/org/apache/doris/planner/PlanNode.java | 29 ++- .../java/org/apache/doris/planner/RepeatNode.java | 12 +- .../java/org/apache/doris/planner/ScanNode.java | 3 +- .../java/org/apache/doris/planner/SelectNode.java | 13 +- .../org/apache/doris/planner/SetOperationNode.java | 27 +- .../apache/doris/planner/SingleNodePlanner.java | 4 +- .../java/org/apache/doris/planner/SortNode.java | 14 +- .../apache/doris/planner/StreamLoadScanNode.java | 2 +- .../apache/doris/planner/TableFunctionNode.java | 10 +- .../java/org/apache/doris/planner/UnionNode.java | 4 +- .../java/org/apache/doris/qe/SessionVariable.java | 8 + .../apache/doris/statistics/AggStatsDerive.java | 89 +++++++ .../doris/statistics/AnalyticEvalStatsDerive.java | 46 ++++ ...eFactory.java => AssertNumRowsStatsDerive.java} | 26 +- .../apache/doris/statistics/BaseStatsDerive.java | 27 +- .../doris/statistics/CrossJoinStatsDerive.java | 53 ++++ .../org/apache/doris/statistics/DeriveFactory.java | 35 ++- ...DeriveFactory.java => EmptySetStatsDerive.java} | 27 +- ...DeriveFactory.java => ExchangeStatsDerive.java} | 28 ++- .../doris/statistics/HashJoinStatsDerive.java | 253 +++++++++++++++++++ .../{DeriveFactory.java => MysqlStatsDerive.java} | 29 ++- .../doris/statistics/OlapScanStatsDerive.java | 18 +- .../{DeriveFactory.java => SelectStatsDerive.java} | 31 ++- .../apache/doris/statistics/StatisticsManager.java | 1 + ...eFactory.java => TableFunctionStatsDerive.java} | 28 ++- .../apache/doris/planner/StatisticDeriveTest.java | 276 +++++++++++++++++++++ 38 files changed, 1015 insertions(+), 227 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index 541543bb00..fb16bdebe6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -28,6 +28,7 @@ import org.apache.doris.analysis.SlotId; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.VectorizedUtil; +import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TAggregationNode; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TExpr; @@ -49,7 +50,7 @@ import java.util.Set; * Aggregation computation. */ public class AggregationNode extends PlanNode { - private final static Logger LOG = LogManager.getLogger(AggregationNode.class); + private static final Logger LOG = LogManager.getLogger(AggregationNode.class); private final AggregateInfo aggInfo; // Set to true if this aggregation node needs to run the Finalize step. This @@ -64,7 +65,7 @@ public class AggregationNode extends PlanNode { * isIntermediate is true if it is a slave node in a 2-part agg plan. */ public AggregationNode(PlanNodeId id, PlanNode input, AggregateInfo aggInfo) { - super(id, aggInfo.getOutputTupleId().asList(), "AGGREGATE"); + super(id, aggInfo.getOutputTupleId().asList(), "AGGREGATE", NodeType.AGG_NODE); this.aggInfo = aggInfo; this.children.add(input); this.needsFinalize = true; @@ -75,7 +76,7 @@ public class AggregationNode extends PlanNode { * Copy c'tor used in clone(). */ private AggregationNode(PlanNodeId id, AggregationNode src) { - super(id, src, "AGGREGATE"); + super(id, src, "AGGREGATE", NodeType.AGG_NODE); aggInfo = src.aggInfo; needsFinalize = src.needsFinalize; } @@ -169,46 +170,14 @@ public class AggregationNode extends PlanNode { } @Override - public void computeStats(Analyzer analyzer) { + public void computeStats(Analyzer analyzer) throws UserException { super.computeStats(analyzer); if (!analyzer.safeIsEnableJoinReorderBasedCost()) { return; } - List<Expr> groupingExprs = aggInfo.getGroupingExprs(); - cardinality = 1; - // cardinality: product of # of distinct values produced by grouping exprs - for (Expr groupingExpr : groupingExprs) { - long numDistinct = groupingExpr.getNumDistinctValues(); - LOG.debug("grouping expr: " + groupingExpr.toSql() + " #distinct=" + Long.toString( - numDistinct)); - if (numDistinct == -1) { - cardinality = -1; - break; - } - // This is prone to overflow, because we keep multiplying cardinalities, - // even if the grouping exprs are functionally dependent (example: - // group by the primary key of a table plus a number of other columns from that - // same table) - // TODO: try to recognize functional dependencies - // TODO: as a shortcut, instead of recognizing functional dependencies, - // limit the contribution of a single table to the number of rows - // of that table (so that when we're grouping by the primary key col plus - // some others, the estimate doesn't overshoot dramatically) - cardinality *= numDistinct; - } - if (cardinality > 0) { - LOG.debug("sel=" + Double.toString(computeSelectivity())); - applyConjunctsSelectivity(); - } - // if we ended up with an overflow, the estimate is certain to be wrong - if (cardinality < 0) { - cardinality = -1; - } - capCardinalityAtLimit(); - if (LOG.isDebugEnabled()) { - LOG.debug("stats Agg: cardinality={}", cardinality); - } + StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this); + cardinality = statsDeriveResult.getRowCount(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java index 5c2c564bf1..a439ed3f39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java @@ -27,6 +27,7 @@ import org.apache.doris.analysis.ExprSubstitutionMap; import org.apache.doris.analysis.OrderByElement; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.common.UserException; +import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TAnalyticNode; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPlanNode; @@ -80,7 +81,7 @@ public class AnalyticEvalNode extends PlanNode { AnalyticWindow analyticWindow, TupleDescriptor intermediateTupleDesc, TupleDescriptor outputTupleDesc, ExprSubstitutionMap logicalToPhysicalSmap, Expr partitionByEq, Expr orderByEq, TupleDescriptor bufferedTupleDesc) { - super(id, input.getTupleIds(), "ANALYTIC"); + super(id, input.getTupleIds(), "ANALYTIC", NodeType.ANALYTIC_EVAL_NODE); Preconditions.checkState(!tupleIds.contains(outputTupleDesc.getId())); // we're materializing the input row augmented with the analytic output tuple tupleIds.add(outputTupleDesc.getId()); @@ -135,17 +136,13 @@ public class AnalyticEvalNode extends PlanNode { } @Override - protected void computeStats(Analyzer analyzer) { + protected void computeStats(Analyzer analyzer) throws UserException { super.computeStats(analyzer); if (!analyzer.safeIsEnableJoinReorderBasedCost()) { return; } - cardinality = cardinality == -1 ? getChild(0).cardinality : cardinality; - applyConjunctsSelectivity(); - capCardinalityAtLimit(); - if (LOG.isDebugEnabled()) { - LOG.debug("stats AnalyticEval: cardinality={}", cardinality); - } + StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this); + cardinality = statsDeriveResult.getRowCount(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java index 388f6fa096..6b8ff8f8ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java @@ -20,6 +20,7 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.AssertNumRowsElement; import org.apache.doris.common.UserException; +import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TAssertNumRowsNode; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPlanNode; @@ -42,7 +43,7 @@ public class AssertNumRowsNode extends PlanNode { private AssertNumRowsElement.Assertion assertion; public AssertNumRowsNode(PlanNodeId id, PlanNode input, AssertNumRowsElement assertNumRowsElement) { - super(id, "ASSERT NUMBER OF ROWS"); + super(id, "ASSERT NUMBER OF ROWS", NodeType.ASSERT_NUM_ROWS_NODE); this.desiredNumOfRows = assertNumRowsElement.getDesiredNumOfRows(); this.subqueryString = assertNumRowsElement.getSubqueryString(); this.assertion = assertNumRowsElement.getAssertion(); @@ -57,7 +58,8 @@ public class AssertNumRowsNode extends PlanNode { super.init(analyzer); super.computeStats(analyzer); if (analyzer.safeIsEnableJoinReorderBasedCost()) { - cardinality = 1; + StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this); + cardinality = statsDeriveResult.getRowCount(); } if (LOG.isDebugEnabled()) { LOG.debug("stats AssertNumRows: cardinality={}", cardinality); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/CrossJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/CrossJoinNode.java index 0df0fa2803..81accd9810 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/CrossJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/CrossJoinNode.java @@ -19,8 +19,8 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.TableRef; -import org.apache.doris.common.CheckedMath; import org.apache.doris.common.UserException; +import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; @@ -41,7 +41,7 @@ public class CrossJoinNode extends PlanNode { private final TableRef innerRef; public CrossJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, TableRef innerRef) { - super(id, "CROSS JOIN"); + super(id, "CROSS JOIN", NodeType.CROSS_JOIN_NODE); this.innerRef = innerRef; tupleIds.addAll(outer.getTupleIds()); tupleIds.addAll(inner.getTupleIds()); @@ -68,21 +68,13 @@ public class CrossJoinNode extends PlanNode { } @Override - public void computeStats(Analyzer analyzer) { + public void computeStats(Analyzer analyzer) throws UserException { super.computeStats(analyzer); if (!analyzer.safeIsEnableJoinReorderBasedCost()) { return; } - if (getChild(0).cardinality == -1 || getChild(1).cardinality == -1) { - cardinality = -1; - } else { - cardinality = CheckedMath.checkedMultiply(getChild(0).cardinality, getChild(1).cardinality); - applyConjunctsSelectivity(); - capCardinalityAtLimit(); - } - if (LOG.isDebugEnabled()) { - LOG.debug("stats CrossJoin: cardinality={}", Long.toString(cardinality)); - } + StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this); + cardinality = statsDeriveResult.getRowCount(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java index bee839d430..07856fed1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java @@ -19,6 +19,8 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.TupleId; +import org.apache.doris.common.UserException; +import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; @@ -35,17 +37,18 @@ import java.util.ArrayList; * construct a valid row empty batch. */ public class EmptySetNode extends PlanNode { - private final static Logger LOG = LogManager.getLogger(EmptySetNode.class); + private static final Logger LOG = LogManager.getLogger(EmptySetNode.class); public EmptySetNode(PlanNodeId id, ArrayList<TupleId> tupleIds) { - super(id, tupleIds, "EMPTYSET"); + super(id, tupleIds, "EMPTYSET", NodeType.EMPTY_SET_NODE); Preconditions.checkArgument(tupleIds.size() > 0); } @Override - public void computeStats(Analyzer analyzer) { + public void computeStats(Analyzer analyzer) throws UserException { + StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this); + cardinality = statsDeriveResult.getRowCount(); avgRowSize = 0; - cardinality = 0; numNodes = 1; if (LOG.isDebugEnabled()) { LOG.debug("stats EmptySet:" + id + ", cardinality: " + cardinality); @@ -53,7 +56,7 @@ public class EmptySetNode extends PlanNode { } @Override - public void init(Analyzer analyzer) { + public void init(Analyzer analyzer) throws UserException { Preconditions.checkState(conjuncts.isEmpty()); // If the physical output tuple produced by an AnalyticEvalNode wasn't created // the logical output tuple is returned by getMaterializedTupleIds(). It needs diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExceptNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExceptNode.java index c1d031931a..84911cad5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExceptNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExceptNode.java @@ -31,7 +31,7 @@ public class ExceptNode extends SetOperationNode { protected ExceptNode(PlanNodeId id, TupleId tupleId, List<Expr> setOpResultExprs, boolean isInSubplan) { - super(id, tupleId, "EXCEPT", setOpResultExprs, isInSubplan); + super(id, tupleId, "EXCEPT", setOpResultExprs, isInSubplan, NodeType.EXCEPT_NODE); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index 6d1125407e..0ce4dd5ba8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.SortInfo; import org.apache.doris.analysis.TupleId; import org.apache.doris.common.UserException; import org.apache.doris.common.util.VectorizedUtil; +import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TExchangeNode; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; @@ -70,7 +71,7 @@ public class ExchangeNode extends PlanNode { * need to compute the cardinality here. */ public ExchangeNode(PlanNodeId id, PlanNode inputNode, boolean copyConjuncts) { - super(id, inputNode, EXCHANGE_NODE); + super(id, inputNode, EXCHANGE_NODE, NodeType.EXCHANGE_NODE); offset = 0; children.add(inputNode); if (!copyConjuncts) { @@ -109,10 +110,10 @@ public class ExchangeNode extends PlanNode { } @Override - protected void computeStats(Analyzer analyzer) { + protected void computeStats(Analyzer analyzer) throws UserException { Preconditions.checkState(children.size() == 1); - cardinality = children.get(0).cardinality; - capCardinalityAtLimit(); + StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this); + cardinality = statsDeriveResult.getRowCount(); if (LOG.isDebugEnabled()) { LOG.debug("stats Exchange:" + id + ", cardinality: " + cardinality); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index 2a13ecbda6..f474e9efd6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -38,6 +38,7 @@ import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.VectorizedUtil; +import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TEqJoinCondition; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.THashJoinNode; @@ -85,7 +86,7 @@ public class HashJoinNode extends PlanNode { public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, TableRef innerRef, List<Expr> eqJoinConjuncts, List<Expr> otherJoinConjuncts) { - super(id, "HASH JOIN"); + super(id, "HASH JOIN", NodeType.HASH_JOIN_NODE); Preconditions.checkArgument(eqJoinConjuncts != null && !eqJoinConjuncts.isEmpty()); Preconditions.checkArgument(otherJoinConjuncts != null); tblRefIds.addAll(outer.getTblRefIds()); @@ -452,20 +453,16 @@ public class HashJoinNode extends PlanNode { @Override - public void computeStats(Analyzer analyzer) { + public void computeStats(Analyzer analyzer) throws UserException { super.computeStats(analyzer); if (!analyzer.safeIsEnableJoinReorderBasedCost()) { return; } - if (joinOp.isSemiAntiJoin()) { - cardinality = getSemiJoinCardinality(); - } else if (joinOp.isInnerJoin() || joinOp.isOuterJoin()) { - cardinality = getJoinCardinality(); - } else { - Preconditions.checkState(false, "joinOp is not supported"); - } - capCardinalityAtLimit(); + + StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this); + cardinality = statsDeriveResult.getRowCount(); + if (LOG.isDebugEnabled()) { LOG.debug("stats HashJoin:" + id + ", cardinality: " + cardinality); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java index 4af73caf51..3439631f0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java @@ -46,7 +46,7 @@ public class IcebergScanNode extends BrokerScanNode { public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded) { - super(id, desc, planNodeName, fileStatusesList, filesAdded, NodeType.ICEBREG_SCAN_NODE); + super(id, desc, planNodeName, fileStatusesList, filesAdded, NodeType.ICEBERG_SCAN_NODE); icebergTable = (IcebergTable) desc.getTable(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java index d022edcf78..82ea85da8e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.MysqlTable; import org.apache.doris.common.UserException; +import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TMySQLScanNode; import org.apache.doris.thrift.TPlanNode; @@ -163,13 +164,12 @@ public class MysqlScanNode extends ScanNode { } @Override - public void computeStats(Analyzer analyzer) { + public void computeStats(Analyzer analyzer) throws UserException { super.computeStats(analyzer); // even if current node scan has no data,at least on backend will be assigned when the fragment actually execute numNodes = numNodes <= 0 ? 1 : numNodes; - // this is just to avoid mysql scan node's cardinality being -1. So that we can calculate the join cost - // normally. - // We assume that the data volume of all mysql tables is very small, so set cardinality directly to 1. - cardinality = cardinality == -1 ? 1 : cardinality; + + StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this); + cardinality = statsDeriveResult.getRowCount(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcScanNode.java index 9049ffc505..cacbc48ad5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcScanNode.java @@ -27,6 +27,7 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.OdbcTable; import org.apache.doris.common.UserException; +import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TOdbcScanNode; import org.apache.doris.thrift.TOdbcTableType; @@ -215,13 +216,12 @@ public class OdbcScanNode extends ScanNode { } @Override - public void computeStats(Analyzer analyzer) { + public void computeStats(Analyzer analyzer) throws UserException { super.computeStats(analyzer); // even if current node scan has no data,at least on backend will be assigned when the fragment actually execute numNodes = numNodes <= 0 ? 1 : numNodes; - // this is just to avoid odbc scan node's cardinality being -1. So that we can calculate the join cost - // normally. - // We assume that the data volume of all odbc tables is very small, so set cardinality directly to 1. - cardinality = cardinality == -1 ? 1 : cardinality; + + StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this); + cardinality = statsDeriveResult.getRowCount(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index a9aa49c9bf..ee02af4b44 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -349,8 +349,8 @@ public class OlapScanNode extends ScanNode { * - When Join reorder is turned on, the cardinality must be calculated before the reorder algorithm. * - So only an inaccurate cardinality can be calculated here. */ + mockRowCountInStatistic(); if (analyzer.safeIsEnableJoinReorderBasedCost()) { - mockRowCountInStatistic(); computeInaccurateCardinality(); } } @@ -397,7 +397,7 @@ public class OlapScanNode extends ScanNode { } @Override - public void computeStats(Analyzer analyzer) { + public void computeStats(Analyzer analyzer) throws UserException { super.computeStats(analyzer); if (cardinality > 0) { avgRowSize = totalBytes / (float) cardinality * COMPRESSION_RATIO; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index a68655eb61..140e6d32c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -140,7 +140,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> { protected NodeType nodeType = NodeType.DEFAULT; protected StatsDeriveResult statsDeriveResult; - protected PlanNode(PlanNodeId id, ArrayList<TupleId> tupleIds, String planNodeName) { + protected PlanNode(PlanNodeId id, ArrayList<TupleId> tupleIds, String planNodeName, NodeType nodeType) { this.id = id; this.limit = -1; // make a copy, just to be on the safe side @@ -149,9 +149,10 @@ abstract public class PlanNode extends TreeNode<PlanNode> { this.cardinality = -1; this.planNodeName = VectorizedUtil.isVectorized() ? "V" + planNodeName : planNodeName; this.numInstances = 1; + this.nodeType = nodeType; } - protected PlanNode(PlanNodeId id, String planNodeName) { + protected PlanNode(PlanNodeId id, String planNodeName, NodeType nodeType) { this.id = id; this.limit = -1; this.tupleIds = Lists.newArrayList(); @@ -159,12 +160,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> { this.cardinality = -1; this.planNodeName = VectorizedUtil.isVectorized() ? "V" + planNodeName : planNodeName; this.numInstances = 1; + this.nodeType = nodeType; } /** * Copy ctor. Also passes in new id. */ - protected PlanNode(PlanNodeId id, PlanNode node, String planNodeName) { + protected PlanNode(PlanNodeId id, PlanNode node, String planNodeName, NodeType nodeType) { this.id = id; this.limit = node.limit; this.tupleIds = Lists.newArrayList(node.tupleIds); @@ -181,17 +183,30 @@ abstract public class PlanNode extends TreeNode<PlanNode> { public enum NodeType { DEFAULT, AGG_NODE, + ANALYTIC_EVAL_NODE, + ASSERT_NUM_ROWS_NODE, BROKER_SCAN_NODE, + CROSS_JOIN_NODE, + EMPTY_SET_NODE, + ES_SCAN_NODE, + EXCEPT_NODE, + EXCHANGE_NODE, HASH_JOIN_NODE, HIVE_SCAN_NODE, - MERGE_NODE, - ES_SCAN_NODE, - ICEBREG_SCAN_NODE, + ICEBERG_SCAN_NODE, + INTERSECT_NODE, LOAD_SCAN_NODE, MYSQL_SCAN_NODE, ODBC_SCAN_NODE, OLAP_SCAN_NODE, + REPEAT_NODE, + SELECT_NODE, + SET_OPERATION_NODE, SCHEMA_SCAN_NODE, + SORT_NODE, + STREAM_LOAD_SCAN_NODE, + TABLE_FUNCTION_NODE, + UNION_NODE, } public String getPlanNodeName() { @@ -579,7 +594,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> { * from finalize() (to facilitate inserting additional nodes during plan * partitioning w/o the need to call finalize() recursively on the whole tree again). */ - protected void computeStats(Analyzer analyzer) { + protected void computeStats(Analyzer analyzer) throws UserException { avgRowSize = 0.0F; for (TupleId tid : tupleIds) { TupleDescriptor desc = analyzer.getTupleDesc(tid); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java index dab99a0f4d..82e3c82f97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java @@ -30,6 +30,7 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.analysis.VirtualSlotRef; import org.apache.doris.common.UserException; +import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; @@ -66,7 +67,7 @@ public class RepeatNode extends PlanNode { private GroupByClause groupByClause; protected RepeatNode(PlanNodeId id, PlanNode input, GroupingInfo groupingInfo, GroupByClause groupByClause) { - super(id, input.getTupleIds(), "REPEAT_NODE"); + super(id, input.getTupleIds(), "REPEAT_NODE", NodeType.REPEAT_NODE); this.children.add(input); this.groupingInfo = groupingInfo; this.input = input; @@ -77,7 +78,7 @@ public class RepeatNode extends PlanNode { // only for unittest protected RepeatNode(PlanNodeId id, PlanNode input, List<Set<SlotId>> repeatSlotIdList, TupleDescriptor outputTupleDesc, List<List<Long>> groupingList) { - super(id, input.getTupleIds(), "REPEAT_NODE"); + super(id, input.getTupleIds(), "REPEAT_NODE", NodeType.REPEAT_NODE); this.children.add(input); this.repeatSlotIdList = buildIdSetList(repeatSlotIdList); this.groupingList = groupingList; @@ -99,10 +100,13 @@ public class RepeatNode extends PlanNode { } @Override - public void computeStats(Analyzer analyzer) { + public void computeStats(Analyzer analyzer) throws UserException { avgRowSize = 0; numNodes = 1; - cardinality = 0; + + StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this); + cardinality = statsDeriveResult.getRowCount(); + if (LOG.isDebugEnabled()) { LOG.debug("stats Sort: cardinality=" + cardinality); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 9837ad1383..e2392ddc7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -64,8 +64,7 @@ abstract public class ScanNode extends PlanNode { protected Analyzer analyzer; public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, NodeType nodeType) { - super(id, desc.getId().asList(), planNodeName); - super.nodeType = nodeType; + super(id, desc.getId().asList(), planNodeName, nodeType); this.desc = desc; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java index c732d9ecc6..b56880c889 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java @@ -23,6 +23,7 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; import org.apache.doris.common.UserException; +import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; @@ -39,12 +40,12 @@ public class SelectNode extends PlanNode { private final static Logger LOG = LogManager.getLogger(SelectNode.class); protected SelectNode(PlanNodeId id, PlanNode child) { - super(id, child.getTupleIds(), "SELECT"); + super(id, child.getTupleIds(), "SELECT", NodeType.SELECT_NODE); addChild(child); this.nullableTupleIds = child.nullableTupleIds; } protected SelectNode(PlanNodeId id, PlanNode child, List<Expr> conjuncts) { - super(id, child.getTupleIds(), "SELECT"); + super(id, child.getTupleIds(), "SELECT", NodeType.SELECT_NODE); addChild(child); this.tblRefIds = child.tblRefIds; this.nullableTupleIds = child.nullableTupleIds; @@ -64,14 +65,14 @@ public class SelectNode extends PlanNode { } @Override - public void computeStats(Analyzer analyzer) { + public void computeStats(Analyzer analyzer) throws UserException { super.computeStats(analyzer); if (!analyzer.safeIsEnableJoinReorderBasedCost()) { return; } - cardinality = getChild(0).cardinality; - applyConjunctsSelectivity(); - capCardinalityAtLimit(); + StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this); + cardinality = statsDeriveResult.getRowCount(); + if (LOG.isDebugEnabled()) { LOG.debug("stats Select: cardinality={}", this.cardinality); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java index 901e461b76..e751af42df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java @@ -82,16 +82,31 @@ public abstract class SetOperationNode extends PlanNode { protected final TupleId tupleId; + protected SetOperationNode(PlanNodeId id, TupleId tupleId, String planNodeName, NodeType nodeType) { + super(id, tupleId.asList(), planNodeName, nodeType); + this.setOpResultExprs = Lists.newArrayList(); + this.tupleId = tupleId; + this.isInSubplan = false; + } + + protected SetOperationNode(PlanNodeId id, TupleId tupleId, String planNodeName, + List<Expr> setOpResultExprs, boolean isInSubplan, NodeType nodeType) { + super(id, tupleId.asList(), planNodeName, nodeType); + this.setOpResultExprs = setOpResultExprs; + this.tupleId = tupleId; + this.isInSubplan = isInSubplan; + } + protected SetOperationNode(PlanNodeId id, TupleId tupleId, String planNodeName) { - super(id, tupleId.asList(), planNodeName); - setOpResultExprs = Lists.newArrayList(); + super(id, tupleId.asList(), planNodeName, NodeType.SET_OPERATION_NODE); + this.setOpResultExprs = Lists.newArrayList(); this.tupleId = tupleId; - isInSubplan = false; + this.isInSubplan = false; } protected SetOperationNode(PlanNodeId id, TupleId tupleId, String planNodeName, List<Expr> setOpResultExprs, boolean isInSubplan) { - super(id, tupleId.asList(), planNodeName); + super(id, tupleId.asList(), planNodeName, NodeType.SET_OPERATION_NODE); this.setOpResultExprs = setOpResultExprs; this.tupleId = tupleId; this.isInSubplan = isInSubplan; @@ -181,7 +196,7 @@ public abstract class SetOperationNode extends PlanNode { } @Override - public void computeStats(Analyzer analyzer) { + public void computeStats(Analyzer analyzer) throws UserException { super.computeStats(analyzer); if (!analyzer.safeIsEnableJoinReorderBasedCost()) { return; @@ -314,7 +329,7 @@ public abstract class SetOperationNode extends PlanNode { * been evaluated during registration to set analyzer.hasEmptyResultSet_. */ @Override - public void init(Analyzer analyzer) { + public void init(Analyzer analyzer) throws UserException { Preconditions.checkState(conjuncts.isEmpty()); computeTupleStatAndMemLayout(analyzer); computeStats(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index b57590c1dd..6a0dc17b25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -175,7 +175,7 @@ public class SingleNodePlanner { * they are never unnested, and therefore the corresponding parent scan should not * materialize them. */ - private PlanNode createEmptyNode(PlanNode inputPlan, QueryStmt stmt, Analyzer analyzer) { + private PlanNode createEmptyNode(PlanNode inputPlan, QueryStmt stmt, Analyzer analyzer) throws UserException { ArrayList<TupleId> tupleIds = Lists.newArrayList(); if (inputPlan != null) { tupleIds = inputPlan.tupleIds; @@ -1165,7 +1165,7 @@ public class SingleNodePlanner { * Returns a MergeNode that materializes the exprs of the constant selectStmt. Replaces the resultExprs of the * selectStmt with SlotRefs into the materialized tuple. */ - private PlanNode createConstantSelectPlan(SelectStmt selectStmt, Analyzer analyzer) { + private PlanNode createConstantSelectPlan(SelectStmt selectStmt, Analyzer analyzer) throws UserException { Preconditions.checkState(selectStmt.getTableRefs().isEmpty()); ArrayList<Expr> resultExprs = selectStmt.getResultExprs(); // Create tuple descriptor for materialized tuple. diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java index 3fb3800a05..0e223c4f1a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java @@ -29,6 +29,7 @@ import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.SortInfo; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; +import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; @@ -79,7 +80,7 @@ public class SortNode extends PlanNode { public SortNode(PlanNodeId id, PlanNode input, SortInfo info, boolean useTopN, boolean isDefaultLimit, long offset) { - super(id, useTopN ? "TOP-N" : "SORT"); + super(id, useTopN ? "TOP-N" : "SORT", NodeType.SORT_NODE); this.info = info; this.useTopN = useTopN; this.isDefaultLimit = isDefaultLimit; @@ -95,7 +96,7 @@ public class SortNode extends PlanNode { * Clone 'inputSortNode' for distributed Top-N */ public SortNode(PlanNodeId id, SortNode inputSortNode, PlanNode child) { - super(id, inputSortNode, inputSortNode.useTopN ? "TOP-N" : "SORT"); + super(id, inputSortNode, inputSortNode.useTopN ? "TOP-N" : "SORT", NodeType.SORT_NODE); this.info = inputSortNode.info; this.useTopN = inputSortNode.useTopN; this.isDefaultLimit = inputSortNode.isDefaultLimit; @@ -127,14 +128,15 @@ public class SortNode extends PlanNode { } @Override - protected void computeStats(Analyzer analyzer) { + protected void computeStats(Analyzer analyzer) throws UserException { super.computeStats(analyzer); if (!analyzer.safeIsEnableJoinReorderBasedCost()) { return; } - cardinality = getChild(0).cardinality; - applyConjunctsSelectivity(); - capCardinalityAtLimit(); + + StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this); + cardinality = statsDeriveResult.getRowCount(); + if (LOG.isDebugEnabled()) { LOG.debug("stats Sort: cardinality=" + cardinality); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index b8b4d60e02..0049eeaa0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -75,7 +75,7 @@ public class StreamLoadScanNode extends LoadScanNode { // used to construct for streaming loading public StreamLoadScanNode( TUniqueId loadId, PlanNodeId id, TupleDescriptor tupleDesc, Table dstTable, LoadTaskInfo taskInfo) { - super(id, tupleDesc, "StreamLoadScanNode"); + super(id, tupleDesc, "StreamLoadScanNode", NodeType.STREAM_LOAD_SCAN_NODE); this.loadId = loadId; this.dstTable = dstTable; this.taskInfo = taskInfo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/TableFunctionNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/TableFunctionNode.java index 25be95fd2e..ed61dd85eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/TableFunctionNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/TableFunctionNode.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleId; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; +import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; @@ -50,7 +51,7 @@ public class TableFunctionNode extends PlanNode { private List<SlotId> outputSlotIds = Lists.newArrayList(); protected TableFunctionNode(PlanNodeId id, PlanNode inputNode, List<LateralViewRef> lateralViewRefs) { - super(id, "TABLE FUNCTION NODE"); + super(id, "TABLE FUNCTION NODE", NodeType.TABLE_FUNCTION_NODE); tupleIds.addAll(inputNode.getTupleIds()); tblRefIds.addAll(inputNode.getTupleIds()); lateralViewTupleIds = lateralViewRefs.stream().map(e -> e.getDesc().getId()) @@ -131,10 +132,11 @@ public class TableFunctionNode extends PlanNode { } @Override - protected void computeStats(Analyzer analyzer) { + protected void computeStats(Analyzer analyzer) throws UserException { super.computeStats(analyzer); - // TODO the cardinality = child cardinality * cardinality of list column - cardinality = children.get(0).cardinality; + + StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this); + cardinality = statsDeriveResult.getRowCount(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java index 01573c100c..1fcf125dd3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java @@ -29,12 +29,12 @@ import java.util.List; public class UnionNode extends SetOperationNode { protected UnionNode(PlanNodeId id, TupleId tupleId) { - super(id, tupleId, "UNION"); + super(id, tupleId, "UNION", NodeType.UNION_NODE); } protected UnionNode(PlanNodeId id, TupleId tupleId, List<Expr> setOpResultExprs, boolean isInSubplan) { - super(id, tupleId, "UNION", setOpResultExprs, isInSubplan); + super(id, tupleId, "UNION", setOpResultExprs, isInSubplan, NodeType.UNION_NODE); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 343f89bc43..6d18557b98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -918,6 +918,14 @@ public class SessionVariable implements Serializable, Writable { this.trimTailingSpacesForExternalTableQuery = trimTailingSpacesForExternalTableQuery; } + public void setEnableJoinReorderBasedCost(boolean enableJoinReorderBasedCost) { + this.enableJoinReorderBasedCost = enableJoinReorderBasedCost; + } + + public void setDisableJoinReorder(boolean disableJoinReorder) { + this.disableJoinReorder = disableJoinReorder; + } + // Serialize to thrift object // used for rest api public TQueryOptions toThrift() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AggStatsDerive.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AggStatsDerive.java new file mode 100644 index 0000000000..6d626ce21a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AggStatsDerive.java @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.common.UserException; +import org.apache.doris.planner.AggregationNode; +import org.apache.doris.planner.PlanNode; + +import com.google.common.base.Preconditions; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +/** + * Derive AggNode statistics. + */ +public class AggStatsDerive extends BaseStatsDerive { + private static final Logger LOG = LogManager.getLogger(AggStatsDerive.class); + List<Expr> groupingExprs = new ArrayList<>(); + + @Override + public void init(PlanNode node) throws UserException { + Preconditions.checkState(node instanceof AggregationNode); + super.init(node); + groupingExprs.addAll(((AggregationNode) node).getAggInfo().getGroupingExprs()); + } + + @Override + public StatsDeriveResult deriveStats() { + return new StatsDeriveResult(deriveRowCount(), deriveColumnToDataSize(), deriveColumnToNdv()); + } + + @Override + protected long deriveRowCount() { + rowCount = 1; + // rowCount: product of # of distinct values produced by grouping exprs + for (Expr groupingExpr : groupingExprs) { + long numDistinct = groupingExpr.getNumDistinctValues(); + LOG.debug("grouping expr: " + groupingExpr.toSql() + " #distinct=" + Long.toString( + numDistinct)); + if (numDistinct == -1) { + rowCount = -1; + break; + } + // This is prone to overflow, because we keep multiplying cardinalities, + // even if the grouping exprs are functionally dependent (example: + // group by the primary key of a table plus a number of other columns from that + // same table) + // TODO: try to recognize functional dependencies + // TODO: as a shortcut, instead of recognizing functional dependencies, + // limit the contribution of a single table to the number of rows + // of that table (so that when we're grouping by the primary key col plus + // some others, the estimate doesn't overshoot dramatically) + rowCount *= numDistinct; + } + if (rowCount > 0) { + LOG.debug("sel=" + Double.toString(computeSelectivity())); + applyConjunctsSelectivity(); + } + // if we ended up with an overflow, the estimate is certain to be wrong + if (rowCount < 0) { + rowCount = -1; + } + + capRowCountAtLimit(); + if (LOG.isDebugEnabled()) { + LOG.debug("stats Agg: rowCount={}", rowCount); + } + return rowCount; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalyticEvalStatsDerive.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalyticEvalStatsDerive.java new file mode 100644 index 0000000000..5c6208d887 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalyticEvalStatsDerive.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import com.google.common.base.Preconditions; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * Derive AnalyticEvalNode statistics. + */ +public class AnalyticEvalStatsDerive extends BaseStatsDerive { + private static final Logger LOG = LogManager.getLogger(AggStatsDerive.class); + + @Override + public StatsDeriveResult deriveStats() { + return new StatsDeriveResult(deriveRowCount(), deriveColumnToDataSize(), deriveColumnToNdv()); + } + + @Override + protected long deriveRowCount() { + Preconditions.checkState(!childrenStatsResult.isEmpty()); + rowCount = rowCount == -1 ? childrenStatsResult.get(0).getRowCount() : rowCount; + applyConjunctsSelectivity(); + capRowCountAtLimit(); + if (LOG.isDebugEnabled()) { + LOG.debug("stats AnalyticEval: rowCount={}", rowCount); + } + return rowCount; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AssertNumRowsStatsDerive.java similarity index 65% copy from fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java copy to fe/fe-core/src/main/java/org/apache/doris/statistics/AssertNumRowsStatsDerive.java index 9ff438e7f5..9e747fe5b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AssertNumRowsStatsDerive.java @@ -17,20 +17,18 @@ package org.apache.doris.statistics; -import org.apache.doris.planner.PlanNode; - -public class DeriveFactory { +/** + * Derive AssertNumRowsNode statistics. + */ +public class AssertNumRowsStatsDerive extends BaseStatsDerive { + @Override + public StatsDeriveResult deriveStats() { + return new StatsDeriveResult(deriveRowCount(), deriveColumnToDataSize(), deriveColumnToNdv()); + } - public BaseStatsDerive getStatsDerive(PlanNode.NodeType nodeType) { - switch (nodeType) { - case OLAP_SCAN_NODE: - return new OlapScanStatsDerive(); - case AGG_NODE: - case HASH_JOIN_NODE: - case MERGE_NODE: - case DEFAULT: - default: - return new BaseStatsDerive(); - } + @Override + protected long deriveRowCount() { + rowCount = 1; + return rowCount; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseStatsDerive.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseStatsDerive.java index 4a8e495895..d2f047c877 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseStatsDerive.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseStatsDerive.java @@ -32,6 +32,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +/** + * Base class for statistics derive. + */ public class BaseStatsDerive { private static final Logger LOG = LogManager.getLogger(BaseStatsDerive.class); // estimate of the output rowCount of this node; @@ -49,8 +52,9 @@ public class BaseStatsDerive { for (PlanNode childNode : node.getChildren()) { StatsDeriveResult result = childNode.getStatsDeriveResult(); if (result == null) { - throw new UserException("childNode statsDeriveResult is null, childNodeType is " + childNode.getNodeType() - + "parentNodeType is " + node.getNodeType()); + throw new UserException( + "childNode statsDeriveResult is null, childNodeType is " + childNode.getNodeType() + + "parentNodeType is " + node.getNodeType()); } childrenStatsResult.add(result); } @@ -92,14 +96,19 @@ public class BaseStatsDerive { /** * Returns the estimated combined selectivity of all conjuncts. Uses heuristics to * address the following estimation challenges: - * 1. The individual selectivities of conjuncts may be unknown. - * 2. Two selectivities, whether known or unknown, could be correlated. Assuming - * independence can lead to significant underestimation. + * * <p> - * The first issue is addressed by using a single default selectivity that is - * representative of all conjuncts with unknown selectivities. - * The second issue is addressed by an exponential backoff when multiplying each - * additional selectivity into the final result. + * * 1. The individual selectivities of conjuncts may be unknown. + * * 2. Two selectivities, whether known or unknown, could be correlated. Assuming + * * independence can lead to significant underestimation. + * </p> + * + * <p> + * * The first issue is addressed by using a single default selectivity that is + * * representative of all conjuncts with unknown selectivities. + * * The second issue is addressed by an exponential backoff when multiplying each + * * additional selectivity into the final result. + * </p> */ protected double computeCombinedSelectivity(List<Expr> conjuncts) { // Collect all estimated selectivities. diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/CrossJoinStatsDerive.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/CrossJoinStatsDerive.java new file mode 100644 index 0000000000..fea8425ff4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/CrossJoinStatsDerive.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.common.CheckedMath; + +import com.google.common.base.Preconditions; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * Derive CrossJoinNode statistics. + */ +public class CrossJoinStatsDerive extends BaseStatsDerive { + private static final Logger LOG = LogManager.getLogger(CrossJoinStatsDerive.class); + + @Override + public StatsDeriveResult deriveStats() { + return new StatsDeriveResult(deriveRowCount(), deriveColumnToDataSize(), deriveColumnToNdv()); + } + + @Override + protected long deriveRowCount() { + Preconditions.checkState(childrenStatsResult.size() == 2); + if (childrenStatsResult.get(0).getRowCount() == -1 || childrenStatsResult.get(1).getRowCount() == -1) { + rowCount = -1; + } else { + rowCount = CheckedMath.checkedMultiply(childrenStatsResult.get(0).getRowCount(), + childrenStatsResult.get(1).getRowCount()); + applyConjunctsSelectivity(); + capRowCountAtLimit(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("stats CrossJoin: rowCount={}", Long.toString(rowCount)); + } + return rowCount; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java index 9ff438e7f5..7d694ee681 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java @@ -23,11 +23,40 @@ public class DeriveFactory { public BaseStatsDerive getStatsDerive(PlanNode.NodeType nodeType) { switch (nodeType) { - case OLAP_SCAN_NODE: - return new OlapScanStatsDerive(); case AGG_NODE: + return new AggStatsDerive(); + case ANALYTIC_EVAL_NODE: + return new AnalyticEvalStatsDerive(); + case ASSERT_NUM_ROWS_NODE: + return new AssertNumRowsStatsDerive(); + case CROSS_JOIN_NODE: + return new CrossJoinStatsDerive(); + case EMPTY_SET_NODE: + case REPEAT_NODE: + return new EmptySetStatsDerive(); + case EXCHANGE_NODE: + return new ExchangeStatsDerive(); case HASH_JOIN_NODE: - case MERGE_NODE: + return new HashJoinStatsDerive(); + case OLAP_SCAN_NODE: + return new OlapScanStatsDerive(); + case MYSQL_SCAN_NODE: + case ODBC_SCAN_NODE: + return new MysqlStatsDerive(); + case SELECT_NODE: + case SORT_NODE: + return new SelectStatsDerive(); + case TABLE_FUNCTION_NODE: + return new TableFunctionStatsDerive(); + case BROKER_SCAN_NODE: + case EXCEPT_NODE: + case ES_SCAN_NODE: + case HIVE_SCAN_NODE: + case ICEBERG_SCAN_NODE: + case INTERSECT_NODE: + case SCHEMA_SCAN_NODE: + case STREAM_LOAD_SCAN_NODE: + case UNION_NODE: case DEFAULT: default: return new BaseStatsDerive(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/EmptySetStatsDerive.java similarity index 65% copy from fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java copy to fe/fe-core/src/main/java/org/apache/doris/statistics/EmptySetStatsDerive.java index 9ff438e7f5..119c2cfdd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/EmptySetStatsDerive.java @@ -17,20 +17,19 @@ package org.apache.doris.statistics; -import org.apache.doris.planner.PlanNode; - -public class DeriveFactory { +/** + * Derive EmptySetNode statistics. + */ +public class EmptySetStatsDerive extends BaseStatsDerive { + // Current REPEAT_NODE also uses this derivation method + @Override + public StatsDeriveResult deriveStats() { + return new StatsDeriveResult(deriveRowCount(), deriveColumnToDataSize(), deriveColumnToNdv()); + } - public BaseStatsDerive getStatsDerive(PlanNode.NodeType nodeType) { - switch (nodeType) { - case OLAP_SCAN_NODE: - return new OlapScanStatsDerive(); - case AGG_NODE: - case HASH_JOIN_NODE: - case MERGE_NODE: - case DEFAULT: - default: - return new BaseStatsDerive(); - } + @Override + protected long deriveRowCount() { + rowCount = 0; + return rowCount; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ExchangeStatsDerive.java similarity index 60% copy from fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java copy to fe/fe-core/src/main/java/org/apache/doris/statistics/ExchangeStatsDerive.java index 9ff438e7f5..6288d1f6a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ExchangeStatsDerive.java @@ -17,20 +17,22 @@ package org.apache.doris.statistics; -import org.apache.doris.planner.PlanNode; +import com.google.common.base.Preconditions; -public class DeriveFactory { +/** + * Derive ExchangeNode statistics. + */ +public class ExchangeStatsDerive extends BaseStatsDerive { + @Override + public StatsDeriveResult deriveStats() { + return new StatsDeriveResult(deriveRowCount(), deriveColumnToDataSize(), deriveColumnToNdv()); + } - public BaseStatsDerive getStatsDerive(PlanNode.NodeType nodeType) { - switch (nodeType) { - case OLAP_SCAN_NODE: - return new OlapScanStatsDerive(); - case AGG_NODE: - case HASH_JOIN_NODE: - case MERGE_NODE: - case DEFAULT: - default: - return new BaseStatsDerive(); - } + @Override + protected long deriveRowCount() { + Preconditions.checkState(!childrenStatsResult.isEmpty()); + rowCount = childrenStatsResult.get(0).getRowCount(); + capRowCountAtLimit(); + return rowCount; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HashJoinStatsDerive.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HashJoinStatsDerive.java new file mode 100644 index 0000000000..649fd1cc7e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HashJoinStatsDerive.java @@ -0,0 +1,253 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.JoinOperator; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.catalog.ColumnStats; +import org.apache.doris.common.CheckedMath; +import org.apache.doris.common.UserException; +import org.apache.doris.planner.HashJoinNode; +import org.apache.doris.planner.PlanNode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.List; + +/** + * Derive HashJoinNode statistics. + */ +public class HashJoinStatsDerive extends BaseStatsDerive { + private JoinOperator joinOp; + private List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList(); + + @Override + public void init(PlanNode node) throws UserException { + Preconditions.checkState(node instanceof HashJoinNode); + super.init(node); + joinOp = ((HashJoinNode) node).getJoinOp(); + eqJoinConjuncts.addAll(((HashJoinNode) node).getEqJoinConjuncts()); + } + + @Override + public StatsDeriveResult deriveStats() { + return new StatsDeriveResult(deriveRowCount(), deriveColumnToDataSize(), deriveColumnToNdv()); + } + + @Override + protected long deriveRowCount() { + if (joinOp.isSemiAntiJoin()) { + rowCount = getSemiJoinrowCount(); + } else if (joinOp.isInnerJoin() || joinOp.isOuterJoin()) { + rowCount = getJoinrowCount(); + } else { + Preconditions.checkState(false, "joinOp is not supported"); + } + capRowCountAtLimit(); + return rowCount; + } + + /** + * Returns the estimated rowCount of a semi join node. + * For a left semi join between child(0) and child(1), we look for equality join + * conditions "L.c = R.d" (with L being from child(0) and R from child(1)) and use as + * the rowCount estimate the minimum of + * |child(0)| * Min(NDV(L.c), NDV(R.d)) / NDV(L.c) + * over all suitable join conditions. The reasoning is that: + * -each row in child(0) is returned at most once + * -the probability of a row in child(0) having a match in R is + * Min(NDV(L.c), NDV(R.d)) / NDV(L.c) + * + *<p> + * For a left anti join we estimate the rowCount as the minimum of: + * |L| * Max(NDV(L.c) - NDV(R.d), NDV(L.c)) / NDV(L.c) + * over all suitable join conditions. The reasoning is that: + * - each row in child(0) is returned at most once + * - if NDV(L.c) > NDV(R.d) then the probability of row in L having a match + * in child(1) is (NDV(L.c) - NDV(R.d)) / NDV(L.c) + * - otherwise, we conservatively use |L| to avoid underestimation + *</p> + * + *<p> + * We analogously estimate the rowCount for right semi/anti joins, and treat the + * null-aware anti join like a regular anti join + *</p> + */ + private long getSemiJoinrowCount() { + Preconditions.checkState(joinOp.isSemiJoin()); + + // Return -1 if the rowCount of the returned side is unknown. + long rowCount; + if (joinOp == JoinOperator.RIGHT_SEMI_JOIN + || joinOp == JoinOperator.RIGHT_ANTI_JOIN) { + if (childrenStatsResult.get(1).getRowCount() == -1) { + return -1; + } + rowCount = childrenStatsResult.get(1).getRowCount(); + } else { + if (childrenStatsResult.get(0).getRowCount() == -1) { + return -1; + } + rowCount = childrenStatsResult.get(0).getRowCount(); + } + double minSelectivity = 1.0; + for (Expr eqJoinPredicate : eqJoinConjuncts) { + long lhsNdv = getNdv(eqJoinPredicate.getChild(0)); + lhsNdv = Math.min(lhsNdv, childrenStatsResult.get(0).getRowCount()); + long rhsNdv = getNdv(eqJoinPredicate.getChild(1)); + rhsNdv = Math.min(rhsNdv, childrenStatsResult.get(1).getRowCount()); + + // Skip conjuncts with unknown NDV on either side. + if (lhsNdv == -1 || rhsNdv == -1) { + continue; + } + + double selectivity = 1.0; + switch (joinOp) { + case LEFT_SEMI_JOIN: { + selectivity = (double) Math.min(lhsNdv, rhsNdv) / (double) (lhsNdv); + break; + } + case RIGHT_SEMI_JOIN: { + selectivity = (double) Math.min(lhsNdv, rhsNdv) / (double) (rhsNdv); + break; + } + case LEFT_ANTI_JOIN: + case NULL_AWARE_LEFT_ANTI_JOIN: { + selectivity = (double) (lhsNdv > rhsNdv ? (lhsNdv - rhsNdv) : lhsNdv) / (double) lhsNdv; + break; + } + case RIGHT_ANTI_JOIN: { + selectivity = (double) (rhsNdv > lhsNdv ? (rhsNdv - lhsNdv) : rhsNdv) / (double) rhsNdv; + break; + } + default: + Preconditions.checkState(false); + } + minSelectivity = Math.min(minSelectivity, selectivity); + } + + Preconditions.checkState(rowCount != -1); + return Math.round(rowCount * minSelectivity); + } + + /** + * Unwraps the SlotRef in expr and returns the NDVs of it. + * Returns -1 if the NDVs are unknown or if expr is not a SlotRef. + */ + private long getNdv(Expr expr) { + SlotRef slotRef = expr.unwrapSlotRef(false); + if (slotRef == null) { + return -1; + } + SlotDescriptor slotDesc = slotRef.getDesc(); + if (slotDesc == null) { + return -1; + } + ColumnStats stats = slotDesc.getStats(); + if (!stats.hasNumDistinctValues()) { + return -1; + } + return stats.getNumDistinctValues(); + } + + private long getJoinrowCount() { + Preconditions.checkState(joinOp.isInnerJoin() || joinOp.isOuterJoin()); + Preconditions.checkState(childrenStatsResult.size() == 2); + + long lhsCard = childrenStatsResult.get(0).getRowCount(); + long rhsCard = childrenStatsResult.get(1).getRowCount(); + if (lhsCard == -1 || rhsCard == -1) { + return lhsCard; + } + + // Collect join conjuncts that are eligible to participate in rowCount estimation. + List<HashJoinNode.EqJoinConjunctScanSlots> eqJoinConjunctSlots = new ArrayList<>(); + for (Expr eqJoinConjunct : eqJoinConjuncts) { + HashJoinNode.EqJoinConjunctScanSlots slots = HashJoinNode.EqJoinConjunctScanSlots.create(eqJoinConjunct); + if (slots != null) { + eqJoinConjunctSlots.add(slots); + } + } + + if (eqJoinConjunctSlots.isEmpty()) { + // There are no eligible equi-join conjuncts. + return lhsCard; + } + + return getGenericJoinrowCount(eqJoinConjunctSlots, lhsCard, rhsCard); + } + + /** + * Returns the estimated join rowCount of a generic N:M inner or outer join based + * on the given list of equi-join conjunct slots and the join input cardinalities. + * The returned result is >= 0. + * The list of join conjuncts must be non-empty and the cardinalities must be >= 0. + * + * <p> + * Generic estimation: + * rowCount = |child(0)| * |child(1)| / max(NDV(L.c), NDV(R.d)) + * - case A: NDV(L.c) <= NDV(R.d) + * every row from child(0) joins with |child(1)| / NDV(R.d) rows + * - case B: NDV(L.c) > NDV(R.d) + * every row from child(1) joins with |child(0)| / NDV(L.c) rows + * - we adjust the NDVs from both sides to account for predicates that may + * might have reduce the rowCount and NDVs + *</p> + */ + private long getGenericJoinrowCount(List<HashJoinNode.EqJoinConjunctScanSlots> eqJoinConjunctSlots, + long lhsCard, + long rhsCard) { + Preconditions.checkState(joinOp.isInnerJoin() || joinOp.isOuterJoin()); + Preconditions.checkState(!eqJoinConjunctSlots.isEmpty()); + Preconditions.checkState(lhsCard >= 0 && rhsCard >= 0); + + long result = -1; + for (HashJoinNode.EqJoinConjunctScanSlots slots : eqJoinConjunctSlots) { + // Adjust the NDVs on both sides to account for predicates. Intuitively, the NDVs + // should only decrease. We ignore adjustments that would lead to an increase. + double lhsAdjNdv = slots.lhsNdv(); + if (slots.lhsNumRows() > lhsCard) { + lhsAdjNdv *= lhsCard / slots.lhsNumRows(); + } + double rhsAdjNdv = slots.rhsNdv(); + if (slots.rhsNumRows() > rhsCard) { + rhsAdjNdv *= rhsCard / slots.rhsNumRows(); + } + // A lower limit of 1 on the max Adjusted Ndv ensures we don't estimate + // rowCount more than the max possible. + long tmpNdv = Double.doubleToLongBits(Math.max(1, Math.max(lhsAdjNdv, rhsAdjNdv))); + long joinCard = tmpNdv == rhsCard + ? lhsCard + : CheckedMath.checkedMultiply( + Math.round((lhsCard / Math.max(1, Math.max(lhsAdjNdv, rhsAdjNdv)))), rhsCard); + if (result == -1) { + result = joinCard; + } else { + result = Math.min(result, joinCard); + } + } + Preconditions.checkState(result >= 0); + return result; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/MysqlStatsDerive.java similarity index 54% copy from fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java copy to fe/fe-core/src/main/java/org/apache/doris/statistics/MysqlStatsDerive.java index 9ff438e7f5..c6b8921ce2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/MysqlStatsDerive.java @@ -17,20 +17,23 @@ package org.apache.doris.statistics; -import org.apache.doris.planner.PlanNode; +/** + * Derive MysqlScanNode statistics. + */ +public class MysqlStatsDerive extends BaseStatsDerive { -public class DeriveFactory { + // Current ODBC_SCAN_NODE also uses this derivation method + @Override + public StatsDeriveResult deriveStats() { + return new StatsDeriveResult(deriveRowCount(), deriveColumnToDataSize(), deriveColumnToNdv()); + } - public BaseStatsDerive getStatsDerive(PlanNode.NodeType nodeType) { - switch (nodeType) { - case OLAP_SCAN_NODE: - return new OlapScanStatsDerive(); - case AGG_NODE: - case HASH_JOIN_NODE: - case MERGE_NODE: - case DEFAULT: - default: - return new BaseStatsDerive(); - } + @Override + protected long deriveRowCount() { + // this is just to avoid mysql scan node's rowCount being -1. So that we can calculate the join cost + // normally. + // We assume that the data volume of all mysql tables is very small, so set rowCount directly to 1. + rowCount = rowCount == -1 ? 1 : rowCount; + return rowCount; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapScanStatsDerive.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapScanStatsDerive.java index c4f8a23f76..efa1e1aafb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapScanStatsDerive.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapScanStatsDerive.java @@ -30,6 +30,9 @@ import com.google.common.base.Preconditions; import java.util.HashMap; import java.util.Map; +/** + * Derive OlapScanNode Statistics. + */ public class OlapScanStatsDerive extends BaseStatsDerive { // Currently, due to the structure of doris, // the selected materialized view is not determined when calculating the statistical information of scan, @@ -50,7 +53,7 @@ public class OlapScanStatsDerive extends BaseStatsDerive { @Override public StatsDeriveResult deriveStats() { - /** + /* * Compute InAccurate cardinality before mv selector and tablet pruning. * - Accurate statistical information relies on the selector of materialized views and bucket reduction. * - However, Those both processes occur after the reorder algorithm is completed. @@ -68,9 +71,16 @@ public class OlapScanStatsDerive extends BaseStatsDerive { return new StatsDeriveResult(deriveRowCount(), slotIdToDataSize, slotIdToNdv); } + /** + * Desc: Build OlapScaNode infrastructure. + * + * @param: node + * @return: void + */ public void buildStructure(OlapScanNode node) { slotIdToDataSize = new HashMap<>(); slotIdToNdv = new HashMap<>(); + slotIdToTableIdAndColumnName = new HashMap<>(); if (node.getTupleDesc() != null && node.getTupleDesc().getTable() != null) { long tableId = node.getTupleDesc().getTable().getId(); @@ -90,6 +100,12 @@ public class OlapScanStatsDerive extends BaseStatsDerive { //TODO:Implement the getStatistics interface //now there is nothing in statistics, need to wait for collection finished + /** + * Desc: Get ndv and dataSize from statistics. + * + * @param pair TableId and ColumnName + * @return {@link Pair} + */ public Pair<Long, Float> getNdvAndDataSizeFromStatistics(Pair<Long, String> pair) { long ndv = -1; float dataSize = -1; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/SelectStatsDerive.java similarity index 57% copy from fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java copy to fe/fe-core/src/main/java/org/apache/doris/statistics/SelectStatsDerive.java index 9ff438e7f5..09a4c4c5df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/SelectStatsDerive.java @@ -17,20 +17,25 @@ package org.apache.doris.statistics; -import org.apache.doris.planner.PlanNode; +import com.google.common.base.Preconditions; -public class DeriveFactory { +/** + * Derive SelectNode statistics. + */ +public class SelectStatsDerive extends BaseStatsDerive { - public BaseStatsDerive getStatsDerive(PlanNode.NodeType nodeType) { - switch (nodeType) { - case OLAP_SCAN_NODE: - return new OlapScanStatsDerive(); - case AGG_NODE: - case HASH_JOIN_NODE: - case MERGE_NODE: - case DEFAULT: - default: - return new BaseStatsDerive(); - } + // Current SORT_NODE also uses this derivation method + @Override + public StatsDeriveResult deriveStats() { + return new StatsDeriveResult(deriveRowCount(), deriveColumnToDataSize(), deriveColumnToNdv()); + } + + @Override + protected long deriveRowCount() { + Preconditions.checkState(!childrenStatsResult.isEmpty()); + rowCount = childrenStatsResult.get(0).getRowCount(); + applyConjunctsSelectivity(); + capRowCountAtLimit(); + return rowCount; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java index a49e41833f..8e0f3eff46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java @@ -181,4 +181,5 @@ public class StatisticsManager { public Statistics getStatistics() { return statistics; } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableFunctionStatsDerive.java similarity index 58% copy from fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java copy to fe/fe-core/src/main/java/org/apache/doris/statistics/TableFunctionStatsDerive.java index 9ff438e7f5..89abbccf33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableFunctionStatsDerive.java @@ -17,20 +17,22 @@ package org.apache.doris.statistics; -import org.apache.doris.planner.PlanNode; +import com.google.common.base.Preconditions; -public class DeriveFactory { +/** + * Derive TableFunctionNode statistics. + */ +public class TableFunctionStatsDerive extends BaseStatsDerive { + @Override + public StatsDeriveResult deriveStats() { + return new StatsDeriveResult(deriveRowCount(), deriveColumnToDataSize(), deriveColumnToNdv()); + } - public BaseStatsDerive getStatsDerive(PlanNode.NodeType nodeType) { - switch (nodeType) { - case OLAP_SCAN_NODE: - return new OlapScanStatsDerive(); - case AGG_NODE: - case HASH_JOIN_NODE: - case MERGE_NODE: - case DEFAULT: - default: - return new BaseStatsDerive(); - } + @Override + protected long deriveRowCount() { + Preconditions.checkState(!childrenStatsResult.isEmpty()); + // TODO the rowCount = child rowCount * rowCount of list column + rowCount = childrenStatsResult.get(0).getRowCount(); + return rowCount; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/StatisticDeriveTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/StatisticDeriveTest.java new file mode 100644 index 0000000000..36de4967ed --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/StatisticDeriveTest.java @@ -0,0 +1,276 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.utframe.TestWithFeService; + +import org.junit.Assert; +import org.junit.jupiter.api.Test; + +public class StatisticDeriveTest extends TestWithFeService { + @Override + protected void runBeforeAll() throws Exception { + // create database + createDatabase("test"); + + createTable( + "CREATE TABLE test.join1 (\n" + + " `dt` int(11) COMMENT \"\",\n" + + " `id` int(11) COMMENT \"\",\n" + + " `value` bigint(8) COMMENT \"\"\n" + + ") ENGINE=OLAP\n" + + "DUPLICATE KEY(`dt`, `id`)\n" + + "PARTITION BY RANGE(`dt`)\n" + + "(PARTITION p1 VALUES LESS THAN (\"10\"))\n" + + "DISTRIBUTED BY HASH(`id`) BUCKETS 10\n" + + "PROPERTIES (\n" + + " \"replication_num\" = \"1\"\n" + + ");"); + + createTable( + "CREATE TABLE test.join2 (\n" + + " `dt` int(11) COMMENT \"\",\n" + + " `id` int(11) COMMENT \"\",\n" + + " `value` varchar(8) COMMENT \"\"\n" + + ") ENGINE=OLAP\n" + + "DUPLICATE KEY(`dt`, `id`)\n" + + "PARTITION BY RANGE(`dt`)\n" + + "(PARTITION p1 VALUES LESS THAN (\"10\"))\n" + + "DISTRIBUTED BY HASH(`id`) BUCKETS 10\n" + + "PROPERTIES (\n" + + " \"replication_num\" = \"1\"\n" + + ");"); + + createTable("create external table test.mysql_table\n" + + "(k1 int, k2 int)\n" + + "ENGINE=MYSQL\n" + + "PROPERTIES (\n" + + "\"host\" = \"127.0.0.1\",\n" + + "\"port\" = \"3306\",\n" + + "\"user\" = \"root\",\n" + + "\"password\" = \"123\",\n" + + "\"database\" = \"db1\",\n" + + "\"table\" = \"tbl1\"\n" + + ");"); + + createTable("create external table test.odbc_oracle\n" + + "(k1 float, k2 int)\n" + + "ENGINE=ODBC\n" + + "PROPERTIES (\n" + + "\"host\" = \"127.0.0.1\",\n" + + "\"port\" = \"3306\",\n" + + "\"user\" = \"root\",\n" + + "\"password\" = \"123\",\n" + + "\"database\" = \"db1\",\n" + + "\"table\" = \"tbl1\",\n" + + "\"driver\" = \"Oracle Driver\",\n" + + "\"odbc_type\" = \"oracle\"\n" + + ");"); + + createTable( + "create external table test.odbc_mysql\n" + + "(k1 int, k2 int)\n" + + "ENGINE=ODBC\n" + + "PROPERTIES (\n" + + "\"host\" = \"127.0.0.1\",\n" + + "\"port\" = \"3306\",\n" + + "\"user\" = \"root\",\n" + + "\"password\" = \"123\",\n" + + "\"database\" = \"db1\",\n" + + "\"table\" = \"tbl1\",\n" + + "\"driver\" = \"Oracle Driver\",\n" + + "\"odbc_type\" = \"mysql\"\n" + + ");"); + + } + + @Test + public void testAggStatsDerive() throws Exception { + // contain AggNode/OlapScanNode + String sql = "select dt, max(id), value from test.join1 group by dt, value;"; + StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); + SessionVariable sessionVariable = connectContext.getSessionVariable(); + sessionVariable.setEnableJoinReorderBasedCost(true); + sessionVariable.setDisableJoinReorder(false); + stmtExecutor.execute(); + Assert.assertNotNull(stmtExecutor.planner()); + Assert.assertNotNull(stmtExecutor.planner().getFragments()); + Assert.assertNotEquals(0, stmtExecutor.planner().getFragments().size()); + assertSQLPlanOrErrorMsgContains(sql, "AGGREGATE"); + assertSQLPlanOrErrorMsgContains(sql, "OlapScanNode"); + } + + @Test + public void testAnalyticEvalStatsDerive() throws Exception { + // contain SortNode/ExchangeNode/OlapScanNode + String sql = "select dt, min(id) OVER (PARTITION BY dt ORDER BY id) from test.join1"; + StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); + SessionVariable sessionVariable = connectContext.getSessionVariable(); + sessionVariable.setEnableJoinReorderBasedCost(true); + sessionVariable.setDisableJoinReorder(false); + stmtExecutor.execute(); + Assert.assertNotNull(stmtExecutor.planner()); + Assert.assertNotNull(stmtExecutor.planner().getFragments()); + Assert.assertNotEquals(0, stmtExecutor.planner().getFragments().size()); + System.out.println(getSQLPlanOrErrorMsg("explain " + sql)); + assertSQLPlanOrErrorMsgContains(sql, "ANALYTIC"); + assertSQLPlanOrErrorMsgContains(sql, "SORT"); + assertSQLPlanOrErrorMsgContains(sql, "EXCHANGE"); + } + + @Test + public void testAssertNumberRowsStatsDerive() throws Exception { + // contain CrossJoinNode/ExchangeNode/AssertNumberRowsNode/AggNode/OlapScanNode + String sql = "SELECT CASE\n" + + "WHEN (\n" + + "SELECT COUNT(*) / 2\n" + + "FROM test.join1\n" + + ") > id THEN (\n" + + "SELECT AVG(id)\n" + + "FROM test.join1\n" + + ")\n" + + "ELSE (\n" + + "SELECT SUM(id)\n" + + "FROM test.join1\n" + + ")\n" + + "END AS kk4\n" + + "FROM test.join1;"; + StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); + SessionVariable sessionVariable = connectContext.getSessionVariable(); + sessionVariable.setEnableJoinReorderBasedCost(true); + sessionVariable.setDisableJoinReorder(false); + stmtExecutor.execute(); + Assert.assertNotNull(stmtExecutor.planner()); + Assert.assertNotNull(stmtExecutor.planner().getFragments()); + Assert.assertNotEquals(0, stmtExecutor.planner().getFragments().size()); + System.out.println(getSQLPlanOrErrorMsg("explain " + sql)); + assertSQLPlanOrErrorMsgContains(sql, "CROSS JOIN"); + assertSQLPlanOrErrorMsgContains(sql, "ASSERT NUMBER OF ROWS"); + assertSQLPlanOrErrorMsgContains(sql, "EXCHANGE"); + assertSQLPlanOrErrorMsgContains(sql, "AGGREGATE"); + assertSQLPlanOrErrorMsgContains(sql, "OlapScanNode"); + } + + @Test + public void testEmptySetStatsDerive() throws Exception { + String sql = "select * from test.join1 where 1 = 2"; + StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); + SessionVariable sessionVariable = connectContext.getSessionVariable(); + sessionVariable.setEnableJoinReorderBasedCost(true); + sessionVariable.setDisableJoinReorder(false); + stmtExecutor.execute(); + Assert.assertNotNull(stmtExecutor.planner()); + Assert.assertNotNull(stmtExecutor.planner().getFragments()); + Assert.assertNotEquals(0, stmtExecutor.planner().getFragments().size()); + System.out.println(getSQLPlanOrErrorMsg("explain " + sql)); + assertSQLPlanOrErrorMsgContains(sql, "EMPTYSET"); + } + + @Test + public void testRepeatStatsDerive() throws Exception { + String sql = "select dt, id, sum(value) from test.join1 group by rollup (dt, id)"; + StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); + SessionVariable sessionVariable = connectContext.getSessionVariable(); + sessionVariable.setEnableJoinReorderBasedCost(true); + sessionVariable.setDisableJoinReorder(false); + stmtExecutor.execute(); + Assert.assertNotNull(stmtExecutor.planner()); + Assert.assertNotNull(stmtExecutor.planner().getFragments()); + Assert.assertNotEquals(0, stmtExecutor.planner().getFragments().size()); + System.out.println(getSQLPlanOrErrorMsg("explain " + sql)); + assertSQLPlanOrErrorMsgContains(sql, "REPEAT_NODE"); + } + + @Test + public void testHashJoinStatsDerive() throws Exception { + // contain HashJoinNode/ExchangeNode/OlapScanNode + String sql = "select * from test.join1 a inner join test.join2 b on a.dt = b.dt"; + StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); + SessionVariable sessionVariable = connectContext.getSessionVariable(); + sessionVariable.setEnableJoinReorderBasedCost(true); + sessionVariable.setDisableJoinReorder(false); + stmtExecutor.execute(); + Assert.assertNotNull(stmtExecutor.planner()); + Assert.assertNotNull(stmtExecutor.planner().getFragments()); + Assert.assertNotEquals(0, stmtExecutor.planner().getFragments().size()); + System.out.println(getSQLPlanOrErrorMsg("explain " + sql)); + assertSQLPlanOrErrorMsgContains(sql, "HASH JOIN"); + } + + @Test + public void testMysqlScanStatsDerive() throws Exception { + String sql = "select * from test.mysql_table"; + SessionVariable sessionVariable = connectContext.getSessionVariable(); + sessionVariable.setEnableJoinReorderBasedCost(true); + sessionVariable.setDisableJoinReorder(false); + StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); + stmtExecutor.execute(); + Assert.assertNotNull(stmtExecutor.planner()); + Assert.assertNotNull(stmtExecutor.planner().getFragments()); + Assert.assertNotEquals(0, stmtExecutor.planner().getFragments().size()); + System.out.println(getSQLPlanOrErrorMsg("explain " + sql)); + assertSQLPlanOrErrorMsgContains(sql, "SCAN MYSQL"); + } + + @Test + public void testOdbcScanStatsDerive() throws Exception { + String sql = "select * from test.odbc_mysql"; + SessionVariable sessionVariable = connectContext.getSessionVariable(); + sessionVariable.setEnableJoinReorderBasedCost(true); + sessionVariable.setDisableJoinReorder(false); + StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); + stmtExecutor.execute(); + Assert.assertNotNull(stmtExecutor.planner()); + Assert.assertNotNull(stmtExecutor.planner().getFragments()); + Assert.assertNotEquals(0, stmtExecutor.planner().getFragments().size()); + System.out.println(getSQLPlanOrErrorMsg("explain " + sql)); + assertSQLPlanOrErrorMsgContains(sql, "SCAN ODBC"); + } + + @Test + public void testTableFunctionStatsDerive() throws Exception { + String sql = "select * from test.join2 lateral view explode_split(value, \",\") tmp as e1"; + SessionVariable sessionVariable = connectContext.getSessionVariable(); + sessionVariable.setEnableJoinReorderBasedCost(true); + sessionVariable.setDisableJoinReorder(false); + StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); + stmtExecutor.execute(); + Assert.assertNotNull(stmtExecutor.planner()); + Assert.assertNotNull(stmtExecutor.planner().getFragments()); + Assert.assertNotEquals(0, stmtExecutor.planner().getFragments().size()); + System.out.println(getSQLPlanOrErrorMsg("explain " + sql)); + assertSQLPlanOrErrorMsgContains(sql, "TABLE FUNCTION NODE"); + } + + @Test + public void testUnionStatsDerive() throws Exception { + String sql = "select * from test.join1 union select * from test.join2"; + SessionVariable sessionVariable = connectContext.getSessionVariable(); + sessionVariable.setEnableJoinReorderBasedCost(true); + sessionVariable.setDisableJoinReorder(false); + StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); + stmtExecutor.execute(); + Assert.assertNotNull(stmtExecutor.planner()); + Assert.assertNotNull(stmtExecutor.planner().getFragments()); + Assert.assertNotEquals(0, stmtExecutor.planner().getFragments().size()); + System.out.println(getSQLPlanOrErrorMsg("explain " + sql)); + assertSQLPlanOrErrorMsgContains(sql, "UNION"); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org