This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 786f238bff1 [Bug](runtime-filter) fix unknown filter on nested loop join sink (#32832) 786f238bff1 is described below commit 786f238bff1d17567ead1ec1cf3354346a48473a Author: Pxl <pxl...@qq.com> AuthorDate: Sat Mar 30 09:37:23 2024 +0800 [Bug](runtime-filter) fix unknown filter on nested loop join sink (#32832) fix unknown filter on nested loop join sink --- be/src/pipeline/exec/nested_loop_join_build_operator.cpp | 8 +++++--- be/src/pipeline/exec/nested_loop_join_build_operator.h | 3 ++- be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 4 ++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index fec2edc71b8..f074afce374 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -42,8 +42,8 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta } _runtime_filters.resize(p._runtime_filter_descs.size()); for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) { - RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i], false, - &_runtime_filters[i], false)); + RETURN_IF_ERROR(state->register_producer_runtime_filter( + p._runtime_filter_descs[i], p._need_local_merge, &_runtime_filters[i], false)); } return Status::OK(); } @@ -51,9 +51,11 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta NestedLoopJoinBuildSinkOperatorX::NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, + bool need_local_merge) : JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>(pool, operator_id, tnode, descs), + _need_local_merge(need_local_merge), _is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only && tnode.nested_loop_join_node.is_output_left_side_only), _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index 801d4ff88ea..52f723b13ae 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -78,7 +78,7 @@ class NestedLoopJoinBuildSinkOperatorX final : public JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState> { public: NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool need_local_merge); Status init(const TDataSink& tsink) override { return Status::InternalError( "{} should not init with TDataSink", @@ -105,6 +105,7 @@ private: vectorized::VExprContextSPtrs _filter_src_expr_ctxs; + bool _need_local_merge; const bool _is_output_left_side_only; RowDescriptor _row_descriptor; }; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 55f65061157..6e321f6ca7e 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -1092,8 +1092,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset( - new NestedLoopJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new NestedLoopJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _need_local_merge)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org