This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d77bfa09d85 [Improvement](shuffle) Use a knob to decide whether a serial exchange… (#44676) d77bfa09d85 is described below commit d77bfa09d85d9759965c109cc50ee89e8fbde499 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Thu Nov 28 16:33:16 2024 +0800 [Improvement](shuffle) Use a knob to decide whether a serial exchange… (#44676) … should be used This improvement was completed in #43199 and reverted by #44075 due to performance fallback. After fixing it, this improvement is re-submited. A new knob to control a exchange node should be serial or not. For example, a partitioned hash join should be executed like below: ``` ┌────────────────────────────┐ ┌────────────────────────────┐ │ │ │ │ │Exchange(HASH PARTITIONED N)│ │Exchange(HASH PARTITIONED N)│ │ │ │ │ └────────────────────────────┴─────────┬────────┴────────────────────────────┘ │ │ │ │ │ │ ┌──────▼──────┐ │ │ │ HASH JOIN │ │ │ └─────────────┘ ``` After turning on this knob, the real plan should be: ``` ┌──────────────────────────────┐ ┌──────────────────────────────┐ │ │ │ │ │ Exchange (HASH PARTITIONED 1)│ │ Exchange (HASH PARTITIONED 1)│ │ │ │ │ └────────────┬─────────────────┘ └────────────┬─────────────────┘ │ │ │ │ │ │ │ │ │ │ ┌──────────────▼─────────────────────┐ ┌──────────────▼─────────────────────┐ │ │ │ │ │ Local Exchange(HASH PARTITIONED N)│ │ Local Exchange(HASH PARTITIONED N)│ │ 1 -> N │ │ 1 -> N │ └────────────────────────────────────┴─────────┬────────┴────────────────────────────────────┴ │ │ │ │ │ │ ┌──────▼──────┐ │ │ │ HASH JOIN │ │ │ └─────────────┘ ``` For large cluster, X (mappers) * Y (reducers) rpc channels can be reduced to X (mappers) * Z (BEs). --- .../main/java/org/apache/doris/planner/ExchangeNode.java | 13 ++++++++++++- .../main/java/org/apache/doris/planner/PlanFragment.java | 14 ++++++-------- .../src/main/java/org/apache/doris/planner/PlanNode.java | 7 +++++++ .../src/main/java/org/apache/doris/planner/ScanNode.java | 5 +++++ .../src/main/java/org/apache/doris/qe/Coordinator.java | 10 ++-------- .../src/main/java/org/apache/doris/qe/SessionVariable.java | 11 +++++++++++ 6 files changed, 43 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index 1ca1db56bfc..cb6628b01c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.SortInfo; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TExchangeNode; @@ -169,6 +170,10 @@ public class ExchangeNode extends PlanNode { @Override protected void toThrift(TPlanNode msg) { + // If this fragment has another scan node, this exchange node is serial or not should be decided by the scan + // node. + msg.setIsSerialOperator((isSerialOperator() || fragment.hasSerialScanNode()) + && fragment.useSerialSource(ConnectContext.get())); msg.node_type = TPlanNodeType.EXCHANGE_NODE; msg.exchange_node = new TExchangeNode(); for (TupleId tid : tupleIds) { @@ -228,11 +233,17 @@ public class ExchangeNode extends PlanNode { */ @Override public boolean isSerialOperator() { - return partitionType == TPartitionType.UNPARTITIONED && mergeInfo != null; + return (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isUseSerialExchange() + || partitionType == TPartitionType.UNPARTITIONED) && mergeInfo != null; } @Override public boolean hasSerialChildren() { return isSerialOperator(); } + + @Override + public boolean hasSerialScanChildren() { + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index fef3de9b696..ab5307c07e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -515,15 +515,13 @@ public class PlanFragment extends TreeNode<PlanFragment> { && !hasNullAwareLeftAntiJoin() // If planRoot is not a serial operator and has serial children, we can use serial source and improve // parallelism of non-serial operators. - && sink instanceof DataStreamSink && !planRoot.isSerialOperator() - && planRoot.hasSerialChildren(); + // For bucket shuffle / colocate join fragment, always use serial source if the bucket scan nodes are + // serial. + && (hasSerialScanNode() || (sink instanceof DataStreamSink && !planRoot.isSerialOperator() + && planRoot.hasSerialChildren())); } - public int getNumBackends() { - return numBackends; - } - - public void setNumBackends(int numBackends) { - this.numBackends = numBackends; + public boolean hasSerialScanNode() { + return planRoot.hasSerialScanChildren(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 14bd34e93e1..73768435154 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -1388,4 +1388,11 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats { } return children.stream().allMatch(PlanNode::hasSerialChildren); } + + public boolean hasSerialScanChildren() { + if (children.isEmpty()) { + return false; + } + return children.stream().anyMatch(PlanNode::hasSerialScanChildren); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index a2583868346..b4033a0535e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -861,4 +861,9 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * numScanBackends() || (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isForceToLocalShuffle()); } + + @Override + public boolean hasSerialScanChildren() { + return isSerialOperator(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 3a6f6e4f840..262b5836689 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1886,17 +1886,11 @@ public class Coordinator implements CoordInterface { return scanNode.getId().asInt() == planNodeId; }).findFirst(); - /** - * Ignore storage data distribution iff: - * 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges. - * 2. Use Nereids planner. - */ boolean sharedScan = true; int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); - boolean ignoreStorageDataDistribution = node.isPresent() - && fragment.useSerialSource(context); - if (node.isPresent() && ignoreStorageDataDistribution) { + boolean ignoreStorageDataDistribution = fragment.useSerialSource(context); + if (ignoreStorageDataDistribution) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); // if have limit and no conjuncts, only need 1 instance to save cpu and // mem resource diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 71b746c7907..115614a4187 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -268,6 +268,8 @@ public class SessionVariable implements Serializable, Writable { public static final String IGNORE_STORAGE_DATA_DISTRIBUTION = "ignore_storage_data_distribution"; + public static final String USE_SERIAL_EXCHANGE = "use_serial_exchange"; + public static final String ENABLE_PARALLEL_SCAN = "enable_parallel_scan"; // Limit the max count of scanners to prevent generate too many scanners. @@ -1112,6 +1114,10 @@ public class SessionVariable implements Serializable, Writable { varType = VariableAnnotation.EXPERIMENTAL, needForward = true) private boolean ignoreStorageDataDistribution = true; + @VariableMgr.VarAttr(name = USE_SERIAL_EXCHANGE, fuzzy = true, + varType = VariableAnnotation.EXPERIMENTAL, needForward = true) + private boolean useSerialExchange = false; + @VariableMgr.VarAttr( name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, description = {"是否在pipelineX引擎上开启local shuffle优化", @@ -2353,6 +2359,7 @@ public class SessionVariable implements Serializable, Writable { this.parallelPrepareThreshold = random.nextInt(32) + 1; this.enableCommonExprPushdown = random.nextBoolean(); this.enableLocalExchange = random.nextBoolean(); + this.useSerialExchange = random.nextBoolean(); // This will cause be dead loop, disable it first // this.disableJoinReorder = random.nextBoolean(); this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean(); @@ -4563,6 +4570,10 @@ public class SessionVariable implements Serializable, Writable { return enableCooldownReplicaAffinity; } + public boolean isUseSerialExchange() { + return useSerialExchange && getEnableLocalExchange(); + } + public void setDisableInvertedIndexV1ForVaraint(boolean disableInvertedIndexV1ForVaraint) { this.disableInvertedIndexV1ForVaraint = disableInvertedIndexV1ForVaraint; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org