This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit e837013ef38c038b01cc26c40f5222aeda1e01e3 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Apr 12 10:33:13 2024 +0800 [pipelineX](fix) Fix data pooling judgement for bucket join (#33533) --- be/src/runtime/runtime_state.cpp | 7 +++++-- .../src/main/java/org/apache/doris/planner/OlapScanNode.java | 5 +++++ fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java | 7 ++++++- fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 6 ++---- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 2713ee441dd..2d9d939186d 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -531,12 +531,15 @@ Status RuntimeState::register_producer_runtime_filter(const doris::TRuntimeFilte bool need_local_merge, doris::IRuntimeFilter** producer_filter, bool build_bf_exactly) { + // If runtime filter need to be local merged, `build_bf_exactly` will lead to bloom filters with + // different size need to be merged which is not allowed. + // So if `need_local_merge` is true, we will disable `build_bf_exactly`. if (desc.has_remote_targets || need_local_merge) { return global_runtime_filter_mgr()->register_local_merge_producer_filter( - desc, query_options(), producer_filter, build_bf_exactly); + desc, query_options(), producer_filter, build_bf_exactly && !need_local_merge); } else { return local_runtime_filter_mgr()->register_producer_filter( - desc, query_options(), producer_filter, build_bf_exactly); + desc, query_options(), producer_filter, build_bf_exactly && !need_local_merge); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 82f1a44a37c..01be11a184b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1775,4 +1775,9 @@ public class OlapScanNode extends ScanNode { public List<SortNode> getTopnFilterSortNodes() { return topnFilterSortNodes; } + + @Override + public int numScanBackends() { + return scanBackendIds.size(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 21f6bb07bd7..f92b44ccf4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -728,7 +728,12 @@ public abstract class ScanNode extends PlanNode { && context.getSessionVariable().getEnablePipelineXEngine() && !fragment.hasNullAwareLeftAntiJoin() && getScanRangeNum() - < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * numBackends; + < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() + * (numScanBackends() > 0 ? numScanBackends() : numBackends); + } + + public int numScanBackends() { + return 0; } public int getScanRangeNum() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index bb546d7cb2b..443c8fb5252 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2070,8 +2070,7 @@ public class Coordinator implements CoordInterface { && context.getSessionVariable().isForceToLocalShuffle(); boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream() .allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context, - fragmentExecParamsMap.get(scanNode.getFragment().getFragmentId()) - .scanRangeAssignment.size())) && useNereids); + addressToBackendID.size())) && useNereids); if (node.isPresent() && (!node.get().shouldDisableSharedScan(context) || ignoreStorageDataDistribution)) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); @@ -2963,8 +2962,7 @@ public class Coordinator implements CoordInterface { && context.getSessionVariable().isForceToLocalShuffle(); boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream() .allMatch(node -> node.ignoreStorageDataDistribution(context, - fragmentExecParamsMap.get(node.getFragment().getFragmentId()) - .scanRangeAssignment.size())) + addressToBackendID.size())) && addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> { return addressScanRange.getValue().size() < parallelExecInstanceNum; }) && useNereids); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org