924060929 commented on code in PR #50895:
URL: https://github.com/apache/doris/pull/50895#discussion_r2220962329
##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java:
##########
@@ -123,35 +125,46 @@ public static MTMVCache from(String defSql,
// No need cost for performance
planner.planWithLock(unboundMvPlan, PhysicalProperties.ANY,
ExplainLevel.REWRITTEN_PLAN);
}
- originPlan = planner.getCascadesContext().getRewritePlan();
- // Eliminate result sink because sink operator is useless in query
rewrite by materialized view
- // and the top sort can also be removed
- mvPlan = originPlan.rewriteDownShortCircuit(p -> {
- if (p instanceof LogicalResultSink) {
- return p.child(0);
- }
- return p;
- });
- // Optimize by rules to remove top sort
- CascadesContext parentCascadesContext =
CascadesContext.initContext(mvSqlStatementContext, mvPlan,
- PhysicalProperties.ANY);
- mvPlan =
MaterializedViewUtils.rewriteByRules(parentCascadesContext, childContext -> {
- Rewriter.getCteChildrenRewriter(childContext,
-
ImmutableList.of(Rewriter.custom(RuleType.ELIMINATE_SORT,
EliminateSort::new))).execute();
- return childContext.getRewritePlan();
- }, mvPlan, originPlan);
- // Construct structInfo once for use later
- structInfoOptional =
MaterializationContext.constructStructInfo(mvPlan, originPlan,
- planner.getCascadesContext(),
- new BitSet());
+ CascadesContext cascadesContext = planner.getCascadesContext();
+ Pair<Plan, StructInfo> finalPlanStructInfoPair =
constructPlanAndStructInfo(
+ cascadesContext.getRewritePlan(), cascadesContext);
+ List<Pair<Plan, StructInfo>> tmpPlanUsedForRewrite = new
ArrayList<>();
+ for (Plan plan :
cascadesContext.getStatementContext().getTmpPlanForMvRewrite()) {
+ tmpPlanUsedForRewrite.add(constructPlanAndStructInfo(plan,
cascadesContext));
+ }
+ return new MTMVCache(finalPlanStructInfoPair,
cascadesContext.getRewritePlan(), needCost
+ ? cascadesContext.getMemo().getRoot().getStatistics() :
null, tmpPlanUsedForRewrite);
} finally {
+
createCacheContext.getStatementContext().setForceRecordTmpPlan(false);
+ mvSqlStatementContext.setForceRecordTmpPlan(false);
createCacheContext.getSessionVariable().enableMaterializedViewRewrite =
originalRewriteFlag;
if (currentContext != null) {
currentContext.setThreadLocalInfo();
}
}
- return new MTMVCache(mvPlan, originPlan, planner.getAnalyzedPlan(),
needCost
- ?
planner.getCascadesContext().getMemo().getRoot().getStatistics() : null,
- structInfoOptional.orElse(null));
+ }
+
+ // Eliminate result sink because sink operator is useless in query rewrite
by materialized view
+ // and the top sort can also be removed
+ private static Pair<Plan, StructInfo> constructPlanAndStructInfo(Plan
plan, CascadesContext cascadesContext) {
+ Plan mvPlan = plan.accept(new DefaultPlanRewriter<Object>() {
+ @Override
+ public Plan visitLogicalResultSink(LogicalResultSink<? extends
Plan> logicalResultSink,
+ Object context) {
+ return new LogicalProject(logicalResultSink.getOutput(),
+ false, logicalResultSink.children());
+ }
+ }, null);
+ // Optimize by rules to remove top sort
+ mvPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
childContext -> {
+ Rewriter.getCteChildrenRewriter(childContext, ImmutableList.of(
+ Rewriter.custom(RuleType.ELIMINATE_SORT,
EliminateSort::new),
+ Rewriter.bottomUp(new MergeProjects()))).execute();
Review Comment:
best to use MergeProjectable, instead of MergeProjects
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java:
##########
@@ -195,7 +197,10 @@ public enum TableFrom {
// if query is: select * from t2 join t5
// mtmvRelatedTables is mv1, mv2, mv3, t1, t2, t3, t4, t5
private final Map<List<String>, TableIf> mtmvRelatedTables =
Maps.newHashMap();
+ // collected async mvs
private final Set<MTMV> candidateMTMVs = Sets.newHashSet();
+ // collected synv mvs
Review Comment:
`synv` is typo?
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java:
##########
@@ -207,6 +215,30 @@ public Pair<Integer, Integer> countGroupJoin(Group group) {
return Pair.of(continuousJoinCount, Math.max(continuousJoinCount,
maxJoinCount));
}
+ private static Pair<Integer, Integer> countLogicalJoin(Plan plan) {
+ List<Pair<Integer, Integer>> children = new ArrayList<>();
+ for (Plan child : plan.children()) {
+ children.add(countLogicalJoin(child));
+ }
+ if (plan instanceof LogicalProject) {
+ return children.get(0);
+ }
+ int maxJoinCount = 0;
+ int continuousJoinCount = 0;
+ for (Pair<Integer, Integer> child : children) {
+ maxJoinCount = Math.max(maxJoinCount, child.second);
+ }
+ if (plan instanceof LogicalJoin) {
+ for (Pair<Integer, Integer> child : children) {
+ continuousJoinCount += child.first;
+ }
+ continuousJoinCount += 1;
+ } else if (plan instanceof LogicalProject) {
+ return children.get(0);
+ }
Review Comment:
never reach this branch? because already exist at line 223
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Optimizer.java:
##########
@@ -47,37 +48,45 @@ public void execute() {
// init memo
cascadesContext.toMemo();
// stats derive
- cascadesContext.pushJob(new
DeriveStatsJob(cascadesContext.getMemo().getRoot().getLogicalExpression(),
- cascadesContext.getCurrentJobContext()));
+
cascadesContext.getMemo().getRoot().getLogicalExpressions().forEach(groupExpression
->
+ cascadesContext.pushJob(new DeriveStatsJob(groupExpression,
cascadesContext.getCurrentJobContext())));
cascadesContext.getJobScheduler().executeJobPool(cascadesContext);
+ if (cascadesContext.getStatementContext().isDpHyp() ||
isDpHyp(cascadesContext)) {
Review Comment:
can we only use `isDpHyp(cascadesContext)` and remove
`getStatementContext().isDpHyp()`?
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java:
##########
@@ -251,6 +257,22 @@ public enum TableFrom {
private boolean prepareStage = false;
+ // this record the tmp plan in RBO for later pre materialized view rewrite
+ private final List<Plan> tmpPlanForMvRewrite = new ArrayList<>();
+ // this record the rewritten plan by mv in RBO phase
+ private final List<Plan> rewrittenPlansByMv = new ArrayList<>();
+ private boolean forceRecordTmpPlan = false;
+ // this record the rule in
PreMaterializedViewRewriter.NEED_PRE_REWRITE_RULE_TYPES if is applied
successfully
+ // or not, if success and in PreRewriteStrategy.FOR_IN_ROB or
PreRewriteStrategy.TRY_IN_ROB, mv
+ // would be written in RBO phase
+ private final BitSet needPreMvRewriteRuleMasks = new
BitSet(RuleType.SENTINEL.ordinal());
+ // if needed to rewrite in RBO phase, this would be set true
+ private boolean needPreRewrite = false;
+ // mark is rewritten in RBO phase, if rewritten in RBO phase should set
true
+ private boolean preRewritten = false;
Review Comment:
rename to xxxPreMvRewritxxx
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PreMaterializedViewRewriter.java:
##########
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.exploration.mv;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.jobs.executor.Optimizer;
+import org.apache.doris.nereids.memo.Group;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+import org.apache.commons.lang3.EnumUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Individual materialized view rewriter based CBO
+ */
+public class PreMaterializedViewRewriter {
+ public static BitSet NEED_PRE_REWRITE_RULE_TYPES = new BitSet();
+ private static final Logger LOG =
LogManager.getLogger(PreMaterializedViewRewriter.class);
+
+ static {
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.PUSH_DOWN_TOP_N_THROUGH_JOIN.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.PUSH_DOWN_TOP_N_THROUGH_PROJECT_JOIN.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.PUSH_DOWN_TOP_N_DISTINCT_THROUGH_JOIN.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.PUSH_DOWN_TOP_N_DISTINCT_THROUGH_PROJECT_JOIN.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.PUSH_DOWN_TOP_N_THROUGH_PROJECT_WINDOW.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.PUSH_DOWN_TOP_N_THROUGH_WINDOW.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.PUSH_DOWN_TOP_N_THROUGH_UNION.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.PUSH_DOWN_TOP_N_DISTINCT_THROUGH_UNION.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.PUSH_DOWN_LIMIT_DISTINCT_THROUGH_JOIN.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.PUSH_DOWN_LIMIT_DISTINCT_THROUGH_PROJECT_JOIN.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.PUSH_DOWN_LIMIT_DISTINCT_THROUGH_UNION.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.PUSH_LIMIT_THROUGH_JOIN.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.PUSH_LIMIT_THROUGH_PROJECT_JOIN.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.PUSH_LIMIT_THROUGH_PROJECT_WINDOW.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.PUSH_LIMIT_THROUGH_UNION.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.PUSH_LIMIT_THROUGH_WINDOW.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.LIMIT_SORT_TO_TOP_N.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.LIMIT_AGG_TO_TOPN_AGG.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.ELIMINATE_CONST_JOIN_CONDITION.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.MERGE_PERCENTILE_TO_ARRAY.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.SUM_LITERAL_REWRITE.ordinal());
+
NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.SPLIT_MULTI_DISTINCT.ordinal());
+ }
+
+ /**
+ * Materialize view pre rewrite
+ */
+ public static Plan rewrite(CascadesContext cascadesContext) {
+ if (cascadesContext.getMaterializationContexts().isEmpty()
+ || !cascadesContext.getStatementContext().isNeedPreRewrite()) {
+ return null;
+ }
+ // Do optimize
+ new Optimizer(cascadesContext).execute();
+ // Chose the best physical plan
+ Group root = cascadesContext.getMemo().getRoot();
+ PhysicalPlan physicalPlan = NereidsPlanner.chooseBestPlan(root,
+
cascadesContext.getCurrentJobContext().getRequiredProperties(),
cascadesContext);
+ Pair<Map<List<String>, MaterializationContext>, BitSet>
chosenMaterializationAndUsedTable
+ =
MaterializedViewUtils.getChosenMaterializationAndUsedTable(physicalPlan,
+ cascadesContext.getAllMaterializationContexts());
+ // Calc the table id set which is used by physical plan
+ cascadesContext.getMemo().incrementAndGetRefreshVersion();
+ // Extract logical plan by table id set by the corresponding best
physical plan
+ StructInfo structInfo =
root.getStructInfoMap().getStructInfo(cascadesContext,
+ chosenMaterializationAndUsedTable.value(), root, null, true);
+ if (structInfo == null) {
+ LOG.error("preMaterializedViewRewriter rewrite structInfo is null,
query id is {}",
+ cascadesContext.getConnectContext().getQueryIdentifier());
+ }
+ if (structInfo != null &&
!chosenMaterializationAndUsedTable.key().isEmpty()) {
+ return structInfo.getOriginalPlan();
+ }
+ return null;
+ }
+
+ public static BitSet getNeedPreRewriteRule() {
+ return NEED_PRE_REWRITE_RULE_TYPES;
+ }
+
+ /**
+ * Calc need to record tmp plan for rewrite or not, this would be
calculated in RBO phase
+ * if needed should return true, or would return false
+ */
+ public static boolean needRecordTmpPlanForRewrite(CascadesContext
cascadesContext) {
+ StatementContext statementContext =
cascadesContext.getStatementContext();
+ PreRewriteStrategy preRewriteStrategy = PreRewriteStrategy.getEnum(
+
cascadesContext.getConnectContext().getSessionVariable().getPreMaterializedViewRewriteStrategy());
+ if (statementContext.isForceRecordTmpPlan()) {
+ return true;
+ }
+ if (PreRewriteStrategy.NOT_IN_RBO.equals(preRewriteStrategy)) {
+ return false;
+ }
+ if
(!MaterializedViewUtils.containMaterializedViewHook(statementContext)) {
+ // current statement context doesn't have hook, doesn't use pre
RBO materialized view rewrite
+ return false;
+ }
+ return !statementContext.getCandidateMVs().isEmpty() ||
!statementContext.getCandidateMTMVs().isEmpty();
+ }
+
+ /**
+ * Calc need pre mv rewrite or not, this would be calculated after RBO
phase
+ */
+ public static boolean needPreRewrite(CascadesContext cascadesContext) {
+ StatementContext statementContext =
cascadesContext.getStatementContext();
+ if (!needRecordTmpPlanForRewrite(cascadesContext)) {
+ LOG.debug("needPreRewrite found not need record tmp plan, query id
is {}",
+ cascadesContext.getConnectContext().getQueryIdentifier());
+ return false;
Review Comment:
add `if (LOG.isDebugEnabled())` to the following logs
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java:
##########
@@ -292,8 +292,8 @@ private LogicalPlan
preAggForRandomDistribution(LogicalOlapScan olapScan) {
if (function == null) {
return olapScan;
}
- Alias alias = new Alias(exprId, ImmutableList.of(function),
col.getName(),
- olapScan.qualified(), true);
+ Alias alias = new Alias(StatementScopeIdGenerator.newExprId(),
Review Comment:
use `exprIdGenerator` to generate exprId
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java:
##########
@@ -286,11 +282,18 @@ protected List<Plan> doRewrite(StructInfo
queryStructInfo, CascadesContext casca
// Rewrite query by view
rewrittenPlan = rewriteQueryByView(matchMode, queryStructInfo,
viewStructInfo, viewToQuerySlotMapping,
rewrittenPlan, materializationContext, cascadesContext);
+ // This is needed whenever by pre tbo mv rewrite or final cbo
rewrite, because the following optimize
Review Comment:
what is tbo rewrite
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java:
##########
@@ -866,10 +898,70 @@ public void setPartialLoadDictionary(boolean
partialLoadDictionary) {
this.partialLoadDictionary = partialLoadDictionary;
}
+ public List<Plan> getTmpPlanForMvRewrite() {
+ return tmpPlanForMvRewrite;
+ }
+
+ public void addTmpPlanForMvRewrite(Plan tmpPlan) {
+ this.tmpPlanForMvRewrite.add(tmpPlan);
+ }
+
+ public List<Plan> getRewrittenPlansByMv() {
+ return rewrittenPlansByMv;
+ }
+
+ public void addRewrittenPlanByMv(Plan rewrittenPlanByMv) {
+ this.rewrittenPlansByMv.add(rewrittenPlanByMv);
+ }
+
+ public boolean isForceRecordTmpPlan() {
+ return forceRecordTmpPlan;
+ }
+
+ public void setForceRecordTmpPlan(boolean forceRecordTmpPlan) {
+ this.forceRecordTmpPlan = forceRecordTmpPlan;
+ }
+
+ public void ruleSetApplied(RuleType ruleType) {
+ needPreMvRewriteRuleMasks.set(ruleType.ordinal());
+ }
+
+ public BitSet getNeedPreMvRewriteRuleMasks() {
+ return needPreMvRewriteRuleMasks;
+ }
+
+ public boolean isNeedPreRewrite() {
Review Comment:
rename to isNeedRewriteMv
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewAggregateRule.java:
##########
@@ -649,11 +648,9 @@ protected Pair<Set<? extends Expression>, Set<? extends
Expression>> topPlanSpli
Set<Expression> topFunctionExpressions = new HashSet<>();
queryTopPlan.getOutput().forEach(expression -> {
ExpressionLineageReplacer.ExpressionReplaceContext replaceContext =
- new
ExpressionLineageReplacer.ExpressionReplaceContext(ImmutableList.of(expression),
- ImmutableSet.of(), ImmutableSet.of(),
queryStructInfo.getTableBitSet());
+ new
ExpressionLineageReplacer.ExpressionReplaceContext(ImmutableList.of(expression));
queryTopPlan.accept(ExpressionLineageReplacer.INSTANCE,
replaceContext);
- if (!Sets.intersection(bottomAggregateFunctionExprIdSet,
-
replaceContext.getExprIdExpressionMap().keySet()).isEmpty()) {
+ if (!Sets.intersection(bottomAggregateFunctionExprIdSet,
replaceContext.getUsedExprIdSet()).isEmpty()) {
Review Comment:
use `!Collections.disjoint(bottomAggregateFunctionExprIdSet,
replaceContext.getUsedExprIdSet())`
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java:
##########
@@ -280,21 +316,89 @@ public static Plan rewriteByRules(
rewrittenPlanContext.getStatementContext().getConnectContext().getSessionVariable()
.setDisableNereidsRules(String.join(",",
oldDisableRuleNames));
rewrittenPlanContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
+
rewrittenPlanContext.getStatementContext().getPlannerHooks().addAll(removedMaterializedViewHooks);
+ }
+ if (rewrittenPlan == null) {
+ return null;
}
- Map<ExprId, Slot> exprIdToNewRewrittenSlot = Maps.newLinkedHashMap();
+ if (rewrittenPlan instanceof Sink) {
+ // can keep the right column order, no need to adjust
+ return rewrittenPlan;
+ }
+ Map<ExprId, Slot> rewrittenPlanAfterOptimizedExprIdToOutputMap =
Maps.newLinkedHashMap();
for (Slot slot : rewrittenPlan.getOutput()) {
- exprIdToNewRewrittenSlot.put(slot.getExprId(), slot);
+ rewrittenPlanAfterOptimizedExprIdToOutputMap.put(slot.getExprId(),
slot);
}
- List<ExprId> rewrittenPlanExprIds = rewrittenPlan.getOutput().stream()
+ List<ExprId> rewrittenPlanOutputsAfterOptimized =
rewrittenPlan.getOutput().stream()
.map(Slot::getExprId).collect(Collectors.toList());
// If project order doesn't change, return rewrittenPlan directly
- if (originalRewrittenPlanExprIds.equals(rewrittenPlanExprIds)) {
+ if
(rewrittenPlanOutputsBeforeOptimize.equals(rewrittenPlanOutputsAfterOptimized))
{
return rewrittenPlan;
}
+ // the expr id would change for some rule, once happened, not check
result colum order
Review Comment:
colum -> column
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]