This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4f915129a90920c5936b1b874c7e6710e57f2970
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Sat Jan 27 10:24:36 2024 +0800

    [pipelineX](localexchange) Add local exchange before TabletFunction (#30446)
    
    * [pipelineX](localexchange) Add local exchange before TabletFunction
    
    * update
---
 be/src/pipeline/exec/table_function_operator.h           |  4 ++++
 .../src/main/java/org/apache/doris/qe/Coordinator.java   | 12 ++++++++----
 .../main/java/org/apache/doris/qe/SessionVariable.java   | 16 ++++++++++++++++
 3 files changed, 28 insertions(+), 4 deletions(-)

diff --git a/be/src/pipeline/exec/table_function_operator.h 
b/be/src/pipeline/exec/table_function_operator.h
index a6c90e3dec4..c8f64a447a5 100644
--- a/be/src/pipeline/exec/table_function_operator.h
+++ b/be/src/pipeline/exec/table_function_operator.h
@@ -96,6 +96,10 @@ public:
                local_state._child_source_state != SourceState::FINISHED;
     }
 
+    DataDistribution required_data_distribution() const override {
+        return {ExchangeType::PASSTHROUGH};
+    }
+
     Status push(RuntimeState* state, vectorized::Block* input_block,
                 SourceState source_state) const override {
         auto& local_state = get_local_state(state);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 099aedd7e17..cca9014a715 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2042,10 +2042,12 @@ public class Coordinator implements CoordInterface {
                         boolean sharedScan = true;
                         int expectedInstanceNum = 
Math.min(parallelExecInstanceNum,
                                 leftMostNode.getNumInstances());
-                        boolean ignoreStorageDataDistribution = 
scanNodes.stream()
+                        boolean forceToLocalShuffle = context != null
+                                && 
context.getSessionVariable().isForceToLocalShuffle();
+                        boolean ignoreStorageDataDistribution = 
forceToLocalShuffle || (scanNodes.stream()
                                 .allMatch(scanNode -> 
scanNode.ignoreStorageDataDistribution(context,
                                         
fragmentExecParamsMap.get(scanNode.getFragment().getFragmentId())
-                                                .scanRangeAssignment.size())) 
&& useNereids;
+                                                .scanRangeAssignment.size())) 
&& useNereids);
                         if (node.isPresent() && 
(!node.get().shouldDisableSharedScan(context)
                                 || ignoreStorageDataDistribution)) {
                             expectedInstanceNum = 
Math.max(expectedInstanceNum, 1);
@@ -2917,13 +2919,15 @@ public class Coordinator implements CoordInterface {
          * 1. `parallelExecInstanceNum * numBackends` is larger than scan 
ranges.
          * 2. Use Nereids planner.
          */
-        boolean ignoreStorageDataDistribution = scanNodes.stream()
+        boolean forceToLocalShuffle = context != null
+                && context.getSessionVariable().isForceToLocalShuffle();
+        boolean ignoreStorageDataDistribution = forceToLocalShuffle || 
(scanNodes.stream()
                 .allMatch(node -> node.ignoreStorageDataDistribution(context,
                         
fragmentExecParamsMap.get(node.getFragment().getFragmentId())
                                 .scanRangeAssignment.size()))
                 && 
addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> {
                     return addressScanRange.getValue().size() < 
parallelExecInstanceNum;
-                }) && useNereids;
+                }) && useNereids);
 
         FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
         for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, 
List<TScanRangeParams>>>>> addressScanRange
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 84809f31b85..cef6babc805 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -235,6 +235,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle";
 
+    public static final String FORCE_TO_LOCAL_SHUFFLE = 
"force_to_local_shuffle";
+
     public static final String ENABLE_AGG_STATE = "enable_agg_state";
 
     public static final String ENABLE_RPC_OPT_FOR_PIPELINE = 
"enable_rpc_opt_for_pipeline";
@@ -845,6 +847,12 @@ public class SessionVariable implements Serializable, 
Writable {
                     "Whether to enable local shuffle on pipelineX engine."})
     private boolean enableLocalShuffle = true;
 
+    @VariableMgr.VarAttr(
+                name = FORCE_TO_LOCAL_SHUFFLE, fuzzy = false, varType = 
VariableAnnotation.EXPERIMENTAL,
+                description = {"是否在pipelineX引擎上强制开启local shuffle优化",
+                        "Whether to force to local shuffle on pipelineX 
engine."})
+    private boolean forceToLocalShuffle = false;
+
     @VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType = 
VariableAnnotation.EXPERIMENTAL,
             needForward = true)
     public boolean enableAggState = false;
@@ -3303,4 +3311,12 @@ public class SessionVariable implements Serializable, 
Writable {
     public void setForceJniScanner(boolean force) {
         forceJniScanner = force;
     }
+
+    public boolean isForceToLocalShuffle() {
+        return getEnablePipelineXEngine() && enableLocalShuffle && 
enableNereidsPlanner && forceToLocalShuffle;
+    }
+
+    public void setForceToLocalShuffle(boolean forceToLocalShuffle) {
+        this.forceToLocalShuffle = forceToLocalShuffle;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to