This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a8535e91af [Improvement](runtimefilter) DO NOT allocate memory for bbf in prepare phase (#13207) a8535e91af is described below commit a8535e91af5b7ee69e8a2f283b5d82852175e5d1 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Mon Oct 10 14:19:33 2022 +0800 [Improvement](runtimefilter) DO NOT allocate memory for bbf in prepare phase (#13207) --- be/src/exprs/bloomfilter_predicate.h | 13 +++++++++++-- be/src/exprs/runtime_filter.cpp | 31 +++++++++++++++++++++++++------ be/src/exprs/runtime_filter.h | 5 ++++- be/src/runtime/fragment_mgr.cpp | 33 +++++++++++++++++++++++++++++++++ be/src/runtime/fragment_mgr.h | 1 + be/src/runtime/runtime_filter_mgr.h | 16 ++++++++++------ 6 files changed, 84 insertions(+), 15 deletions(-) diff --git a/be/src/exprs/bloomfilter_predicate.h b/be/src/exprs/bloomfilter_predicate.h index 2540dc657d..ddc9d8a884 100644 --- a/be/src/exprs/bloomfilter_predicate.h +++ b/be/src/exprs/bloomfilter_predicate.h @@ -101,7 +101,13 @@ public: } Status init_with_fixed_length(int64_t bloom_filter_length) { - DCHECK(!_inited); + if (_inited) { + return Status::OK(); + } + std::lock_guard<std::mutex> l(_lock); + if (_inited) { + return Status::OK(); + } DCHECK(bloom_filter_length >= 0); DCHECK_EQ((bloom_filter_length & (bloom_filter_length - 1)), 0); _bloom_filter_alloced = bloom_filter_length; @@ -117,7 +123,9 @@ public: _bloom_filter.reset(BloomFilterAdaptor::create()); } if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) { - LOG(WARNING) << "bloom filter size not the same"; + LOG(WARNING) << "bloom filter size not the same: already allocated bytes = " + << _bloom_filter_alloced + << ", expected allocated bytes = " << other_func->_bloom_filter_alloced; return Status::InvalidArgument("bloom filter size invalid"); } return _bloom_filter->merge(other_func->_bloom_filter.get()); @@ -166,6 +174,7 @@ protected: int32_t _bloom_filter_alloced; std::shared_ptr<BloomFilterAdaptor> _bloom_filter; bool _inited; + std::mutex _lock; }; template <class T> diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 1311f2b928..24e8e41a84 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -418,7 +418,7 @@ public: _filter_id(filter_id) {} // init runtime filter wrapper // alloc memory to init runtime filter function - Status init(const RuntimeFilterParams* params) { + Status init(const RuntimeFilterParams* params, bool init_bloom_filter) { _max_in_num = params->max_in_num; switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { @@ -432,12 +432,20 @@ public: case RuntimeFilterType::BLOOM_FILTER: { _is_bloomfilter = true; _bloomfilter_func.reset(create_bloom_filter(_column_return_type)); - return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size); + if (init_bloom_filter) { + return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size); + } else { + return Status::OK(); + } } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { _hybrid_set.reset(create_set(_column_return_type)); _bloomfilter_func.reset(create_bloom_filter(_column_return_type)); - return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size); + if (init_bloom_filter) { + return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size); + } else { + return Status::OK(); + } } default: return Status::InvalidArgument("Unknown Filter type"); @@ -461,6 +469,12 @@ public: } } + BloomFilterFuncBase* get_bloomfilter() const { + DCHECK(_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER || + _filter_type == RuntimeFilterType::BLOOM_FILTER); + return _bloomfilter_func.get(); + } + void insert(const void* data) { switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { @@ -1059,7 +1073,7 @@ Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const TRunt *res = pool->add(new IRuntimeFilter(state, pool)); (*res)->set_role(role); UniqueId fragment_instance_id(state->fragment_instance_id()); - return (*res)->init_with_desc(desc, query_options, fragment_instance_id, node_id); + return (*res)->init_with_desc(desc, query_options, fragment_instance_id, true, node_id); } void IRuntimeFilter::insert(const void* data) { @@ -1186,8 +1200,13 @@ void IRuntimeFilter::signal() { _effect_timer.reset(); } +BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const { + return _wrapper->get_bloomfilter(); +} + Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, - UniqueId fragment_instance_id, int node_id) { + UniqueId fragment_instance_id, bool init_bloom_filter, + int node_id) { // if node_id == -1 , it shouldn't be a consumer DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer())); @@ -1235,7 +1254,7 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue } _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, ¶ms)); - return _wrapper->init(¶ms); + return _wrapper->init(¶ms, init_bloom_filter); } Status IRuntimeFilter::serialize(PMergeFilterRequest* request, void** data, int* len) { diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 60162b98f0..6756e2e70f 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -43,6 +43,7 @@ class PInFilter; class PMinMaxFilter; class HashJoinNode; class RuntimeProfile; +class BloomFilterFuncBase; namespace vectorized { class VExpr; @@ -190,7 +191,9 @@ public: // init filter with desc Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, - UniqueId fragment_id = UniqueId(0, 0), int node_id = -1); + UniqueId fragment_id, bool init_bloom_filter = false, int node_id = -1); + + BloomFilterFuncBase* get_bloomfilter() const; // serialize _wrapper to protobuf Status serialize(PMergeFilterRequest* request, void** data, int* len); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 7b5c3af432..0471b63d71 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -27,6 +27,7 @@ #include "common/object_pool.h" #include "common/resource_tls.h" #include "common/signal_handler.h" +#include "exprs/bloomfilter_predicate.h" #include "gen_cpp/DataSinks_types.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/HeartbeatService.h" @@ -425,6 +426,7 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env) : _exec_env(exec_env), _fragment_map(), _fragments_ctx_map(), + _bf_size_map(), _stop_background_threads_latch(1) { _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr"); INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count); @@ -467,6 +469,7 @@ FragmentMgr::~FragmentMgr() { std::lock_guard<std::mutex> lock(_lock); _fragment_map.clear(); _fragments_ctx_map.clear(); + _bf_size_map.clear(); } } @@ -508,6 +511,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi _fragment_map.erase(exec_state->fragment_instance_id()); if (all_done && fragments_ctx) { _fragments_ctx_map.erase(fragments_ctx->query_id); + _bf_size_map.erase(fragments_ctx->query_id); } } @@ -666,6 +670,26 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi _runtimefilter_controller.add_entity(params, &handler); exec_state->set_merge_controller_handler(handler); + auto& runtime_filter_params = params.params.runtime_filter_params; + if (!runtime_filter_params.rid_to_runtime_filter.empty()) { + _bf_size_map.insert({fragments_ctx->query_id, {}}); + } + for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) { + int filter_id = filterid_to_desc.first; + const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id); + if (target_iter == runtime_filter_params.rid_to_target_param.end()) { + continue; + } + const auto& build_iter = runtime_filter_params.runtime_filter_builder_num.find(filter_id); + if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) { + continue; + } + if (filterid_to_desc.second.__isset.bloom_filter_size_bytes) { + _bf_size_map[fragments_ctx->query_id].insert( + {filter_id, filterid_to_desc.second.bloom_filter_size_bytes}); + } + } + RETURN_IF_ERROR(exec_state->prepare(params)); { std::lock_guard<std::mutex> lock(_lock); @@ -683,6 +707,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi // Remove the exec state added std::lock_guard<std::mutex> lock(_lock); _fragment_map.erase(params.params.fragment_instance_id); + _bf_size_map.erase(fragments_ctx->query_id); } exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "push plan fragment to thread pool failed"); @@ -954,6 +979,14 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, const char* UniqueId queryid = request->query_id(); std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller; RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller)); + auto bf_size_for_cur_query = _bf_size_map.find(queryid.to_thrift()); + if (bf_size_for_cur_query != _bf_size_map.end()) { + for (auto& iter : bf_size_for_cur_query->second) { + auto bf = filter_controller->get_filter(iter.first)->filter->get_bloomfilter(); + DCHECK(bf != nullptr); + bf->init_with_fixed_length(iter.second); + } + } RETURN_IF_ERROR(filter_controller->merge(request, attach_data)); return Status::OK(); } diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 66b06540d4..fd54afacd2 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -112,6 +112,7 @@ private: std::unordered_map<TUniqueId, std::shared_ptr<FragmentExecState>> _fragment_map; // query id -> QueryFragmentsCtx std::unordered_map<TUniqueId, std::shared_ptr<QueryFragmentsCtx>> _fragments_ctx_map; + std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> _bf_size_map; CountDownLatch _stop_background_threads_latch; scoped_refptr<Thread> _cancel_thread; diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 1dbb75471e..0db00f2331 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -117,12 +117,6 @@ public: UniqueId query_id() { return _query_id; } -private: - Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc, - const TQueryOptions* query_options, - const std::vector<doris::TRuntimeFilterTargetParams>* target_info, - const int producer_size); - struct RuntimeFilterCntlVal { int64_t create_time; int producer_size; @@ -132,6 +126,16 @@ private: std::unordered_set<std::string> arrive_id; // fragment_instance_id ? std::shared_ptr<ObjectPool> pool; }; + +public: + RuntimeFilterCntlVal* get_filter(int id) { return _filter_map[std::to_string(id)].get(); } + +private: + Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc, + const TQueryOptions* query_options, + const std::vector<doris::TRuntimeFilterTargetParams>* target_info, + const int producer_size); + UniqueId _query_id; UniqueId _fragment_instance_id; // protect _filter_map --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org