This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e92e392d13 [refactor](coordinator) Refactor local shuffle logics 
(#42460)
2e92e392d13 is described below

commit 2e92e392d1325b48d58796d2209309c95497c637
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Oct 25 17:27:09 2024 +0800

    [refactor](coordinator) Refactor local shuffle logics (#42460)
    
    Unify `useSerialSource` of fragments contain no scan nodes and 
`ignoreDataDistribution` of fragments contain scan nodes
---
 .../org/apache/doris/planner/AggregationNode.java  |  1 +
 .../org/apache/doris/planner/AnalyticEvalNode.java |  6 ++++
 .../org/apache/doris/planner/EmptySetNode.java     |  5 ---
 .../org/apache/doris/planner/ExchangeNode.java     | 31 +++++++++++++----
 .../apache/doris/planner/NestedLoopJoinNode.java   | 16 ++++-----
 .../org/apache/doris/planner/PlanFragment.java     | 40 +++++++---------------
 .../java/org/apache/doris/planner/PlanNode.java    | 14 +++++---
 .../java/org/apache/doris/planner/RepeatNode.java  |  1 +
 .../java/org/apache/doris/planner/ScanNode.java    |  4 ++-
 .../java/org/apache/doris/planner/SelectNode.java  |  1 +
 .../java/org/apache/doris/planner/SortNode.java    |  1 +
 .../java/org/apache/doris/planner/UnionNode.java   |  2 ++
 .../main/java/org/apache/doris/qe/Coordinator.java | 23 +++----------
 13 files changed, 74 insertions(+), 71 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index 55d1b4b50c0..446f49c3782 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -488,6 +488,7 @@ public class AggregationNode extends PlanNode {
         }
     }
 
+    // If `GroupingExprs` is empty and agg need to finalize, the result must 
be output by single instance
     @Override
     public boolean isSerialOperator() {
         return aggInfo.getGroupingExprs().isEmpty() && needsFinalize;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
index dce6c3d1b04..7b5998717a2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
@@ -297,6 +297,12 @@ public class AnalyticEvalNode extends PlanNode {
         return output.toString();
     }
 
+    /**
+     * If `partitionExprs` is empty, the result must be output by single 
instance.
+     *
+     * For example, for `window (colA order by colB)`,
+     * all data should be input in this node to ensure the global ordering by 
colB.
+     */
     @Override
     public boolean isSerialOperator() {
         return partitionExprs.isEmpty();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
index f6ddf23429e..e262797a4fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
@@ -80,9 +80,4 @@ public class EmptySetNode extends PlanNode {
     public int getNumInstances() {
         return 1;
     }
-
-    @Override
-    public boolean isSerialOperator() {
-        return true;
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 7af09287191..97d46b109b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -195,11 +195,6 @@ public class ExchangeNode extends PlanNode {
         return prefix + "offset: " + offset + "\n";
     }
 
-    @Override
-    public boolean isMerging() {
-        return mergeInfo != null;
-    }
-
     public boolean isRightChildOfBroadcastHashJoin() {
         return isRightChildOfBroadcastHashJoin;
     }
@@ -208,8 +203,32 @@ public class ExchangeNode extends PlanNode {
         isRightChildOfBroadcastHashJoin = value;
     }
 
+    /**
+     * If table `t1` has unique key `k1` and value column `v1`.
+     * Now use plan below to load data into `t1`:
+     * ```
+     * FRAGMENT 0:
+     *  Merging Exchange (id = 1)
+     *   NL Join (id = 2)
+     *  DataStreamSender (id = 3, dst_id = 3) (TABLET_SINK_SHUFFLE_PARTITIONED)
+     *
+     * FRAGMENT 1:
+     *  Exchange (id = 3)
+     *  OlapTableSink (id = 4) ```
+     *
+     * In this plan, `Exchange (id = 1)` needs to do merge sort using column 
`k1` and `v1` so parallelism
+     * of FRAGMENT 0 must be 1 and data will be shuffled to FRAGMENT 1 which 
also has only 1 instance
+     * because this loading job relies on the global ordering of column `k1` 
and `v1`.
+     *
+     * So FRAGMENT 0 should not use serial source.
+     */
     @Override
     public boolean isSerialOperator() {
-        return true;
+        return partitionType == TPartitionType.UNPARTITIONED && mergeInfo != 
null;
+    }
+
+    @Override
+    public boolean hasSerialChildren() {
+        return isSerialOperator();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
index c7b3525e4cd..e2a7504a98d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
@@ -284,18 +284,16 @@ public class NestedLoopJoinNode extends JoinNodeBase {
 
     /**
      * If joinOp is one of type below:
-     * 1. NULL_AWARE_LEFT_ANTI_JOIN
-     * 2. RIGHT_OUTER_JOIN
-     * 3. RIGHT_ANTI_JOIN
-     * 4. RIGHT_SEMI_JOIN
+     * 1. RIGHT_OUTER_JOIN
+     * 2. RIGHT_ANTI_JOIN
+     * 3. RIGHT_SEMI_JOIN
+     * 4. FULL_OUTER_JOIN
      *
-     * We will
-     * @return
+     * Probe-side must have full data so join is a serial operator.
      */
     @Override
     public boolean isSerialOperator() {
-        return joinOp == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN || joinOp == 
JoinOperator.RIGHT_OUTER_JOIN
-                || joinOp == JoinOperator.RIGHT_ANTI_JOIN || joinOp == 
JoinOperator.RIGHT_SEMI_JOIN
-                || joinOp == JoinOperator.FULL_OUTER_JOIN;
+        return joinOp == JoinOperator.RIGHT_OUTER_JOIN || joinOp == 
JoinOperator.RIGHT_ANTI_JOIN
+                || joinOp == JoinOperator.RIGHT_SEMI_JOIN || joinOp == 
JoinOperator.FULL_OUTER_JOIN;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index 3e3c49bf675..c5a6ec55f63 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -162,6 +162,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     public Optional<NereidsSpecifyInstances<ScanSource>> specifyInstances = 
Optional.empty();
 
     public TQueryCacheParam queryCacheParam;
+    private int numBackends = 0;
 
     /**
      * C'tor for fragment with specific partition; the output is by default 
broadcast.
@@ -504,37 +505,22 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         return planRoot.isNullAwareLeftAntiJoin();
     }
 
-    private boolean isMergingFragment() {
-        return planRoot.isMerging();
-    }
-
     public boolean useSerialSource(ConnectContext context) {
         return context != null
                 && 
context.getSessionVariable().isIgnoreStorageDataDistribution()
+                && queryCacheParam == null
                 && !hasNullAwareLeftAntiJoin()
-                // If input data partition is UNPARTITIONED and sink is 
DataStreamSink and root node is not a serial
-                // operator, we use local exchange to improve parallelism
-                && getDataPartition() == DataPartition.UNPARTITIONED && 
!children.isEmpty()
+                // If planRoot is not a serial operator and has serial 
children, we can use serial source and improve
+                // parallelism of non-serial operators.
                 && sink instanceof DataStreamSink && 
!planRoot.isSerialOperator()
-                /**
-                 * If table `t1` has unique key `k1` and value column `v1`.
-                 * Now use plan below to load data into `t1`:
-                 * ```
-                 * FRAGMENT 0:
-                 *  Merging Exchange (id = 1)
-                 *   NL Join (id = 2)
-                 *  DataStreamSender (id = 3, dst_id = 3) 
(TABLET_SINK_SHUFFLE_PARTITIONED)
-                 *
-                 * FRAGMENT 1:
-                 *  Exchange (id = 3)
-                 *  OlapTableSink (id = 4) ```
-                 *
-                 * In this plan, `Exchange (id = 1)` needs to do merge sort 
using column `k1` and `v1` so parallelism
-                 * of FRAGMENT 0 must be 1 and data will be shuffled to 
FRAGMENT 1 which also has only 1 instance
-                 * because this loading job relies on the global ordering of 
column `k1` and `v1`.
-                 *
-                 * So FRAGMENT 0 should not use serial source.
-                 */
-                && !isMergingFragment();
+                && planRoot.hasSerialChildren();
+    }
+
+    public int getNumBackends() {
+        return numBackends;
+    }
+
+    public void setNumBackends(int numBackends) {
+        this.numBackends = numBackends;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index d1ba493682b..14bd34e93e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -41,6 +41,7 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.TreeNode;
 import org.apache.doris.common.UserException;
 import org.apache.doris.planner.normalize.Normalizer;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.PlanStats;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.statistics.StatsDeriveResult;
@@ -279,10 +280,6 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
implements PlanStats {
         return children.stream().anyMatch(PlanNode::isNullAwareLeftAntiJoin);
     }
 
-    public boolean isMerging() {
-        return children.stream().anyMatch(PlanNode::isMerging);
-    }
-
     public PlanFragment getFragment() {
         return fragment;
     }
@@ -643,7 +640,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
implements PlanStats {
         TPlanNode msg = new TPlanNode();
         msg.node_id = id.asInt();
         msg.setNereidsId(nereidsId);
-        msg.setIsSerialOperator(isSerialOperator());
+        msg.setIsSerialOperator(isSerialOperator() && 
fragment.useSerialSource(ConnectContext.get()));
         msg.num_children = children.size();
         msg.limit = limit;
         for (TupleId tid : tupleIds) {
@@ -1384,4 +1381,11 @@ public abstract class PlanNode extends 
TreeNode<PlanNode> implements PlanStats {
     public boolean isSerialOperator() {
         return false;
     }
+
+    public boolean hasSerialChildren() {
+        if (children.isEmpty()) {
+            return isSerialOperator();
+        }
+        return children.stream().allMatch(PlanNode::hasSerialChildren);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
index 407d8a6444c..2bc4e847ac3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
@@ -201,6 +201,7 @@ public class RepeatNode extends PlanNode {
         return output.toString();
     }
 
+    // Determined by its child.
     @Override
     public boolean isSerialOperator() {
         return children.get(0).isSerialOperator();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 1681699d651..b392075be9b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -851,6 +851,8 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
 
     @Override
     public boolean isSerialOperator() {
-        return true;
+        return numScanBackends() <= 0 || getScanRangeNum()
+                < 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * 
numScanBackends()
+                || (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable().isForceToLocalShuffle());
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
index b3b088837a6..734e9338352 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
@@ -110,6 +110,7 @@ public class SelectNode extends PlanNode {
         return output.toString();
     }
 
+    // Determined by its child.
     @Override
     public boolean isSerialOperator() {
         return children.get(0).isSerialOperator();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
index fc1c50c0bba..e3eb08c3e75 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
@@ -389,6 +389,7 @@ public class SortNode extends PlanNode {
         return new HashSet<>(result);
     }
 
+    // If it's analytic sort or not merged by a followed exchange node, it 
must output the global ordered data.
     @Override
     public boolean isSerialOperator() {
         return !isAnalyticSort && !mergeByexchange;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
index bf48a770f1c..ac66ce718ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
@@ -43,6 +43,8 @@ public class UnionNode extends SetOperationNode {
         toThrift(msg, TPlanNodeType.UNION_NODE);
     }
 
+    // If it is a union without children which means it will output some 
constant values, we should use a serial union
+    // to output non-duplicated data.
     @Override
     public boolean isSerialOperator() {
         return children.isEmpty();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 4eda6775b5c..294e9e34056 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1783,8 +1783,7 @@ public class Coordinator implements CoordInterface {
                 // Using serial source means a serial source operator will be 
used in this fragment (e.g. data will be
                 // shuffled to only 1 exchange operator) and then splitted by 
followed local exchanger
                 int expectedInstanceNum = fragment.getParallelExecNum();
-                boolean useSerialSource = fragment.useSerialSource(context) && 
useNereids
-                        && fragment.queryCacheParam == null;
+                boolean useSerialSource = fragment.useSerialSource(context);
                 if (useSerialSource) {
                     for (int j = 1; j < expectedInstanceNum; j++) {
                         params.instanceExecParams.add(new FInstanceExecParam(
@@ -1824,8 +1823,7 @@ public class Coordinator implements CoordInterface {
                 }
                 // Using serial source means a serial source operator will be 
used in this fragment (e.g. data will be
                 // shuffled to only 1 exchange operator) and then splitted by 
followed local exchanger
-                boolean useSerialSource = fragment.useSerialSource(context) && 
useNereids
-                        && fragment.queryCacheParam == null;
+                boolean useSerialSource = fragment.useSerialSource(context);
                 if (exchangeInstances > 0 && 
fragmentExecParamsMap.get(inputFragmentId)
                         .instanceExecParams.size() > exchangeInstances) {
                     // random select some instance
@@ -1898,12 +1896,8 @@ public class Coordinator implements CoordInterface {
                         boolean sharedScan = true;
                         int expectedInstanceNum = 
Math.min(parallelExecInstanceNum,
                                 leftMostNode.getNumInstances());
-                        boolean forceToLocalShuffle = context != null
-                                && 
context.getSessionVariable().isForceToLocalShuffle()
-                                && !fragment.hasNullAwareLeftAntiJoin() && 
useNereids;
-                        boolean ignoreStorageDataDistribution = 
(forceToLocalShuffle || (node.isPresent()
-                                && 
node.get().ignoreStorageDataDistribution(context, addressToBackendID.size())
-                                && useNereids)) && fragment.queryCacheParam == 
null;
+                        boolean ignoreStorageDataDistribution = 
node.isPresent()
+                                && fragment.useSerialSource(context);
                         if (node.isPresent() && ignoreStorageDataDistribution) 
{
                             expectedInstanceNum = 
Math.max(expectedInstanceNum, 1);
                             // if have limit and no conjuncts, only need 1 
instance to save cpu and
@@ -2750,14 +2744,7 @@ public class Coordinator implements CoordInterface {
          * 1. `parallelExecInstanceNum * numBackends` is larger than scan 
ranges.
          * 2. Use Nereids planner.
          */
-        boolean forceToLocalShuffle = context != null
-                && context.getSessionVariable().isForceToLocalShuffle() && 
!hasNullAwareLeftAntiJoin && useNereids;
-        boolean ignoreStorageDataDistribution = (forceToLocalShuffle || 
(scanNodes.stream()
-                .allMatch(node -> node.ignoreStorageDataDistribution(context,
-                        addressToBackendID.size()))
-                && 
addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> {
-                    return addressScanRange.getValue().size() < 
parallelExecInstanceNum;
-                }) && useNereids)) && params.fragment.queryCacheParam == null;
+        boolean ignoreStorageDataDistribution = params.fragment != null && 
params.fragment.useSerialSource(context);
 
         FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
         for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, 
List<TScanRangeParams>>>>> addressScanRange


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to