This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 144204fecce872f260c1aedfa514ee1f88c299e2 Author: HappenLee <happen...@hotmail.com> AuthorDate: Fri Jan 26 10:16:17 2024 +0800 [Refactor](Rf) refactor the code of runtime filter (#30268) --- be/src/pipeline/pipeline_fragment_context.cpp | 1 - .../pipeline_x/pipeline_x_fragment_context.cpp | 1 - be/src/runtime/fragment_mgr.cpp | 6 +- be/src/runtime/plan_fragment_executor.cpp | 1 - be/src/runtime/runtime_filter_mgr.cpp | 182 +++++---------------- be/src/runtime/runtime_filter_mgr.h | 46 ++++-- 6 files changed, 75 insertions(+), 162 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 2341071d963..5e8af4940b8 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -243,7 +243,6 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re // TODO should be combine with plan_fragment_executor.prepare funciton SCOPED_ATTACH_TASK(_runtime_state.get()); - static_cast<void>(_runtime_state->runtime_filter_mgr()->init()); _runtime_state->set_be_number(local_params.backend_num); if (request.__isset.backend_id) { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index de0a544f18c..b639603d19e 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -533,7 +533,6 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( if (local_params.__isset.runtime_filter_params) { runtime_filter_mgr->set_runtime_filter_params(local_params.runtime_filter_params); } - RETURN_IF_ERROR(runtime_filter_mgr->init()); filterparams->runtime_filter_mgr = runtime_filter_mgr.get(); _runtime_filter_states.push_back(std::move(filterparams)); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 403c2463c2a..c38365d6a0c 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -778,7 +778,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, std::shared_ptr<RuntimeFilterMergeControllerEntity> handler; // TODO need check the status, but when I add return_if_error the P0 will not pass static_cast<void>(_runtimefilter_controller.add_entity( - params, &handler, + params.params, params.params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(fragment_executor->runtime_state()))); fragment_executor->set_merge_controller_handler(handler); { @@ -866,7 +866,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, for (size_t i = 0; i < params.local_params.size(); i++) { std::shared_ptr<RuntimeFilterMergeControllerEntity> handler; static_cast<void>(_runtimefilter_controller.add_entity( - params, params.local_params[i], &handler, + params.local_params[i], params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(context->get_runtime_state(UniqueId())))); context->set_merge_controller_handler(handler); const TUniqueId& fragment_instance_id = params.local_params[i].fragment_instance_id; @@ -946,7 +946,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::shared_ptr<RuntimeFilterMergeControllerEntity> handler; static_cast<void>(_runtimefilter_controller.add_entity( - params, local_params, &handler, + local_params, params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(context->get_runtime_state(UniqueId())))); context->set_merge_controller_handler(handler); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 2e8735c013d..0e4ecd7f5a7 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -133,7 +133,6 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); SCOPED_ATTACH_TASK(_runtime_state.get()); - static_cast<void>(_runtime_state->runtime_filter_mgr()->init()); _runtime_state->set_be_number(request.backend_num); if (request.__isset.backend_id) { _runtime_state->set_backend_id(request.backend_id); diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 73a043070e3..29bf22535ed 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -54,21 +54,15 @@ struct AsyncRPCContext { RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParamsContext* state) { _state = state; _state->runtime_filter_mgr = this; -} - -Status RuntimeFilterMgr::init() { _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr", ExecEnv::GetInstance()->experimental_mem_tracker()); - return Status::OK(); } Status RuntimeFilterMgr::get_producer_filter(const int filter_id, IRuntimeFilter** target) { - int32_t key = filter_id; - std::lock_guard<std::mutex> l(_lock); - auto iter = _producer_map.find(key); + auto iter = _producer_map.find(filter_id); if (iter == _producer_map.end()) { - return Status::InvalidArgument("unknown filter: {}, role: PRODUCER", key); + return Status::InvalidArgument("unknown filter: {}, role: PRODUCER", filter_id); } *target = iter->second; @@ -94,11 +88,10 @@ Status RuntimeFilterMgr::get_consume_filter(const int filter_id, const int node_ Status RuntimeFilterMgr::get_consume_filters(const int filter_id, std::vector<IRuntimeFilter*>& consumer_filters) { - int32_t key = filter_id; std::lock_guard<std::mutex> l(_lock); - auto iter = _consumer_map.find(key); + auto iter = _consumer_map.find(filter_id); if (iter == _consumer_map.end()) { - return Status::InvalidArgument("unknown filter: {}, role: CONSUMER.", key); + return Status::InvalidArgument("unknown filter: {}, role: CONSUMER.", filter_id); } for (auto& holder : iter->second) { consumer_filters.emplace_back(holder.filter); @@ -114,52 +107,31 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc std::lock_guard<std::mutex> l(_lock); auto iter = _consumer_map.find(key); - if (desc.__isset.opt_remote_rf && desc.opt_remote_rf && desc.has_remote_targets && - desc.type == TRuntimeFilterType::BLOOM) { - // if this runtime filter has remote target (e.g. need merge), we reuse the runtime filter between all instances - - iter = _consumer_map.find(key); - if (iter != _consumer_map.end()) { - for (auto holder : iter->second) { - if (holder.node_id == node_id) { - return Status::OK(); - } - } - } - IRuntimeFilter* filter; - RETURN_IF_ERROR(IRuntimeFilter::create(_state, _state->obj_pool(), &desc, &options, - RuntimeFilterRole::CONSUMER, node_id, &filter, - build_bf_exactly)); - _consumer_map[key].emplace_back(node_id, filter); - } else if (is_global) { - if (iter != _consumer_map.end()) { - for (auto holder : iter->second) { - if (holder.node_id == node_id) { - return Status::OK(); - } + bool has_exist = false; + if (iter != _consumer_map.end()) { + for (auto holder : iter->second) { + if (holder.node_id == node_id) { + has_exist = true; } } + } - IRuntimeFilter* filter; - RETURN_IF_ERROR(IRuntimeFilter::create(_state, _state->obj_pool(), &desc, &options, - RuntimeFilterRole::CONSUMER, node_id, &filter, - build_bf_exactly, is_global)); - _consumer_map[key].emplace_back(node_id, filter); - } else { - if (iter != _consumer_map.end()) { - for (auto holder : iter->second) { - if (holder.node_id == node_id) { - return Status::InvalidArgument("filter has registered"); - } - } - } + // TODO: make the two case as one case to judge + bool remote_opt_or_global = + (desc.__isset.opt_remote_rf && desc.opt_remote_rf && desc.has_remote_targets && + desc.type == TRuntimeFilterType::BLOOM) || + is_global; + if (!has_exist) { IRuntimeFilter* filter; - RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, - RuntimeFilterRole::CONSUMER, node_id, &filter, - build_bf_exactly)); + RETURN_IF_ERROR(IRuntimeFilter::create( + _state, remote_opt_or_global ? _state->obj_pool() : &_pool, &desc, &options, + RuntimeFilterRole::CONSUMER, node_id, &filter, build_bf_exactly, is_global)); _consumer_map[key].emplace_back(node_id, filter); + } else if (!remote_opt_or_global) { + return Status::InvalidArgument("filter has registered"); } + return Status::OK(); } @@ -218,21 +190,21 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( const std::vector<doris::TRuntimeFilterTargetParams>* target_info, const int producer_size) { std::unique_lock<std::shared_mutex> guard(_filter_map_mutex); - std::shared_ptr<RuntimeFilterCntlVal> cntVal = std::make_shared<RuntimeFilterCntlVal>(); + std::shared_ptr<RuntimeFilterCntlVal> cnt_val = std::make_shared<RuntimeFilterCntlVal>(); // runtime_filter_desc and target will be released, - // so we need to copy to cntVal - cntVal->producer_size = producer_size; - cntVal->runtime_filter_desc = *runtime_filter_desc; - cntVal->target_info = *target_info; - cntVal->pool.reset(new ObjectPool()); - cntVal->filter = cntVal->pool->add( + // so we need to copy to cnt_val + cnt_val->producer_size = producer_size; + cnt_val->runtime_filter_desc = *runtime_filter_desc; + cnt_val->target_info = *target_info; + cnt_val->pool.reset(new ObjectPool()); + cnt_val->filter = cnt_val->pool->add( new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool, runtime_filter_desc)); auto filter_id = runtime_filter_desc->filter_id; // LOG(INFO) << "entity filter id:" << filter_id; - static_cast<void>( - cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options, -1, false)); - _filter_map.emplace(filter_id, CntlValwithLock {cntVal, std::make_unique<std::mutex>()}); + RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options, + -1, false)); + _filter_map.emplace(filter_id, CntlValwithLock {cnt_val, std::make_unique<std::mutex>()}); return Status::OK(); } @@ -241,20 +213,20 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( const std::vector<doris::TRuntimeFilterTargetParamsV2>* targetv2_info, const int producer_size) { std::unique_lock<std::shared_mutex> guard(_filter_map_mutex); - std::shared_ptr<RuntimeFilterCntlVal> cntVal = std::make_shared<RuntimeFilterCntlVal>(); + std::shared_ptr<RuntimeFilterCntlVal> cnt_val = std::make_shared<RuntimeFilterCntlVal>(); // runtime_filter_desc and target will be released, - // so we need to copy to cntVal - cntVal->producer_size = producer_size; - cntVal->runtime_filter_desc = *runtime_filter_desc; - cntVal->targetv2_info = *targetv2_info; - cntVal->pool.reset(new ObjectPool()); - cntVal->filter = cntVal->pool->add( + // so we need to copy to cnt_val + cnt_val->producer_size = producer_size; + cnt_val->runtime_filter_desc = *runtime_filter_desc; + cnt_val->targetv2_info = *targetv2_info; + cnt_val->pool.reset(new ObjectPool()); + cnt_val->filter = cnt_val->pool->add( new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool, runtime_filter_desc)); auto filter_id = runtime_filter_desc->filter_id; // LOG(INFO) << "entity filter id:" << filter_id; - static_cast<void>(cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options)); - _filter_map.emplace(filter_id, CntlValwithLock {cntVal, std::make_unique<std::mutex>()}); + RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options)); + _filter_map.emplace(filter_id, CntlValwithLock {cnt_val, std::make_unique<std::mutex>()}); return Status::OK(); } @@ -308,7 +280,6 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId frag Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* request, butil::IOBufAsZeroCopyInputStream* attach_data, bool opt_remote_rf) { - _opt_remote_rf = _opt_remote_rf && opt_remote_rf; SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); std::shared_ptr<RuntimeFilterCntlVal> cntVal; int merged_size = 0; @@ -485,70 +456,6 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ return Status::OK(); } -Status RuntimeFilterMergeController::add_entity( - const TExecPlanFragmentParams& params, - std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle, - RuntimeFilterParamsContext* state) { - if (!params.params.__isset.runtime_filter_params || - params.params.runtime_filter_params.rid_to_runtime_filter.size() == 0) { - return Status::OK(); - } - - runtime_filter_merge_entity_closer entity_closer = - std::bind(runtime_filter_merge_entity_close, this, std::placeholders::_1); - - UniqueId query_id(params.params.query_id); - std::string query_id_str = query_id.to_string(); - UniqueId fragment_instance_id = UniqueId(params.params.fragment_instance_id); - uint32_t shard = _get_controller_shard_idx(query_id); - std::lock_guard<std::mutex> guard(_controller_mutex[shard]); - auto iter = _filter_controller_map[shard].find(query_id_str); - - if (iter == _filter_controller_map[shard].end()) { - *handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>( - new RuntimeFilterMergeControllerEntity(state), entity_closer); - _filter_controller_map[shard][query_id_str] = *handle; - const TRuntimeFilterParams& filter_params = params.params.runtime_filter_params; - RETURN_IF_ERROR(handle->get()->init(query_id, fragment_instance_id, filter_params, - params.query_options)); - } else { - *handle = _filter_controller_map[shard][query_id_str].lock(); - } - return Status::OK(); -} - -Status RuntimeFilterMergeController::add_entity( - const TPipelineFragmentParams& params, const TPipelineInstanceParams& local_params, - std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle, - RuntimeFilterParamsContext* state) { - if (!local_params.__isset.runtime_filter_params || - local_params.runtime_filter_params.rid_to_runtime_filter.size() == 0) { - return Status::OK(); - } - - runtime_filter_merge_entity_closer entity_closer = - std::bind(runtime_filter_merge_entity_close, this, std::placeholders::_1); - - UniqueId query_id(params.query_id); - std::string query_id_str = query_id.to_string(); - UniqueId fragment_instance_id = UniqueId(local_params.fragment_instance_id); - uint32_t shard = _get_controller_shard_idx(query_id); - std::lock_guard<std::mutex> guard(_controller_mutex[shard]); - auto iter = _filter_controller_map[shard].find(query_id_str); - - if (iter == _filter_controller_map[shard].end()) { - *handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>( - new RuntimeFilterMergeControllerEntity(state), entity_closer); - _filter_controller_map[shard][query_id_str] = *handle; - const TRuntimeFilterParams& filter_params = local_params.runtime_filter_params; - RETURN_IF_ERROR(handle->get()->init(query_id, fragment_instance_id, filter_params, - params.query_options)); - } else { - *handle = _filter_controller_map[shard][query_id_str].lock(); - } - return Status::OK(); -} - Status RuntimeFilterMergeController::acquire( UniqueId query_id, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) { uint32_t shard = _get_controller_shard_idx(query_id); @@ -573,13 +480,6 @@ Status RuntimeFilterMergeController::remove_entity(UniqueId query_id) { return Status::OK(); } -// auto called while call ~std::shared_ptr<RuntimeFilterMergeControllerEntity> -void runtime_filter_merge_entity_close(RuntimeFilterMergeController* controller, - RuntimeFilterMergeControllerEntity* entity) { - static_cast<void>(controller->remove_entity(entity->query_id())); - delete entity; -} - RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* state) { RuntimeFilterParamsContext* params = state->obj_pool()->add(new RuntimeFilterParamsContext()); params->runtime_filter_wait_infinitely = state->runtime_filter_wait_infinitely(); diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 86e470a706e..24ab78464db 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -71,8 +71,6 @@ public: ~RuntimeFilterMgr() = default; - Status init(); - Status get_consume_filter(const int filter_id, const int node_id, IRuntimeFilter** consumer_filter); @@ -114,7 +112,7 @@ private: TNetworkAddress _merge_addr; - bool _has_merge_addr; + bool _has_merge_addr = false; std::mutex _lock; }; @@ -174,7 +172,6 @@ private: std::pair<std::shared_ptr<RuntimeFilterCntlVal>, std::unique_ptr<std::mutex>>; std::map<int, CntlValwithLock> _filter_map; RuntimeFilterParamsContext* _state = nullptr; - bool _opt_remote_rf = true; }; // RuntimeFilterMergeController has a map query-id -> entity @@ -187,13 +184,37 @@ public: // add a query-id -> entity // If a query-id -> entity already exists // add_entity will return a exists entity - Status add_entity(const TExecPlanFragmentParams& params, - std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle, - RuntimeFilterParamsContext* state); - Status add_entity(const TPipelineFragmentParams& params, - const TPipelineInstanceParams& local_params, + Status add_entity(const auto& params, UniqueId query_id, const TQueryOptions& query_options, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle, - RuntimeFilterParamsContext* state); + RuntimeFilterParamsContext* state) { + if (!params.__isset.runtime_filter_params || + params.runtime_filter_params.rid_to_runtime_filter.size() == 0) { + return Status::OK(); + } + + // TODO: why we need string, direct use UniqueId + std::string query_id_str = query_id.to_string(); + UniqueId fragment_instance_id = UniqueId(params.fragment_instance_id); + uint32_t shard = _get_controller_shard_idx(query_id); + std::lock_guard<std::mutex> guard(_controller_mutex[shard]); + auto iter = _filter_controller_map[shard].find(query_id_str); + if (iter == _filter_controller_map[shard].end()) { + *handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>( + new RuntimeFilterMergeControllerEntity(state), + [this](RuntimeFilterMergeControllerEntity* entity) { + static_cast<void>(remove_entity(entity->query_id())); + delete entity; + }); + _filter_controller_map[shard][query_id_str] = *handle; + const TRuntimeFilterParams& filter_params = params.runtime_filter_params; + RETURN_IF_ERROR(handle->get()->init(query_id, fragment_instance_id, filter_params, + query_options)); + } else { + *handle = _filter_controller_map[shard][query_id_str].lock(); + } + return Status::OK(); + } + // thread safe // increase a reference count // if a query-id is not exist @@ -221,11 +242,6 @@ private: FilterControllerMap _filter_controller_map[kShardNum]; }; -using runtime_filter_merge_entity_closer = std::function<void(RuntimeFilterMergeControllerEntity*)>; - -void runtime_filter_merge_entity_close(RuntimeFilterMergeController* controller, - RuntimeFilterMergeControllerEntity* entity); - //There are two types of runtime filters: // one is global, originating from QueryContext, // and the other is local, originating from RuntimeState. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org