This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 394a9a1 [Feature] Runtime Filtering for Doris (Background, Configuration, FE Implement, Tuning, Test ) (#6121) 394a9a1 is described below commit 394a9a1472cb8bfe91f6649afcbeee7f0178dc62 Author: Xinyi Zou <zouxinyi...@foxmail.com> AuthorDate: Tue Jul 13 11:36:01 2021 +0800 [Feature] Runtime Filtering for Doris (Background, Configuration, FE Implement, Tuning, Test ) (#6121) - `RuntimeFilterGenerator` is used to generate Runtime Filter and assign it to the node that uses Runtime Filter in the query plan. - `RuntimeFilter` represents a filter in the query plan, including the specific properties of the filter, the binding method of expr and tuple slot, etc. - `RuntimeFilterTarget` indicates the filter information provided to ScanNode, including target expr, whether to merge, etc. --- .../java/org/apache/doris/analysis/Analyzer.java | 46 +++ .../org/apache/doris/analysis/BinaryPredicate.java | 2 + .../main/java/org/apache/doris/analysis/Expr.java | 37 ++ .../java/org/apache/doris/analysis/Predicate.java | 5 + .../org/apache/doris/analysis/StringLiteral.java | 30 +- .../org/apache/doris/analysis/SysVariableDesc.java | 20 +- .../doris/analysis/TupleIsNullPredicate.java | 20 + .../util/BitUtil.java} | 23 +- .../apache/doris/planner/DistributedPlanner.java | 3 - .../org/apache/doris/planner/HashJoinNode.java | 15 +- .../org/apache/doris/planner/OlapScanNode.java | 4 + .../org/apache/doris/planner/PlanFragment.java | 30 ++ .../java/org/apache/doris/planner/PlanNode.java | 36 ++ .../java/org/apache/doris/planner/Planner.java | 10 +- .../org/apache/doris/planner/RuntimeFilter.java | 457 +++++++++++++++++++++ .../doris/planner/RuntimeFilterGenerator.java | 400 ++++++++++++++++++ .../org/apache/doris/planner/RuntimeFilterId.java | 56 +++ .../java/org/apache/doris/planner/ScanNode.java | 2 + .../apache/doris/planner/SingleNodePlanner.java | 31 ++ .../main/java/org/apache/doris/qe/Coordinator.java | 74 +++- .../apache/doris/qe/RuntimeFilterTypeHelper.java | 115 ++++++ .../java/org/apache/doris/qe/SessionVariable.java | 102 +++++ .../main/java/org/apache/doris/qe/VariableMgr.java | 36 +- .../org/apache/doris/qe/VariableVarConverterI.java | 4 +- .../org/apache/doris/qe/VariableVarConverters.java | 59 ++- .../org/apache/doris/planner/QueryPlanTest.java | 79 ++++ .../doris/planner/RuntimeFilterGeneratorTest.java | 426 +++++++++++++++++++ .../doris/qe/RuntimeFilterTypeHelperTest.java | 77 ++++ .../java/org/apache/doris/qe/VariableMgrTest.java | 6 + gensrc/thrift/PaloInternalService.thrift | 24 +- gensrc/thrift/PlanNodes.thrift | 4 +- 31 files changed, 2131 insertions(+), 102 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index 3634768..a879f08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -34,6 +34,7 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.IdGenerator; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.planner.PlanNode; +import org.apache.doris.planner.RuntimeFilter; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.BetweenToCompoundRule; import org.apache.doris.rewrite.ExprRewriteRule; @@ -148,6 +149,9 @@ public class Analyzer { private boolean isUDFAllowed = true; // timezone specified for some operation, such as broker load private String timezone = TimeUtils.DEFAULT_TIME_ZONE; + + // The runtime filter that is expected to be used + private final List<RuntimeFilter> assignedRuntimeFilters = new ArrayList<>(); public void setIsSubquery() { isSubquery = true; @@ -163,6 +167,10 @@ public class Analyzer { public void setTimezone(String timezone) { this.timezone = timezone; } public String getTimezone() { return timezone; } + public void putAssignedRuntimeFilter(RuntimeFilter rf) { assignedRuntimeFilters.add(rf); } + public List<RuntimeFilter> getAssignedRuntimeFilter() { return assignedRuntimeFilters; } + public void clearAssignedRuntimeFilters() { assignedRuntimeFilters.clear(); } + // state shared between all objects of an Analyzer tree // TODO: Many maps here contain properties about tuples, e.g., whether // a tuple is outer/semi joined, etc. Remove the maps in favor of making @@ -551,6 +559,10 @@ public class Analyzer { return globalState.descTbl.getTupleDesc(id); } + public SlotDescriptor getSlotDesc(SlotId id) { + return globalState.descTbl.getSlotDesc(id); + } + /** * Given a "table alias"."column alias", return the SlotDescriptor * @@ -1809,4 +1821,38 @@ public class Analyzer { } } } + + /** + * Column conduction, can slot a value-transfer to slot b + * + * TODO(zxy) Use value-transfer graph to check + */ + public boolean hasValueTransfer(SlotId a, SlotId b) { + return a.equals(b); + } + + /** + * Returns sorted slot IDs with value transfers from 'srcSid'. + * Time complexity: O(V) where V = number of slots + * + * TODO(zxy) Use value-transfer graph to check + */ + public List<SlotId> getValueTransferTargets(SlotId srcSid) { + List<SlotId> result = new ArrayList<>(); + result.add(srcSid); + return result; + } + + /** + * Returns true if any of the given slot ids or their value-transfer targets belong + * to an outer-joined tuple. + */ + public boolean hasOuterJoinedValueTransferTarget(List<SlotId> sids) { + for (SlotId srcSid: sids) { + for (SlotId dstSid: getValueTransferTargets(srcSid)) { + if (isOuterJoined(getTupleId(dstSid))) return true; + } + } + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java index 9a98ac5..8c3da5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java @@ -133,6 +133,8 @@ public class BinaryPredicate extends Predicate implements Writable { public boolean isEquivalence() { return this == EQ || this == EQ_FOR_NULL; }; + public boolean isUnNullSafeEquivalence() { return this == EQ; }; + public boolean isUnequivalence() { return this == NE; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java index 3f73a31..cafd88f 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java @@ -1704,4 +1704,41 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl final Expr newExpr = ExpressionFunctions.INSTANCE.evalExpr(this); return newExpr != null ? newExpr : this; } + + public String getStringValue() { + if (this instanceof LiteralExpr) { + return ((LiteralExpr) this).getStringValue(); + } + return ""; + } + + public static Expr getFirstBoundChild(Expr expr, List<TupleId> tids) { + for (Expr child: expr.getChildren()) { + if (child.isBoundByTupleIds(tids)) return child; + } + return null; + } + + /** + * Returns true if expr contains specify function, otherwise false. + */ + public boolean isContainsFunction(String functionName) { + if (fn == null) return false; + if (fn.functionName().equalsIgnoreCase(functionName)) return true; + for (Expr child: children) { + if (child.isContainsFunction(functionName)) return true; + } + return false; + } + + /** + * Returns true if expr contains specify className, otherwise false. + */ + public boolean isContainsClass(String className) { + if (this.getClass().getName().equalsIgnoreCase(className)) return true; + for (Expr child: children) { + if (child.isContainsClass(className)) return true; + } + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Predicate.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Predicate.java index 0f033b4..cc4f4dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Predicate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Predicate.java @@ -98,6 +98,11 @@ public abstract class Predicate extends Expr { && ((BinaryPredicate) expr).getOp().isEquivalence(); } + public static boolean isUnNullSafeEquivalencePredicate(Expr expr) { + return (expr instanceof BinaryPredicate) + && ((BinaryPredicate) expr).getOp().isUnNullSafeEquivalence(); + } + public static boolean canPushDownPredicate(Expr expr) { if (!(expr instanceof Predicate)) { return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StringLiteral.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StringLiteral.java index 8af05d3..ab202e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StringLiteral.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StringLiteral.java @@ -24,7 +24,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.io.Text; -import org.apache.doris.qe.SqlModeHelper; +import org.apache.doris.qe.VariableVarConverters; import org.apache.doris.thrift.TExprNode; import org.apache.doris.thrift.TExprNodeType; import org.apache.doris.thrift.TStringLiteral; @@ -43,18 +43,8 @@ import java.util.Objects; public class StringLiteral extends LiteralExpr { private static final Logger LOG = LogManager.getLogger(StringLiteral.class); private String value; - /** - * the session variable `sql_mode` is a special kind of variable. - * it's real type is int, so when querying `select @@sql_mode`, the return column - * type is "int". but user usually set this variable by string, such as: - * `set @@sql_mode = 'STRICT_TRANS_TABLES'` - * or - * `set @@sql_mode = concat(@@sql_mode, 'STRICT_TRANS_TABLES')'` - * <p> - * So when it need to be cast to int, it means "cast 'STRICT_TRANS_TABLES' to Integer". - * To support this, we set `isSqlMode` to true, so that it can cast sql mode name to integer. - */ - private boolean isSqlMode = false; + // Means the converted session variable need to be cast to int, such as "cast 'STRICT_TRANS_TABLES' to Integer". + private String beConverted = ""; public StringLiteral() { super(); @@ -73,8 +63,8 @@ public class StringLiteral extends LiteralExpr { value = other.value; } - public void setIsSqlMode(boolean val) { - this.isSqlMode = val; + public void setBeConverted(String val) { + this.beConverted = val; } @Override @@ -203,20 +193,18 @@ public class StringLiteral extends LiteralExpr { case SMALLINT: case INT: case BIGINT: - if (isSqlMode) { + if (VariableVarConverters.hasConverter(beConverted)) { try { - long sqlMode = SqlModeHelper.encode(value); - return new IntLiteral(sqlMode, targetType); + return new IntLiteral(VariableVarConverters.encode(beConverted, value), targetType); } catch (DdlException e) { throw new AnalysisException(e.getMessage()); } } return new IntLiteral(value, targetType); case LARGEINT: - if (isSqlMode) { + if (VariableVarConverters.hasConverter(beConverted)) { try { - long sqlMode = SqlModeHelper.encode(value); - return new LargeIntLiteral(String.valueOf(sqlMode)); + return new LargeIntLiteral(String.valueOf(VariableVarConverters.encode(beConverted, value))); } catch (DdlException e) { throw new AnalysisException(e.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SysVariableDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SysVariableDesc.java index 5675f52..c000baf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SysVariableDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SysVariableDesc.java @@ -21,9 +21,8 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorReport; -import org.apache.doris.qe.SessionVariable; -import org.apache.doris.qe.SqlModeHelper; import org.apache.doris.qe.VariableMgr; +import org.apache.doris.qe.VariableVarConverters; import org.apache.doris.thrift.TBoolLiteral; import org.apache.doris.thrift.TExprNode; import org.apache.doris.thrift.TExprNodeType; @@ -72,10 +71,10 @@ public class SysVariableDesc extends Expr { @Override public void analyzeImpl(Analyzer analyzer) throws AnalysisException { VariableMgr.fillValue(analyzer.getContext().getSessionVariable(), this); - if (!Strings.isNullOrEmpty(name) && name.equalsIgnoreCase(SessionVariable.SQL_MODE)) { + if (!Strings.isNullOrEmpty(name) && VariableVarConverters.hasConverter(name)) { setType(Type.VARCHAR); try { - setStringValue(SqlModeHelper.decode(intValue)); + setStringValue(VariableVarConverters.decode(name, intValue)); } catch (DdlException e) { ErrorReport.reportAnalysisException(e.getMessage()); } @@ -117,16 +116,13 @@ public class SysVariableDesc extends Expr { @Override public Expr getResultValue() throws AnalysisException { Expr expr = super.getResultValue(); - if (!Strings.isNullOrEmpty(name) && name.equalsIgnoreCase(SessionVariable.SQL_MODE)) { - // SQL_MODE is a special variable. Its type is int, but it is usually set using a string. - // Such as `set sql_mode = concat(@@sql_mode, "STRICT_TRANS_TABLES");` - // So we return the string type here so that it can correctly match the subsequent function signature. - // We will convert the string to int in VariableMgr. - // And we also set `isSqlMode` to true in StringLiteral, so that it can be cast back + if (!Strings.isNullOrEmpty(name) && VariableVarConverters.hasConverter(name)) { + // Return the string type here so that it can correctly match the subsequent function signature. + // And we also set `beConverted` to session variable name in StringLiteral, so that it can be cast back // to Integer when returning value. try { - StringLiteral s = new StringLiteral(SqlModeHelper.decode(intValue)); - s.setIsSqlMode(true); + StringLiteral s = new StringLiteral(VariableVarConverters.decode(name, intValue)); + s.setBeConverted(name); return s; } catch (DdlException e) { throw new AnalysisException(e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleIsNullPredicate.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleIsNullPredicate.java index 872c18c..c4cb13f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleIsNullPredicate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleIsNullPredicate.java @@ -153,4 +153,24 @@ public class TupleIsNullPredicate extends Predicate { public String toSqlImpl() { return "TupleIsNull(" + Joiner.on(",").join(tupleIds) + ")"; } + + /** + * Recursive function that replaces all 'IF(TupleIsNull(), NULL, e)' exprs in + * 'expr' with e and returns the modified expr. + */ + public static Expr unwrapExpr(Expr expr) { + if (expr instanceof FunctionCallExpr) { + FunctionCallExpr fnCallExpr = (FunctionCallExpr) expr; + List<Expr> params = fnCallExpr.getParams().exprs(); + if (fnCallExpr.getFnName().getFunction().equals("if") && + params.get(0) instanceof TupleIsNullPredicate && + Expr.IS_NULL_LITERAL.apply(params.get(1))) { + return unwrapExpr(params.get(2)); + } + } + for (int i = 0; i < expr.getChildren().size(); ++i) { + expr.setChild(i, unwrapExpr(expr.getChild(i))); + } + return expr; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BitUtil.java similarity index 53% copy from fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java copy to fe/fe-core/src/main/java/org/apache/doris/common/util/BitUtil.java index 26d7dae..5702755 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BitUtil.java @@ -15,10 +15,25 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.qe; +package org.apache.doris.common.util; -import org.apache.doris.common.DdlException; +public class BitUtil { -public interface VariableVarConverterI { - public String convert(String value) throws DdlException; + // Returns the log2 of 'val'. 'val' must be > 0. + public static int log2Ceiling(long val) { + // Formula is based on the Long.numberOfLeadingZeros() javadoc comment. + return 64 - Long.numberOfLeadingZeros(val - 1); + } + + // Round up 'val' to the nearest power of two. 'val' must be > 0. + public static long roundUpToPowerOf2(long val) { + return 1L << log2Ceiling(val); + } + + // Round up 'val' to the nearest multiple of a power-of-two 'factor'. + // 'val' must be > 0. + public static long roundUpToPowerOf2Factor(long val, long factor) { + return (val + (factor - 1)) & ~(factor - 1); + } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 677bde5..5aa9bb6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -427,7 +427,6 @@ public class DistributedPlanner { node.setChild(0, leftChildFragment.getPlanRoot()); connectChildFragment(node, 1, leftChildFragment, rightChildFragment); leftChildFragment.setPlanRoot(node); - return leftChildFragment; } else { node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED); @@ -472,8 +471,6 @@ public class DistributedPlanner { rightChildFragment.setDestination(rhsExchange); rightChildFragment.setOutputPartition(rhsJoinPartition); - // TODO: Before we support global runtime filter, only shuffle join do not enable local runtime filter - node.setIsPushDown(false); return joinFragment; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index 2360d5c..6b06de2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -60,7 +60,6 @@ public class HashJoinNode extends PlanNode { private List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList(); // join conjuncts from the JOIN clause that aren't equi-join predicates private List<Expr> otherJoinConjuncts; - private boolean isPushDown = false; private DistributionMode distrMode; private boolean isColocate = false; //the flag for colocate join private String colocateReason = ""; // if can not do colocate join, set reason here @@ -85,9 +84,6 @@ public class HashJoinNode extends PlanNode { this.otherJoinConjuncts = otherJoinConjuncts; children.add(outer); children.add(inner); - if (this.joinOp.isInnerJoin() || this.joinOp.isLeftSemiJoin()) { - this.isPushDown = true; - } // Inherits all the nullable tuple from the children // Mark tuples that form the "nullable" side of the outer join as nullable. @@ -282,10 +278,6 @@ public class HashJoinNode extends PlanNode { } } - public void setIsPushDown(boolean isPushDown) { - this.isPushDown = isPushDown; - } - @Override protected void toThrift(TPlanNode msg) { msg.node_type = TPlanNodeType.HASH_JOIN_NODE; @@ -300,7 +292,6 @@ public class HashJoinNode extends PlanNode { for (Expr e : otherJoinConjuncts) { msg.hash_join_node.addToOtherJoinConjuncts(e.treeToThrift()); } - msg.hash_join_node.setIsPushDown(isPushDown); } @Override @@ -326,6 +317,12 @@ public class HashJoinNode extends PlanNode { if (!conjuncts.isEmpty()) { output.append(detailPrefix).append("other predicates: ").append(getExplainString(conjuncts)).append("\n"); } + if (!runtimeFilters.isEmpty()) { + output.append(detailPrefix).append("runtime filters: "); + output.append(getRuntimeFilterExplainString(true)); + } + output.append(detailPrefix).append(String.format( + "cardinality=%s", cardinality)).append("\n"); return output.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 62948c7..99e158e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -591,6 +591,10 @@ public class OlapScanNode extends ScanNode { output.append(prefix).append("PREDICATES: ").append( getExplainString(conjuncts)).append("\n"); } + if (!runtimeFilters.isEmpty()) { + output.append(prefix).append("runtime filters: "); + output.append(getRuntimeFilterExplainString(false)); + } output.append(prefix).append(String.format( "partitions=%s/%s", diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index f714132..e865cf7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -36,7 +36,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; /** @@ -127,6 +129,11 @@ public class PlanFragment extends TreeNode<PlanFragment> { // default value is 1 private int parallelExecNum = 1; + // The runtime filter id that produced + private Set<RuntimeFilterId> builderRuntimeFilterIds; + // The runtime filter id that is expected to be used + private Set<RuntimeFilterId> targetRuntimeFilterIds; + /** * C'tor for fragment with specific partition; the output is by default broadcast. */ @@ -136,6 +143,8 @@ public class PlanFragment extends TreeNode<PlanFragment> { this.dataPartition = partition; this.outputPartition = DataPartition.UNPARTITIONED; this.transferQueryStatisticsWithEveryBatch = false; + this.builderRuntimeFilterIds = new HashSet<>(); + this.targetRuntimeFilterIds = new HashSet<>(); setParallelExecNumIfExists(); setFragmentInPlanTree(planRoot); } @@ -177,6 +186,14 @@ public class PlanFragment extends TreeNode<PlanFragment> { this.outputExprs = Expr.cloneList(outputExprs, null); } + public void setBuilderRuntimeFilterIds(RuntimeFilterId rid) { + this.builderRuntimeFilterIds.add(rid); + } + + public void setTargetRuntimeFilterIds(RuntimeFilterId rid) { + this.targetRuntimeFilterIds.add(rid); + } + /** * Finalize plan tree and create stream sink, if needed. */ @@ -344,6 +361,19 @@ public class PlanFragment extends TreeNode<PlanFragment> { return fragmentId; } + public Set<RuntimeFilterId> getBuilderRuntimeFilterIds() { + return builderRuntimeFilterIds; + } + + public Set<RuntimeFilterId> getTargetRuntimeFilterIds() { + return targetRuntimeFilterIds; + } + + public void clearRuntimeFilters() { + builderRuntimeFilterIds.clear(); + targetRuntimeFilterIds.clear(); + } + public void setTransferQueryStatisticsWithEveryBatch(boolean value) { transferQueryStatisticsWithEveryBatch = value; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index dd7f636..1233f45 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -17,6 +17,7 @@ package org.apache.doris.planner; +import com.google.common.base.Joiner; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ExprSubstitutionMap; @@ -40,6 +41,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Set; @@ -117,6 +119,9 @@ abstract public class PlanNode extends TreeNode<PlanNode> { return planNodeName; } + // Runtime filters assigned to this node. + protected List<RuntimeFilter> runtimeFilters = new ArrayList<>(); + protected PlanNode(PlanNodeId id, ArrayList<TupleId> tupleIds, String planNodeName) { this.id = id; this.limit = -1; @@ -411,6 +416,10 @@ abstract public class PlanNode extends TreeNode<PlanNode> { for (Expr e : conjuncts) { msg.addToConjuncts(e.treeToThrift()); } + // Serialize any runtime filters + for (RuntimeFilter filter : runtimeFilters) { + msg.addToRuntimeFilters(filter.toThrift()); + } msg.compact_data = compactData; toThrift(msg); container.addToNodes(msg); @@ -659,4 +668,31 @@ abstract public class PlanNode extends TreeNode<PlanNode> { } return null; } + + protected void addRuntimeFilter(RuntimeFilter filter) { runtimeFilters.add(filter); } + + protected Collection<RuntimeFilter> getRuntimeFilters() { return runtimeFilters; } + + public void clearRuntimeFilters() { runtimeFilters.clear(); } + + protected String getRuntimeFilterExplainString(boolean isBuildNode) { + if (runtimeFilters.isEmpty()) return ""; + List<String> filtersStr = new ArrayList<>(); + for (RuntimeFilter filter: runtimeFilters) { + StringBuilder filterStr = new StringBuilder(); + filterStr.append(filter.getFilterId()); + filterStr.append("["); + filterStr.append(filter.getType().toString().toLowerCase()); + filterStr.append("]"); + if (isBuildNode) { + filterStr.append(" <- "); + filterStr.append(filter.getSrcExpr().toSql()); + } else { + filterStr.append(" -> "); + filterStr.append(filter.getTargetExpr(getId()).toSql()); + } + filtersStr.add(filterStr.toString()); + } + return Joiner.on(", ").join(filtersStr) + "\n"; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java index fa7a101..16c0a37 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java @@ -31,6 +31,7 @@ import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.UserException; import org.apache.doris.common.profile.PlanTreeBuilder; import org.apache.doris.common.profile.PlanTreePrinter; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TQueryOptions; @@ -38,6 +39,7 @@ import org.apache.doris.thrift.TQueryOptions; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.doris.thrift.TRuntimeFilterMode; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -201,7 +203,13 @@ public class Planner { QueryStatisticsTransferOptimizer queryStatisticTransferOptimizer = new QueryStatisticsTransferOptimizer(rootFragment); queryStatisticTransferOptimizer.optimizeQueryStatisticsTransfer(); - if (statement instanceof InsertStmt) { + // Create runtime filters. + if (!ConnectContext.get().getSessionVariable().getRuntimeFilterMode().toUpperCase() + .equals(TRuntimeFilterMode.OFF.name())) { + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot()); + } + + if (statement instanceof InsertStmt) { InsertStmt insertStmt = (InsertStmt) statement; rootFragment = distributedPlanner.createInsertFragment(rootFragment, insertStmt, fragments); rootFragment.setSink(insertStmt.getDataSink()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java new file mode 100644 index 0000000..a368208 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java @@ -0,0 +1,457 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.Predicate; +import org.apache.doris.analysis.SlotId; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.analysis.TupleIsNullPredicate; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.IdGenerator; +import org.apache.doris.thrift.TRuntimeFilterDesc; +import org.apache.doris.thrift.TRuntimeFilterType; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Representation of a runtime filter. A runtime filter is generated from + * an equi-join predicate of the form <lhs_expr> = <rhs_expr>, where lhs_expr is the + * expr on which the filter is applied and must be bound by a single tuple id from + * the left plan subtree of the associated join node, while rhs_expr is the expr on + * which the filter is built and can be bound by any number of tuple ids from the + * right plan subtree. Every runtime filter must record the join node that constructs + * the filter and the scan nodes that apply the filter (destination nodes). + */ +public final class RuntimeFilter { + private final static Logger LOG = LogManager.getLogger(RuntimeFilter.class); + + // Identifier of the filter (unique within a query) + private final RuntimeFilterId id; + // Join node that builds the filter + private final HashJoinNode builderNode; + // Expr (rhs of join predicate) on which the filter is built + private final Expr srcExpr; + // The position of expr in the join condition + private final int exprOrder; + // Expr (lhs of join predicate) from which the targetExprs_ are generated. + private final Expr origTargetExpr; + // Runtime filter targets + private final List<RuntimeFilterTarget> targets = new ArrayList<>(); + // Slots from base table tuples that have value transfer from the slots + // of 'origTargetExpr'. The slots are grouped by tuple id. + private final Map<TupleId, List<SlotId>> targetSlotsByTid; + // If true, the join node building this filter is executed using a broadcast join; + // set in the DistributedPlanner.createHashJoinFragment() + private boolean isBroadcastJoin; + // Estimate of the number of distinct values that will be inserted into this filter, + // globally across all instances of the source node. Used to compute an optimal size + // for the filter. A value of -1 means no estimate is available, and default filter + // parameters should be used. + private long ndvEstimate = -1; + // Size of the filter (in Bytes). Should be greater than zero for bloom filters. + private long filterSizeBytes = 0; + // If true, the filter is produced by a broadcast join and there is at least one + // destination scan node which is in the same fragment as the join; set in + // DistributedPlanner.createHashJoinFragment(). + private boolean hasLocalTargets = false; + // If true, there is at least one destination scan node which is not in the same + // fragment as the join that produced the filter; set in + // DistributedPlanner.createHashJoinFragment(). + private boolean hasRemoteTargets = false; + // If set, indicates that the filter can't be assigned to another scan node. + // Once set, it can't be unset. + private boolean finalized = false; + // The type of filter to build. + private TRuntimeFilterType runtimeFilterType; + + /** + * Internal representation of a runtime filter target. + */ + public static class RuntimeFilterTarget { + // Scan node that applies the filter + public ScanNode node; + // Expr on which the filter is applied + public Expr expr; + // Indicates if 'expr' is bound only by partition columns + public final boolean isBoundByKeyColumns; + // Indicates if 'node' is in the same fragment as the join that produces the filter + public final boolean isLocalTarget; + + public RuntimeFilterTarget(ScanNode targetNode, Expr targetExpr, + boolean isBoundByKeyColumns, boolean isLocalTarget) { + Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds())); + this.node = targetNode; + this.expr = targetExpr; + this.isBoundByKeyColumns = isBoundByKeyColumns; + this.isLocalTarget = isLocalTarget; + } + + @Override + public String toString() { + return "Target Id: " + node.getId() + " " + + "Target expr: " + expr.debugString() + " " + + "Is only Bound By Key: " + isBoundByKeyColumns + + "Is local: " + isLocalTarget; + } + } + + private RuntimeFilter(RuntimeFilterId filterId, HashJoinNode filterSrcNode, Expr srcExpr, int exprOrder, + Expr origTargetExpr, Map<TupleId, List<SlotId>> targetSlots, + TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) { + this.id = filterId; + this.builderNode = filterSrcNode; + this.srcExpr = srcExpr; + this.exprOrder = exprOrder; + this.origTargetExpr = origTargetExpr; + this.targetSlotsByTid = targetSlots; + this.runtimeFilterType = type; + computeNdvEstimate(); + calculateFilterSize(filterSizeLimits); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof RuntimeFilter)) return false; + return ((RuntimeFilter) obj).id.equals(id); + } + + @Override + public int hashCode() { return id.hashCode(); } + + public void markFinalized() { finalized = true; } + public boolean isFinalized() { return finalized; } + + /** + * Serializes a runtime filter to Thrift. + */ + public TRuntimeFilterDesc toThrift() { + TRuntimeFilterDesc tFilter = new TRuntimeFilterDesc(); + tFilter.setFilterId(id.asInt()); + tFilter.setSrcExpr(srcExpr.treeToThrift()); + tFilter.setExprOrder(exprOrder); + tFilter.setIsBroadcastJoin(isBroadcastJoin); + tFilter.setHasLocalTargets(hasLocalTargets); + tFilter.setHasRemoteTargets(hasRemoteTargets); + for (RuntimeFilterTarget target : targets) { + tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(), target.expr.treeToThrift()); + } + tFilter.setType(runtimeFilterType); + tFilter.setBloomFilterSizeBytes(filterSizeBytes); + return tFilter; + } + + public List<RuntimeFilterTarget> getTargets() { return targets; } + public boolean hasTargets() { return !targets.isEmpty(); } + public Expr getSrcExpr() { return srcExpr; } + public Expr getOrigTargetExpr() { return origTargetExpr; } + public Map<TupleId, List<SlotId>> getTargetSlots() { return targetSlotsByTid; } + public RuntimeFilterId getFilterId() { return id; } + public TRuntimeFilterType getType() { return runtimeFilterType; } + public void setType(TRuntimeFilterType type) { runtimeFilterType = type; } + public boolean hasRemoteTargets() { return hasRemoteTargets; } + public HashJoinNode getBuilderNode() { return builderNode; } + + /** + * Static function to create a RuntimeFilter from 'joinPredicate' that is assigned + * to the join node 'filterSrcNode'. Returns an instance of RuntimeFilter + * or null if a runtime filter cannot be generated from the specified predicate. + */ + public static RuntimeFilter create(IdGenerator<RuntimeFilterId> idGen, Analyzer analyzer, + Expr joinPredicate, int exprOrder, HashJoinNode filterSrcNode, + TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) { + Preconditions.checkNotNull(idGen); + Preconditions.checkNotNull(joinPredicate); + Preconditions.checkNotNull(filterSrcNode); + // Only consider binary equality predicates and not contain Null-safe equals. + // The predicate could not be pushed down when there is Null-safe equal operator. Because the runtimeFilter + // will filter the null value in child[0] while it is needed in the Null-safe equal join. + // For example: select * from a join b where a.id<=>b.id + // the null value in table a should be return by scan node instead of filtering it by runtimeFilter. + if (!Predicate.isUnNullSafeEquivalencePredicate(joinPredicate)) return null; + + BinaryPredicate normalizedJoinConjunct = + SingleNodePlanner.getNormalizedEqPred(joinPredicate, + filterSrcNode.getChild(0).getTupleIds(), + filterSrcNode.getChild(1).getTupleIds(), analyzer); + if (normalizedJoinConjunct == null) return null; + + // Ensure that the target expr does not contain TupleIsNull predicates as these + // can't be evaluated at a scan node. + Expr targetExpr = + TupleIsNullPredicate.unwrapExpr(normalizedJoinConjunct.getChild(0).clone()); + Expr srcExpr = normalizedJoinConjunct.getChild(1); + + if (srcExpr.getType().equals(ScalarType.createHllType()) + || srcExpr.getType().equals(ScalarType.createType(PrimitiveType.BITMAP))) return null; + + Map<TupleId, List<SlotId>> targetSlots = getTargetSlots(analyzer, targetExpr); + Preconditions.checkNotNull(targetSlots); + if (targetSlots.isEmpty()) return null; + + if (LOG.isTraceEnabled()) { + LOG.trace("Generating runtime filter from predicate " + joinPredicate); + } + return new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, exprOrder, + targetExpr, targetSlots, type, filterSizeLimits); + } + + /** + * Returns the ids of base table tuple slots on which a runtime filter expr can be + * applied. Due to the existence of equivalence classes, a filter expr may be + * applicable at multiple scan nodes. The returned slot ids are grouped by tuple id. + * Returns an empty collection if the filter expr cannot be applied at a base table + * or if applying the filter might lead to incorrect results. + * Returns the slot id of the base table expected to use this target expr. + */ + private static Map<TupleId, List<SlotId>> getTargetSlots(Analyzer analyzer, Expr expr) { + // 'expr' is not a SlotRef and may contain multiple SlotRefs + List<TupleId> tids = new ArrayList<>(); + List<SlotId> sids = new ArrayList<>(); + expr.getIds(tids, sids); + + /* + If the target expression evaluates to a non-NULL value for outer-join non-matches, then assigning the + filter below the nullable side of an outer join may produce incorrect query results. + This check is conservative but correct to keep the code simple. In particular, it would otherwise be + difficult to identify incorrect runtime filter assignments through outer-joined inline views because + the 'expr' has already been fully resolved. + TODO(zxy) We rely on the value-transfer graph to check whether 'expr' could potentially be assigned + below an outer-joined inline view. + + Queries with the following characteristics may produce wrong results due to an incorrectly assigned + runtime filter: + 1)The query has an outer join + 2)A scan on the nullable side of that outer join has a runtime filter with a NULL-checking + expression such as COALESCE/IFNULL/CASE + 3)The latter point imples that there is another join above the outer join with a NULL-checking + expression in it's join condition + + Reproduction: + TPC-DS 1T Benchmarks test + " + select count(*) from store t1 left outer join store t2 on t1.s_store_sk = t2.s_store_sk + where coalesce(t2.s_store_sk + 100, 100) in (select ifnull(100, s_store_sk) from store); + + select count(*) from store t1 left outer join store t2 on t1.s_store_sk = t2.s_store_sk + where case when t2.s_store_sk is NULL then 100 else t2.s_store_sk end + in (select ifnull(100, s_store_sk) from store limit 10); + " + We expect a count of 0. A count of 1024 is incorrect. + Query plan: + | 4:HASH JOIN + | | join op: LEFT SEMI JOIN (BROADCAST) + | | equal join conjunct: coalesce(`t2`.`s_store_sk` + 100, 100) = ifnull(100, `s_store_sk`) + | | runtime filters: RF000[in] <- ifnull(100, `s_store_sk`) + | | cardinality=1002 + | |----7:EXCHANGE + | 3:HASH JOIN + | | join op: LEFT OUTER JOIN + | | equal join conjunct: `t1`.`s_store_sk` = `t2`.`s_store_sk` + | |----1:OlapScanNode + | | TABLE: store + | | runtime filters: RF000[in] -> coalesce(`t2`.`s_store_sk` + 100, 100) + | 0:OlapScanNode + | TABLE: store + Explanation: + RF000 filters out all rows in scan 01. + In join 03 there are no join matches since the right-hand is empty. All rows from the right-hand + side are nulled. + The join condition in join 04 now satisfies all input rows because every "t2.id" is NULL, + so after the COALESCE() the join condition becomes 100 = 100. + */ + if (analyzer.hasOuterJoinedValueTransferTarget(sids)) { + // Do not push down when contains NULL-checking expression COALESCE/IFNULL/CASE + // TODO(zxy) Returns true if 'p' evaluates to true when all its referenced slots are NULL, returns false + // otherwise. Throws if backend expression evaluation fails. + if (expr.isContainsFunction("COALESCE") || expr.isContainsFunction("IFNULL") + || expr.isContainsClass("org.apache.doris.analysis.CaseExpr")) + return Collections.emptyMap(); + } + + Map<TupleId, List<SlotId>> slotsByTid = new HashMap<>(); + // We need to iterate over all the slots of 'expr' and check if they have + // equivalent slots that are bound by the same base table tuple(s). + for (SlotId slotId: sids) { + Map<TupleId, List<SlotId>> currSlotsByTid = getBaseTblEquivSlots(analyzer, slotId); + if (currSlotsByTid.isEmpty()) return Collections.emptyMap(); + if (slotsByTid.isEmpty()) { + slotsByTid.putAll(currSlotsByTid); + continue; + } + + // Compute the intersection between tuple ids from 'slotsByTid' and + // 'currSlotsByTid'. If the intersection is empty, an empty collection + // is returned. + Iterator<Map.Entry<TupleId, List<SlotId>>> iter = slotsByTid.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry<TupleId, List<SlotId>> entry = iter.next(); + List<SlotId> slotIds = currSlotsByTid.get(entry.getKey()); + // Take the intersection of the tuple ids of all slots in expr to + // form <tupleid, slotid> and return. + // A.a + B.b = C.c, when the tuple IDs of the two slots A.a and B.b are different, at this + // time cannot be pushed down, so remove. If you can get A.a and transferd to B.a, then + // the tuple IDs of A.a and B.b have intersection B, So target expr is available, the tuple + // ID of this intersection is the scan node that is expected to use this runtime fitler + if (slotIds == null) { + iter.remove(); + } else { + entry.getValue().addAll(slotIds); + } + } + if (slotsByTid.isEmpty()) return Collections.emptyMap(); + } + return slotsByTid; + } + + /** + * Static function that returns the ids of slots bound by base table tuples for which + * there is a value transfer from 'srcSid'. The slots are grouped by tuple id. + * That is, srcSid can be calculated from the <tuple id, slot id> of the base table. + */ + private static Map<TupleId, List<SlotId>> getBaseTblEquivSlots(Analyzer analyzer, + SlotId srcSid) { + Map<TupleId, List<SlotId>> slotsByTid = new HashMap<>(); + for (SlotId targetSid: analyzer.getValueTransferTargets(srcSid)) { + TupleDescriptor tupleDesc = analyzer.getSlotDesc(targetSid).getParent(); + if (tupleDesc.getTable() == null) continue; + List<SlotId> sids = slotsByTid.computeIfAbsent(tupleDesc.getId(), k -> new ArrayList<>()); + sids.add(targetSid); + } + return slotsByTid; + } + + public Expr getTargetExpr(PlanNodeId targetPlanNodeId) { + for (RuntimeFilterTarget target: targets) { + if (target.node.getId() != targetPlanNodeId) continue; + return target.expr; + } + return null; + } + + /** + * Estimates the selectivity of a runtime filter as the cardinality of the + * associated source join node over the cardinality of that join node's left + * child. + */ + public double getSelectivity() { + if (builderNode.getCardinality() == -1 + || builderNode.getChild(0).getCardinality() == -1 + || builderNode.getChild(0).getCardinality() == 0) { + return -1; + } + return builderNode.getCardinality() / (double) builderNode.getChild(0).getCardinality(); + } + + public void addTarget(RuntimeFilterTarget target) { targets.add(target); } + + public void setIsBroadcast(boolean isBroadcast) { isBroadcastJoin = isBroadcast; } + + public void computeNdvEstimate() { ndvEstimate = builderNode.getChild(1).getCardinality(); } + + public void extractTargetsPosition() { + Preconditions.checkNotNull(builderNode.getFragment()); + Preconditions.checkState(hasTargets()); + for (RuntimeFilterTarget target: targets) { + Preconditions.checkNotNull(target.node.getFragment()); + hasLocalTargets = hasLocalTargets || target.isLocalTarget; + hasRemoteTargets = hasRemoteTargets || !target.isLocalTarget; + } + } + + /** + * Sets the filter size (in bytes) required for a bloom filter to achieve the + * configured maximum false-positive rate based on the expected NDV. Also bounds the + * filter size between the max and minimum filter sizes supplied to it by + * 'filterSizeLimits'. + * Considering that the `IN` filter may be converted to the `Bloom FIlter` when crossing fragments, + * the bloom filter size is always calculated. + */ + private void calculateFilterSize(RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) { + if (ndvEstimate == -1) { + filterSizeBytes = filterSizeLimits.defaultVal; + return; + } + double fpp = FeConstants.default_bloom_filter_fpp; + int logFilterSize = GetMinLogSpaceForBloomFilter(ndvEstimate, fpp); + filterSizeBytes = 1L << logFilterSize; + filterSizeBytes = Math.max(filterSizeBytes, filterSizeLimits.minVal); + filterSizeBytes = Math.min(filterSizeBytes, filterSizeLimits.maxVal); + } + + /** + * Returns the log (base 2) of the minimum number of bytes we need for a Bloom + * filter with 'ndv' unique elements and a false positive probability of less + * than 'fpp'. + */ + public static int GetMinLogSpaceForBloomFilter(long ndv, double fpp) { + if (0 == ndv) return 0; + double k = 8; // BUCKET_WORDS + // m is the number of bits we would need to get the fpp specified + double m = -k * ndv / Math.log(1 - Math.pow(fpp, 1.0 / k)); + + // Handle case where ndv == 1 => ceil(log2(m/8)) < 0. + return Math.max(0, (int)(Math.ceil(Math.log(m / 8)/Math.log(2)))); + } + + /** + * Assigns this runtime filter to the corresponding plan nodes. + */ + public void assignToPlanNodes() { + Preconditions.checkState(hasTargets()); + builderNode.addRuntimeFilter(this); + for (RuntimeFilterTarget target: targets) { + target.node.addRuntimeFilter(this); + // fragment is expected to use this filter id + target.node.fragment_.setTargetRuntimeFilterIds(this.id); + } + } + + public void registerToPlan(Analyzer analyzer) { + setIsBroadcast(getBuilderNode().getDistributionMode() == HashJoinNode.DistributionMode.BROADCAST); + if (LOG.isTraceEnabled()) LOG.trace("Runtime filter: " + debugString()); + assignToPlanNodes(); + analyzer.putAssignedRuntimeFilter(this); + getBuilderNode().fragment_.setBuilderRuntimeFilterIds(getFilterId()); + } + + public String debugString() { + return "FilterID: " + id + " " + + "Source: " + builderNode.getId() + " " + + "SrcExpr: " + getSrcExpr().debugString() + " " + + "Target(s): " + + Joiner.on(", ").join(targets) + " " + + "Selectivity: " + getSelectivity(); + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java new file mode 100644 index 0000000..7c53de3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java @@ -0,0 +1,400 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ExprSubstitutionMap; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotId; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.IdGenerator; +import org.apache.doris.common.util.BitUtil; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.thrift.TRuntimeFilterMode; +import org.apache.doris.thrift.TRuntimeFilterType; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Class used for generating and assigning runtime filters to a query plan using + * runtime filter propagation. Runtime filter propagation is an optimization technique + * used to filter scanned tuples or scan ranges based on information collected at + * runtime. A runtime filter is constructed during the build phase of a join node, and is + * applied at, potentially, multiple scan nodes on the probe side of that join node. + * Runtime filters are generated from equal-join predicates but they do not replace the + * original predicates. + * + * MinMax filters are of a fixed size (except for those used for string type) and + * therefore only sizes for bloom filters need to be calculated. These calculations are + * based on the NDV estimates of the associated table columns, the min buffer size that + * can be allocated by the bufferpool, and the query options. Moreover, it is also bound + * by the MIN/MAX_BLOOM_FILTER_SIZE limits which are enforced on the query options before + * this phase of planning. + * + * Example: select * from T1, T2 where T1.a = T2.b and T2.c = '1'; + * Assuming that T1 is a fact table and T2 is a significantly smaller dimension table, a + * runtime filter is constructed at the join node between tables T1 and T2 while building + * the hash table on the values of T2.b (rhs of the join condition) from the tuples of T2 + * that satisfy predicate T2.c = '1'. The runtime filter is subsequently sent to the + * scan node of table T1 and is applied on the values of T1.a (lhs of the join condition) + * to prune tuples of T2 that cannot be part of the join result. + */ +public final class RuntimeFilterGenerator { + private final static Logger LOG = LogManager.getLogger(RuntimeFilterGenerator.class); + + // Map of base table tuple ids to a list of runtime filters that + // can be applied at the corresponding scan nodes. + private final Map<TupleId, List<RuntimeFilter>> runtimeFiltersByTid = new HashMap<>(); + + // Generator for filter ids + private final IdGenerator<RuntimeFilterId> filterIdGenerator = RuntimeFilterId.createGenerator(); + + /** + * Internal class that encapsulates the max, min and default sizes used for creating + * bloom filter objects. + */ + public static class FilterSizeLimits { + // Maximum filter size, in bytes, rounded up to a power of two. + public final long maxVal; + + // Minimum filter size, in bytes, rounded up to a power of two. + public final long minVal; + + // Pre-computed default filter size, in bytes, rounded up to a power of two. + public final long defaultVal; + + public FilterSizeLimits(SessionVariable sessionVariable) { + // Round up all limits to a power of two + long maxLimit = sessionVariable.getRuntimeBloomFilterMaxSize(); + maxVal = BitUtil.roundUpToPowerOf2(maxLimit); + + long minLimit = sessionVariable.getRuntimeBloomFilterMinSize(); + // Make sure minVal <= defaultVal <= maxVal + minVal = BitUtil.roundUpToPowerOf2(Math.min(minLimit, maxVal)); + + long defaultValue = sessionVariable.getRuntimeBloomFilterSize(); + defaultValue = Math.max(defaultValue, minVal); + defaultVal = BitUtil.roundUpToPowerOf2(Math.min(defaultValue, maxVal)); + } + } + + // Contains size limits for bloom filters. + private final FilterSizeLimits bloomFilterSizeLimits; + + private final Analyzer analyzer; + private final SessionVariable sessionVariable; + + private RuntimeFilterGenerator(Analyzer analyzer) { + this.analyzer = analyzer; + this.sessionVariable = ConnectContext.get().getSessionVariable(); + Preconditions.checkNotNull(this.sessionVariable); + bloomFilterSizeLimits = new FilterSizeLimits(sessionVariable); + } + + /** + * Generates and assigns runtime filters to a query plan tree. + */ + public static void generateRuntimeFilters(Analyzer analyzer, PlanNode plan) { + Preconditions.checkNotNull(analyzer); + int maxNumBloomFilters = ConnectContext.get().getSessionVariable().getRuntimeFiltersMaxNum(); + int runtimeFilterType = ConnectContext.get().getSessionVariable().getRuntimeFilterType(); + Preconditions.checkState(maxNumBloomFilters >= 0); + RuntimeFilterGenerator filterGenerator = new RuntimeFilterGenerator(analyzer); + Preconditions.checkState(runtimeFilterType >= 0, "runtimeFilterType not expected"); + Preconditions.checkState(runtimeFilterType + <= Arrays.stream(TRuntimeFilterType.values()).mapToInt(TRuntimeFilterType::getValue).sum() + , "runtimeFilterType not expected"); + filterGenerator.generateFilters(plan); + List<RuntimeFilter> filters = filterGenerator.getRuntimeFilters(); + if (filters.size() > maxNumBloomFilters) { + // If more than 'maxNumBloomFilters' were generated, sort them by increasing + // selectivity and keep the 'maxNumBloomFilters' most selective bloom filters. + filters.sort((a, b) -> { + double aSelectivity = + a.getSelectivity() == -1 ? Double.MAX_VALUE : a.getSelectivity(); + double bSelectivity = + b.getSelectivity() == -1 ? Double.MAX_VALUE : b.getSelectivity(); + return Double.compare(aSelectivity, bSelectivity); + }); + } + // We only enforce a limit on the number of bloom filters as they are much more + // heavy-weight than the other filter types. + int numBloomFilters = 0; + for (RuntimeFilter filter: filters) { + filter.extractTargetsPosition(); + // When there is a remote target, the producer and consumer of the filter are not in the same fragment at + // this time, and the filter build by the producer needs to be merged. Currently, the IN filter has + // no merge logic, so replace it with Bloom Filter. + // The reason for this is that in the IN pushdown implemented by early Doris, when OlapScanNode and + // HashJoinNode are not in the same fragment, the IN filter will be pushed down to the nearest + // ExchangeNode, so that although it cannot be pushed down to the storage engine to improve performance, + // In some extreme cases, the number of rows in the hash table constructed by HashJoinNode can be reduced, + // thereby avoiding OOM. To cover the previous case (from tpcds 1T query 17), replace IN with Bloom Filter. + // Only when no Bloom Filter is generated, will IN be converted to Bloom Filter and pushed down. + if (filter.getType() == TRuntimeFilterType.IN && filter.hasRemoteTargets()) { + if ((runtimeFilterType & TRuntimeFilterType.BLOOM.getValue()) == 0) { + filter.setType(TRuntimeFilterType.BLOOM); + } else { + continue; + } + } + if (filter.getType() == TRuntimeFilterType.BLOOM) { + if (numBloomFilters >= maxNumBloomFilters) continue; + ++numBloomFilters; + } + filter.registerToPlan(analyzer); + } + } + + /** + * Returns a list of all the registered runtime filters, ordered by filter ID. + */ + public List<RuntimeFilter> getRuntimeFilters() { + Set<RuntimeFilter> resultSet = new HashSet<>(); + for (List<RuntimeFilter> filters: runtimeFiltersByTid.values()) { + resultSet.addAll(filters); + } + List<RuntimeFilter> resultList = Lists.newArrayList(resultSet); + resultList.sort((a, b) -> a.getFilterId().compareTo(b.getFilterId())); + return resultList; + } + + /** + * Generates the runtime filters for a query by recursively traversing the distributed + * plan tree rooted at 'root'. In the top-down traversal of the plan tree, candidate + * runtime filters are generated from equi-join predicates assigned to hash-join nodes. + * In the bottom-up traversal of the plan tree, the filters are assigned to destination + * (scan) nodes. Filters that cannot be assigned to a scan node are discarded. + */ + private void generateFilters(PlanNode root) { + if (root instanceof HashJoinNode) { + HashJoinNode joinNode = (HashJoinNode) root; + List<Expr> joinConjuncts = new ArrayList<>(); + // It's not correct to push runtime filters to the left side of a left outer, + // full outer or anti join if the filter corresponds to an equi-join predicate + // from the ON clause. + if (!joinNode.getJoinOp().isLeftOuterJoin() + && !joinNode.getJoinOp().isFullOuterJoin() + && !joinNode.getJoinOp().isAntiJoin()) { + joinConjuncts.addAll(joinNode.getEqJoinConjuncts()); + } + + // TODO(zxy) supports PlanNode.conjuncts generate runtime filter. + // PlanNode.conjuncts (call joinNode.getConjuncts() here) Different from HashJoinNode.eqJoinConjuncts + // and HashJoinNode.otherJoinConjuncts. + // In previous tests, it was found that using PlanNode.conjuncts to generate runtimeFilter may cause + // incorrect results. For example, When the `in` subquery is converted to join, the join conjunct will be + // saved in PlanNode.conjuncts. At this time, using the automatically generated join conjunct to generate + // a runtimeFilter, some rows may be missing in the result. + // SQL: select * from T as a where k1 = (select count(1) from T as b where a.k1 = b.k1); + // Table T has only one INT column. At this time, `a.k1 = b.k1` is in eqJoinConjuncts, + // `k1` = ifnull(xxx) is in conjuncts, the runtimeFilter generated according to conjuncts will cause + // the result to be empty, but the actual result should have data returned. + + List<RuntimeFilter> filters = new ArrayList<>(); + // Actually all types of Runtime Filter objects generated by the same joinConjunct have the same + // properties except ID. Maybe consider avoiding repeated generation + for (TRuntimeFilterType type : TRuntimeFilterType.values()) { + if ((sessionVariable.getRuntimeFilterType() & type.getValue()) == 0) continue; + for (int i = 0; i < joinConjuncts.size(); i++) { + Expr conjunct = joinConjuncts.get(i); + RuntimeFilter filter = RuntimeFilter.create(filterIdGenerator, + analyzer, conjunct, i, joinNode, type, bloomFilterSizeLimits); + if (filter == null) continue; + registerRuntimeFilter(filter); + filters.add(filter); + } + } + generateFilters(root.getChild(0)); + // Finalize every runtime filter of that join. This is to ensure that we don't + // assign a filter to a scan node from the right subtree of joinNode or ancestor + // join nodes in case we don't find a destination node in the left subtree. + for (RuntimeFilter runtimeFilter: filters) finalizeRuntimeFilter(runtimeFilter); + generateFilters(root.getChild(1)); + } else if (root instanceof ScanNode) { + assignRuntimeFilters((ScanNode) root); + } else { + for (PlanNode childNode: root.getChildren()) { + generateFilters(childNode); + } + } + } + + /** + * Registers a runtime filter with the tuple id of every scan node that is a candidate + * destination node for that filter. + */ + private void registerRuntimeFilter(RuntimeFilter filter) { + Map<TupleId, List<SlotId>> targetSlotsByTid = filter.getTargetSlots(); + Preconditions.checkState(targetSlotsByTid != null && !targetSlotsByTid.isEmpty()); + for (TupleId tupleId: targetSlotsByTid.keySet()) { + registerRuntimeFilter(filter, tupleId); + } + } + + /** + * Registers a runtime filter with a specific target tuple id. + */ + private void registerRuntimeFilter(RuntimeFilter filter, TupleId targetTid) { + Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid)); + List<RuntimeFilter> filters = runtimeFiltersByTid.computeIfAbsent(targetTid, k -> new ArrayList<>()); + Preconditions.checkState(!filter.isFinalized()); + filters.add(filter); + } + + /** + * Finalizes a runtime filter by disassociating it from all the candidate target scan + * nodes that haven't been used as destinations for that filter. Also sets the + * finalized flag of that filter so that it can't be assigned to any other scan nodes. + */ + private void finalizeRuntimeFilter(RuntimeFilter runtimeFilter) { + Set<TupleId> targetTupleIds = new HashSet<>(); + for (RuntimeFilter.RuntimeFilterTarget target: runtimeFilter.getTargets()) { + targetTupleIds.addAll(target.node.getTupleIds()); + } + for (TupleId tupleId: runtimeFilter.getTargetSlots().keySet()) { + if (!targetTupleIds.contains(tupleId)) { + runtimeFiltersByTid.get(tupleId).remove(runtimeFilter); + } + } + runtimeFilter.markFinalized(); + } + + /** + * Assigns runtime filters to a specific scan node 'scanNode'. + * The assigned filters are the ones for which 'scanNode' can be used as a destination + * node. The following constraints are enforced when assigning filters to 'scanNode': + * 1. If the RUNTIME_FILTER_MODE query option is set to LOCAL, a filter is only assigned + * to 'scanNode' if the filter is produced within the same fragment that contains the + * scan node. + * 2. Only olap scan nodes are supported: + */ + private void assignRuntimeFilters(ScanNode scanNode) { + if (!(scanNode instanceof OlapScanNode)) return; + TupleId tid = scanNode.getTupleIds().get(0); + if (!runtimeFiltersByTid.containsKey(tid)) return; + String runtimeFilterMode = sessionVariable.getRuntimeFilterMode(); + Preconditions.checkState(Arrays.stream(TRuntimeFilterMode.values()).map(Enum::name).anyMatch( + p -> p.equals(runtimeFilterMode.toUpperCase())), "runtimeFilterMode not expected"); + for (RuntimeFilter filter: runtimeFiltersByTid.get(tid)) { + if (filter.isFinalized()) continue; + Expr targetExpr = computeTargetExpr(filter, tid); + if (targetExpr == null) continue; + boolean isBoundByKeyColumns = isBoundByKeyColumns(analyzer, targetExpr, scanNode); + boolean isLocalTarget = isLocalTarget(filter, scanNode); + if (runtimeFilterMode.equals(TRuntimeFilterMode.LOCAL.name()) && !isLocalTarget) continue; + if (runtimeFilterMode.equals(TRuntimeFilterMode.REMOTE.name()) && isLocalTarget) continue; + + RuntimeFilter.RuntimeFilterTarget target = new RuntimeFilter.RuntimeFilterTarget( + scanNode, targetExpr, isBoundByKeyColumns, isLocalTarget); + filter.addTarget(target); + } + } + + /** + * Check if 'targetNode' is local to the source node of 'filter'. + */ + private static boolean isLocalTarget(RuntimeFilter filter, ScanNode targetNode) { + return targetNode.getFragment().getId().equals(filter.getBuilderNode().getFragment().getId()); + } + + /** + * Check if all the slots of'targetExpr' is key. + */ + private static boolean isBoundByKeyColumns(Analyzer analyzer, Expr targetExpr, ScanNode targetNode) { + Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds())); + List<SlotId> sids = new ArrayList<>(); + targetExpr.getIds(null, sids); + for (SlotId sid : sids) { + // Take slotDesc from the desc of targetExpr the same + SlotDescriptor slotDesc = analyzer.getSlotDesc(sid); + if (slotDesc.getColumn() == null || !slotDesc.getColumn().isKey()) { + return false; + } + } + return true; + } + + /** + * Computes the target expr for a specified runtime filter 'filter' to be applied at + * the scan node with target tuple descriptor 'targetTid'. + */ + private Expr computeTargetExpr(RuntimeFilter filter, TupleId targetTid) { + Expr targetExpr = filter.getOrigTargetExpr(); + // if there is a subquery on the left side of join, in order to push to scan in the subquery, + // targetExpr will return false as long as there is a slotref parent node that is not targetTid. + // But when this slotref can be transferred to the targetTid slot, such as Aa + Bb = Cc, + // targetTid is B, if Aa can be transferred to Ba, that is, Aa and Ba are equivalent columns, + // then replace Aa with Ba, and then calculate for targetTid targetExpr + if (!targetExpr.isBound(targetTid)) { + Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid)); + // Modify the filter target expr using the equivalent slots from the scan node + // on which the filter will be applied. + ExprSubstitutionMap smap = new ExprSubstitutionMap(); + List<SlotRef> exprSlots = new ArrayList<>(); + // Get the ids of all slotRef children of targetExpr, which is equal to the deduplication of + // all slots of targetSlotsByTid. + targetExpr.collect(SlotRef.class, exprSlots); + // targetExpr specifies the id of the slotRef node in the `tupleID` + List<SlotId> sids = filter.getTargetSlots().get(targetTid); + for (SlotRef slotRef: exprSlots) { + for (SlotId sid: sids) { + if (analyzer.hasValueTransfer(slotRef.getSlotId(), sid)) { + SlotRef newSlotRef = new SlotRef(analyzer.getSlotDesc(sid)); + newSlotRef.analyzeNoThrow(analyzer); + smap.put(slotRef, newSlotRef); + break; + } + } + } + Preconditions.checkState(exprSlots.size() == smap.size()); + try { + targetExpr = targetExpr.substitute(smap, analyzer, false); + } catch (Exception e) { + return null; + } + } + Type srcType = filter.getSrcExpr().getType(); + // Types of targetExpr and srcExpr must be exactly the same since runtime filters are + // based on hashing. + if (!targetExpr.getType().equals(srcType)) { + try { + targetExpr = targetExpr.castTo(srcType); + } catch (Exception e) { + return null; + } + } + return targetExpr; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterId.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterId.java new file mode 100644 index 0000000..c4339b4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterId.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.common.Id; +import org.apache.doris.common.IdGenerator; + +public class RuntimeFilterId extends Id<RuntimeFilterId> { + // Construction only allowed via an IdGenerator. + protected RuntimeFilterId(int id) { + super(id); + } + + public static IdGenerator<RuntimeFilterId> createGenerator() { + return new IdGenerator<RuntimeFilterId>() { + @Override + public RuntimeFilterId getNextId() { + return new RuntimeFilterId(nextId_++); + } + + @Override + public RuntimeFilterId getMaxId() { + return new RuntimeFilterId(nextId_ - 1); + } + }; + } + + @Override + public String toString() { + return String.format("RF%03d", id); + } + + @Override + public int hashCode() { + return id; + } + + public int compareTo(RuntimeFilterId cmp) { + return Integer.compare(id, cmp.id); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 2a71437..e039881 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -55,6 +55,8 @@ abstract public class ScanNode extends PlanNode { return result; } + public TupleDescriptor getTupleDesc() { return desc; } + public void setColumnFilters(Map<String, PartitionColumnFilter> columnFilters) { this.columnFilters = columnFilters; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index ffd8cc3..9dcadd7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -2202,4 +2202,35 @@ public class SingleNodePlanner { } return analyzer.getUnassignedConjuncts(tupleIds); } + + /** + * Returns a normalized version of a binary equality predicate 'expr' where the lhs + * child expr is bound by some tuple in 'lhsTids' and the rhs child expr is bound by + * some tuple in 'rhsTids'. Returns 'expr' if this predicate is already normalized. + * Returns null in any of the following cases: + * 1. It is not an equality predicate + * 2. One of the operands is a constant + * 3. Both children of this predicate are the same expr + * The so-called normalization is to ensure that the above conditions are met, and then + * to ensure that the order of expr is consistent with the order of node + */ + public static BinaryPredicate getNormalizedEqPred(Expr expr, List<TupleId> lhsTids, + List<TupleId> rhsTids, Analyzer analyzer) { + if (!(expr instanceof BinaryPredicate)) return null; + BinaryPredicate pred = (BinaryPredicate) expr; + if (!pred.getOp().isEquivalence()) { + return null; + } + if (pred.getChild(0).isConstant() || pred.getChild(1).isConstant()) return null; + + // Use the child that contains lhsTids as lhsExpr, for example, A join B on B.k = A.k, + // where lhsExpr=A.k, rhsExpr=B.k, changed the order, A.k = B.k + Expr lhsExpr = Expr.getFirstBoundChild(pred, lhsTids); + Expr rhsExpr = Expr.getFirstBoundChild(pred, rhsTids); + if (lhsExpr == null || rhsExpr == null || lhsExpr == rhsExpr) return null; + + BinaryPredicate result = new BinaryPredicate(pred.getOp(), lhsExpr, rhsExpr); + result.analyzeNoThrow(analyzer); + return result; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 10ae31f..11375f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -48,6 +48,8 @@ import org.apache.doris.planner.PlanNode; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.Planner; import org.apache.doris.planner.ResultSink; +import org.apache.doris.planner.RuntimeFilter; +import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.SetOperationNode; import org.apache.doris.planner.UnionNode; @@ -72,6 +74,8 @@ import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TResourceInfo; +import org.apache.doris.thrift.TRuntimeFilterParams; +import org.apache.doris.thrift.TRuntimeFilterTargetParams; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; import org.apache.doris.thrift.TScanRangeParams; @@ -201,6 +205,17 @@ public class Coordinator { // parallel execute private final TUniqueId nextInstanceId; + // Runtime filter merge instance address and ID + public TNetworkAddress runtimeFilterMergeAddr; + public TUniqueId runtimeFilterMergeInstanceId; + // Runtime filter ID to the target instance address of the fragment, + // that is expected to use this runtime filter, the instance address is not repeated + public Map<RuntimeFilterId, List<FRuntimeFilterTargetParam>> ridToTargetParam = Maps.newHashMap(); + // The runtime filter that expects the instance to be used + public List<RuntimeFilter> assignedRuntimeFilters = new ArrayList<>(); + // Runtime filter ID to the builder instance number + public Map<RuntimeFilterId, Integer> ridToBuilderNum = Maps.newHashMap(); + // Used for query/insert public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { this.isBlockQuery = planner.isBlockQuery(); @@ -224,6 +239,7 @@ public class Coordinator { this.nextInstanceId = new TUniqueId(); nextInstanceId.setHi(queryId.hi); nextInstanceId.setLo(queryId.lo + 1); + this.assignedRuntimeFilters = analyzer.getAssignedRuntimeFilter(); } // Used for broker load task/export task coordinator @@ -804,6 +820,9 @@ public class Coordinator { } } + // assign runtime filter merge addr and target addr + assignRuntimeFilterAddr(); + // compute destinations and # senders per exchange node // (the root fragment doesn't have a destination) for (FragmentExecParams params : fragmentExecParamsMap.values()) { @@ -1117,6 +1136,31 @@ public class Coordinator { } } + // Traverse the expected runtimeFilterID in each fragment, and establish the corresponding relationship + // between runtimeFilterID and fragment instance addr and select the merge instance of runtimeFilter + private void assignRuntimeFilterAddr() throws Exception { + for (PlanFragment fragment: fragments) { + FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId()); + // Transform <fragment, runtimeFilterId> to <runtimeFilterId, fragment> + for (RuntimeFilterId rid: fragment.getTargetRuntimeFilterIds()) { + List<FRuntimeFilterTargetParam> targetFragments = + ridToTargetParam.computeIfAbsent(rid, k -> new ArrayList<>()); + for (final FInstanceExecParam instance : params.instanceExecParams) { + targetFragments.add(new FRuntimeFilterTargetParam(instance.instanceId, toBrpcHost(instance.host))); + } + } + + for (RuntimeFilterId rid: fragment.getBuilderRuntimeFilterIds()) { + ridToBuilderNum.merge(rid, params.instanceExecParams.size(), Integer::sum); + } + } + // Use the uppermost fragment as a merged node, the uppermost fragment has one and only one instance + FragmentExecParams uppermostParams = fragmentExecParamsMap.get(fragments.get(0).getFragmentId()); + runtimeFilterMergeAddr = toBrpcHost(uppermostParams.instanceExecParams.get(0).host); + runtimeFilterMergeInstanceId = uppermostParams.instanceExecParams.get(0).instanceId; + } + + // One fragment could only have one HashJoinNode private boolean isColocateJoin(PlanNode node) { // TODO(cmy): some internal process, such as broker load task, do not have ConnectContext. // Any configurations needed by the Coordinator should be passed in Coordinator initialization. @@ -1957,6 +2001,24 @@ public class Coordinator { params.setQueryOptions(queryOptions); params.params.setSendQueryStatisticsWithEveryBatch( fragment.isTransferQueryStatisticsWithEveryBatch()); + params.params.setRuntimeFilterParams(new TRuntimeFilterParams()); + params.params.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr); + if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) { + for (Map.Entry<RuntimeFilterId, List<FRuntimeFilterTargetParam>> entry: ridToTargetParam.entrySet()) { + List<TRuntimeFilterTargetParams> targetParams = Lists.newArrayList(); + for (FRuntimeFilterTargetParam targetParam: entry.getValue()) { + targetParams.add(new TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId, + targetParam.targetFragmentInstanceAddr)); + } + params.params.runtime_filter_params.putToRidToTargetParam(entry.getKey().asInt(), targetParams); + } + for (Map.Entry<RuntimeFilterId, Integer> entry: ridToBuilderNum.entrySet()) { + params.params.runtime_filter_params.putToRuntimeFilterBuilderNum(entry.getKey().asInt(), entry.getValue()); + } + for (RuntimeFilter rf: assignedRuntimeFilters) { + params.params.runtime_filter_params.putToRidToRuntimeFilter(rf.getFilterId().asInt(), rf.toThrift()); + } + } if (queryOptions.getQueryType() == TQueryType.LOAD) { LoadErrorHub.Param param = Catalog.getCurrentCatalog().getLoadInstance().getLoadErrorHubInfo(); if (param != null) { @@ -1966,7 +2028,6 @@ public class Coordinator { } } } - paramsList.add(params); } return paramsList; @@ -2089,6 +2150,17 @@ public class Coordinator { fragmentProfile.get(backendExecState.profileFragmentId).addChild(backendExecState.profile); } } + + // Runtime filter target fragment instance param + static class FRuntimeFilterTargetParam { + public TUniqueId targetFragmentInstanceId;; + public TNetworkAddress targetFragmentInstanceAddr; + + public FRuntimeFilterTargetParam(TUniqueId id, TNetworkAddress host) { + this.targetFragmentInstanceId = id; + this.targetFragmentInstanceAddr = host; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java new file mode 100644 index 0000000..8806263 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/RuntimeFilterTypeHelper.java @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.thrift.TRuntimeFilterType; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Maps; + +import org.apache.commons.lang.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Used for encoding and decoding of session variable runtime_filter_type + */ +public class RuntimeFilterTypeHelper { + private static final Logger LOG = LogManager.getLogger(RuntimeFilterTypeHelper.class); + + public final static long ALLOWED_MASK = (TRuntimeFilterType.IN.getValue() | + TRuntimeFilterType.BLOOM.getValue() | TRuntimeFilterType.MIN_MAX.getValue()); + + private final static Map<String, Long> varValueSet = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + + static { + varValueSet.put("IN", (long) TRuntimeFilterType.IN.getValue()); + varValueSet.put("BLOOM_FILTER", (long) TRuntimeFilterType.BLOOM.getValue()); + varValueSet.put("MIN_MAX", (long) TRuntimeFilterType.MIN_MAX.getValue()); + } + + // convert long type variable value to string type that user can read + public static String decode(Long varValue) throws DdlException { + // 0 parse to empty string + if (varValue == 0) { + return ""; + } + if ((varValue & ~ALLOWED_MASK) != 0) { + ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, SessionVariable.RUNTIME_FILTER_TYPE, varValue); + } + + List<String> names = new ArrayList<String>(); + for (Map.Entry<String, Long> value : getSupportedVarValue().entrySet()) { + if ((varValue & value.getValue()) != 0) { + names.add(value.getKey()); + } + } + + return Joiner.on(',').join(names); + } + + // convert string type variable value to long type that session can store + public static Long encode(String varValue) throws DdlException { + List<String> names = Splitter.on(',').trimResults().omitEmptyStrings().splitToList(varValue); + + // empty string parse to 0 + long resultCode = 0; + for (String key : names) { + long code = 0; + if (StringUtils.isNumeric(key)) { + code |= Long.parseLong(key); + } else { + code = getCodeFromString(key); + if (code == 0) { + ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, SessionVariable.RUNTIME_FILTER_TYPE, key); + } + } + resultCode |= code; + if ((resultCode & ~ALLOWED_MASK) != 0) { + ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, SessionVariable.RUNTIME_FILTER_TYPE, key); + } + } + return resultCode; + } + + // check if this variable value is supported + public static boolean isSupportedVarValue(String varValue) { + return varValue != null && getSupportedVarValue().containsKey(varValue); + } + + // encode variable value from string to long + private static long getCodeFromString(String varValue) { + long code = 0; + if (isSupportedVarValue(varValue)) { + code |= getSupportedVarValue().get(varValue); + } + return code; + } + + public static Map<String, Long> getSupportedVarValue() { + return varValueSet; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 3146ae3..0dcae49 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -115,6 +115,24 @@ public class SessionVariable implements Serializable, Writable { // when true, the partition column must be set to NOT NULL. public static final String ALLOW_PARTITION_COLUMN_NULLABLE = "allow_partition_column_nullable"; + // runtime filter run mode + public static final String RUNTIME_FILTER_MODE = "runtime_filter_mode"; + // Size in bytes of Bloom Filters used for runtime filters. Actual size of filter will + // be rounded up to the nearest power of two. + public static final String RUNTIME_BLOOM_FILTER_SIZE = "runtime_bloom_filter_size"; + // Minimum runtime bloom filter size, in bytes + public static final String RUNTIME_BLOOM_FILTER_MIN_SIZE = "runtime_bloom_filter_min_size"; + // Maximum runtime bloom filter size, in bytes + public static final String RUNTIME_BLOOM_FILTER_MAX_SIZE = "runtime_bloom_filter_max_size"; + // Time in ms to wait until runtime filters are delivered. + public static final String RUNTIME_FILTER_WAIT_TIME_MS = "runtime_filter_wait_time_ms"; + // Maximum number of bloom runtime filters allowed per query + public static final String RUNTIME_FILTERS_MAX_NUM = "runtime_filters_max_num"; + // Runtime filter type used, For testing, Corresponds to TRuntimeFilterType + public static final String RUNTIME_FILTER_TYPE = "runtime_filter_type"; + // if the right table is greater than this value in the hash join, we will ignore IN filter + public static final String RUNTIME_FILTER_MAX_IN_NUM = "runtime_filter_max_in_num"; + // max ms to wait transaction publish finish when exec insert stmt. public static final String INSERT_VISIBLE_TIMEOUT_MS = "insert_visible_timeout_ms"; @@ -310,6 +328,23 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = EXTRACT_WIDE_RANGE_EXPR, needForward = true) public boolean extractWideRangeExpr = true; + @VariableMgr.VarAttr(name = RUNTIME_FILTER_MODE) + private String runtimeFilterMode = "GLOBAL"; + @VariableMgr.VarAttr(name = RUNTIME_BLOOM_FILTER_SIZE) + private int runtimeBloomFilterSize = 2097152; + @VariableMgr.VarAttr(name = RUNTIME_BLOOM_FILTER_MIN_SIZE) + private int runtimeBloomFilterMinSize = 1048576; + @VariableMgr.VarAttr(name = RUNTIME_BLOOM_FILTER_MAX_SIZE) + private int runtimeBloomFilterMaxSize = 16777216; + @VariableMgr.VarAttr(name = RUNTIME_FILTER_WAIT_TIME_MS) + private int runtimeFilterWaitTimeMs = 1000; + @VariableMgr.VarAttr(name = RUNTIME_FILTERS_MAX_NUM) + private int runtimeFiltersMaxNum = 10; + // Set runtimeFilterType to IN filter + @VariableMgr.VarAttr(name = RUNTIME_FILTER_TYPE) + private int runtimeFilterType = 1; + @VariableMgr.VarAttr(name = RUNTIME_FILTER_MAX_IN_NUM) + private int runtimeFilterMaxInNum = 1024; public long getMaxExecMemByte() { return maxExecMemByte; @@ -585,6 +620,70 @@ public class SessionVariable implements Serializable, Writable { return allowPartitionColumnNullable; } + public String getRuntimeFilterMode() { + return runtimeFilterMode; + } + + public void setRuntimeFilterMode(String runtimeFilterMode) { + this.runtimeFilterMode = runtimeFilterMode; + } + + public int getRuntimeBloomFilterSize() { + return runtimeBloomFilterSize; + } + + public void setRuntimeBloomFilterSize(int runtimeBloomFilterSize) { + this.runtimeBloomFilterSize = runtimeBloomFilterSize; + } + + public int getRuntimeBloomFilterMinSize() { + return runtimeBloomFilterMinSize; + } + + public void setRuntimeBloomFilterMinSize(int runtimeBloomFilterMinSize) { + this.runtimeBloomFilterMinSize = runtimeBloomFilterMinSize; + } + + public int getRuntimeBloomFilterMaxSize() { + return runtimeBloomFilterMaxSize; + } + + public void setRuntimeBloomFilterMaxSize(int runtimeBloomFilterMaxSize) { + this.runtimeBloomFilterMaxSize = runtimeBloomFilterMaxSize; + } + + public int getRuntimeFilterWaitTimeMs() { + return runtimeFilterWaitTimeMs; + } + + public void setRuntimeFilterWaitTimeMs(int runtimeFilterWaitTimeMs) { + this.runtimeFilterWaitTimeMs = runtimeFilterWaitTimeMs; + } + + public int getRuntimeFiltersMaxNum() { + return runtimeFiltersMaxNum; + } + + public void setRuntimeFiltersMaxNum(int runtimeFiltersMaxNum) { + this.runtimeFiltersMaxNum = runtimeFiltersMaxNum; + } + + public int getRuntimeFilterType() { + return runtimeFilterType; + } + + public void setRuntimeFilterType(int runtimeFilterType) { + this.runtimeFilterType = runtimeFilterType; + } + + public int getRuntimeFilterMaxInNum() { + return runtimeFilterMaxInNum; + } + + public void setRuntimeFilterMaxInNum(int runtimeFilterMaxInNum) { + this.runtimeFilterMaxInNum = runtimeFilterMaxInNum; + } + public long getInsertVisibleTimeoutMs() { if (insertVisibleTimeoutMs < MIN_INSERT_VISIBLE_TIMEOUT_MS) { return MIN_INSERT_VISIBLE_TIMEOUT_MS; @@ -658,6 +757,9 @@ public class SessionVariable implements Serializable, Writable { tResult.setEnableSpilling(enableSpilling); tResult.setEnableEnableExchangeNodeParallelMerge(enableExchangeNodeParallelMerge); + + tResult.setRuntimeFilterWaitTimeMs(runtimeFilterWaitTimeMs); + tResult.setRuntimeFilterMaxInNum(runtimeFilterMaxInNum); return tResult; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java index e48811a..b4c85e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java @@ -156,42 +156,44 @@ public class VariableMgr { // Set value to a variable private static boolean setValue(Object obj, Field field, String value) throws DdlException { VarAttr attr = field.getAnnotation(VarAttr.class); - String convertedVal = VariableVarConverters.convert(attr.name(), value); + if (VariableVarConverters.hasConverter(attr.name())) { + value = VariableVarConverters.encode(attr.name(), value).toString(); + } try { switch (field.getType().getSimpleName()) { case "boolean": - if (convertedVal.equalsIgnoreCase("ON") - || convertedVal.equalsIgnoreCase("TRUE") - || convertedVal.equalsIgnoreCase("1")) { + if (value.equalsIgnoreCase("ON") + || value.equalsIgnoreCase("TRUE") + || value.equalsIgnoreCase("1")) { field.setBoolean(obj, true); - } else if (convertedVal.equalsIgnoreCase("OFF") - || convertedVal.equalsIgnoreCase("FALSE") - || convertedVal.equalsIgnoreCase("0")) { + } else if (value.equalsIgnoreCase("OFF") + || value.equalsIgnoreCase("FALSE") + || value.equalsIgnoreCase("0")) { field.setBoolean(obj, false); } else { throw new IllegalAccessException(); } break; case "byte": - field.setByte(obj, Byte.valueOf(convertedVal)); + field.setByte(obj, Byte.valueOf(value)); break; case "short": - field.setShort(obj, Short.valueOf(convertedVal)); + field.setShort(obj, Short.valueOf(value)); break; case "int": - field.setInt(obj, Integer.valueOf(convertedVal)); + field.setInt(obj, Integer.valueOf(value)); break; case "long": - field.setLong(obj, Long.valueOf(convertedVal)); + field.setLong(obj, Long.valueOf(value)); break; case "float": - field.setFloat(obj, Float.valueOf(convertedVal)); + field.setFloat(obj, Float.valueOf(value)); break; case "double": - field.setDouble(obj, Double.valueOf(convertedVal)); + field.setDouble(obj, Double.valueOf(value)); break; case "String": - field.set(obj, convertedVal); + field.set(obj, value); break; default: // Unsupported type variable. @@ -496,12 +498,12 @@ public class VariableMgr { row.add(getValue(ctx.getObj(), ctx.getField())); } - if (row.size() > 1 && row.get(0).equalsIgnoreCase(SessionVariable.SQL_MODE)) { + if (row.size() > 1 && VariableVarConverters.hasConverter(row.get(0))) { try { - row.set(1, SqlModeHelper.decode(Long.valueOf(row.get(1)))); + row.set(1, VariableVarConverters.decode(row.get(0), Long.valueOf(row.get(1)))); } catch (DdlException e) { row.set(1, ""); - LOG.warn("Decode sql mode failed"); + LOG.warn("Decode session variable failed"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java index 26d7dae..fcc06d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverterI.java @@ -20,5 +20,7 @@ package org.apache.doris.qe; import org.apache.doris.common.DdlException; public interface VariableVarConverterI { - public String convert(String value) throws DdlException; + public Long encode(String value) throws DdlException; + + public String decode(Long value) throws DdlException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverters.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverters.java index 39b6155..8cecda1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverters.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarConverters.java @@ -22,23 +22,46 @@ import org.apache.doris.common.DdlException; import java.util.Map; -// Helper class to drives the convert of session variables according to the converters. -// You can define your converter that implements interface VariableVarConverterI in here. -// Each converter should put in map (variable name -> converters) and only converts the variable -// with specified name. +/** + * Helper class to drives the convert of session variables according to the converters. + * You can define your converter that implements interface VariableVarConverterI in here. + * Each converter should put in map (variable name -> converters) and only converts the variable with specified name. + * + * The converted session variable is a special kind of variable. + * It's real type is int, so for example, when querying `select @@sql_mode`, the return column + * type is "int". + * But user usually set this variable by string, such as: + * `set @@sql_mode = 'STRICT_TRANS_TABLES'` + * or + * `set @@sql_mode = concat(@@sql_mode, 'STRICT_TRANS_TABLES')'` + */ public class VariableVarConverters { public static final Map<String, VariableVarConverterI> converters = Maps.newHashMap(); + static { SqlModeConverter sqlModeConverter = new SqlModeConverter(); converters.put(SessionVariable.SQL_MODE, sqlModeConverter); + RuntimeFilterTypeConverter runtimeFilterTypeConverter = new RuntimeFilterTypeConverter(); + converters.put(SessionVariable.RUNTIME_FILTER_TYPE, runtimeFilterTypeConverter); + } + + public static Boolean hasConverter(String varName) { + return converters.containsKey(varName); } - public static String convert(String varName, String value) throws DdlException { + public static Long encode(String varName, String value) throws DdlException { if (converters.containsKey(varName)) { - return converters.get(varName).convert(value); + return converters.get(varName).encode(value); } - return value; + return 0L; + } + + public static String decode(String varName, Long value) throws DdlException { + if (converters.containsKey(varName)) { + return converters.get(varName).decode(value); + } + return ""; } /* Converters */ @@ -46,8 +69,26 @@ public class VariableVarConverters { // Converter to convert sql mode variable public static class SqlModeConverter implements VariableVarConverterI { @Override - public String convert(String value) throws DdlException { - return SqlModeHelper.encode(value).toString(); + public Long encode(String value) throws DdlException { + return SqlModeHelper.encode(value); + } + + @Override + public String decode(Long value) throws DdlException { + return SqlModeHelper.decode(value); + } + } + + // Converter to convert runtime filter type variable + public static class RuntimeFilterTypeConverter implements VariableVarConverterI { + @Override + public Long encode(String value) throws DdlException { + return RuntimeFilterTypeHelper.encode(value); + } + + @Override + public String decode(Long value) throws DdlException { + return RuntimeFilterTypeHelper.decode(value); } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index 4aac012..5aac2db 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -41,6 +41,7 @@ import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.load.EtlJobType; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.thrift.TRuntimeFilterMode; import org.apache.doris.utframe.UtFrameUtils; import com.google.common.collect.Lists; @@ -1317,6 +1318,84 @@ public class QueryPlanTest { } @Test + public void testRuntimeFilterMode() throws Exception { + connectContext.setDatabase("default_cluster:test"); + + String queryStr = "explain select * from jointest t2, jointest t1 where t1.k1 = t2.k1"; + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "LOCAL"); + String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("runtime filter")); + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "REMOTE"); + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertFalse(explainString.contains("runtime filter")); + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "OFF"); + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertFalse(explainString.contains("runtime filter")); + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "GLOBAL"); + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("runtime filter")); + + queryStr = "explain select * from jointest t2, jointest t1 where t1.k1 <=> t2.k1"; + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "LOCAL"); + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 7); + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertFalse(explainString.contains("runtime filter")); + + queryStr = "explain select * from jointest as a where k1 = (select count(1) from jointest as b where a.k1 = b.k1);"; + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "GLOBAL"); + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 7); + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + System.out.println(explainString); + Assert.assertFalse(explainString.contains("runtime filter")); + } + + @Test + public void testRuntimeFilterType() throws Exception { + connectContext.setDatabase("default_cluster:test"); + String queryStr = "explain select * from jointest t2, jointest t1 where t1.k1 = t2.k1"; + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "GLOBAL"); + + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 0); + String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertFalse(explainString.contains("runtime filter")); + + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 1); + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("runtime filters: RF000[in] <- `t1`.`k1`")); + Assert.assertTrue(explainString.contains("runtime filters: RF000[in] -> `t2`.`k1`")); + + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 2); + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("runtime filters: RF000[bloom] <- `t1`.`k1`")); + Assert.assertTrue(explainString.contains("runtime filters: RF000[bloom] -> `t2`.`k1`")); + + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 3); + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("runtime filters: RF000[in] <- `t1`.`k1`, RF001[bloom] <- `t1`.`k1`")); + Assert.assertTrue(explainString.contains("runtime filters: RF000[in] -> `t2`.`k1`, RF001[bloom] -> `t2`.`k1`")); + + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 4); + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("runtime filters: RF000[min_max] <- `t1`.`k1`")); + Assert.assertTrue(explainString.contains("runtime filters: RF000[min_max] -> `t2`.`k1`")); + + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 5); + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("runtime filters: RF000[in] <- `t1`.`k1`, RF001[min_max] <- `t1`.`k1`")); + Assert.assertTrue(explainString.contains("runtime filters: RF000[in] -> `t2`.`k1`, RF001[min_max] -> `t2`.`k1`")); + + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 6); + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("runtime filters: RF000[bloom] <- `t1`.`k1`, RF001[min_max] <- `t1`.`k1`")); + Assert.assertTrue(explainString.contains("runtime filters: RF000[bloom] -> `t2`.`k1`, RF001[min_max] -> `t2`.`k1`")); + + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 7); + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("runtime filters: RF000[in] <- `t1`.`k1`, RF001[bloom] <- `t1`.`k1`, RF002[min_max] <- `t1`.`k1`")); + Assert.assertTrue(explainString.contains("runtime filters: RF000[in] -> `t2`.`k1`, RF001[bloom] -> `t2`.`k1`, RF002[min_max] -> `t2`.`k1`")); + } + + @Test public void testEmptyNode() throws Exception { connectContext.setDatabase("default_cluster:test"); String emptyNode = "EMPTYSET"; diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java new file mode 100644 index 0000000..09fe44a --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/RuntimeFilterGeneratorTest.java @@ -0,0 +1,426 @@ +package org.apache.doris.planner; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BaseTableRef; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.JoinOperator; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotId; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TableName; +import org.apache.doris.analysis.TableRef; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TPartitionType; +import org.apache.doris.thrift.TRuntimeFilterMode; + +import com.google.common.collect.Lists; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; + +import mockit.Expectations; +import mockit.Mocked; + +public class RuntimeFilterGeneratorTest { + private Analyzer analyzer; + private PlanFragment testPlanFragment; + private HashJoinNode hashJoinNode; + private OlapScanNode lhsScanNode; + private OlapScanNode rhsScanNode; + @Mocked + private ConnectContext connectContext; + + @Before + public void setUp() throws AnalysisException { + Catalog catalog = Deencapsulation.newInstance(Catalog.class); + analyzer = new Analyzer(catalog, connectContext); + new Expectations() { + { + analyzer.getClusterName(); + result = "default_cluster"; + } + }; + TableRef tableRef = new TableRef(); + Deencapsulation.setField(tableRef, "isAnalyzed", true); + Deencapsulation.setField(tableRef, "joinOp", JoinOperator.INNER_JOIN); + + TupleDescriptor lhsTupleDescriptor = new TupleDescriptor(new TupleId(0)); + lhsScanNode = new OlapScanNode(new PlanNodeId(0), lhsTupleDescriptor, "LEFT SCAN"); + TableName lhsTableName = new TableName("default_cluster:test_db", "test_lhs_tbl"); + SlotRef lhsExpr = new SlotRef(lhsTableName, "test_lhs_col"); + SlotDescriptor lhsSlotDescriptor = new SlotDescriptor(new SlotId(0), lhsTupleDescriptor); + Column k1 = new Column("test_lhs_col", PrimitiveType.BIGINT); + k1.setIsKey(true); + k1.setIsAllowNull(false); + lhsSlotDescriptor.setColumn(k1); + lhsExpr.setDesc(lhsSlotDescriptor); + Table lhsTable = new Table(0, "test_lhs_tbl", Table.TableType.OLAP, Lists.newArrayList(k1)); + BaseTableRef lhsTableRef = new BaseTableRef(tableRef, lhsTable, lhsTableName); + lhsTableRef.analyze(analyzer); + + TupleDescriptor rhsTupleDescriptor = new TupleDescriptor(new TupleId(1)); + rhsScanNode = new OlapScanNode(new PlanNodeId(1), rhsTupleDescriptor, "RIGHT SCAN"); + TableName rhsTableName = new TableName("default_cluster:test_db", "test_rhs_tbl"); + SlotRef rhsExpr = new SlotRef(rhsTableName, "test_rhs_col"); + SlotDescriptor rhsSlotDescriptor = new SlotDescriptor(new SlotId(1), rhsTupleDescriptor); + Column k2 = new Column("test_rhs_col", PrimitiveType.INT); + k2.setIsKey(true); + k2.setIsAllowNull(false); + rhsSlotDescriptor.setColumn(k2); + rhsExpr.setDesc(rhsSlotDescriptor); + Table rhsTable = new Table(0, "test_rhs_tbl", Table.TableType.OLAP, Lists.newArrayList(k2)); + BaseTableRef rhsTableRef = new BaseTableRef(tableRef, rhsTable, rhsTableName); + rhsTableRef.analyze(analyzer); + + ArrayList<Expr> testJoinExprs = new ArrayList<>(); + BinaryPredicate eqJoinConjunct = new BinaryPredicate(BinaryPredicate.Operator.EQ, lhsExpr, rhsExpr); + testJoinExprs.add(eqJoinConjunct); + + hashJoinNode = new HashJoinNode(new PlanNodeId(2), lhsScanNode, rhsScanNode, tableRef, testJoinExprs + , new ArrayList<>()); + testPlanFragment = new PlanFragment(new PlanFragmentId(0), hashJoinNode + , new DataPartition(TPartitionType.UNPARTITIONED)); + hashJoinNode.setFragment(testPlanFragment); + lhsScanNode.setFragment(testPlanFragment); + rhsScanNode.setFragment(testPlanFragment); + + new Expectations() { + { + analyzer.getSlotDesc(new SlotId(0)); + result = lhsSlotDescriptor; + analyzer.getSlotDesc(new SlotId(1)); + result = rhsSlotDescriptor; + + ConnectContext.get().getSessionVariable().getRuntimeFiltersMaxNum(); + result = 8; + ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMaxSize(); + result = 16777216; + ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMinSize(); + result = 1048576; + ConnectContext.get().getSessionVariable().getRuntimeBloomFilterSize(); + result = 2097152; + } + }; + } + + private void clearRuntimeFilterState() { + testPlanFragment.clearRuntimeFilters(); + analyzer.clearAssignedRuntimeFilters(); + hashJoinNode.clearRuntimeFilters(); + lhsScanNode.clearRuntimeFilters(); + } + + @Test + public void testGenerateRuntimeFiltersMode() { + clearRuntimeFilterState(); + new Expectations() { + { + ConnectContext.get().getSessionVariable().getRuntimeFilterMode(); + result = "GLOBAL"; + ConnectContext.get().getSessionVariable().getRuntimeFilterType(); + result = 7; + } + }; + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode); + Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3); + Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3); + Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3); + Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3); + Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3); + Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true) + , "RF000[in] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" + + ", RF001[bloom] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" + + ", RF002[min_max] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n"); + Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false) + , "RF000[in] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" + + ", RF001[bloom] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" + + ", RF002[min_max] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n"); + + clearRuntimeFilterState(); + new Expectations() { + { + ConnectContext.get().getSessionVariable().getRuntimeFilterMode(); + result = "LOCAL"; + } + }; + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode); + Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 3); + Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 3); + Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 3); + Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 3); + Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 3); + Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true) + , "RF000[in] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" + + ", RF001[bloom] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" + + ", RF002[min_max] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n"); + Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false) + , "RF000[in] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" + + ", RF001[bloom] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" + + ", RF002[min_max] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n"); + + clearRuntimeFilterState(); + new Expectations() { + { + ConnectContext.get().getSessionVariable().getRuntimeFilterMode(); + result = "REMOTE"; + } + }; + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode); + Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 0); + Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 0); + Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 0); + Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 0); + Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 0); + Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true), ""); + Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false), ""); + } + + @Test(expected = IllegalStateException.class) + public void testGenerateRuntimeFiltersModeException() { + clearRuntimeFilterState(); + new Expectations() { + { + ConnectContext.get().getSessionVariable().getRuntimeFilterType(); + result = 8; + } + }; + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode); + } + + @Test + public void testGenerateRuntimeFiltersType() { + clearRuntimeFilterState(); + new Expectations() { + { + ConnectContext.get().getSessionVariable().getRuntimeFilterType(); + result = 0; + ConnectContext.get().getSessionVariable().getRuntimeFilterMode(); + result = "GLOBAL"; + } + }; + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode); + Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true), ""); + Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false), ""); + Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 0); + Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 0); + Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 0); + Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 0); + Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 0); + + clearRuntimeFilterState(); + new Expectations() { + { + ConnectContext.get().getSessionVariable().getRuntimeFilterType(); + result = 1; + } + }; + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode); + Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true) + , "RF000[in] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n"); + Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false) + , "RF000[in] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n"); + Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 1); + Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 1); + Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 1); + Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 1); + Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 1); + + clearRuntimeFilterState(); + new Expectations() { + { + ConnectContext.get().getSessionVariable().getRuntimeFilterType(); + result = 2; + } + }; + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode); + Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true) + , "RF000[bloom] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n"); + Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false) + , "RF000[bloom] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n"); + Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 1); + Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 1); + Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 1); + Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 1); + Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 1); + + clearRuntimeFilterState(); + new Expectations() { + { + ConnectContext.get().getSessionVariable().getRuntimeFilterType(); + result = 3; + } + }; + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode); + Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true) + , "RF000[in] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" + + ", RF001[bloom] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n"); + Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false) + , "RF000[in] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" + + ", RF001[bloom] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n"); + Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 2); + Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 2); + Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 2); + Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 2); + Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 2); + + clearRuntimeFilterState(); + new Expectations() { + { + ConnectContext.get().getSessionVariable().getRuntimeFilterType(); + result = 4; + } + }; + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode); + Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true) + , "RF000[min_max] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n"); + Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false) + , "RF000[min_max] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n"); + Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 1); + Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 1); + Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 1); + Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 1); + Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 1); + + clearRuntimeFilterState(); + new Expectations() { + { + ConnectContext.get().getSessionVariable().getRuntimeFilterType(); + result = 5; + } + }; + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode); + Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true) + , "RF000[in] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" + + ", RF001[min_max] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n"); + Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false) + , "RF000[in] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" + + ", RF001[min_max] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n"); + Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 2); + Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 2); + Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 2); + Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 2); + Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 2); + + clearRuntimeFilterState(); + new Expectations() { + { + ConnectContext.get().getSessionVariable().getRuntimeFilterType(); + result = 6; + } + }; + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode); + Assert.assertEquals(hashJoinNode.getRuntimeFilterExplainString(true) + , "RF000[bloom] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`" + + ", RF001[min_max] <- `default_cluster:test_db`.`test_rhs_tbl`.`test_rhs_col`\n"); + Assert.assertEquals(lhsScanNode.getRuntimeFilterExplainString(false) + , "RF000[bloom] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`" + + ", RF001[min_max] -> `default_cluster:test_db`.`test_lhs_tbl`.`test_lhs_col`\n"); + Assert.assertEquals(testPlanFragment.getTargetRuntimeFilterIds().size(), 2); + Assert.assertEquals(testPlanFragment.getBuilderRuntimeFilterIds().size(), 2); + Assert.assertEquals(analyzer.getAssignedRuntimeFilter().size(), 2); + Assert.assertEquals(hashJoinNode.getRuntimeFilters().size(), 2); + Assert.assertEquals(lhsScanNode.getRuntimeFilters().size(), 2); + } + + @Test(expected = IllegalStateException.class) + public void testGenerateRuntimeFiltersTypeExceptionLess() { + clearRuntimeFilterState(); + new Expectations() { + { + ConnectContext.get().getSessionVariable().getRuntimeFilterType(); + result = -1; + } + }; + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode); + } + + @Test(expected = IllegalStateException.class) + public void testGenerateRuntimeFiltersTypeExceptionMore() { + clearRuntimeFilterState(); + new Expectations() { + { + ConnectContext.get().getSessionVariable().getRuntimeFilterType(); + result = 8; + } + }; + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode); + } + + @Test + public void testGenerateRuntimeFiltersSize() { + new Expectations() { + { + ConnectContext.get().getSessionVariable().getRuntimeFilterMode(); + result = "GLOBAL"; + ConnectContext.get().getSessionVariable().getRuntimeFilterType(); + result = 2; + } + }; + + clearRuntimeFilterState(); + new Expectations() { + { + ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMaxSize(); + result = 16777216; + ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMinSize(); + result = 1048576; + ConnectContext.get().getSessionVariable().getRuntimeBloomFilterSize(); + result = 2097152; + } + }; + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode); + Assert.assertEquals(analyzer.getAssignedRuntimeFilter().get(0).toThrift().getBloomFilterSizeBytes(), 2097152); + + clearRuntimeFilterState(); + new Expectations() { + { + ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMaxSize(); + result = 16777216; + ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMinSize(); + result = 1048576; + ConnectContext.get().getSessionVariable().getRuntimeBloomFilterSize(); + result = 1; + } + }; + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode); + Assert.assertEquals(analyzer.getAssignedRuntimeFilter().get(0).toThrift().getBloomFilterSizeBytes(), 1048576); + + clearRuntimeFilterState(); + new Expectations() { + { + ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMaxSize(); + result = 16777216; + ConnectContext.get().getSessionVariable().getRuntimeBloomFilterMinSize(); + result = 1048576; + ConnectContext.get().getSessionVariable().getRuntimeBloomFilterSize(); + result = 999999999; + } + }; + RuntimeFilterGenerator.generateRuntimeFilters(analyzer, hashJoinNode); + Assert.assertEquals(analyzer.getAssignedRuntimeFilter().get(0).toThrift().getBloomFilterSizeBytes(), 16777216); + + // Use ndv and fpp to calculate the minimum space required for bloom filter + Assert.assertEquals(1L << + RuntimeFilter.GetMinLogSpaceForBloomFilter(1000000, 0.05), 1048576); + Assert.assertEquals(1L << + RuntimeFilter.GetMinLogSpaceForBloomFilter(1000000, 0.1), 1048576); + Assert.assertEquals(1L << + RuntimeFilter.GetMinLogSpaceForBloomFilter(1000000, 0.3), 524288); + Assert.assertEquals(1L << + RuntimeFilter.GetMinLogSpaceForBloomFilter(10000000, 0.1), 8388608); + Assert.assertEquals(1L << + RuntimeFilter.GetMinLogSpaceForBloomFilter(1000, 0.1), 1024); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java new file mode 100644 index 0000000..d0ce41c --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/RuntimeFilterTypeHelperTest.java @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.common.DdlException; + +import org.junit.Assert; +import org.junit.Test; + +public class RuntimeFilterTypeHelperTest { + + @Test + public void testNormal() throws DdlException { + String runtimeFilterType = ""; + Assert.assertEquals(new Long(0L), RuntimeFilterTypeHelper.encode(runtimeFilterType)); + + runtimeFilterType = "IN"; + Assert.assertEquals(new Long(1L), RuntimeFilterTypeHelper.encode(runtimeFilterType)); + + runtimeFilterType = "BLOOM_FILTER"; + Assert.assertEquals(new Long(2L), RuntimeFilterTypeHelper.encode(runtimeFilterType)); + + runtimeFilterType = "IN,BLOOM_FILTER"; + Assert.assertEquals(new Long(3L), RuntimeFilterTypeHelper.encode(runtimeFilterType)); + + runtimeFilterType = "MIN_MAX"; + Assert.assertEquals(new Long(4L), RuntimeFilterTypeHelper.encode(runtimeFilterType)); + + runtimeFilterType = "IN,MIN_MAX"; + Assert.assertEquals(new Long(5L), RuntimeFilterTypeHelper.encode(runtimeFilterType)); + + runtimeFilterType = "MIN_MAX, BLOOM_FILTER"; + Assert.assertEquals(new Long(6L), RuntimeFilterTypeHelper.encode(runtimeFilterType)); + + runtimeFilterType = "IN,BLOOM_FILTER,MIN_MAX"; + Assert.assertEquals(new Long(7L), RuntimeFilterTypeHelper.encode(runtimeFilterType)); + + long runtimeFilterTypeValue = 0L; + Assert.assertEquals("", RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue)); + + runtimeFilterTypeValue = 1L; + Assert.assertEquals("IN", RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue)); + + runtimeFilterTypeValue = 3L; + Assert.assertEquals("BLOOM_FILTER,IN", RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue)); // Orderly + + runtimeFilterTypeValue = 7L; + Assert.assertEquals("BLOOM_FILTER,IN,MIN_MAX", RuntimeFilterTypeHelper.decode(runtimeFilterTypeValue)); // Orderly + } + + @Test(expected = DdlException.class) + public void testInvalidSqlMode() throws DdlException { + RuntimeFilterTypeHelper.encode("BLOOM,IN"); + Assert.fail("No exception throws"); + } + + @Test(expected = DdlException.class) + public void testInvalidDecode() throws DdlException { + RuntimeFilterTypeHelper.decode(10L); + Assert.fail("No exception throws"); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java index a053c38..5cba8c2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java @@ -167,6 +167,12 @@ public class VariableMgrTest { setVar7.analyze(null); VariableMgr.setVar(var, setVar7); Assert.assertEquals("-08:00", VariableMgr.newSessionVariable().getTimeZone()); + + SetVar setVar8 = new SetVar(SetType.SESSION, "runtime_filter_type", new StringLiteral( + RuntimeFilterTypeHelper.encode("BLOOM_FILTER").toString())); + setVar8.analyze(null); + VariableMgr.setVar(var, setVar8); + Assert.assertEquals(2L, var.getRuntimeFilterType()); } @Test(expected = UserException.class) diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 888af72..b2f0c89 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -141,29 +141,11 @@ struct TQueryOptions { // whether enable parallel merge in exchange node 32: optional bool enable_enable_exchange_node_parallel_merge = false; - // runtime filter run mode - 33: optional string runtime_filter_mode = "GLOBAL"; - - // Size in bytes of Bloom Filters used for runtime filters. Actual size of filter will - // be rounded up to the nearest power of two. - 34: optional i32 runtime_bloom_filter_size = 1048576 - - // Minimum runtime bloom filter size, in bytes - 35: optional i32 runtime_bloom_filter_min_size = 1048576 - - // Maximum runtime bloom filter size, in bytes - 36: optional i32 runtime_bloom_filter_max_size = 16777216 - // Time in ms to wait until runtime filters are delivered. - 37: optional i32 runtime_filter_wait_time_ms = 1000 - - // Maximum number of bloom runtime filters allowed per query - 38: optional i32 runtime_filters_max_num = 10 - - // Runtime filter type used, For testing, Corresponds to TRuntimeFilterType - 39: optional i32 runtime_filter_type = 1; + 33: optional i32 runtime_filter_wait_time_ms = 1000 - 40: optional i32 runtime_filter_max_in_num = 1024; + // if the right table is greater than this value in the hash join, we will ignore IN filter + 34: optional i32 runtime_filter_max_in_num = 1024; } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index d3b2cd0..8d968dc 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -381,11 +381,10 @@ struct THashJoinNode { // anything from the ON or USING clauses (but *not* the WHERE clause) that's not an // equi-join predicate 3: optional list<Exprs.TExpr> other_join_conjuncts - 4: optional bool is_push_down // If true, this join node can (but may choose not to) generate slot filters // after constructing the build side that can be applied to the probe side. - 5: optional bool add_probe_filters + 4: optional bool add_probe_filters } struct TMergeJoinNode { @@ -726,7 +725,6 @@ struct TRuntimeFilterDesc { 9: optional i64 bloom_filter_size_bytes } - // This is essentially a union of all messages corresponding to subclasses // of PlanNode. struct TPlanNode { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org