This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 28016c53f0 [profile](rf) refactor profile of runtime filters (#19134) 28016c53f0 is described below commit 28016c53f0c635e74ca7ae081a65b170587ed055 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Apr 28 08:46:42 2023 +0800 [profile](rf) refactor profile of runtime filters (#19134) * [profile](rf) refactor profile of runtime filters --------- Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- be/src/exprs/runtime_filter.cpp | 19 ++++++++----------- be/src/exprs/runtime_filter.h | 9 +++++---- be/src/runtime/fragment_mgr.cpp | 3 ++- be/src/vec/exec/scan/vscan_node.cpp | 3 +++ 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 2a9d7cc24f..e18b49b765 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1210,7 +1210,6 @@ bool IRuntimeFilter::await() { return false; } } else { - SCOPED_TIMER(_await_time_cost); std::unique_lock lock(_inner_mutex); if (_rf_state != RuntimeFilterState::READY) { int64_t ms_since_registration = MonotonicMillis() - registration_time_; @@ -1485,23 +1484,21 @@ Status IRuntimeFilter::_create_wrapper(RuntimeState* state, const T* param, Obje void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) { if (_profile_init) { + parent_profile->add_child(_profile.get(), true, nullptr); return; } { - std::lock_guard guard(_inner_mutex); + std::lock_guard guard(_profile_mutex); if (_profile_init) { return; } DCHECK(parent_profile != nullptr); - _profile.reset( - new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id, - ::doris::to_string(_runtime_filter_type)))); + _name = fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id, + ::doris::to_string(_runtime_filter_type)); + _profile.reset(new RuntimeProfile(_name)); _profile_init = true; } parent_profile->add_child(_profile.get(), true, nullptr); - if (!_enable_pipeline_exec) { - _await_time_cost = ADD_TIMER(_profile, "AWaitTimeCost"); - } _profile->add_info_string("Info", _format_status()); if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { update_runtime_filter_type_to_profile(); @@ -1838,8 +1835,8 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) { return Status::OK(); } -Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param) { - int64_t start_update = MonotonicMillis(); +Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param, + int64_t start_apply) { if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) { set_ignored(); const PInFilter in_filter = param->request->in_filter(); @@ -1858,7 +1855,7 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param) { _profile->add_info_string("MergeTime", std::to_string(param->request->merge_time()) + " ms"); _profile->add_info_string("UpdateTime", - std::to_string(MonotonicMillis() - start_update) + " ms"); + std::to_string(MonotonicMillis() - start_apply) + " ms"); return Status::OK(); } diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 52191b5182..b1dbbfa4b2 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -281,7 +281,7 @@ public: void change_to_bloom_filter(); Status init_bloom_filter(const size_t build_bf_cardinality); Status update_filter(const UpdateRuntimeFilterParams* param); - Status update_filter(const UpdateRuntimeFilterParamsV2* param); + Status update_filter(const UpdateRuntimeFilterParamsV2* param, int64_t start_apply); void set_ignored() { _is_ignored = true; } @@ -299,6 +299,8 @@ public: void init_profile(RuntimeProfile* parent_profile); + std::string& get_name() { return _name; } + void update_runtime_filter_type_to_profile(); void ready_for_publish(); @@ -394,8 +396,6 @@ protected: // parent profile // only effect on consumer std::unique_ptr<RuntimeProfile> _profile; - // unix millis - RuntimeProfile::Counter* _await_time_cost = nullptr; /// Time in ms (from MonotonicMillis()), that the filter was registered. const int64_t registration_time_; @@ -403,7 +403,8 @@ protected: const bool _enable_pipeline_exec; bool _profile_init = false; - + doris::Mutex _profile_mutex; + std::string _name; bool _opt_remote_rf; }; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index e2a8802fd9..6fe7079d7f 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1160,6 +1160,7 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, butil::IOBufAsZeroCopyInputStream* attach_data) { bool is_pipeline = request->has_is_pipeline() && request->is_pipeline(); + int64_t start_apply = MonotonicMillis(); const auto& fragment_instance_ids = request->fragment_instance_ids(); if (fragment_instance_ids.size() > 0) { @@ -1205,7 +1206,7 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, int filter_id = request->filter_id(); IRuntimeFilter* real_filter = nullptr; RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filter(filter_id, &real_filter)); - RETURN_IF_ERROR(real_filter->update_filter(¶ms)); + RETURN_IF_ERROR(real_filter->update_filter(¶ms, start_apply)); } return Status::OK(); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 5482df2e20..f926ab15c6 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -119,9 +119,12 @@ Status VScanNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); // init profile for runtime filter + std::stringstream ss; for (auto& rf_ctx : _runtime_filter_ctxs) { rf_ctx.runtime_filter->init_profile(_runtime_profile.get()); + ss << rf_ctx.runtime_filter->get_name() << ", "; } + _runtime_profile->add_info_string("RuntimeFilters: ", ss.str()); if (_is_pipeline_scan) { if (_shared_scan_opt) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org