This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 52c45e38aff4c88670f789db2f275b6d8c1dede1 Author: HappenLee <happen...@hotmail.com> AuthorDate: Fri Feb 23 14:42:30 2024 +0800 [Refactor](RF) refactor the profile of rf and pipeline-x support local ignore (#31287) * [Refactor](RF) refactor the profile of rf and pipeline-x support local ignore * fix local merge filter --- be/src/agent/heartbeat_server.cpp | 2 +- be/src/agent/heartbeat_server.h | 2 +- be/src/exprs/runtime_filter.cpp | 76 +++++++--------------- be/src/exprs/runtime_filter.h | 2 +- be/src/exprs/runtime_filter_slots.h | 7 +- be/src/pipeline/exec/scan_operator.cpp | 3 +- be/src/pipeline/pipeline_fragment_context.h | 9 --- be/src/pipeline/pipeline_x/dependency.cpp | 43 ++++++------ be/src/pipeline/pipeline_x/dependency.h | 13 ++-- .../pipeline_x/pipeline_x_fragment_context.cpp | 2 - be/src/runtime/fragment_mgr.cpp | 38 ++++------- be/src/runtime/plan_fragment_executor.h | 8 --- be/src/runtime/query_context.h | 9 ++- be/src/runtime/runtime_filter_mgr.cpp | 5 +- be/src/runtime/runtime_filter_mgr.h | 11 +--- be/src/service/backend_options.cpp | 4 ++ be/src/service/backend_options.h | 1 + be/src/service/doris_main.cpp | 1 - be/src/vec/exec/runtime_filter_consumer.cpp | 6 +- be/src/vec/exec/scan/vscan_node.cpp | 3 +- 20 files changed, 92 insertions(+), 153 deletions(-) diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index a47a02dc938..a3783a07e07 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -215,8 +215,8 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { if (master_info.__isset.backend_id) { _master_info->__set_backend_id(master_info.backend_id); + BackendOptions::set_backend_id(master_info.backend_id); } - if (master_info.__isset.frontend_infos) { ExecEnv::GetInstance()->update_frontends(master_info.frontend_infos); } else { diff --git a/be/src/agent/heartbeat_server.h b/be/src/agent/heartbeat_server.h index 928efb5c620..ce7d60c5b97 100644 --- a/be/src/agent/heartbeat_server.h +++ b/be/src/agent/heartbeat_server.h @@ -39,7 +39,7 @@ public: explicit HeartbeatServer(TMasterInfo* master_info); ~HeartbeatServer() override = default; - virtual void init_cluster_id(); + void init_cluster_id(); // Master send heartbeat to this server // diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 1ec66bf2a87..786876cc796 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -462,19 +462,6 @@ public: switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { - // only in filter can set ignore in merge time - if (_ignored) { - break; - } else if (wrapper->_ignored) { - VLOG_DEBUG << " ignore merge runtime filter(in filter id " << _filter_id - << ") because: " << wrapper->ignored_msg(); - - _ignored = true; - _ignored_msg = wrapper->_ignored_msg; - // release in filter - _context.hybrid_set.reset(); - break; - } // try insert set _context.hybrid_set->insert(wrapper->_context.hybrid_set.get()); if (_max_in_num >= 0 && _context.hybrid_set->size() >= _max_in_num) { @@ -1032,8 +1019,8 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool opt_remo pquery_id->set_lo(_state->query_id.lo()); auto pfragment_instance_id = merge_filter_request->mutable_fragment_instance_id(); - pfragment_instance_id->set_hi(_state->fragment_instance_id().hi()); - pfragment_instance_id->set_lo(_state->fragment_instance_id().lo()); + pfragment_instance_id->set_hi(BackendOptions::get_local_backend().id); + pfragment_instance_id->set_lo((int64_t)this); merge_filter_request->set_filter_id(_filter_id); merge_filter_request->set_opt_remote_rf(opt_remote_rf); @@ -1061,14 +1048,12 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr std::vector<vectorized::VExprSPtr>& push_exprs, bool is_late_arrival) { DCHECK(is_consumer()); - if (_wrapper->is_ignored()) { - return Status::OK(); - } - if (!is_late_arrival) { - _set_push_down(); + if (!_wrapper->is_ignored()) { + _set_push_down(!is_late_arrival); + RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr)); } _profile->add_info_string("Info", _format_status()); - return _wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr); + return Status::OK(); } bool IRuntimeFilter::await() { @@ -1202,9 +1187,9 @@ void IRuntimeFilter::set_ignored(const std::string& msg) { std::string IRuntimeFilter::_format_status() const { return fmt::format( - "[IsPushDown = {}, RuntimeFilterState = {}, IsIgnored = {}, HasRemoteTarget = {}, " + "[IsPushDown = {}, RuntimeFilterState = {}, IgnoredMsg = {}, HasRemoteTarget = {}, " "HasLocalTarget = {}]", - _is_push_down, _get_explain_state_string(), _wrapper->is_ignored(), _has_remote_target, + _is_push_down, _get_explain_state_string(), _wrapper->ignored_msg(), _has_remote_target, _has_local_target); } @@ -1323,11 +1308,7 @@ Status IRuntimeFilter::create_wrapper(const UpdateRuntimeFilterParamsV2* param, } void IRuntimeFilter::change_to_bloom_filter() { - auto origin_type = _wrapper->get_real_type(); _wrapper->change_to_bloom_filter(); - if (origin_type != _wrapper->get_real_type()) { - update_runtime_filter_type_to_profile(); - } } Status IRuntimeFilter::init_bloom_filter(const size_t build_bf_cardinality) { @@ -1367,32 +1348,24 @@ Status IRuntimeFilter::_create_wrapper(const T* param, ObjectPool* pool, void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) { if (_profile_init) { parent_profile->add_child(_profile.get(), true, nullptr); - return; - } - _profile_init = true; - parent_profile->add_child(_profile.get(), true, nullptr); - _profile->add_info_string("Info", _format_status()); - if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { - update_runtime_filter_type_to_profile(); + } else { + _profile_init = true; + parent_profile->add_child(_profile.get(), true, nullptr); + _profile->add_info_string("Info", _format_status()); } } void IRuntimeFilter::update_runtime_filter_type_to_profile() { - if (_profile != nullptr) { - _profile->add_info_string("RealRuntimeFilterType", to_string(_wrapper->get_real_type())); - } + _profile->add_info_string("RealRuntimeFilterType", to_string(_wrapper->get_real_type())); } Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) { - if (!_wrapper->is_ignored() && wrapper->is_ignored()) { + if (wrapper->is_ignored()) { set_ignored(wrapper->ignored_msg()); + } else if (!_wrapper->is_ignored()) { + return _wrapper->merge(wrapper); } - auto origin_type = _wrapper->get_real_type(); - Status status = _wrapper->merge(wrapper); - if (origin_type != _wrapper->get_real_type()) { - update_runtime_filter_type_to_profile(); - } - return status; + return Status::OK(); } template <typename T> @@ -1695,12 +1668,10 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) { if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) { const PInFilter in_filter = param->request->in_filter(); set_ignored(in_filter.ignored_msg()); - } - std::unique_ptr<RuntimePredicateWrapper> wrapper; - RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(param, _pool, &wrapper)); - auto origin_type = _wrapper->get_real_type(); - RETURN_IF_ERROR(_wrapper->merge(wrapper.get())); - if (origin_type != _wrapper->get_real_type()) { + } else { + std::unique_ptr<RuntimePredicateWrapper> wrapper; + RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(param, _pool, &wrapper)); + RETURN_IF_ERROR(_wrapper->merge(wrapper.get())); update_runtime_filter_type_to_profile(); } this->signal(); @@ -1718,11 +1689,8 @@ void IRuntimeFilter::update_filter(RuntimePredicateWrapper* wrapper, int64_t mer if (_wrapper->column_type() != wrapper->column_type()) { wrapper->_column_return_type = _wrapper->_column_return_type; } - auto origin_type = _wrapper->get_real_type(); _wrapper = wrapper; - if (origin_type != _wrapper->get_real_type()) { - update_runtime_filter_type_to_profile(); - } + update_runtime_filter_type_to_profile(); this->signal(); } diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index d853493889c..74b2580a4e6 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -362,7 +362,7 @@ protected: static Status _create_wrapper(const T* param, ObjectPool* pool, std::unique_ptr<RuntimePredicateWrapper>* wrapper); - void _set_push_down() { _is_push_down = true; } + void _set_push_down(bool push_down) { _is_push_down = push_down; } std::string _format_status() const; diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 7f34bf7f2c9..c9c1a996064 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -47,11 +47,8 @@ public: std::map<int, bool> has_in_filter; auto ignore_local_filter = [&](int filter_id) { - // Now pipeline x have bug in ignore, after fix the problem enable ignore logic in pipeline x - if (_need_local_merge) { - return Status::OK(); - } - auto runtime_filter_mgr = state->local_runtime_filter_mgr(); + auto runtime_filter_mgr = _need_local_merge ? state->global_runtime_filter_mgr() + : state->local_runtime_filter_mgr(); std::vector<IRuntimeFilter*> filters; RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, filters)); diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 84e439c9032..119f5e42a60 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -303,8 +303,7 @@ Status ScanLocalState<Derived>::_normalize_predicate( auto impl = conjunct_expr_root->get_impl(); // If impl is not null, which means this a conjuncts from runtime filter. auto cur_expr = impl ? impl.get() : conjunct_expr_root.get(); - bool _is_runtime_filter_predicate = - _rf_vexpr_set.find(conjunct_expr_root) != _rf_vexpr_set.end(); + bool _is_runtime_filter_predicate = _rf_vexpr_set.contains(conjunct_expr_root); SlotDescriptor* slot = nullptr; ColumnValueRangeType* range = nullptr; vectorized::VScanNode::PushDownType pdt = diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 38db8cbe8ff..4c805b50582 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -112,11 +112,6 @@ public: void close_a_pipeline(); - void set_merge_controller_handler( - std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) { - _merge_controller_handler = handler; - } - virtual void add_merge_controller_handler( std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {} @@ -193,10 +188,6 @@ protected: std::shared_ptr<QueryContext> _query_ctx; - // This shared ptr is never used. It is just a reference to hold the object. - // There is a weak ptr in runtime filter manager to reference this object. - std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler; - MonotonicStopWatch _fragment_watcher; RuntimeProfile::Counter* _start_timer = nullptr; RuntimeProfile::Counter* _prepare_timer = nullptr; diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 631a17d193a..56045118a94 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -126,7 +126,7 @@ void RuntimeFilterTimer::call_timeout() { } _call_timeout = true; if (_parent) { - _parent->sub_filters(); + _parent->sub_filters(_filter_id); } } @@ -137,7 +137,7 @@ void RuntimeFilterTimer::call_ready() { } _call_ready = true; if (_parent) { - _parent->sub_filters(); + _parent->sub_filters(_filter_id); } _is_ready = true; } @@ -146,40 +146,43 @@ void RuntimeFilterTimer::call_has_ready() { std::unique_lock<std::mutex> lc(_lock); DCHECK(!_call_timeout); if (!_call_ready) { - _parent->sub_filters(); + _parent->sub_filters(_filter_id); } } -void RuntimeFilterTimer::call_has_release() { - // When the use count is equal to 1, only the timer queue still holds ownership, - // so there is no need to take any action. -} - void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) { + const auto filter_id = runtime_filter->filter_id(); + ; _filters++; + _filter_ready_map[filter_id] = false; int64_t registration_time = runtime_filter->registration_time(); int32 wait_time_ms = runtime_filter->wait_time_ms(); auto filter_timer = std::make_shared<RuntimeFilterTimer>( - registration_time, wait_time_ms, + filter_id, registration_time, wait_time_ms, std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this())); runtime_filter->set_filter_timer(filter_timer); ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer); } -void RuntimeFilterDependency::sub_filters() { - auto value = _filters.fetch_sub(1); - if (value == 1) { - _watcher.stop(); - std::vector<PipelineXTask*> local_block_task {}; - { - std::unique_lock<std::mutex> lc(_task_lock); - *_blocked_by_rf = false; - local_block_task.swap(_blocked_task); +void RuntimeFilterDependency::sub_filters(int id) { + std::vector<PipelineXTask*> local_block_task {}; + { + std::lock_guard<std::mutex> lk(_task_lock); + if (!_filter_ready_map[id]) { + _filter_ready_map[id] = true; + _filters--; } - for (auto* task : local_block_task) { - task->wake_up(); + if (_filters == 0) { + _watcher.stop(); + { + *_blocked_by_rf = false; + local_block_task.swap(_blocked_task); + } } } + for (auto* task : local_block_task) { + task->wake_up(); + } } void LocalExchangeSharedState::sub_running_sink_operators() { diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index f518ef96d46..ccb919c7edf 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -203,9 +203,10 @@ public: class RuntimeFilterDependency; class RuntimeFilterTimer { public: - RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms, + RuntimeFilterTimer(int filter_id, int64_t registration_time, int32_t wait_time_ms, std::shared_ptr<RuntimeFilterDependency> parent) - : _parent(std::move(parent)), + : _filter_id(filter_id), + _parent(std::move(parent)), _registration_time(registration_time), _wait_time_ms(wait_time_ms) {} @@ -215,7 +216,9 @@ public: void call_has_ready(); - void call_has_release(); + // When the use count is equal to 1, only the timer queue still holds ownership, + // so there is no need to take any action. + void call_has_release() {}; bool has_ready(); @@ -223,6 +226,7 @@ public: int32_t wait_time_ms() const { return _wait_time_ms; } private: + int _filter_id = -1; bool _call_ready {}; bool _call_timeout {}; std::shared_ptr<RuntimeFilterDependency> _parent; @@ -303,7 +307,7 @@ public: : Dependency(id, node_id, name, query_ctx) {} Dependency* is_blocked_by(PipelineXTask* task) override; void add_filters(IRuntimeFilter* runtime_filter); - void sub_filters(); + void sub_filters(int id); void set_blocked_by_rf(std::shared_ptr<std::atomic_bool> blocked_by_rf) { _blocked_by_rf = blocked_by_rf; } @@ -312,6 +316,7 @@ public: protected: std::atomic_int _filters; + phmap::flat_hash_map<int, bool> _filter_ready_map; std::shared_ptr<std::atomic_bool> _blocked_by_rf; }; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index efca7c068b6..696fcfefba5 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -518,8 +518,6 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( filterparams->query_id.set_hi(_runtime_state->query_id().hi); filterparams->query_id.set_lo(_runtime_state->query_id().lo); - filterparams->_fragment_instance_id.set_hi(fragment_instance_id.hi); - filterparams->_fragment_instance_id.set_lo(fragment_instance_id.lo); filterparams->be_exec_version = _runtime_state->be_exec_version(); filterparams->query_ctx = _query_ctx.get(); } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 20a78209a36..3c1e64cf6c9 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -719,7 +719,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, static_cast<void>(_runtimefilter_controller.add_entity( params.params, params.params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(fragment_executor->runtime_state()))); - fragment_executor->set_merge_controller_handler(handler); + query_ctx->set_merge_controller_handler(handler); { std::lock_guard<std::mutex> lock(_lock); _fragment_instance_map.insert( @@ -807,7 +807,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, static_cast<void>(_runtimefilter_controller.add_entity( params.local_params[i], params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(context->get_runtime_state()))); - context->set_merge_controller_handler(handler); + query_ctx->set_merge_controller_handler(handler); const TUniqueId& fragment_instance_id = params.local_params[i].fragment_instance_id; { std::lock_guard<std::mutex> lock(_lock); @@ -887,7 +887,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, static_cast<void>(_runtimefilter_controller.add_entity( local_params, params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(context->get_runtime_state()))); - context->set_merge_controller_handler(handler); + query_ctx->set_merge_controller_handler(handler); { std::lock_guard<std::mutex> lock(_lock); _pipeline_map.insert(std::make_pair(fragment_instance_id, context)); @@ -1395,38 +1395,24 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, butil::IOBufAsZeroCopyInputStream* attach_data) { UniqueId queryid = request->query_id(); - bool is_pipeline = request->has_is_pipeline() && request->is_pipeline(); bool opt_remote_rf = request->has_opt_remote_rf() && request->opt_remote_rf(); std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller; RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller)); - auto fragment_instance_id = filter_controller->instance_id(); - TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift(); - std::shared_ptr<PlanFragmentExecutor> fragment_executor; - std::shared_ptr<pipeline::PipelineFragmentContext> pip_context; - if (is_pipeline) { + std::shared_ptr<QueryContext> query_ctx; + { + TUniqueId query_id; + query_id.__set_hi(queryid.hi); + query_id.__set_lo(queryid.lo); std::lock_guard<std::mutex> lock(_lock); - auto iter = _pipeline_map.find(tfragment_instance_id); - if (iter == _pipeline_map.end()) { - VLOG_CRITICAL << "unknown fragment-id:" << fragment_instance_id; - return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string()); + auto iter = _query_ctx_map.find(query_id); + if (iter == _query_ctx_map.end()) { + return Status::InvalidArgument("query-id: {}", queryid.to_string()); } // hold reference to pip_context, or else runtime_state can be destroyed // when filter_controller->merge is still in progress - pip_context = iter->second; - } else { - std::unique_lock<std::mutex> lock(_lock); - auto iter = _fragment_instance_map.find(tfragment_instance_id); - if (iter == _fragment_instance_map.end()) { - VLOG_CRITICAL << "unknown fragment instance id:" << print_id(tfragment_instance_id); - return Status::InvalidArgument("fragment instance id: {}", - print_id(tfragment_instance_id)); - } - - // hold reference to fragment_executor, or else runtime_state can be destroyed - // when filter_controller->merge is still in progress - fragment_executor = iter->second; + query_ctx = iter->second; } auto merge_status = filter_controller->merge(request, attach_data, opt_remote_rf); DCHECK(merge_status.ok()); diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 41fa6c2f819..5529d1ba3b5 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -134,11 +134,6 @@ public: void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; } - void set_merge_controller_handler( - std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) { - _merge_controller_handler = handler; - } - std::shared_ptr<QueryContext> get_query_ctx() { return _query_ctx; } TUniqueId fragment_instance_id() const { return _fragment_instance_id; } @@ -219,9 +214,6 @@ private: RuntimeProfile::Counter* _blocks_produced_counter = nullptr; RuntimeProfile::Counter* _fragment_cpu_timer = nullptr; - // This shared ptr is never used. It is just a reference to hold the object. - // There is a weak ptr in runtime filter manager to reference this object. - std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler; // If set the true, this plan fragment will be executed only after FE send execution start rpc. bool _need_wait_execution_trigger = false; diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 3db91ba2824..d7b3813dcef 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -214,7 +214,11 @@ public: int64_t mem_limit() { return _bytes_limit; } -public: + void set_merge_controller_handler( + std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) { + _merge_controller_handler = handler; + } + DescriptorTbl* desc_tbl = nullptr; bool set_rsc_info = false; std::string user; @@ -282,6 +286,9 @@ private: std::unique_ptr<pipeline::Dependency> _execution_dependency; std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr; + // This shared ptr is never used. It is just a reference to hold the object. + // There is a weak ptr in runtime filter manager to reference this object. + std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler; }; } // namespace doris diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 95f65c5fc32..81d5dc88d54 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -258,11 +258,10 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( return Status::OK(); } -Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId fragment_instance_id, +Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, const TRuntimeFilterParams& runtime_filter_params, const TQueryOptions& query_options) { _query_id = query_id; - _fragment_instance_id = fragment_instance_id; _mem_tracker = std::make_shared<MemTracker>("RuntimeFilterMergeControllerEntity", ExecEnv::GetInstance()->experimental_mem_tracker()); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -518,8 +517,6 @@ RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* sta params->query_id.set_hi(state->query_id().hi); params->query_id.set_lo(state->query_id().lo); - params->_fragment_instance_id.set_hi(state->fragment_instance_id().hi); - params->_fragment_instance_id.set_lo(state->fragment_instance_id().lo); params->be_exec_version = state->be_exec_version(); params->query_ctx = state->get_query_ctx(); return params; diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index c9b455bc107..19908166942 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -136,8 +136,7 @@ public: : _query_id(0, 0), _fragment_instance_id(0, 0), _state(state) {} ~RuntimeFilterMergeControllerEntity() = default; - Status init(UniqueId query_id, UniqueId fragment_instance_id, - const TRuntimeFilterParams& runtime_filter_params, + Status init(UniqueId query_id, const TRuntimeFilterParams& runtime_filter_params, const TQueryOptions& query_options); // handle merge rpc @@ -201,7 +200,6 @@ public: // TODO: why we need string, direct use UniqueId std::string query_id_str = query_id.to_string(); - UniqueId fragment_instance_id = UniqueId(params.fragment_instance_id); uint32_t shard = _get_controller_shard_idx(query_id); std::lock_guard<std::mutex> guard(_controller_mutex[shard]); auto iter = _filter_controller_map[shard].find(query_id_str); @@ -214,8 +212,7 @@ public: }); _filter_controller_map[shard][query_id_str] = *handle; const TRuntimeFilterParams& filter_params = params.runtime_filter_params; - RETURN_IF_ERROR(handle->get()->init(query_id, fragment_instance_id, filter_params, - query_options)); + RETURN_IF_ERROR(handle->get()->init(query_id, filter_params, query_options)); } else { *handle = _filter_controller_map[shard][query_id_str].lock(); } @@ -254,8 +251,6 @@ private: // and the other is local, originating from RuntimeState. // In practice, we have already distinguished between them through UpdateRuntimeFilterParamsV2/V1. // RuntimeState/QueryContext is only used to store runtime_filter_wait_time_ms and enable_pipeline_exec... - -/// TODO: Consider adding checks for global/local. struct RuntimeFilterParamsContext { RuntimeFilterParamsContext() = default; static RuntimeFilterParamsContext* create(RuntimeState* state); @@ -268,10 +263,8 @@ struct RuntimeFilterParamsContext { RuntimeFilterMgr* runtime_filter_mgr; ExecEnv* exec_env; PUniqueId query_id; - PUniqueId _fragment_instance_id; int be_exec_version; QueryContext* query_ctx; QueryContext* get_query_ctx() const { return query_ctx; } - PUniqueId fragment_instance_id() const { return _fragment_instance_id; } }; } // namespace doris diff --git a/be/src/service/backend_options.cpp b/be/src/service/backend_options.cpp index c8325733368..a8c48fd710e 100644 --- a/be/src/service/backend_options.cpp +++ b/be/src/service/backend_options.cpp @@ -74,6 +74,10 @@ TBackend BackendOptions::get_local_backend() { return _backend; } +void BackendOptions::set_backend_id(int64_t backend_id) { + _backend.__set_id(backend_id); +} + void BackendOptions::set_localhost(const std::string& host) { _s_localhost = host; } diff --git a/be/src/service/backend_options.h b/be/src/service/backend_options.h index 72293373883..8f504ba2ea7 100644 --- a/be/src/service/backend_options.h +++ b/be/src/service/backend_options.h @@ -34,6 +34,7 @@ public: static bool init(); static const std::string& get_localhost(); static TBackend get_local_backend(); + static void set_backend_id(int64_t backend_id); static void set_localhost(const std::string& host); static bool is_bind_ipv6(); static const char* get_service_bind_address(); diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index f013b83e68d..b3962af87e3 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -56,7 +56,6 @@ #include "common/config.h" #include "common/daemon.h" #include "common/logging.h" -#include "common/phdr_cache.h" #include "common/signal_handler.h" #include "common/status.h" #include "io/cache/block/block_file_cache_factory.h" diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index e683c4f2be0..52caf84e361 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -37,12 +37,12 @@ Status RuntimeFilterConsumer::init(RuntimeState* state, bool need_local_merge) { } void RuntimeFilterConsumer::_init_profile(RuntimeProfile* profile) { - std::stringstream ss; + fmt::memory_buffer buffer; for (auto& rf_ctx : _runtime_filter_ctxs) { rf_ctx.runtime_filter->init_profile(profile); - ss << rf_ctx.runtime_filter->get_name() << ", "; + fmt::format_to(buffer, "{}, ", rf_ctx.runtime_filter->get_name()); } - profile->add_info_string("RuntimeFilters: ", ss.str()); + profile->add_info_string("RuntimeFilters: ", to_string(buffer)); } Status RuntimeFilterConsumer::_register_runtime_filter(bool need_local_merge) { diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 02557644d5a..4ba8f924c00 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -472,8 +472,7 @@ Status VScanNode::_normalize_predicate(const VExprSPtr& conjunct_expr_root, VExp auto impl = conjunct_expr_root->get_impl(); // If impl is not null, which means this a conjuncts from runtime filter. auto cur_expr = impl ? impl.get() : conjunct_expr_root.get(); - bool _is_runtime_filter_predicate = - _rf_vexpr_set.find(conjunct_expr_root) != _rf_vexpr_set.end(); + bool _is_runtime_filter_predicate = _rf_vexpr_set.contains(conjunct_expr_root); SlotDescriptor* slot = nullptr; ColumnValueRangeType* range = nullptr; PushDownType pdt = PushDownType::UNACCEPTABLE; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org