This is an automated email from the ASF dual-hosted git repository. lingmiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 1378e7e05f (Refactor)[Planner] Remove merge node (#9251) 1378e7e05f is described below commit 1378e7e05ff5849976705e70437fd05e2b8fda84 Author: EmmyMiao87 <522274...@qq.com> AuthorDate: Thu Apr 28 15:05:35 2022 +0800 (Refactor)[Planner] Remove merge node (#9251) --- .../apache/doris/planner/DistributedPlanner.java | 63 ------ .../java/org/apache/doris/planner/MergeNode.java | 224 --------------------- 2 files changed, 287 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 215790e7d1..4ee3106057 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -213,8 +213,6 @@ public class DistributedPlanner { result = createSelectNodeFragment((SelectNode) root, childFragments); } else if (root instanceof SetOperationNode) { result = createSetOperationNodeFragment((SetOperationNode) root, childFragments, fragments); - } else if (root instanceof MergeNode) { - result = createMergeNodeFragment((MergeNode) root, childFragments, fragments); } else if (root instanceof AggregationNode) { result = createAggregationFragment((AggregationNode) root, childFragments.get(0), fragments); } else if (root instanceof SortNode) { @@ -692,67 +690,6 @@ public class DistributedPlanner { return leftChildFragment; } - /** - * Creates an unpartitioned fragment that merges the outputs of all of its children (with a single ExchangeNode), - * corresponding to the 'mergeNode' of the non-distributed plan. Each of the child fragments receives a MergeNode as - * a new plan root (with the child fragment's plan tree as its only input), so that each child fragment's output is - * mapped onto the MergeNode's result tuple id. TODO: if this is implementing a UNION DISTINCT, the parent of the - * mergeNode is a duplicate-removing AggregationNode, which might make sense to apply to the children as well, in - * order to reduce the amount of data that needs to be sent to the parent; augment the planner to decide whether - * that would reduce the runtime. TODO: since the fragment that does the merge is unpartitioned, it can absorb all - * child fragments that are also unpartitioned - */ - private PlanFragment createMergeNodeFragment(MergeNode mergeNode, - ArrayList<PlanFragment> childFragments, - ArrayList<PlanFragment> fragments) - throws UserException { - Preconditions.checkState(mergeNode.getChildren().size() == childFragments.size()); - - // If the mergeNode only has constant exprs, return it in an unpartitioned fragment. - if (mergeNode.getChildren().isEmpty()) { - Preconditions.checkState(!mergeNode.getConstExprLists().isEmpty()); - return new PlanFragment(ctx_.getNextFragmentId(), mergeNode, DataPartition.UNPARTITIONED); - } - - // create an ExchangeNode to perform the merge operation of mergeNode; - // the ExchangeNode retains the generic PlanNode parameters of mergeNode - ExchangeNode exchNode = new ExchangeNode(ctx_.getNextNodeId(), mergeNode, true); - exchNode.setNumInstances(1); - exchNode.init(ctx_.getRootAnalyzer()); - PlanFragment parentFragment = - new PlanFragment(ctx_.getNextFragmentId(), exchNode, DataPartition.UNPARTITIONED); - - // we don't expect to be paralleling a MergeNode that was inserted solely - // to evaluate conjuncts (ie, that doesn't explicitly materialize its output) - Preconditions.checkState(mergeNode.getTupleIds().size() == 1); - - for (int i = 0; i < childFragments.size(); ++i) { - PlanFragment childFragment = childFragments.get(i); - // create a clone of mergeNode; we want to keep the limit and conjuncts - MergeNode childMergeNode = new MergeNode(ctx_.getNextNodeId(), mergeNode); - List<Expr> resultExprs = Expr.cloneList(mergeNode.getResultExprLists().get(i), null); - childMergeNode.addChild(childFragment.getPlanRoot(), resultExprs); - childFragment.setPlanRoot(childMergeNode); - childFragment.setDestination(exchNode); - } - - // Add an unpartitioned child fragment with a MergeNode for the constant exprs. - if (!mergeNode.getConstExprLists().isEmpty()) { - MergeNode childMergeNode = new MergeNode(ctx_.getNextNodeId(), mergeNode); - childMergeNode.init(ctx_.getRootAnalyzer()); - childMergeNode.getConstExprLists().addAll(mergeNode.getConstExprLists()); - // Clear original constant exprs to make sure nobody else picks them up. - mergeNode.getConstExprLists().clear(); - PlanFragment childFragment = - new PlanFragment(ctx_.getNextFragmentId(), childMergeNode, DataPartition.UNPARTITIONED); - childFragment.setPlanRoot(childMergeNode); - childFragment.setDestination(exchNode); - childFragments.add(childFragment); - fragments.add(childFragment); - } - return parentFragment; - } - /** * Returns a new fragment with a UnionNode as its root. The data partition of the * returned fragment and how the data of the child fragments is consumed depends on the diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/MergeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/MergeNode.java deleted file mode 100644 index bb307021ef..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/MergeNode.java +++ /dev/null @@ -1,224 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.planner; - -import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.SlotDescriptor; -import org.apache.doris.analysis.SlotId; -import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.analysis.TupleId; -import org.apache.doris.common.UserException; -import org.apache.doris.thrift.TExplainLevel; -import org.apache.doris.thrift.TExpr; -import org.apache.doris.thrift.TMergeNode; -import org.apache.doris.thrift.TPlanNode; -import org.apache.doris.thrift.TPlanNodeType; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; - -import java.util.List; - -/** - * Node that merges the results of its child plans by materializing - * the corresponding result exprs. - * If no result exprs are specified for a child, it simply passes on the child's - * results. - */ -public class MergeNode extends PlanNode { - private final static Logger LOG = LogManager.getLogger(MergeNode.class); - - // Expr lists corresponding to the input query stmts. - // The ith resultExprList belongs to the ith child. - // All exprs are resolved to base tables. - protected List<List<Expr>> resultExprLists = Lists.newArrayList(); - - // Expr lists that originate from constant select stmts. - // We keep them separate from the regular expr lists to avoid null children. - protected List<List<Expr>> constExprLists = Lists.newArrayList(); - - // Output tuple materialized by this node. - protected final List<TupleDescriptor> tupleDescs = Lists.newArrayList(); - - protected final TupleId tupleId; - - protected MergeNode(PlanNodeId id, MergeNode node) { - super(id, node, "MERGE"); - this.tupleId = node.tupleId; - } - - public void addConstExprList(List<Expr> exprs) { - constExprLists.add(exprs); - } - - public void addChild(PlanNode node, List<Expr> resultExprs) { - addChild(node); - resultExprLists.add(resultExprs); - if (resultExprs != null) { - // if we're materializing output, we can only do that into a single - // output tuple - Preconditions.checkState(tupleIds.size() == 1); - } - } - - /** - * This must be called *after* addChild()/addConstExprList() because it recomputes - * both of them. - * The MergeNode doesn't need an smap: like a ScanNode, it materializes an "original" - * tuple id - */ - @Override - public void init(Analyzer analyzer) throws UserException { - assignConjuncts(analyzer); - //computeMemLayout(analyzer); - computeStats(analyzer); - Preconditions.checkState(resultExprLists.size() == getChildren().size()); - - // drop resultExprs/constExprs that aren't getting materialized (= where the - // corresponding output slot isn't being materialized) - List<SlotDescriptor> slots = analyzer.getDescTbl().getTupleDesc(tupleId).getSlots(); - List<List<Expr>> newResultExprLists = Lists.newArrayList(); - // - // for (int i = 0; i < resultExprLists.size(); ++i) { - // List<Expr> exprList = resultExprLists.get(i); - // List<Expr> newExprList = Lists.newArrayList(); - // for (int j = 0; j < exprList.size(); ++j) { - // if (slots.get(j).isMaterialized()) newExprList.add(exprList.get(j)); - // } - // newResultExprLists.add(newExprList); - // newResultExprLists.add( - // Expr.substituteList(newExprList, getChild(i).getOutputSmap(), analyzer, true)); - // } - // resultExprLists = newResultExprLists; - // - Preconditions.checkState(resultExprLists.size() == getChildren().size()); - - List<List<Expr>> newConstExprLists = Lists.newArrayList(); - for (List<Expr> exprList: constExprLists) { - List<Expr> newExprList = Lists.newArrayList(); - for (int i = 0; i < exprList.size(); ++i) { - if (slots.get(i).isMaterialized()) newExprList.add(exprList.get(i)); - } - newConstExprLists.add(newExprList); - } - constExprLists = newConstExprLists; - } - - @Override - public void computeStats(Analyzer analyzer) { - super.computeStats(analyzer); - if (!analyzer.safeIsEnableJoinReorderBasedCost()) { - return; - } - cardinality = constExprLists.size(); - for (PlanNode child : children) { - // ignore missing child cardinality info in the hope it won't matter enough - // to change the planning outcome - if (child.cardinality > 0) { - cardinality += child.cardinality; - } - } - applyConjunctsSelectivity(); - capCardinalityAtLimit(); - if (LOG.isDebugEnabled()) { - LOG.debug("stats Merge: cardinality={}", Long.toString(cardinality)); - } - } - - @Override - protected void computeOldCardinality() { - cardinality = constExprLists.size(); - for (PlanNode child : children) { - // ignore missing child cardinality info in the hope it won't matter enough - // to change the planning outcome - if (child.cardinality > 0) { - cardinality += child.cardinality; - } - } - LOG.debug("stats Merge: cardinality={}", Long.toString(cardinality)); - } - - public List<List<Expr>> getResultExprLists() { - return resultExprLists; - } - - public List<List<Expr>> getConstExprLists() { - return constExprLists; - } - - @Override - protected void toThrift(TPlanNode msg) { - List<List<TExpr>> texprLists = Lists.newArrayList(); - List<List<TExpr>> constTexprLists = Lists.newArrayList(); - for (List<Expr> exprList : resultExprLists) { - if (exprList != null) { - texprLists.add(Expr.treesToThrift(exprList)); - } - } - for (List<Expr> constTexprList : constExprLists) { - constTexprLists.add(Expr.treesToThrift(constTexprList)); - } - msg.merge_node = new TMergeNode(tupleId.asInt(), texprLists, constTexprLists); - msg.node_type = TPlanNodeType.MERGE_NODE; - } - - @Override - public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { - if (detailLevel == TExplainLevel.BRIEF) { - return ""; - } - StringBuilder output = new StringBuilder(); - // A MergeNode may have predicates if a union is used inside an inline view, - // and the enclosing select stmt has predicates referring to the inline view. - if (!conjuncts.isEmpty()) { - output.append(prefix + "predicates: " + getExplainString(conjuncts) + "\n"); - } - if (constExprLists.size() > 0) { - output.append(prefix + "merging " + constExprLists.size() + " SELECT CONSTANT\n"); - } - return output.toString(); - } - - @Override - public void getMaterializedIds(Analyzer analyzer, List<SlotId> ids) { - super.getMaterializedIds(analyzer, ids); - - for (List<Expr> resultExprs : resultExprLists) { - Expr.getIds(resultExprs, null, ids); - } - - // for now, also mark all of our output slots as materialized - // TODO: fix this, it's not really necessary, but it breaks the logic - // in MergeNode (c++) - for (TupleId tupleId : tupleIds) { - TupleDescriptor tupleDesc = analyzer.getTupleDesc(tupleId); - for (SlotDescriptor slotDesc : tupleDesc.getSlots()) { - ids.add(slotDesc.getId()); - } - } - } - - @Override - public int getNumInstances() { - return 1; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org