This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f41562e6a63 [Fix](rf) fix multi thread init error in RuntimeFilterMergeControllerEntity (#31337) f41562e6a63 is described below commit f41562e6a63814eccf6dbd2587afd9ec90b1aae3 Author: HappenLee <happen...@hotmail.com> AuthorDate: Fri Feb 23 20:34:13 2024 +0800 [Fix](rf) fix multi thread init error in RuntimeFilterMergeControllerEntity (#31337) --- be/src/runtime/fragment_mgr.cpp | 12 +++++++++--- be/src/runtime/runtime_filter_mgr.cpp | 12 +++++------- be/src/runtime/runtime_filter_mgr.h | 18 +++++++----------- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index f6176516721..b9a1a6ef594 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -719,9 +719,11 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, static_cast<void>(_runtimefilter_controller.add_entity( params.params, params.params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(fragment_executor->runtime_state()))); - query_ctx->set_merge_controller_handler(handler); { std::lock_guard<std::mutex> lock(_lock); + if (handler) { + query_ctx->set_merge_controller_handler(handler); + } _fragment_instance_map.insert( std::make_pair(params.params.fragment_instance_id, fragment_executor)); _cv.notify_all(); @@ -807,7 +809,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, static_cast<void>(_runtimefilter_controller.add_entity( params.local_params[i], params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(context->get_runtime_state()))); - query_ctx->set_merge_controller_handler(handler); + if (i == 0 and handler) { + query_ctx->set_merge_controller_handler(handler); + } const TUniqueId& fragment_instance_id = params.local_params[i].fragment_instance_id; { std::lock_guard<std::mutex> lock(_lock); @@ -887,7 +891,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, static_cast<void>(_runtimefilter_controller.add_entity( local_params, params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(context->get_runtime_state()))); - query_ctx->set_merge_controller_handler(handler); + if (i == 0 and handler) { + query_ctx->set_merge_controller_handler(handler); + } { std::lock_guard<std::mutex> lock(_lock); _pipeline_map.insert(std::make_pair(fragment_instance_id, context)); diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 81d5dc88d54..a3b7ad3ddad 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -486,24 +486,22 @@ Status RuntimeFilterMergeController::acquire( UniqueId query_id, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) { uint32_t shard = _get_controller_shard_idx(query_id); std::lock_guard<std::mutex> guard(_controller_mutex[shard]); - std::string query_id_str = query_id.to_string(); - auto iter = _filter_controller_map[shard].find(query_id_str); + auto iter = _filter_controller_map[shard].find(query_id); if (iter == _filter_controller_map[shard].end()) { - LOG(WARNING) << "not found entity, query-id:" << query_id_str; + LOG(WARNING) << "not found entity, query-id:" << query_id.to_string(); return Status::InvalidArgument("not found entity"); } - *handle = _filter_controller_map[shard][query_id_str].lock(); + *handle = _filter_controller_map[shard][query_id].lock(); if (*handle == nullptr) { return Status::InvalidArgument("entity is closed"); } return Status::OK(); } -Status RuntimeFilterMergeController::remove_entity(UniqueId query_id) { +void RuntimeFilterMergeController::remove_entity(UniqueId query_id) { uint32_t shard = _get_controller_shard_idx(query_id); std::lock_guard<std::mutex> guard(_controller_mutex[shard]); - _filter_controller_map[shard].erase(query_id.to_string()); - return Status::OK(); + _filter_controller_map[shard].erase(query_id); } RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* state) { diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 19908166942..d29978d3b60 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -133,7 +133,7 @@ private: class RuntimeFilterMergeControllerEntity { public: RuntimeFilterMergeControllerEntity(RuntimeFilterParamsContext* state) - : _query_id(0, 0), _fragment_instance_id(0, 0), _state(state) {} + : _query_id(0, 0), _state(state) {} ~RuntimeFilterMergeControllerEntity() = default; Status init(UniqueId query_id, const TRuntimeFilterParams& runtime_filter_params, @@ -145,8 +145,6 @@ public: UniqueId query_id() const { return _query_id; } - UniqueId instance_id() const { return _fragment_instance_id; } - struct RuntimeFilterCntlVal { int64_t merge_time; int producer_size; @@ -170,7 +168,6 @@ private: const int producer_size); UniqueId _query_id; - UniqueId _fragment_instance_id; // protect _filter_map std::shared_mutex _filter_map_mutex; std::shared_ptr<MemTracker> _mem_tracker; @@ -199,22 +196,21 @@ public: } // TODO: why we need string, direct use UniqueId - std::string query_id_str = query_id.to_string(); uint32_t shard = _get_controller_shard_idx(query_id); std::lock_guard<std::mutex> guard(_controller_mutex[shard]); - auto iter = _filter_controller_map[shard].find(query_id_str); + auto iter = _filter_controller_map[shard].find(query_id); if (iter == _filter_controller_map[shard].end()) { *handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>( new RuntimeFilterMergeControllerEntity(state), [this](RuntimeFilterMergeControllerEntity* entity) { - static_cast<void>(remove_entity(entity->query_id())); + remove_entity(entity->query_id()); delete entity; }); - _filter_controller_map[shard][query_id_str] = *handle; + _filter_controller_map[shard][query_id] = *handle; const TRuntimeFilterParams& filter_params = params.runtime_filter_params; RETURN_IF_ERROR(handle->get()->init(query_id, filter_params, query_options)); } else { - *handle = _filter_controller_map[shard][query_id_str].lock(); + *handle = _filter_controller_map[shard][query_id].lock(); } return Status::OK(); } @@ -228,7 +224,7 @@ public: // thread safe // remove a entity by query-id // remove_entity will be called automatically by entity when entity is destroyed - Status remove_entity(UniqueId query_id); + void remove_entity(UniqueId query_id); static const int kShardNum = 128; @@ -241,7 +237,7 @@ private: // We store the weak pointer here. // When the external object is destroyed, we need to clear this record using FilterControllerMap = - std::unordered_map<std::string, std::weak_ptr<RuntimeFilterMergeControllerEntity>>; + std::unordered_map<UniqueId, std::weak_ptr<RuntimeFilterMergeControllerEntity>>; // str(query-id) -> entity FilterControllerMap _filter_controller_map[kShardNum]; }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org