This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new baf6d8ee78c [pipelineX](fix) Fix data pooling judgement for bucket 
join (#33533)
baf6d8ee78c is described below

commit baf6d8ee78c769d75ff9fc6715fc0b429586a4ee
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Apr 12 10:33:13 2024 +0800

    [pipelineX](fix) Fix data pooling judgement for bucket join (#33533)
---
 be/src/runtime/runtime_state.cpp                                   | 7 +++++--
 .../src/main/java/org/apache/doris/planner/OlapScanNode.java       | 5 +++++
 fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java    | 7 ++++++-
 fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java      | 6 ++----
 4 files changed, 18 insertions(+), 7 deletions(-)

diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 2713ee441dd..2d9d939186d 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -531,12 +531,15 @@ Status 
RuntimeState::register_producer_runtime_filter(const doris::TRuntimeFilte
                                                       bool need_local_merge,
                                                       doris::IRuntimeFilter** 
producer_filter,
                                                       bool build_bf_exactly) {
+    // If runtime filter need to be local merged, `build_bf_exactly` will lead 
to bloom filters with
+    // different size need to be merged which is not allowed.
+    // So if `need_local_merge` is true, we will disable `build_bf_exactly`.
     if (desc.has_remote_targets || need_local_merge) {
         return 
global_runtime_filter_mgr()->register_local_merge_producer_filter(
-                desc, query_options(), producer_filter, build_bf_exactly);
+                desc, query_options(), producer_filter, build_bf_exactly && 
!need_local_merge);
     } else {
         return local_runtime_filter_mgr()->register_producer_filter(
-                desc, query_options(), producer_filter, build_bf_exactly);
+                desc, query_options(), producer_filter, build_bf_exactly && 
!need_local_merge);
     }
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index e96a734151d..61ffc770ae3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -1813,4 +1813,9 @@ public class OlapScanNode extends ScanNode {
     public List<SortNode> getTopnFilterSortNodes() {
         return topnFilterSortNodes;
     }
+
+    @Override
+    public int numScanBackends() {
+        return scanBackendIds.size();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 490a72f895b..a0b5d5911cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -734,7 +734,12 @@ public abstract class ScanNode extends PlanNode {
                 && context.getSessionVariable().getEnablePipelineXEngine()
                 && !fragment.hasNullAwareLeftAntiJoin()
                 && getScanRangeNum()
-                < 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * 
numBackends;
+                < 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum()
+                * (numScanBackends() > 0 ? numScanBackends() : numBackends);
+    }
+
+    public int numScanBackends() {
+        return 0;
     }
 
     public int getScanRangeNum() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 82b7cef3607..c389bd36240 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2072,8 +2072,7 @@ public class Coordinator implements CoordInterface {
                                 && 
context.getSessionVariable().isForceToLocalShuffle();
                         boolean ignoreStorageDataDistribution = 
forceToLocalShuffle || (scanNodes.stream()
                                 .allMatch(scanNode -> 
scanNode.ignoreStorageDataDistribution(context,
-                                        
fragmentExecParamsMap.get(scanNode.getFragment().getFragmentId())
-                                                .scanRangeAssignment.size())) 
&& useNereids);
+                                        addressToBackendID.size())) && 
useNereids);
                         if (node.isPresent() && 
(!node.get().shouldDisableSharedScan(context)
                                 || ignoreStorageDataDistribution)) {
                             expectedInstanceNum = 
Math.max(expectedInstanceNum, 1);
@@ -2965,8 +2964,7 @@ public class Coordinator implements CoordInterface {
                 && context.getSessionVariable().isForceToLocalShuffle();
         boolean ignoreStorageDataDistribution = forceToLocalShuffle || 
(scanNodes.stream()
                 .allMatch(node -> node.ignoreStorageDataDistribution(context,
-                        
fragmentExecParamsMap.get(node.getFragment().getFragmentId())
-                                .scanRangeAssignment.size()))
+                        addressToBackendID.size()))
                 && 
addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> {
                     return addressScanRange.getValue().size() < 
parallelExecInstanceNum;
                 }) && useNereids);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to