This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e2b79e92d1414f94b81778ad10a0d698a0b04da3
Author: yiguolei <676222...@qq.com>
AuthorDate: Sun Jul 23 13:04:33 2023 +0800

    [bugfix](runtimefilter) runtime filter is shared between multi instances 
with same node id, should not cache exprs (#22114)
    
    
    runtime filter is shared among multi instances.
    in the past, we cached pushdown expr(runtime filter generated)
    every scannode[runtime filter consumer] will try to call prepare expr
    but the expr may generated with different fn_context_id
    ---------
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/exprs/runtime_filter.cpp             | 28 +++++-----------------------
 be/src/exprs/runtime_filter.h               |  7 +------
 be/src/vec/exec/runtime_filter_consumer.cpp |  6 +++---
 3 files changed, 9 insertions(+), 32 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 6932ad9c6b..02f5e7b514 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1152,35 +1152,17 @@ Status IRuntimeFilter::publish() {
     }
 }
 
-Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* 
push_exprs) {
+Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* 
push_exprs,
+                                          bool is_late_arrival) {
     DCHECK(is_consumer());
-    if (!_is_ignored) {
-        _set_push_down();
-        _profile->add_info_string("Info", _format_status());
-        return _wrapper->get_push_exprs(push_exprs, _vprobe_ctx);
-    } else {
-        _profile->add_info_string("Info", _format_status());
-        return Status::OK();
-    }
-}
-
-Status IRuntimeFilter::get_prepared_exprs(std::vector<vectorized::VExprSPtr>* 
vexprs,
-                                          const RowDescriptor& desc, 
RuntimeState* state) {
     _profile->add_info_string("Info", _format_status());
     if (_is_ignored) {
         return Status::OK();
     }
-    DCHECK((!_enable_pipeline_exec && _rf_state == RuntimeFilterState::READY) 
||
-           (_enable_pipeline_exec &&
-            _rf_state_atomic.load(std::memory_order_acquire) == 
RuntimeFilterState::READY));
-    DCHECK(is_consumer());
-    std::lock_guard guard(_inner_mutex);
-
-    if (_push_down_vexprs.empty()) {
-        RETURN_IF_ERROR(_wrapper->get_push_exprs(&_push_down_vexprs, 
_vprobe_ctx));
+    if (!is_late_arrival) {
+        _set_push_down();
     }
-    vexprs->insert(vexprs->end(), _push_down_vexprs.begin(), 
_push_down_vexprs.end());
-    return Status::OK();
+    return _wrapper->get_push_exprs(push_exprs, _vprobe_ctx);
 }
 
 bool IRuntimeFilter::await() {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index a4fd241a28..fb5e43d177 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -221,10 +221,7 @@ public:
 
     RuntimeFilterType type() const { return _runtime_filter_type; }
 
-    Status get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs);
-
-    Status get_prepared_exprs(std::vector<doris::vectorized::VExprSPtr>* 
push_exprs,
-                              const RowDescriptor& desc, RuntimeState* state);
+    Status get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs, 
bool is_late_arrival);
 
     bool is_broadcast_join() const { return _is_broadcast_join; }
 
@@ -385,8 +382,6 @@ protected:
     bool _is_ignored;
     std::string _ignored_msg;
 
-    std::vector<doris::vectorized::VExprSPtr> _push_down_vexprs;
-
     struct RPCContext;
 
     std::shared_ptr<RPCContext> _rpc_context;
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp 
b/be/src/vec/exec/runtime_filter_consumer.cpp
index b05ebf0476..2af841749b 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -95,7 +95,7 @@ Status RuntimeFilterConsumer::_acquire_runtime_filter() {
             ready = runtime_filter->await();
         }
         if (ready && !_runtime_filter_ctxs[i].apply_mark) {
-            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs));
+            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs, 
false));
             _runtime_filter_ctxs[i].apply_mark = true;
         } else if (runtime_filter->current_state() == 
RuntimeFilterState::NOT_READY &&
                    !_runtime_filter_ctxs[i].apply_mark) {
@@ -151,8 +151,8 @@ Status 
RuntimeFilterConsumer::try_append_late_arrival_runtime_filter(int* arrive
             ++current_arrived_rf_num;
             continue;
         } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
-            
RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_prepared_exprs(
-                    &exprs, _row_descriptor_ref, _state));
+            RETURN_IF_ERROR(
+                    
_runtime_filter_ctxs[i].runtime_filter->get_push_expr_ctxs(&exprs, true));
             ++current_arrived_rf_num;
             _runtime_filter_ctxs[i].apply_mark = true;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to