This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 786f238bff1 [Bug](runtime-filter) fix unknown filter on nested loop 
join sink (#32832)
786f238bff1 is described below

commit 786f238bff1d17567ead1ec1cf3354346a48473a
Author: Pxl <pxl...@qq.com>
AuthorDate: Sat Mar 30 09:37:23 2024 +0800

    [Bug](runtime-filter) fix unknown filter on nested loop join sink (#32832)
    
    fix unknown filter on nested loop join sink
---
 be/src/pipeline/exec/nested_loop_join_build_operator.cpp   | 8 +++++---
 be/src/pipeline/exec/nested_loop_join_build_operator.h     | 3 ++-
 be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 4 ++--
 3 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index fec2edc71b8..f074afce374 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -42,8 +42,8 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkSta
     }
     _runtime_filters.resize(p._runtime_filter_descs.size());
     for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
-        
RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i],
 false,
-                                                                
&_runtime_filters[i], false));
+        RETURN_IF_ERROR(state->register_producer_runtime_filter(
+                p._runtime_filter_descs[i], p._need_local_merge, 
&_runtime_filters[i], false));
     }
     return Status::OK();
 }
@@ -51,9 +51,11 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkSta
 NestedLoopJoinBuildSinkOperatorX::NestedLoopJoinBuildSinkOperatorX(ObjectPool* 
pool,
                                                                    int 
operator_id,
                                                                    const 
TPlanNode& tnode,
-                                                                   const 
DescriptorTbl& descs)
+                                                                   const 
DescriptorTbl& descs,
+                                                                   bool 
need_local_merge)
         : JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>(pool, 
operator_id, tnode,
                                                                     descs),
+          _need_local_merge(need_local_merge),
           
_is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only
 &&
                                     
tnode.nested_loop_join_node.is_output_left_side_only),
           _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h 
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index 801d4ff88ea..52f723b13ae 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -78,7 +78,7 @@ class NestedLoopJoinBuildSinkOperatorX final
         : public JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState> {
 public:
     NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const 
TPlanNode& tnode,
-                                     const DescriptorTbl& descs);
+                                     const DescriptorTbl& descs, bool 
need_local_merge);
     Status init(const TDataSink& tsink) override {
         return Status::InternalError(
                 "{} should not init with TDataSink",
@@ -105,6 +105,7 @@ private:
 
     vectorized::VExprContextSPtrs _filter_src_expr_ctxs;
 
+    bool _need_local_merge;
     const bool _is_output_left_side_only;
     RowDescriptor _row_descriptor;
 };
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 55f65061157..6e321f6ca7e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -1092,8 +1092,8 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
 
         DataSinkOperatorXPtr sink;
-        sink.reset(
-                new NestedLoopJoinBuildSinkOperatorX(pool, 
next_sink_operator_id(), tnode, descs));
+        sink.reset(new NestedLoopJoinBuildSinkOperatorX(pool, 
next_sink_operator_id(), tnode, descs,
+                                                        _need_local_merge));
         sink->set_dests_id({op->operator_id()});
         RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
         RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, 
_runtime_state.get()));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to