This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch runtimefilter_multi_send in repository https://gitbox.apache.org/repos/asf/doris.git
commit dae170394fd4cc7a31ef6ea46fdf6230e68a6f62 Author: morrySnow <morrys...@126.com> AuthorDate: Thu Jun 29 21:30:50 2023 +0800 [opt](Nereids) support set cte shuffle type for each consumer --- .../glue/translator/PhysicalPlanTranslator.java | 134 ++++++++++----------- .../properties/ChildOutputPropertyDeriver.java | 2 +- .../properties/ChildrenPropertiesRegulator.java | 34 ++++++ .../properties/DistributionSpecMustShuffle.java | 35 ++++++ .../nereids/properties/PhysicalProperties.java | 3 + .../org/apache/doris/planner/DataStreamSink.java | 86 ++++++++++++- .../org/apache/doris/planner/ExchangeNode.java | 10 +- .../doris/planner/MultiCastPlanFragment.java | 5 +- .../org/apache/doris/planner/PlanFragment.java | 2 +- .../java/org/apache/doris/planner/PlanNode.java | 4 +- 10 files changed, 229 insertions(+), 86 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 8210c1a082..860bb624dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -247,27 +247,21 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla @Override public PlanFragment visitPhysicalDistribute(PhysicalDistribute<? extends Plan> distribute, PlanTranslatorContext context) { - PlanFragment childFragment = distribute.child().accept(this, context); + PlanFragment inputFragment = distribute.child().accept(this, context); // TODO: why need set streaming here? should remove this. - if (childFragment.getPlanRoot() instanceof AggregationNode + if (inputFragment.getPlanRoot() instanceof AggregationNode && distribute.child() instanceof PhysicalHashAggregate - && context.getFirstAggregateInFragment(childFragment) == distribute.child()) { + && context.getFirstAggregateInFragment(inputFragment) == distribute.child()) { PhysicalHashAggregate<?> hashAggregate = (PhysicalHashAggregate<?>) distribute.child(); if (hashAggregate.getAggPhase() == AggPhase.LOCAL && hashAggregate.getAggMode() == AggMode.INPUT_TO_BUFFER) { - AggregationNode aggregationNode = (AggregationNode) childFragment.getPlanRoot(); + AggregationNode aggregationNode = (AggregationNode) inputFragment.getPlanRoot(); aggregationNode.setUseStreamingPreagg(hashAggregate.isMaybeUsingStream()); } } - ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), childFragment.getPlanRoot()); + ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), inputFragment.getPlanRoot()); updateLegacyPlanIdToPhysicalPlan(exchangeNode, distribute); - exchangeNode.setNumInstances(childFragment.getPlanRoot().getNumInstances()); - if (distribute.getDistributionSpec() instanceof DistributionSpecGather) { - // gather to one instance - exchangeNode.setNumInstances(1); - } - List<ExprId> validOutputIds = distribute.getOutputExprIds(); if (distribute.child() instanceof PhysicalHashAggregate) { // we must add group by keys to output list, @@ -282,8 +276,28 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla } DataPartition dataPartition = toDataPartition(distribute.getDistributionSpec(), validOutputIds, context); PlanFragment parentFragment = new PlanFragment(context.nextFragmentId(), exchangeNode, dataPartition); - childFragment.setDestination(exchangeNode); - childFragment.setOutputPartition(dataPartition); + exchangeNode.setNumInstances(inputFragment.getPlanRoot().getNumInstances()); + if (distribute.getDistributionSpec() instanceof DistributionSpecGather) { + // gather to one instance + exchangeNode.setNumInstances(1); + } + + // process multicast sink + if (inputFragment instanceof MultiCastPlanFragment) { + MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); + DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( + multiCastDataSink.getDataStreamSinks().size() - 1); + TupleDescriptor tupleDescriptor = generateTupleDesc(distribute.getOutput(), null, context); + exchangeNode.updateTupleIds(tupleDescriptor); + dataStreamSink.setExchNodeId(exchangeNode.getId()); + dataStreamSink.setOutputPartition(dataPartition); + parentFragment.addChild(inputFragment); + ((MultiCastPlanFragment) inputFragment).addToDest(exchangeNode); + } else { + inputFragment.setDestination(exchangeNode); + inputFragment.setOutputPartition(dataPartition); + } + context.addPlanFragment(parentFragment); return parentFragment; } @@ -763,68 +777,21 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla PhysicalCTEProducer cteProducer = context.getCteProduceMap().get(cteId); Preconditions.checkState(cteProducer != null, "invalid cteProducer"); - ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), multiCastFragment.getPlanRoot()); - - DataStreamSink streamSink = new DataStreamSink(exchangeNode.getId()); - streamSink.setPartition(DataPartition.RANDOM); + // set datasink to multicast data sink but do not set target now + // target will be set when translate distribute + DataStreamSink streamSink = new DataStreamSink(); streamSink.setFragment(multiCastFragment); - multiCastDataSink.getDataStreamSinks().add(streamSink); multiCastDataSink.getDestinations().add(Lists.newArrayList()); - exchangeNode.setNumInstances(multiCastFragment.getPlanRoot().getNumInstances()); - - PlanFragment consumeFragment = new PlanFragment(context.nextFragmentId(), exchangeNode, - multiCastFragment.getDataPartition()); - - Map<Slot, Slot> projectMap = Maps.newHashMap(); - projectMap.putAll(cteConsumer.getProducerToConsumerSlotMap()); - - List<NamedExpression> execList = new ArrayList<>(); - PlanNode inputPlanNode = consumeFragment.getPlanRoot(); - List<Slot> cteProjects = cteProducer.getProjects(); - for (Slot slot : cteProjects) { - if (projectMap.containsKey(slot)) { - execList.add(projectMap.get(slot)); - } else { - throw new RuntimeException("could not find slot in cte producer consumer projectMap"); - } + // update expr to slot mapping + for (int i = 0; i < cteConsumer.getOutput().size(); i++) { + Slot producerSlot = cteProducer.getOutput().get(i); + Slot consumerSlot = cteConsumer.getOutput().get(i); + SlotRef slotRef = context.findSlotRef(producerSlot.getExprId()); + context.addExprIdSlotRefPair(consumerSlot.getExprId(), slotRef); } - - List<Slot> slotList = execList - .stream() - .map(NamedExpression::toSlot) - .collect(Collectors.toList()); - - TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, context); - - // update tuple list and tblTupleList - inputPlanNode.getTupleIds().clear(); - inputPlanNode.getTupleIds().add(tupleDescriptor.getId()); - inputPlanNode.getTblRefIds().clear(); - inputPlanNode.getTblRefIds().add(tupleDescriptor.getId()); - inputPlanNode.getNullableTupleIds().clear(); - inputPlanNode.getNullableTupleIds().add(tupleDescriptor.getId()); - - List<Expr> execExprList = execList - .stream() - .map(e -> ExpressionTranslator.translate(e, context)) - .collect(Collectors.toList()); - - inputPlanNode.setProjectList(execExprList); - inputPlanNode.setOutputTupleDesc(tupleDescriptor); - - // update data partition - consumeFragment.setDataPartition(DataPartition.RANDOM); - - SelectNode projectNode = new SelectNode(context.nextPlanNodeId(), inputPlanNode); - consumeFragment.setPlanRoot(projectNode); - - multiCastFragment.getDestNodeList().add(exchangeNode); - consumeFragment.addChild(multiCastFragment); - context.getPlanFragments().add(consumeFragment); - - return consumeFragment; + return multiCastFragment; } @Override @@ -859,6 +826,17 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla } PlanFragment inputFragment = filter.child(0).accept(this, context); + // process multicast sink + if (inputFragment instanceof MultiCastPlanFragment) { + MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); + DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( + multiCastDataSink.getDataStreamSinks().size() - 1); + filter.getConjuncts().stream() + .map(e -> ExpressionTranslator.translate(e, context)) + .forEach(dataStreamSink::addConjunct); + return inputFragment; + } + PlanNode planNode = inputFragment.getPlanRoot(); if (planNode instanceof ExchangeNode || planNode instanceof SortNode || planNode instanceof UnionNode) { // the three nodes don't support conjuncts, need create a SelectNode to filter data @@ -1397,19 +1375,31 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla ((AbstractPhysicalJoin<?, ?>) project.child(0).child(0)).setShouldTranslateOutput(false); } } + PlanFragment inputFragment = project.child(0).accept(this, context); + List<Expr> execExprList = project.getProjects() .stream() .map(e -> ExpressionTranslator.translate(e, context)) .collect(Collectors.toList()); // TODO: fix the project alias of an aliased relation. - - PlanNode inputPlanNode = inputFragment.getPlanRoot(); List<Slot> slotList = project.getProjects() .stream() .map(NamedExpression::toSlot) .collect(Collectors.toList()); + // process multicast sink + if (inputFragment instanceof MultiCastPlanFragment) { + MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); + DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( + multiCastDataSink.getDataStreamSinks().size() - 1); + TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, context); + dataStreamSink.setProjections(execExprList); + dataStreamSink.setOutputTupleDesc(tupleDescriptor); + return inputFragment; + } + + PlanNode inputPlanNode = inputFragment.getPlanRoot(); List<Expr> predicateList = inputPlanNode.getConjuncts(); Set<SlotId> requiredSlotIdSet = Sets.newHashSet(); for (Expr expr : execExprList) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index 2695b67a93..158eaa67f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -114,7 +114,7 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties, public PhysicalProperties visitPhysicalCTEConsumer( PhysicalCTEConsumer cteConsumer, PlanContext context) { Preconditions.checkState(childrenOutputProperties.size() == 0); - return PhysicalProperties.ANY; + return PhysicalProperties.MUST_SHUFFLE; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index 862dbb5e2b..86a3f650d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -27,9 +27,11 @@ import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.plans.AggMode; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; +import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; +import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.JoinUtils; @@ -74,16 +76,33 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> { @Override public Boolean visit(Plan plan, Void context) { + // process must shuffle + for (int i = 0; i < children.size(); i++) { + DistributionSpec distributionSpec = childrenProperties.get(i).getDistributionSpec(); + if (distributionSpec instanceof DistributionSpecMustShuffle) { + updateChildEnforceAndCost(i, PhysicalProperties.EXECUTION_ANY); + } + } return true; } @Override public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate<? extends Plan> agg, Void context) { + // forbid one phase agg on distribute if (agg.getAggMode() == AggMode.INPUT_TO_RESULT && children.get(0).getPlan() instanceof PhysicalDistribute) { // this means one stage gather agg, usually bad pattern return false; } + // process must shuffle + visit(agg, context); + // process agg + return true; + } + + @Override + public Boolean visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, Void context) { + // do not process must shuffle return true; } @@ -93,6 +112,9 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> { Preconditions.checkArgument(children.size() == 2, String.format("children.size() is %d", children.size())); Preconditions.checkArgument(childrenProperties.size() == 2); Preconditions.checkArgument(requiredProperties.size() == 2); + // process must shuffle + visit(hashJoin, context); + // process hash join DistributionSpec leftDistributionSpec = childrenProperties.get(0).getDistributionSpec(); DistributionSpec rightDistributionSpec = childrenProperties.get(1).getDistributionSpec(); @@ -229,6 +251,9 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> { Preconditions.checkArgument(children.size() == 2, String.format("children.size() is %d", children.size())); Preconditions.checkArgument(childrenProperties.size() == 2); Preconditions.checkArgument(requiredProperties.size() == 2); + // process must shuffle + visit(nestedLoopJoin, context); + // process nlj DistributionSpec rightDistributionSpec = childrenProperties.get(1).getDistributionSpec(); if (rightDistributionSpec instanceof DistributionSpecStorageGather) { updateChildEnforceAndCost(1, PhysicalProperties.GATHER); @@ -236,8 +261,17 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> { return true; } + @Override + public Boolean visitPhysicalProject(PhysicalProject<? extends Plan> project, Void context) { + // do not process must shuffle + return true; + } + @Override public Boolean visitPhysicalSetOperation(PhysicalSetOperation setOperation, Void context) { + // process must shuffle + visit(setOperation, context); + // process set operation if (children.isEmpty()) { return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecMustShuffle.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecMustShuffle.java new file mode 100644 index 0000000000..8f718fbb9d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecMustShuffle.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.properties; + +/** + * present data must use after shuffle + */ +public class DistributionSpecMustShuffle extends DistributionSpec { + + public static final DistributionSpecMustShuffle INSTANCE = new DistributionSpecMustShuffle(); + + public DistributionSpecMustShuffle() { + super(); + } + + @Override + public boolean satisfy(DistributionSpec other) { + return other instanceof DistributionSpecAny; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java index 28bf347977..9596043f67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java @@ -44,6 +44,9 @@ public class PhysicalProperties { public static PhysicalProperties STORAGE_GATHER = new PhysicalProperties(DistributionSpecStorageGather.INSTANCE); + public static PhysicalProperties MUST_SHUFFLE = new PhysicalProperties(DistributionSpecMustShuffle.INSTANCE); + + private final OrderSpec orderSpec; private final DistributionSpec distributionSpec; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java index 0f903d69d1..274cdd86f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java @@ -20,19 +20,38 @@ package org.apache.doris.planner; +import org.apache.doris.analysis.BitmapFilterPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TDataStreamSink; import org.apache.doris.thrift.TExplainLevel; +import com.google.common.collect.Lists; +import org.springframework.util.CollectionUtils; + +import java.util.List; + /** * Data sink that forwards data to an exchange node. */ public class DataStreamSink extends DataSink { - private final PlanNodeId exchNodeId; + + private PlanNodeId exchNodeId; private DataPartition outputPartition; + protected TupleDescriptor outputTupleDesc; + + protected List<Expr> projections; + + protected List<Expr> conjuncts = Lists.newArrayList(); + + public DataStreamSink() { + + } + public DataStreamSink(PlanNodeId exchNodeId) { this.exchNodeId = exchNodeId; } @@ -42,23 +61,65 @@ public class DataStreamSink extends DataSink { return exchNodeId; } + public void setExchNodeId(PlanNodeId exchNodeId) { + this.exchNodeId = exchNodeId; + } + @Override public DataPartition getOutputPartition() { return outputPartition; } - public void setPartition(DataPartition partition) { - outputPartition = partition; + public void setOutputPartition(DataPartition outputPartition) { + this.outputPartition = outputPartition; + } + + public TupleDescriptor getOutputTupleDesc() { + return outputTupleDesc; + } + + public void setOutputTupleDesc(TupleDescriptor outputTupleDesc) { + this.outputTupleDesc = outputTupleDesc; + } + + public List<Expr> getProjections() { + return projections; + } + + public void setProjections(List<Expr> projections) { + this.projections = projections; + } + + public List<Expr> getConjuncts() { + return conjuncts; + } + + public void setConjuncts(List<Expr> conjuncts) { + this.conjuncts = conjuncts; + } + + public void addConjunct(Expr conjunct) { + this.conjuncts.add(conjunct); } @Override public String getExplainString(String prefix, TExplainLevel explainLevel) { StringBuilder strBuilder = new StringBuilder(); - strBuilder.append(prefix + "STREAM DATA SINK\n"); - strBuilder.append(prefix + " EXCHANGE ID: " + exchNodeId + "\n"); + strBuilder.append(prefix).append("STREAM DATA SINK\n"); + strBuilder.append(prefix).append(" EXCHANGE ID: ").append(exchNodeId); if (outputPartition != null) { - strBuilder.append(prefix + " " + outputPartition.getExplainString(explainLevel)); + strBuilder.append("\n").append(prefix).append(" ").append(outputPartition.getExplainString(explainLevel)); + } + if (!conjuncts.isEmpty()) { + Expr expr = PlanNode.convertConjunctsToAndCompoundPredicate(conjuncts); + strBuilder.append(prefix).append(" CONJUNCTS: ").append(expr.toSql()); + } + if (!CollectionUtils.isEmpty(projections)) { + strBuilder.append("\n").append(prefix).append(" PROJECTIONS: ") + .append(PlanNode.getExplainString(projections)).append("\n"); + strBuilder.append(prefix).append(" PROJECTION TUPLE: ").append(outputTupleDesc.getId()); } + strBuilder.append("\n"); return strBuilder.toString(); } @@ -67,6 +128,19 @@ public class DataStreamSink extends DataSink { TDataSink result = new TDataSink(TDataSinkType.DATA_STREAM_SINK); TDataStreamSink tStreamSink = new TDataStreamSink(exchNodeId.asInt(), outputPartition.toThrift()); + for (Expr e : conjuncts) { + if (!(e instanceof BitmapFilterPredicate)) { + tStreamSink.addToConjuncts(e.treeToThrift()); + } + } + if (projections != null) { + for (Expr expr : projections) { + tStreamSink.addToOutputExprs(expr.treeToThrift()); + } + } + if (outputTupleDesc != null) { + tStreamSink.setOutputTupleId(outputTupleDesc.getId().asInt()); + } result.setStreamSink(tStreamSink); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index 30bf9eb45d..6694a05219 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -108,15 +108,21 @@ public class ExchangeNode extends PlanNode { public final void computeTupleIds() { PlanNode inputNode = getChild(0); TupleDescriptor outputTupleDesc = inputNode.getOutputTupleDesc(); + updateTupleIds(outputTupleDesc); + } + + public void updateTupleIds(TupleDescriptor outputTupleDesc) { if (outputTupleDesc != null) { tupleIds.clear(); tupleIds.add(outputTupleDesc.getId()); + tblRefIds.add(outputTupleDesc.getId()); + nullableTupleIds.add(outputTupleDesc.getId()); } else { clearTupleIds(); tupleIds.addAll(getChild(0).getTupleIds()); + tblRefIds.addAll(getChild(0).getTblRefIds()); + nullableTupleIds.addAll(getChild(0).getNullableTupleIds()); } - tblRefIds.addAll(getChild(0).getTblRefIds()); - nullableTupleIds.addAll(getChild(0).getNullableTupleIds()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/MultiCastPlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/MultiCastPlanFragment.java index 0d5b54b269..e52bbb0557 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/MultiCastPlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MultiCastPlanFragment.java @@ -35,10 +35,11 @@ public class MultiCastPlanFragment extends PlanFragment { this.children.addAll(planFragment.getChildren()); } - public List<ExchangeNode> getDestNodeList() { - return destNodeList; + public void addToDest(ExchangeNode exchangeNode) { + destNodeList.add(exchangeNode); } + public List<PlanFragment> getDestFragmentList() { return destNodeList.stream().map(PlanNode::getFragment).collect(Collectors.toList()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 54903beae5..64ac4c3051 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -256,7 +256,7 @@ public class PlanFragment extends TreeNode<PlanFragment> { Preconditions.checkState(sink == null); // we're streaming to an exchange node DataStreamSink streamSink = new DataStreamSink(destNode.getId()); - streamSink.setPartition(outputPartition); + streamSink.setOutputPartition(outputPartition); streamSink.setFragment(this); sink = streamSink; } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index ab17c0acec..cf5d14cb5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -415,7 +415,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats { } } - protected Expr convertConjunctsToAndCompoundPredicate(List<Expr> conjuncts) { + public static Expr convertConjunctsToAndCompoundPredicate(List<Expr> conjuncts) { List<Expr> targetConjuncts = Lists.newArrayList(conjuncts); while (targetConjuncts.size() > 1) { List<Expr> newTargetConjuncts = Lists.newArrayList(); @@ -824,7 +824,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats { return output.toString(); } - protected String getExplainString(List<? extends Expr> exprs) { + public static String getExplainString(List<? extends Expr> exprs) { if (exprs == null) { return ""; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org