This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 843f7b5883a [fix](runtime filter) Fix runtime filter producers (#44293) 843f7b5883a is described below commit 843f7b5883ac652a58da3fce2cf8f1e5587c7dfa Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Wed Nov 20 09:20:25 2024 +0800 [fix](runtime filter) Fix runtime filter producers (#44293) A runtime filter producer may have multiple targets some of which are managed in global mgr and others are managed in local mgr. To process it, producer will be shared by both of global mgr and local mgr. In this PR, a producer will be always created by a local mgr and we can always find it by a queryCtx's RF mgr. --- be/src/exprs/runtime_filter.cpp | 14 +++++++------- be/src/exprs/runtime_filter.h | 15 +++++---------- be/src/runtime/runtime_filter_mgr.cpp | 27 ++++++++++++++------------- be/src/runtime/runtime_filter_mgr.h | 3 ++- be/src/runtime/runtime_state.cpp | 9 ++++++--- 5 files changed, 34 insertions(+), 34 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index f28cc53dcb8..b7af2561fe0 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -976,8 +976,8 @@ private: Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, const RuntimeFilterRole role, int node_id, std::shared_ptr<IRuntimeFilter>* res, - bool build_bf_exactly, bool need_local_merge) { - *res = std::make_shared<IRuntimeFilter>(state, desc, need_local_merge); + bool build_bf_exactly) { + *res = std::make_shared<IRuntimeFilter>(state, desc); (*res)->set_role(role); return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly); } @@ -1311,10 +1311,10 @@ bool IRuntimeFilter::get_ignored() { std::string IRuntimeFilter::formatted_state() const { return fmt::format( - "[IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, " + "[Id = {}, IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, " "HasLocalTarget = {}, Ignored = {}]", - _is_push_down, _get_explain_state_string(), _has_remote_target, _has_local_target, - _wrapper->_context->ignored); + _filter_id, _is_push_down, _get_explain_state_string(), _has_remote_target, + _has_local_target, _wrapper->_context->ignored); } Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, @@ -1505,9 +1505,9 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() { std::string IRuntimeFilter::debug_string() const { return fmt::format( - "RuntimeFilter: (id = {}, type = {}, need_local_merge: {}, is_broadcast: {}, " + "RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, " "build_bf_cardinality: {}", - _filter_id, to_string(_runtime_filter_type), _need_local_merge, _is_broadcast_join, + _filter_id, to_string(_runtime_filter_type), _is_broadcast_join, _wrapper->get_build_bf_cardinality()); } diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 9e0e93433d5..6632c5dc872 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -190,8 +190,7 @@ enum RuntimeFilterState { /// that can be pushed down to node based on the results of the right table. class IRuntimeFilter { public: - IRuntimeFilter(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, - bool need_local_merge = false) + IRuntimeFilter(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc) : _state(state), _filter_id(desc->filter_id), _is_broadcast_join(true), @@ -204,17 +203,16 @@ public: _wait_infinitely(_state->get_query_ctx()->runtime_filter_wait_infinitely()), _rf_wait_time_ms(_state->get_query_ctx()->runtime_filter_wait_time_ms()), _runtime_filter_type(get_runtime_filter_type(desc)), - _profile( - new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})", - _filter_id, to_string(_runtime_filter_type)))), - _need_local_merge(need_local_merge) {} + _profile(new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})", + _filter_id, + to_string(_runtime_filter_type)))) {} ~IRuntimeFilter() = default; static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, const RuntimeFilterRole role, int node_id, std::shared_ptr<IRuntimeFilter>* res, - bool build_bf_exactly = false, bool need_local_merge = false); + bool build_bf_exactly = false); RuntimeFilterContextSPtr& get_shared_context_ref(); @@ -414,9 +412,6 @@ protected: // parent profile // only effect on consumer std::unique_ptr<RuntimeProfile> _profile; - // `_need_local_merge` indicates whether this runtime filter is global on this BE. - // All runtime filters should be merged on each BE before push_to_remote or publish. - bool _need_local_merge = false; std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer; diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 31b9ec3b0c2..b11e8290d96 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -109,8 +109,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc if (!has_exist) { std::shared_ptr<IRuntimeFilter> filter; RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, RuntimeFilterRole::CONSUMER, - node_id, &filter, build_bf_exactly, - need_local_merge)); + node_id, &filter, build_bf_exactly)); _consumer_map[key].emplace_back(node_id, filter); *consumer_filter = filter; } else if (!need_local_merge) { @@ -122,7 +121,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc Status RuntimeFilterMgr::register_local_merge_producer_filter( const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions& options, - std::shared_ptr<IRuntimeFilter>* producer_filter, bool build_bf_exactly) { + std::shared_ptr<IRuntimeFilter> producer_filter, bool build_bf_exactly) { DCHECK(_is_global); SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; @@ -138,21 +137,19 @@ Status RuntimeFilterMgr::register_local_merge_producer_filter( } DCHECK(_state != nullptr); - RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, RuntimeFilterRole::PRODUCER, -1, - producer_filter, build_bf_exactly, true)); { std::lock_guard<std::mutex> l(*iter->second.lock); if (iter->second.filters.empty()) { std::shared_ptr<IRuntimeFilter> merge_filter; RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, RuntimeFilterRole::PRODUCER, -1, &merge_filter, - build_bf_exactly, true)); + build_bf_exactly)); merge_filter->set_ignored(); iter->second.filters.emplace_back(merge_filter); } iter->second.merge_time++; iter->second.merge_size_times++; - iter->second.filters.emplace_back(*producer_filter); + iter->second.filters.emplace_back(producer_filter); } return Status::OK(); } @@ -173,6 +170,16 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters( return Status::OK(); } +doris::LocalMergeFilters* RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id) { + DCHECK(_is_global); + std::lock_guard<std::mutex> l(_lock); + auto iter = _local_merge_producer_map.find(filter_id); + if (iter == _local_merge_producer_map.end()) { + return nullptr; + } + return &iter->second; +} + Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, std::shared_ptr<IRuntimeFilter>* producer_filter, @@ -378,12 +385,6 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz } Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) { - auto filter = try_get_product_filter(request->filter_id()); - if (filter) { - filter->set_synced_size(request->filter_size()); - return Status::OK(); - } - LocalMergeFilters* local_merge_filters = nullptr; RETURN_IF_ERROR(get_local_merge_producer_filters(request->filter_id(), &local_merge_filters)); // first filter size merged filter diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 53520e43a55..dce051ab0d6 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -102,10 +102,11 @@ public: Status register_local_merge_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, - std::shared_ptr<IRuntimeFilter>* producer_filter, + std::shared_ptr<IRuntimeFilter> producer_filter, bool build_bf_exactly = false); Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters** local_merge_filters); + LocalMergeFilters* get_local_merge_producer_filters(int filter_id); Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, std::shared_ptr<IRuntimeFilter>* producer_filter, diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 34b3866febf..116ac95bd36 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -518,10 +518,13 @@ RuntimeFilterMgr* RuntimeState::global_runtime_filter_mgr() { Status RuntimeState::register_producer_runtime_filter( const TRuntimeFilterDesc& desc, std::shared_ptr<IRuntimeFilter>* producer_filter, bool build_bf_exactly) { - RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter( + // Producers are created by local runtime filter mgr and shared by global runtime filter manager. + // When RF is published, consumers in both global and local RF mgr will be found. + RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter( desc, query_options(), producer_filter, build_bf_exactly)); - return local_runtime_filter_mgr()->register_producer_filter(desc, query_options(), - producer_filter, build_bf_exactly); + RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter( + desc, query_options(), *producer_filter, build_bf_exactly)); + return Status::OK(); } Status RuntimeState::register_consumer_runtime_filter( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org