This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 2e92e392d13 [refactor](coordinator) Refactor local shuffle logics (#42460) 2e92e392d13 is described below commit 2e92e392d1325b48d58796d2209309c95497c637 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Oct 25 17:27:09 2024 +0800 [refactor](coordinator) Refactor local shuffle logics (#42460) Unify `useSerialSource` of fragments contain no scan nodes and `ignoreDataDistribution` of fragments contain scan nodes --- .../org/apache/doris/planner/AggregationNode.java | 1 + .../org/apache/doris/planner/AnalyticEvalNode.java | 6 ++++ .../org/apache/doris/planner/EmptySetNode.java | 5 --- .../org/apache/doris/planner/ExchangeNode.java | 31 +++++++++++++---- .../apache/doris/planner/NestedLoopJoinNode.java | 16 ++++----- .../org/apache/doris/planner/PlanFragment.java | 40 +++++++--------------- .../java/org/apache/doris/planner/PlanNode.java | 14 +++++--- .../java/org/apache/doris/planner/RepeatNode.java | 1 + .../java/org/apache/doris/planner/ScanNode.java | 4 ++- .../java/org/apache/doris/planner/SelectNode.java | 1 + .../java/org/apache/doris/planner/SortNode.java | 1 + .../java/org/apache/doris/planner/UnionNode.java | 2 ++ .../main/java/org/apache/doris/qe/Coordinator.java | 23 +++---------- 13 files changed, 74 insertions(+), 71 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index 55d1b4b50c0..446f49c3782 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -488,6 +488,7 @@ public class AggregationNode extends PlanNode { } } + // If `GroupingExprs` is empty and agg need to finalize, the result must be output by single instance @Override public boolean isSerialOperator() { return aggInfo.getGroupingExprs().isEmpty() && needsFinalize; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java index dce6c3d1b04..7b5998717a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java @@ -297,6 +297,12 @@ public class AnalyticEvalNode extends PlanNode { return output.toString(); } + /** + * If `partitionExprs` is empty, the result must be output by single instance. + * + * For example, for `window (colA order by colB)`, + * all data should be input in this node to ensure the global ordering by colB. + */ @Override public boolean isSerialOperator() { return partitionExprs.isEmpty(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java index f6ddf23429e..e262797a4fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java @@ -80,9 +80,4 @@ public class EmptySetNode extends PlanNode { public int getNumInstances() { return 1; } - - @Override - public boolean isSerialOperator() { - return true; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index 7af09287191..97d46b109b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -195,11 +195,6 @@ public class ExchangeNode extends PlanNode { return prefix + "offset: " + offset + "\n"; } - @Override - public boolean isMerging() { - return mergeInfo != null; - } - public boolean isRightChildOfBroadcastHashJoin() { return isRightChildOfBroadcastHashJoin; } @@ -208,8 +203,32 @@ public class ExchangeNode extends PlanNode { isRightChildOfBroadcastHashJoin = value; } + /** + * If table `t1` has unique key `k1` and value column `v1`. + * Now use plan below to load data into `t1`: + * ``` + * FRAGMENT 0: + * Merging Exchange (id = 1) + * NL Join (id = 2) + * DataStreamSender (id = 3, dst_id = 3) (TABLET_SINK_SHUFFLE_PARTITIONED) + * + * FRAGMENT 1: + * Exchange (id = 3) + * OlapTableSink (id = 4) ``` + * + * In this plan, `Exchange (id = 1)` needs to do merge sort using column `k1` and `v1` so parallelism + * of FRAGMENT 0 must be 1 and data will be shuffled to FRAGMENT 1 which also has only 1 instance + * because this loading job relies on the global ordering of column `k1` and `v1`. + * + * So FRAGMENT 0 should not use serial source. + */ @Override public boolean isSerialOperator() { - return true; + return partitionType == TPartitionType.UNPARTITIONED && mergeInfo != null; + } + + @Override + public boolean hasSerialChildren() { + return isSerialOperator(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java index c7b3525e4cd..e2a7504a98d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java @@ -284,18 +284,16 @@ public class NestedLoopJoinNode extends JoinNodeBase { /** * If joinOp is one of type below: - * 1. NULL_AWARE_LEFT_ANTI_JOIN - * 2. RIGHT_OUTER_JOIN - * 3. RIGHT_ANTI_JOIN - * 4. RIGHT_SEMI_JOIN + * 1. RIGHT_OUTER_JOIN + * 2. RIGHT_ANTI_JOIN + * 3. RIGHT_SEMI_JOIN + * 4. FULL_OUTER_JOIN * - * We will - * @return + * Probe-side must have full data so join is a serial operator. */ @Override public boolean isSerialOperator() { - return joinOp == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN || joinOp == JoinOperator.RIGHT_OUTER_JOIN - || joinOp == JoinOperator.RIGHT_ANTI_JOIN || joinOp == JoinOperator.RIGHT_SEMI_JOIN - || joinOp == JoinOperator.FULL_OUTER_JOIN; + return joinOp == JoinOperator.RIGHT_OUTER_JOIN || joinOp == JoinOperator.RIGHT_ANTI_JOIN + || joinOp == JoinOperator.RIGHT_SEMI_JOIN || joinOp == JoinOperator.FULL_OUTER_JOIN; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 3e3c49bf675..c5a6ec55f63 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -162,6 +162,7 @@ public class PlanFragment extends TreeNode<PlanFragment> { public Optional<NereidsSpecifyInstances<ScanSource>> specifyInstances = Optional.empty(); public TQueryCacheParam queryCacheParam; + private int numBackends = 0; /** * C'tor for fragment with specific partition; the output is by default broadcast. @@ -504,37 +505,22 @@ public class PlanFragment extends TreeNode<PlanFragment> { return planRoot.isNullAwareLeftAntiJoin(); } - private boolean isMergingFragment() { - return planRoot.isMerging(); - } - public boolean useSerialSource(ConnectContext context) { return context != null && context.getSessionVariable().isIgnoreStorageDataDistribution() + && queryCacheParam == null && !hasNullAwareLeftAntiJoin() - // If input data partition is UNPARTITIONED and sink is DataStreamSink and root node is not a serial - // operator, we use local exchange to improve parallelism - && getDataPartition() == DataPartition.UNPARTITIONED && !children.isEmpty() + // If planRoot is not a serial operator and has serial children, we can use serial source and improve + // parallelism of non-serial operators. && sink instanceof DataStreamSink && !planRoot.isSerialOperator() - /** - * If table `t1` has unique key `k1` and value column `v1`. - * Now use plan below to load data into `t1`: - * ``` - * FRAGMENT 0: - * Merging Exchange (id = 1) - * NL Join (id = 2) - * DataStreamSender (id = 3, dst_id = 3) (TABLET_SINK_SHUFFLE_PARTITIONED) - * - * FRAGMENT 1: - * Exchange (id = 3) - * OlapTableSink (id = 4) ``` - * - * In this plan, `Exchange (id = 1)` needs to do merge sort using column `k1` and `v1` so parallelism - * of FRAGMENT 0 must be 1 and data will be shuffled to FRAGMENT 1 which also has only 1 instance - * because this loading job relies on the global ordering of column `k1` and `v1`. - * - * So FRAGMENT 0 should not use serial source. - */ - && !isMergingFragment(); + && planRoot.hasSerialChildren(); + } + + public int getNumBackends() { + return numBackends; + } + + public void setNumBackends(int numBackends) { + this.numBackends = numBackends; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index d1ba493682b..14bd34e93e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -41,6 +41,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.TreeNode; import org.apache.doris.common.UserException; import org.apache.doris.planner.normalize.Normalizer; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.PlanStats; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.StatsDeriveResult; @@ -279,10 +280,6 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats { return children.stream().anyMatch(PlanNode::isNullAwareLeftAntiJoin); } - public boolean isMerging() { - return children.stream().anyMatch(PlanNode::isMerging); - } - public PlanFragment getFragment() { return fragment; } @@ -643,7 +640,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats { TPlanNode msg = new TPlanNode(); msg.node_id = id.asInt(); msg.setNereidsId(nereidsId); - msg.setIsSerialOperator(isSerialOperator()); + msg.setIsSerialOperator(isSerialOperator() && fragment.useSerialSource(ConnectContext.get())); msg.num_children = children.size(); msg.limit = limit; for (TupleId tid : tupleIds) { @@ -1384,4 +1381,11 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats { public boolean isSerialOperator() { return false; } + + public boolean hasSerialChildren() { + if (children.isEmpty()) { + return isSerialOperator(); + } + return children.stream().allMatch(PlanNode::hasSerialChildren); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java index 407d8a6444c..2bc4e847ac3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java @@ -201,6 +201,7 @@ public class RepeatNode extends PlanNode { return output.toString(); } + // Determined by its child. @Override public boolean isSerialOperator() { return children.get(0).isSerialOperator(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 1681699d651..b392075be9b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -851,6 +851,8 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { @Override public boolean isSerialOperator() { - return true; + return numScanBackends() <= 0 || getScanRangeNum() + < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * numScanBackends() + || (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isForceToLocalShuffle()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java index b3b088837a6..734e9338352 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java @@ -110,6 +110,7 @@ public class SelectNode extends PlanNode { return output.toString(); } + // Determined by its child. @Override public boolean isSerialOperator() { return children.get(0).isSerialOperator(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java index fc1c50c0bba..e3eb08c3e75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java @@ -389,6 +389,7 @@ public class SortNode extends PlanNode { return new HashSet<>(result); } + // If it's analytic sort or not merged by a followed exchange node, it must output the global ordered data. @Override public boolean isSerialOperator() { return !isAnalyticSort && !mergeByexchange; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java index bf48a770f1c..ac66ce718ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java @@ -43,6 +43,8 @@ public class UnionNode extends SetOperationNode { toThrift(msg, TPlanNodeType.UNION_NODE); } + // If it is a union without children which means it will output some constant values, we should use a serial union + // to output non-duplicated data. @Override public boolean isSerialOperator() { return children.isEmpty(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 4eda6775b5c..294e9e34056 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1783,8 +1783,7 @@ public class Coordinator implements CoordInterface { // Using serial source means a serial source operator will be used in this fragment (e.g. data will be // shuffled to only 1 exchange operator) and then splitted by followed local exchanger int expectedInstanceNum = fragment.getParallelExecNum(); - boolean useSerialSource = fragment.useSerialSource(context) && useNereids - && fragment.queryCacheParam == null; + boolean useSerialSource = fragment.useSerialSource(context); if (useSerialSource) { for (int j = 1; j < expectedInstanceNum; j++) { params.instanceExecParams.add(new FInstanceExecParam( @@ -1824,8 +1823,7 @@ public class Coordinator implements CoordInterface { } // Using serial source means a serial source operator will be used in this fragment (e.g. data will be // shuffled to only 1 exchange operator) and then splitted by followed local exchanger - boolean useSerialSource = fragment.useSerialSource(context) && useNereids - && fragment.queryCacheParam == null; + boolean useSerialSource = fragment.useSerialSource(context); if (exchangeInstances > 0 && fragmentExecParamsMap.get(inputFragmentId) .instanceExecParams.size() > exchangeInstances) { // random select some instance @@ -1898,12 +1896,8 @@ public class Coordinator implements CoordInterface { boolean sharedScan = true; int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); - boolean forceToLocalShuffle = context != null - && context.getSessionVariable().isForceToLocalShuffle() - && !fragment.hasNullAwareLeftAntiJoin() && useNereids; - boolean ignoreStorageDataDistribution = (forceToLocalShuffle || (node.isPresent() - && node.get().ignoreStorageDataDistribution(context, addressToBackendID.size()) - && useNereids)) && fragment.queryCacheParam == null; + boolean ignoreStorageDataDistribution = node.isPresent() + && fragment.useSerialSource(context); if (node.isPresent() && ignoreStorageDataDistribution) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); // if have limit and no conjuncts, only need 1 instance to save cpu and @@ -2750,14 +2744,7 @@ public class Coordinator implements CoordInterface { * 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges. * 2. Use Nereids planner. */ - boolean forceToLocalShuffle = context != null - && context.getSessionVariable().isForceToLocalShuffle() && !hasNullAwareLeftAntiJoin && useNereids; - boolean ignoreStorageDataDistribution = (forceToLocalShuffle || (scanNodes.stream() - .allMatch(node -> node.ignoreStorageDataDistribution(context, - addressToBackendID.size())) - && addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> { - return addressScanRange.getValue().size() < parallelExecInstanceNum; - }) && useNereids)) && params.fragment.queryCacheParam == null; + boolean ignoreStorageDataDistribution = params.fragment != null && params.fragment.useSerialSource(context); FragmentScanRangeAssignment assignment = params.scanRangeAssignment; for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org