This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new baf6d8ee78c [pipelineX](fix) Fix data pooling judgement for bucket join (#33533) baf6d8ee78c is described below commit baf6d8ee78c769d75ff9fc6715fc0b429586a4ee Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Apr 12 10:33:13 2024 +0800 [pipelineX](fix) Fix data pooling judgement for bucket join (#33533) --- be/src/runtime/runtime_state.cpp | 7 +++++-- .../src/main/java/org/apache/doris/planner/OlapScanNode.java | 5 +++++ fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java | 7 ++++++- fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 6 ++---- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 2713ee441dd..2d9d939186d 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -531,12 +531,15 @@ Status RuntimeState::register_producer_runtime_filter(const doris::TRuntimeFilte bool need_local_merge, doris::IRuntimeFilter** producer_filter, bool build_bf_exactly) { + // If runtime filter need to be local merged, `build_bf_exactly` will lead to bloom filters with + // different size need to be merged which is not allowed. + // So if `need_local_merge` is true, we will disable `build_bf_exactly`. if (desc.has_remote_targets || need_local_merge) { return global_runtime_filter_mgr()->register_local_merge_producer_filter( - desc, query_options(), producer_filter, build_bf_exactly); + desc, query_options(), producer_filter, build_bf_exactly && !need_local_merge); } else { return local_runtime_filter_mgr()->register_producer_filter( - desc, query_options(), producer_filter, build_bf_exactly); + desc, query_options(), producer_filter, build_bf_exactly && !need_local_merge); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index e96a734151d..61ffc770ae3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1813,4 +1813,9 @@ public class OlapScanNode extends ScanNode { public List<SortNode> getTopnFilterSortNodes() { return topnFilterSortNodes; } + + @Override + public int numScanBackends() { + return scanBackendIds.size(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 490a72f895b..a0b5d5911cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -734,7 +734,12 @@ public abstract class ScanNode extends PlanNode { && context.getSessionVariable().getEnablePipelineXEngine() && !fragment.hasNullAwareLeftAntiJoin() && getScanRangeNum() - < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * numBackends; + < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() + * (numScanBackends() > 0 ? numScanBackends() : numBackends); + } + + public int numScanBackends() { + return 0; } public int getScanRangeNum() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 82b7cef3607..c389bd36240 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2072,8 +2072,7 @@ public class Coordinator implements CoordInterface { && context.getSessionVariable().isForceToLocalShuffle(); boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream() .allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context, - fragmentExecParamsMap.get(scanNode.getFragment().getFragmentId()) - .scanRangeAssignment.size())) && useNereids); + addressToBackendID.size())) && useNereids); if (node.isPresent() && (!node.get().shouldDisableSharedScan(context) || ignoreStorageDataDistribution)) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); @@ -2965,8 +2964,7 @@ public class Coordinator implements CoordInterface { && context.getSessionVariable().isForceToLocalShuffle(); boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream() .allMatch(node -> node.ignoreStorageDataDistribution(context, - fragmentExecParamsMap.get(node.getFragment().getFragmentId()) - .scanRangeAssignment.size())) + addressToBackendID.size())) && addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> { return addressScanRange.getValue().size() < parallelExecInstanceNum; }) && useNereids); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org