This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit e2b79e92d1414f94b81778ad10a0d698a0b04da3 Author: yiguolei <676222...@qq.com> AuthorDate: Sun Jul 23 13:04:33 2023 +0800 [bugfix](runtimefilter) runtime filter is shared between multi instances with same node id, should not cache exprs (#22114) runtime filter is shared among multi instances. in the past, we cached pushdown expr(runtime filter generated) every scannode[runtime filter consumer] will try to call prepare expr but the expr may generated with different fn_context_id --------- Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/exprs/runtime_filter.cpp | 28 +++++----------------------- be/src/exprs/runtime_filter.h | 7 +------ be/src/vec/exec/runtime_filter_consumer.cpp | 6 +++--- 3 files changed, 9 insertions(+), 32 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 6932ad9c6b..02f5e7b514 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1152,35 +1152,17 @@ Status IRuntimeFilter::publish() { } } -Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs) { +Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs, + bool is_late_arrival) { DCHECK(is_consumer()); - if (!_is_ignored) { - _set_push_down(); - _profile->add_info_string("Info", _format_status()); - return _wrapper->get_push_exprs(push_exprs, _vprobe_ctx); - } else { - _profile->add_info_string("Info", _format_status()); - return Status::OK(); - } -} - -Status IRuntimeFilter::get_prepared_exprs(std::vector<vectorized::VExprSPtr>* vexprs, - const RowDescriptor& desc, RuntimeState* state) { _profile->add_info_string("Info", _format_status()); if (_is_ignored) { return Status::OK(); } - DCHECK((!_enable_pipeline_exec && _rf_state == RuntimeFilterState::READY) || - (_enable_pipeline_exec && - _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY)); - DCHECK(is_consumer()); - std::lock_guard guard(_inner_mutex); - - if (_push_down_vexprs.empty()) { - RETURN_IF_ERROR(_wrapper->get_push_exprs(&_push_down_vexprs, _vprobe_ctx)); + if (!is_late_arrival) { + _set_push_down(); } - vexprs->insert(vexprs->end(), _push_down_vexprs.begin(), _push_down_vexprs.end()); - return Status::OK(); + return _wrapper->get_push_exprs(push_exprs, _vprobe_ctx); } bool IRuntimeFilter::await() { diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index a4fd241a28..fb5e43d177 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -221,10 +221,7 @@ public: RuntimeFilterType type() const { return _runtime_filter_type; } - Status get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs); - - Status get_prepared_exprs(std::vector<doris::vectorized::VExprSPtr>* push_exprs, - const RowDescriptor& desc, RuntimeState* state); + Status get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs, bool is_late_arrival); bool is_broadcast_join() const { return _is_broadcast_join; } @@ -385,8 +382,6 @@ protected: bool _is_ignored; std::string _ignored_msg; - std::vector<doris::vectorized::VExprSPtr> _push_down_vexprs; - struct RPCContext; std::shared_ptr<RPCContext> _rpc_context; diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index b05ebf0476..2af841749b 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -95,7 +95,7 @@ Status RuntimeFilterConsumer::_acquire_runtime_filter() { ready = runtime_filter->await(); } if (ready && !_runtime_filter_ctxs[i].apply_mark) { - RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs)); + RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs, false)); _runtime_filter_ctxs[i].apply_mark = true; } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY && !_runtime_filter_ctxs[i].apply_mark) { @@ -151,8 +151,8 @@ Status RuntimeFilterConsumer::try_append_late_arrival_runtime_filter(int* arrive ++current_arrived_rf_num; continue; } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) { - RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_prepared_exprs( - &exprs, _row_descriptor_ref, _state)); + RETURN_IF_ERROR( + _runtime_filter_ctxs[i].runtime_filter->get_push_expr_ctxs(&exprs, true)); ++current_arrived_rf_num; _runtime_filter_ctxs[i].apply_mark = true; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org