This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d77bfa09d85 [Improvement](shuffle) Use a knob to decide whether a 
serial exchange… (#44676)
d77bfa09d85 is described below

commit d77bfa09d85d9759965c109cc50ee89e8fbde499
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Thu Nov 28 16:33:16 2024 +0800

    [Improvement](shuffle) Use a knob to decide whether a serial exchange… 
(#44676)
    
    … should be used
    
    This improvement was completed in #43199 and reverted by #44075 due to 
performance fallback. After fixing it, this improvement is re-submited.
    
    A new knob to control a exchange node should be serial or not.
    For example, a partitioned hash join should be executed like below:
    ```
    ┌────────────────────────────┐                  
┌────────────────────────────┐
    │                            │                  │                           
 │
    │Exchange(HASH PARTITIONED N)│                  │Exchange(HASH PARTITIONED 
N)│
    │                            │                  │                           
 │
    
└────────────────────────────┴─────────┬────────┴────────────────────────────┘
                                           │
                                           │
                                           │
                                           │
                                           │
                                           │
                                    ┌──────▼──────┐
                                    │             │
                                    │ HASH  JOIN  │
                                    │             │
                                    └─────────────┘
    ```
    
    After turning on this knob, the real plan should be:
    ```
      ┌──────────────────────────────┐                        
┌──────────────────────────────┐
      │                              │                        │                 
             │
      │ Exchange (HASH PARTITIONED 1)│                        │ Exchange (HASH 
PARTITIONED 1)│
      │                              │                        │                 
             │
      └────────────┬─────────────────┘                        
└────────────┬─────────────────┘
                   │                                                       │
                   │                                                       │
                   │                                                       │
                   │                                                       │
                   │                                                       │
    ┌──────────────▼─────────────────────┐                  
┌──────────────▼─────────────────────┐
    │                                    │                  │                   
                 │
    │ Local  Exchange(HASH PARTITIONED N)│                  │ Local  
Exchange(HASH PARTITIONED N)│
    │              1 -> N                │                  │              1 -> 
N                │
    
└────────────────────────────────────┴─────────┬────────┴────────────────────────────────────┴
                                                   │
                                                   │
                                                   │
                                                   │
                                                   │
                                                   │
                                            ┌──────▼──────┐
                                            │             │
                                            │ HASH  JOIN  │
                                            │             │
                                            └─────────────┘
    ```
    
    For large cluster, X (mappers) * Y (reducers) rpc channels can be reduced 
to X (mappers) * Z (BEs).
---
 .../main/java/org/apache/doris/planner/ExchangeNode.java   | 13 ++++++++++++-
 .../main/java/org/apache/doris/planner/PlanFragment.java   | 14 ++++++--------
 .../src/main/java/org/apache/doris/planner/PlanNode.java   |  7 +++++++
 .../src/main/java/org/apache/doris/planner/ScanNode.java   |  5 +++++
 .../src/main/java/org/apache/doris/qe/Coordinator.java     | 10 ++--------
 .../src/main/java/org/apache/doris/qe/SessionVariable.java | 11 +++++++++++
 6 files changed, 43 insertions(+), 17 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 1ca1db56bfc..cb6628b01c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -25,6 +25,7 @@ import org.apache.doris.analysis.SortInfo;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.common.UserException;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.statistics.StatsRecursiveDerive;
 import org.apache.doris.thrift.TExchangeNode;
@@ -169,6 +170,10 @@ public class ExchangeNode extends PlanNode {
 
     @Override
     protected void toThrift(TPlanNode msg) {
+        // If this fragment has another scan node, this exchange node is 
serial or not should be decided by the scan
+        // node.
+        msg.setIsSerialOperator((isSerialOperator() || 
fragment.hasSerialScanNode())
+                && fragment.useSerialSource(ConnectContext.get()));
         msg.node_type = TPlanNodeType.EXCHANGE_NODE;
         msg.exchange_node = new TExchangeNode();
         for (TupleId tid : tupleIds) {
@@ -228,11 +233,17 @@ public class ExchangeNode extends PlanNode {
      */
     @Override
     public boolean isSerialOperator() {
-        return partitionType == TPartitionType.UNPARTITIONED && mergeInfo != 
null;
+        return (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable().isUseSerialExchange()
+                || partitionType == TPartitionType.UNPARTITIONED) && mergeInfo 
!= null;
     }
 
     @Override
     public boolean hasSerialChildren() {
         return isSerialOperator();
     }
+
+    @Override
+    public boolean hasSerialScanChildren() {
+        return false;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index fef3de9b696..ab5307c07e9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -515,15 +515,13 @@ public class PlanFragment extends TreeNode<PlanFragment> {
                 && !hasNullAwareLeftAntiJoin()
                 // If planRoot is not a serial operator and has serial 
children, we can use serial source and improve
                 // parallelism of non-serial operators.
-                && sink instanceof DataStreamSink && 
!planRoot.isSerialOperator()
-                && planRoot.hasSerialChildren();
+                // For bucket shuffle / colocate join fragment, always use 
serial source if the bucket scan nodes are
+                // serial.
+                && (hasSerialScanNode() || (sink instanceof DataStreamSink && 
!planRoot.isSerialOperator()
+                && planRoot.hasSerialChildren()));
     }
 
-    public int getNumBackends() {
-        return numBackends;
-    }
-
-    public void setNumBackends(int numBackends) {
-        this.numBackends = numBackends;
+    public boolean hasSerialScanNode() {
+        return planRoot.hasSerialScanChildren();
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 14bd34e93e1..73768435154 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -1388,4 +1388,11 @@ public abstract class PlanNode extends 
TreeNode<PlanNode> implements PlanStats {
         }
         return children.stream().allMatch(PlanNode::hasSerialChildren);
     }
+
+    public boolean hasSerialScanChildren() {
+        if (children.isEmpty()) {
+            return false;
+        }
+        return children.stream().anyMatch(PlanNode::hasSerialScanChildren);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index a2583868346..b4033a0535e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -861,4 +861,9 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
                 < 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * 
numScanBackends()
                 || (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable().isForceToLocalShuffle());
     }
+
+    @Override
+    public boolean hasSerialScanChildren() {
+        return isSerialOperator();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 3a6f6e4f840..262b5836689 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1886,17 +1886,11 @@ public class Coordinator implements CoordInterface {
                             return scanNode.getId().asInt() == planNodeId;
                         }).findFirst();
 
-                        /**
-                         * Ignore storage data distribution iff:
-                         * 1. `parallelExecInstanceNum * numBackends` is 
larger than scan ranges.
-                         * 2. Use Nereids planner.
-                         */
                         boolean sharedScan = true;
                         int expectedInstanceNum = 
Math.min(parallelExecInstanceNum,
                                 leftMostNode.getNumInstances());
-                        boolean ignoreStorageDataDistribution = 
node.isPresent()
-                                && fragment.useSerialSource(context);
-                        if (node.isPresent() && ignoreStorageDataDistribution) 
{
+                        boolean ignoreStorageDataDistribution = 
fragment.useSerialSource(context);
+                        if (ignoreStorageDataDistribution) {
                             expectedInstanceNum = 
Math.max(expectedInstanceNum, 1);
                             // if have limit and no conjuncts, only need 1 
instance to save cpu and
                             // mem resource
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 71b746c7907..115614a4187 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -268,6 +268,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String IGNORE_STORAGE_DATA_DISTRIBUTION = 
"ignore_storage_data_distribution";
 
+    public static final String USE_SERIAL_EXCHANGE = "use_serial_exchange";
+
     public static final String ENABLE_PARALLEL_SCAN = "enable_parallel_scan";
 
     // Limit the max count of scanners to prevent generate too many scanners.
@@ -1112,6 +1114,10 @@ public class SessionVariable implements Serializable, 
Writable {
             varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
     private boolean ignoreStorageDataDistribution = true;
 
+    @VariableMgr.VarAttr(name = USE_SERIAL_EXCHANGE, fuzzy = true,
+            varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
+    private boolean useSerialExchange = false;
+
     @VariableMgr.VarAttr(
             name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = 
VariableAnnotation.EXPERIMENTAL,
             description = {"是否在pipelineX引擎上开启local shuffle优化",
@@ -2353,6 +2359,7 @@ public class SessionVariable implements Serializable, 
Writable {
         this.parallelPrepareThreshold = random.nextInt(32) + 1;
         this.enableCommonExprPushdown = random.nextBoolean();
         this.enableLocalExchange = random.nextBoolean();
+        this.useSerialExchange = random.nextBoolean();
         // This will cause be dead loop, disable it first
         // this.disableJoinReorder = random.nextBoolean();
         this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean();
@@ -4563,6 +4570,10 @@ public class SessionVariable implements Serializable, 
Writable {
         return enableCooldownReplicaAffinity;
     }
 
+    public boolean isUseSerialExchange() {
+        return useSerialExchange && getEnableLocalExchange();
+    }
+
     public void setDisableInvertedIndexV1ForVaraint(boolean 
disableInvertedIndexV1ForVaraint) {
         this.disableInvertedIndexV1ForVaraint = 
disableInvertedIndexV1ForVaraint;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to