This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e837013ef38c038b01cc26c40f5222aeda1e01e3
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Apr 12 10:33:13 2024 +0800

    [pipelineX](fix) Fix data pooling judgement for bucket join (#33533)
---
 be/src/runtime/runtime_state.cpp                                   | 7 +++++--
 .../src/main/java/org/apache/doris/planner/OlapScanNode.java       | 5 +++++
 fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java    | 7 ++++++-
 fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java      | 6 ++----
 4 files changed, 18 insertions(+), 7 deletions(-)

diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 2713ee441dd..2d9d939186d 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -531,12 +531,15 @@ Status 
RuntimeState::register_producer_runtime_filter(const doris::TRuntimeFilte
                                                       bool need_local_merge,
                                                       doris::IRuntimeFilter** 
producer_filter,
                                                       bool build_bf_exactly) {
+    // If runtime filter need to be local merged, `build_bf_exactly` will lead 
to bloom filters with
+    // different size need to be merged which is not allowed.
+    // So if `need_local_merge` is true, we will disable `build_bf_exactly`.
     if (desc.has_remote_targets || need_local_merge) {
         return 
global_runtime_filter_mgr()->register_local_merge_producer_filter(
-                desc, query_options(), producer_filter, build_bf_exactly);
+                desc, query_options(), producer_filter, build_bf_exactly && 
!need_local_merge);
     } else {
         return local_runtime_filter_mgr()->register_producer_filter(
-                desc, query_options(), producer_filter, build_bf_exactly);
+                desc, query_options(), producer_filter, build_bf_exactly && 
!need_local_merge);
     }
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 82f1a44a37c..01be11a184b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -1775,4 +1775,9 @@ public class OlapScanNode extends ScanNode {
     public List<SortNode> getTopnFilterSortNodes() {
         return topnFilterSortNodes;
     }
+
+    @Override
+    public int numScanBackends() {
+        return scanBackendIds.size();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 21f6bb07bd7..f92b44ccf4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -728,7 +728,12 @@ public abstract class ScanNode extends PlanNode {
                 && context.getSessionVariable().getEnablePipelineXEngine()
                 && !fragment.hasNullAwareLeftAntiJoin()
                 && getScanRangeNum()
-                < 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * 
numBackends;
+                < 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum()
+                * (numScanBackends() > 0 ? numScanBackends() : numBackends);
+    }
+
+    public int numScanBackends() {
+        return 0;
     }
 
     public int getScanRangeNum() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index bb546d7cb2b..443c8fb5252 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2070,8 +2070,7 @@ public class Coordinator implements CoordInterface {
                                 && 
context.getSessionVariable().isForceToLocalShuffle();
                         boolean ignoreStorageDataDistribution = 
forceToLocalShuffle || (scanNodes.stream()
                                 .allMatch(scanNode -> 
scanNode.ignoreStorageDataDistribution(context,
-                                        
fragmentExecParamsMap.get(scanNode.getFragment().getFragmentId())
-                                                .scanRangeAssignment.size())) 
&& useNereids);
+                                        addressToBackendID.size())) && 
useNereids);
                         if (node.isPresent() && 
(!node.get().shouldDisableSharedScan(context)
                                 || ignoreStorageDataDistribution)) {
                             expectedInstanceNum = 
Math.max(expectedInstanceNum, 1);
@@ -2963,8 +2962,7 @@ public class Coordinator implements CoordInterface {
                 && context.getSessionVariable().isForceToLocalShuffle();
         boolean ignoreStorageDataDistribution = forceToLocalShuffle || 
(scanNodes.stream()
                 .allMatch(node -> node.ignoreStorageDataDistribution(context,
-                        
fragmentExecParamsMap.get(node.getFragment().getFragmentId())
-                                .scanRangeAssignment.size()))
+                        addressToBackendID.size()))
                 && 
addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> {
                     return addressScanRange.getValue().size() < 
parallelExecInstanceNum;
                 }) && useNereids);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to