This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0a7c2fc928c [refactor](profile) Refactor of RuntimeFilter profile (#49777) 0a7c2fc928c is described below commit 0a7c2fc928ce41dc84e25c1af3e337fcfaf57327 Author: zhiqiang <hezhiqi...@selectdb.com> AuthorDate: Tue Apr 8 18:24:24 2025 +0800 [refactor](profile) Refactor of RuntimeFilter profile (#49777) ### What problem does this PR solve? Refactor of runtime filter profile. 1. Counter is shared by related objects. 2. Update RuntimeProfile only when pipeline task is closed. 3. A new Counter type, which is similar to info_string but could be added as a Counter. ``` use ssb; SELECT c_city, s_city, d_year, SUM(lo_revenue) AS REVENUE FROM customer, lineorder, supplier, dates WHERE lo_custkey = c_custkey AND lo_suppkey = s_suppkey AND lo_orderdate = d_datekey AND ( c_city = 'UNITED KI1' OR c_city = 'UNITED KI5' ) AND ( s_city = 'UNITED KI1' OR s_city = 'UNITED KI5' ) AND d_year >= 1992 AND d_year <= 1997 GROUP BY c_city, s_city, d_year ORDER BY d_year ASC, REVENUE DESC ``` We will have a structured counter in executon profile like ```text - RuntimeFilterInfo: - AcquireRuntimeFilter: 6.972ms - RF0 AlwaysTrueFilterRows: 0 - RF0 FilterRows: 41 - RF0 Info: Consumer: ([id: 0, state: [READY], type: MINMAX_FILTER, column_type: INT], mode: LOCAL, state: APPLIED) - RF0 InputRows: 48.757K (48757) - RF0 WaitTime: 161.0ms - RF1 AlwaysTrueFilterRows: 0 - RF1 FilterRows: 11.288919M (11288919) - RF1 Info: Consumer: ([id: 1, state: [READY], type: IN_OR_BLOOM_FILTER(BLOOM_FILTER), column_type: INT, bf_size: 1048576, build_bf_by_runtime_size: true], mode: LOCAL, state: APPLIED) - RF1 InputRows: 11.381544M (11381544) - RF1 WaitTime: 160.0ms - RF2 AlwaysTrueFilterRows: 0 - RF2 FilterRows: 0 - RF2 Info: Consumer: ([id: 2, state: [READY], type: MINMAX_FILTER, column_type: INT], mode: LOCAL, state: APPLIED) - RF2 InputRows: 48.686K (48686) - RF2 WaitTime: 224.0ms - RF3 AlwaysTrueFilterRows: 0 - RF3 FilterRows: 91.841K (91841) - RF3 Info: Consumer: ([id: 3, state: [READY], type: IN_OR_BLOOM_FILTER(BLOOM_FILTER), column_type: INT, bf_size: 1048576, build_bf_by_runtime_size: true], mode: LOCAL, state: APPLIED) - RF3 InputRows: 92.625K (92625) - RF3 WaitTime: 223.0ms - RF4 AlwaysTrueFilterRows: 0 - RF4 FilterRows: 0 - RF4 Info: Consumer: ([id: 4, state: [READY], type: MINMAX_FILTER, column_type: INT], mode: LOCAL, state: APPLIED) - RF4 InputRows: 0 - RF4 WaitTime: 149.0ms - RF5 AlwaysTrueFilterRows: 0 - RF5 FilterRows: 0 - RF5 Info: Consumer: ([id: 5, state: [READY], type: IN_OR_BLOOM_FILTER(BLOOM_FILTER), column_type: INT, bf_size: 1048576, build_bf_by_runtime_size: true], mode: LOCAL, state: APPLIED) - RF5 InputRows: 7 - RF5 WaitTime: 149.0ms ``` Merged profile will be like ``` - RuntimeFilterInfo: sum , avg , max , min - RF0 FilterRows: sum 2.139K (2139), avg 44, max 59, min 30 - RF0 InputRows: sum 2.340004M (2340004), avg 48.75K (48750), max 48.76K (48760), min 48.741K (48741) - RF1 FilterRows: sum 542.210415M (542210415), avg 11.29605M (11296050), max 11.307756M (11307756), min 11.281476M (11281476) - RF1 InputRows: sum 546.667366M (546667366), avg 11.388903M (11388903), max 11.400674M (11400674), min 11.374343M (11374343) - RF2 FilterRows: sum 109, avg 2, max 12, min 0 - RF2 InputRows: sum 2.336525M (2336525), avg 48.677K (48677), max 48.708K (48708), min 48.645K (48645) - RF3 FilterRows: sum 4.421298M (4421298), avg 92.11K (92110), max 92.878K (92878), min 91.634K (91634) - RF3 InputRows: sum 4.456951M (4456951), avg 92.853K (92853), max 93.618K (93618), min 92.338K (92338) - RF4 FilterRows: sum 0, avg 0, max 0, min 0 - RF4 InputRows: sum 0, avg 0, max 0, min 0 - RF5 FilterRows: sum 0, avg 0, max 0, min 0 - RF5 InputRows: sum 340, avg 7, max 9, min 6 ``` --- be/src/exec/olap_common.h | 28 +++++-- be/src/olap/bitmap_filter_predicate.h | 2 - be/src/olap/column_predicate.h | 45 +++++------ be/src/olap/comparison_predicate.h | 6 +- be/src/olap/filter_olap_param.h | 24 ++++-- be/src/olap/olap_common.h | 9 +-- be/src/olap/rowset/segment_v2/segment_iterator.cpp | 12 --- be/src/olap/tablet_reader.cpp | 15 ++-- be/src/pipeline/exec/datagen_operator.cpp | 4 +- .../exec/multi_cast_data_stream_source.cpp | 6 +- be/src/pipeline/exec/olap_scan_operator.cpp | 32 -------- be/src/pipeline/exec/olap_scan_operator.h | 6 -- be/src/pipeline/exec/scan_operator.cpp | 13 +-- be/src/runtime/runtime_state.cpp | 4 +- be/src/runtime/runtime_state.h | 7 +- be/src/runtime_filter/runtime_filter_consumer.cpp | 49 ++++++++---- be/src/runtime_filter/runtime_filter_consumer.h | 52 ++++++------ .../runtime_filter_consumer_helper.cpp | 25 ++++-- .../runtime_filter_consumer_helper.h | 13 ++- be/src/runtime_filter/runtime_filter_mgr.cpp | 9 +-- be/src/runtime_filter/runtime_filter_mgr.h | 3 +- be/src/util/runtime_profile.cpp | 38 ++++----- be/src/util/runtime_profile.h | 51 ++++++------ be/src/vec/exec/scan/olap_scanner.cpp | 3 - be/src/vec/exprs/vruntimefilter_wrapper.cpp | 6 +- be/src/vec/exprs/vruntimefilter_wrapper.h | 56 +++++++------ .../runtime_filter_consumer_helper_test.cpp | 4 +- .../runtime_filter_consumer_test.cpp | 41 +++++----- be/test/runtime_filter/runtime_filter_mgr_test.cpp | 3 +- .../runtime_filter_producer_test.cpp | 12 +-- .../runtime_profile_counter_tree_node_test.cpp | 92 +++++++++++++++------- .../org/apache/doris/common/profile/Counter.java | 16 +++- .../doris/common/profile/RuntimeProfile.java | 8 +- gensrc/thrift/RuntimeProfile.thrift | 1 + 34 files changed, 368 insertions(+), 327 deletions(-) diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h index 65469b6c968..62604bd1bc9 100644 --- a/be/src/exec/olap_common.h +++ b/be/src/exec/olap_common.h @@ -17,6 +17,7 @@ #pragma once +#include <gen_cpp/Metrics_types.h> #include <gen_cpp/PaloInternalService_types.h> #include <glog/logging.h> #include <stddef.h> @@ -41,6 +42,7 @@ #include "runtime/define_primitive_type.h" #include "runtime/primitive_type.h" #include "runtime/type_limit.h" +#include "util/runtime_profile.h" #include "vec/core/types.h" #include "vec/io/io_helper.h" #include "vec/runtime/ipv4_value.h" @@ -301,12 +303,21 @@ public: _contain_null = _is_nullable_col && contain_null; } - void set_runtime_filter_info(int runtime_filter_id, - RuntimeProfile::Counter* predicate_filtered_rows_counter, - RuntimeProfile::Counter* predicate_input_rows_counter) { + void attach_profile_counter( + int runtime_filter_id, + std::shared_ptr<RuntimeProfile::Counter> predicate_filtered_rows_counter, + std::shared_ptr<RuntimeProfile::Counter> predicate_input_rows_counter) { + DCHECK(predicate_filtered_rows_counter != nullptr); + DCHECK(predicate_input_rows_counter != nullptr); + _runtime_filter_id = runtime_filter_id; - _predicate_filtered_rows_counter = predicate_filtered_rows_counter; - _predicate_input_rows_counter = predicate_input_rows_counter; + + if (predicate_filtered_rows_counter != nullptr) { + _predicate_filtered_rows_counter = predicate_filtered_rows_counter; + } + if (predicate_input_rows_counter != nullptr) { + _predicate_input_rows_counter = predicate_input_rows_counter; + } } int precision() const { return _precision; } @@ -370,8 +381,11 @@ private: primitive_type == PrimitiveType::TYPE_DATETIMEV2; int _runtime_filter_id = -1; - RuntimeProfile::Counter* _predicate_filtered_rows_counter = nullptr; - RuntimeProfile::Counter* _predicate_input_rows_counter = nullptr; + + std::shared_ptr<RuntimeProfile::Counter> _predicate_filtered_rows_counter = + std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0); + std::shared_ptr<RuntimeProfile::Counter> _predicate_input_rows_counter = + std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0); }; class OlapScanKeys { diff --git a/be/src/olap/bitmap_filter_predicate.h b/be/src/olap/bitmap_filter_predicate.h index 12cbd94ec8d..c1488869a04 100644 --- a/be/src/olap/bitmap_filter_predicate.h +++ b/be/src/olap/bitmap_filter_predicate.h @@ -113,8 +113,6 @@ uint16_t BitmapFilterColumnPredicate<T>::_evaluate_inner(const vectorized::IColu } else { new_size = evaluate<false>(column, nullptr, sel, size); } - _evaluated_rows += size; - _passed_rows += new_size; update_filter_info(size - new_size, size); return new_size; } diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h index 628b49c6213..92e5dea4537 100644 --- a/be/src/olap/column_predicate.h +++ b/be/src/olap/column_predicate.h @@ -17,6 +17,7 @@ #pragma once +#include <memory> #include <roaring/roaring.hh> #include "common/exception.h" @@ -24,6 +25,7 @@ #include "olap/rowset/segment_v2/bloom_filter.h" #include "olap/rowset/segment_v2/inverted_index_reader.h" #include "runtime/define_primitive_type.h" +#include "util/runtime_profile.h" #include "vec/columns/column.h" #include "vec/exprs/vruntimefilter_wrapper.h" @@ -184,8 +186,6 @@ public: } uint16_t new_size = _evaluate_inner(column, sel, size); - _evaluated_rows += size; - _passed_rows += new_size; if (_can_ignore()) { do_judge_selectivity(size - new_size, size); } @@ -255,31 +255,25 @@ public: int get_runtime_filter_id() const { return _runtime_filter_id; } - void set_runtime_filter_info(int filter_id, - RuntimeProfile::Counter* predicate_filtered_rows_counter, - RuntimeProfile::Counter* predicate_input_rows_counter) { - if (filter_id >= 0) { - DCHECK(predicate_filtered_rows_counter != nullptr); - DCHECK(predicate_input_rows_counter != nullptr); - } + void attach_profile_counter( + int filter_id, std::shared_ptr<RuntimeProfile::Counter> predicate_filtered_rows_counter, + std::shared_ptr<RuntimeProfile::Counter> predicate_input_rows_counter) { _runtime_filter_id = filter_id; - _predicate_filtered_rows_counter = predicate_filtered_rows_counter; - _predicate_input_rows_counter = predicate_input_rows_counter; - } + DCHECK(predicate_filtered_rows_counter != nullptr); + DCHECK(predicate_input_rows_counter != nullptr); - /// TODO: Currently we only record statistics for runtime filters, in the future we should record for all predicates - void update_filter_info(int64_t filter_rows, int64_t input_rows) const { - if (_predicate_input_rows_counter) { - COUNTER_UPDATE(_predicate_input_rows_counter, input_rows); + if (predicate_filtered_rows_counter != nullptr) { + _predicate_filtered_rows_counter = predicate_filtered_rows_counter; } - if (_predicate_filtered_rows_counter) { - COUNTER_UPDATE(_predicate_filtered_rows_counter, filter_rows); + if (predicate_input_rows_counter != nullptr) { + _predicate_input_rows_counter = predicate_input_rows_counter; } } - PredicateFilterInfo get_filtered_info() const { - return PredicateFilterInfo {static_cast<int>(type()), _evaluated_rows - 1, - _evaluated_rows - 1 - _passed_rows}; + /// TODO: Currently we only record statistics for runtime filters, in the future we should record for all predicates + void update_filter_info(int64_t filter_rows, int64_t input_rows) const { + COUNTER_UPDATE(_predicate_input_rows_counter, input_rows); + COUNTER_UPDATE(_predicate_filtered_rows_counter, filter_rows); } static std::string pred_type_string(PredicateType type) { @@ -353,8 +347,6 @@ protected: // TODO: the value is only in delete condition, better be template value bool _opposite; int _runtime_filter_id = -1; - mutable uint64_t _evaluated_rows = 1; - mutable uint64_t _passed_rows = 0; // VRuntimeFilterWrapper and ColumnPredicate share the same logic, // but it's challenging to unify them, so the code is duplicated. // _judge_counter, _judge_input_rows, _judge_filter_rows, and _always_true @@ -368,8 +360,11 @@ protected: mutable uint64_t _judge_filter_rows = 0; mutable bool _always_true = false; - RuntimeProfile::Counter* _predicate_filtered_rows_counter = nullptr; - RuntimeProfile::Counter* _predicate_input_rows_counter = nullptr; + std::shared_ptr<RuntimeProfile::Counter> _predicate_filtered_rows_counter = + std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0); + + std::shared_ptr<RuntimeProfile::Counter> _predicate_input_rows_counter = + std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0); }; } //namespace doris diff --git a/be/src/olap/comparison_predicate.h b/be/src/olap/comparison_predicate.h index d7bf38a6c6a..1fddd4b1046 100644 --- a/be/src/olap/comparison_predicate.h +++ b/be/src/olap/comparison_predicate.h @@ -267,9 +267,12 @@ public: } else { current_evaluated_rows += size; } - _evaluated_rows += current_evaluated_rows; } + // defer is created after its reference args are created. + // so defer will be destroyed BEFORE the reference args. + // so reference here is safe. + // https://stackoverflow.com/questions/14688285/c-local-variable-destruction-order Defer defer([&]() { update_filter_info(current_evaluated_rows - current_passed_rows, current_evaluated_rows); @@ -359,7 +362,6 @@ public: for (uint16_t i = 0; i < size; i++) { current_passed_rows += flags[i]; } - _passed_rows += current_passed_rows; do_judge_selectivity(current_evaluated_rows - current_passed_rows, current_evaluated_rows); } diff --git a/be/src/olap/filter_olap_param.h b/be/src/olap/filter_olap_param.h index d9aa6386ec6..272fee63fb5 100644 --- a/be/src/olap/filter_olap_param.h +++ b/be/src/olap/filter_olap_param.h @@ -24,18 +24,28 @@ namespace doris { template <typename T> struct FilterOlapParam { FilterOlapParam(std::string column_name, T filter, int runtime_filter_id, - RuntimeProfile::Counter* filtered_counter, - RuntimeProfile::Counter* input_counter) + std::shared_ptr<RuntimeProfile::Counter> filtered_counter, + std::shared_ptr<RuntimeProfile::Counter> input_counter) : column_name(std::move(column_name)), filter(std::move(filter)), - runtime_filter_id(runtime_filter_id), - filtered_rows_counter(filtered_counter), - input_rows_counter(input_counter) {} + runtime_filter_id(runtime_filter_id) { + DCHECK(filtered_rows_counter != nullptr); + DCHECK(input_rows_counter != nullptr); + if (filtered_counter != nullptr) { + filtered_rows_counter = filtered_counter; + } + if (input_counter != nullptr) { + input_rows_counter = input_counter; + } + } + std::string column_name; T filter; int runtime_filter_id; - RuntimeProfile::Counter* filtered_rows_counter = nullptr; - RuntimeProfile::Counter* input_rows_counter = nullptr; + std::shared_ptr<RuntimeProfile::Counter> filtered_rows_counter = + std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0); + std::shared_ptr<RuntimeProfile::Counter> input_rows_counter = + std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0); }; } // namespace doris diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 616bcf980de..fe92bea5017 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -77,11 +77,7 @@ struct DataDirInfo { DataDirType data_dir_type = DataDirType::OLAP_DATA_DIR; std::string metric_name; }; -struct PredicateFilterInfo { - int type = 0; - uint64_t input_row = 0; - uint64_t filtered_row = 0; -}; + // Sort DataDirInfo by available space. struct DataDirInfoLessAvailability { bool operator()(const DataDirInfo& left, const DataDirInfo& right) const { @@ -337,9 +333,6 @@ struct OlapReaderStatistics { int64_t short_cond_ns = 0; int64_t expr_filter_ns = 0; int64_t output_col_ns = 0; - - std::map<int, PredicateFilterInfo> filter_info; - int64_t rows_key_range_filtered = 0; int64_t rows_stats_filtered = 0; int64_t rows_stats_rp_filtered = 0; diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 00306091876..ce4ec01c8ba 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -1877,17 +1877,6 @@ uint16_t SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_ro return selected_size; } -void SegmentIterator::_collect_runtime_filter_predicate() { - // collect profile - for (auto* p : _filter_info_id) { - // There is a situation, such as with in or minmax filters, - // where intermediate conversion to a key range or other types - // prevents obtaining the filter id. - if (p->is_runtime_filter()) { - _opts.stats->filter_info[p->get_runtime_filter_id()] = p->get_filtered_info(); - } - } -} Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_column_ids, std::vector<rowid_t>& rowid_vector, uint16_t* sel_rowid_idx, size_t select_size, @@ -2147,7 +2136,6 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) { // In SSB test, it make no difference; So need more scenarios to test selected_size = _evaluate_short_circuit_predicate(_sel_rowid_idx.data(), selected_size); - _collect_runtime_filter_predicate(); if (selected_size > 0) { // step 3.1: output short circuit and predicate column // when lazy materialization enables, _predicate_column_ids = distinct(_short_cir_pred_column_ids + _vec_pred_column_ids) diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp index 4503749e1fe..be2be8626b8 100644 --- a/be/src/olap/tablet_reader.cpp +++ b/be/src/olap/tablet_reader.cpp @@ -520,16 +520,13 @@ Status TabletReader::_init_orderby_keys_param(const ReaderParams& read_params) { Status TabletReader::_init_conditions_param(const ReaderParams& read_params) { SCOPED_RAW_TIMER(&_stats.tablet_reader_init_conditions_param_timer_ns); std::vector<ColumnPredicate*> predicates; - auto emplace_predicate = [&predicates](auto& param, ColumnPredicate* predicate) { - predicate->set_runtime_filter_info(param.runtime_filter_id, param.filtered_rows_counter, - param.input_rows_counter); - predicates.emplace_back(predicate); - }; - auto parse_and_emplace_predicates = [this, &emplace_predicate](auto& params) { + auto parse_and_emplace_predicates = [this, &predicates](auto& params) { for (const auto& param : params) { ColumnPredicate* predicate = _parse_to_predicate({param.column_name, param.filter}); - emplace_predicate(param, predicate); + predicate->attach_profile_counter(param.runtime_filter_id, param.filtered_rows_counter, + param.input_rows_counter); + predicates.emplace_back(predicate); } }; @@ -545,7 +542,9 @@ Status TabletReader::_init_conditions_param(const ReaderParams& read_params) { parse_to_predicate(mcolumn, index, tmp_cond, _predicate_arena.get()); // record condition value into predicate_params in order to pushdown segment_iterator, // _gen_predicate_result_sign will build predicate result unique sign with condition value - emplace_predicate(param, predicate); + predicate->attach_profile_counter(param.runtime_filter_id, param.filtered_rows_counter, + param.input_rows_counter); + predicates.emplace_back(predicate); } parse_and_emplace_predicates(read_params.bloom_filters); parse_and_emplace_predicates(read_params.bitmap_filters); diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index 5d2c80874bd..c494edd397b 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -96,8 +96,8 @@ Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) { // TODO: use runtime filter to filte result block, maybe this node need derive from vscan_node. for (const auto& filter_desc : p._runtime_filter_descs) { std::shared_ptr<RuntimeFilterConsumer> filter; - RETURN_IF_ERROR(state->register_consumer_runtime_filter( - filter_desc, p.is_serial_operator(), p.node_id(), &filter, _runtime_profile.get())); + RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc, p.is_serial_operator(), + p.node_id(), &filter)); } return Status::OK(); } diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index fd9788e326a..37aa8fa3b77 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -43,8 +43,8 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState _filter_timer = ADD_TIMER(_runtime_profile, "FilterTime"); _get_data_timer = ADD_TIMER(_runtime_profile, "GetDataTime"); _materialize_data_timer = ADD_TIMER(_runtime_profile, "MaterializeDataTime"); - RETURN_IF_ERROR(_helper.init(state, profile(), false, _filter_dependencies, p.operator_id(), - p.node_id(), p.get_name() + "_FILTER_DEPENDENCY")); + RETURN_IF_ERROR(_helper.init(state, false, _filter_dependencies, p.operator_id(), p.node_id(), + p.get_name() + "_FILTER_DEPENDENCY")); return Status::OK(); } @@ -81,7 +81,7 @@ Status MultiCastDataStreamSourceLocalState::close(RuntimeState* state) { rf_time += dep->watcher_elapse_time(); } COUNTER_SET(_wait_for_rf_timer, rf_time); - + _helper.collect_realtime_profile(profile()); return Base::close(state); } diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index b93c22274d9..97297dd4b6c 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -157,8 +157,6 @@ Status OlapScanLocalState::_init_profile() { _total_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentTotal", TUnit::UNIT); _tablet_counter = ADD_COUNTER(_runtime_profile, "TabletNum", TUnit::UNIT); _key_range_counter = ADD_COUNTER(_runtime_profile, "KeyRangesNum", TUnit::UNIT); - _runtime_filter_info = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile, "RuntimeFilterInfo", 1); - _tablet_reader_init_timer = ADD_TIMER(_scanner_profile, "TabletReaderInitTimer"); _tablet_reader_capture_rs_readers_timer = ADD_TIMER(_scanner_profile, "TabletReaderCaptureRsReadersTimer"); @@ -643,36 +641,6 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() { return Status::OK(); } -void OlapScanLocalState::add_filter_info(int id, const PredicateFilterInfo& update_info) { - std::unique_lock lock(_profile_mtx); - // update - _filter_info[id].filtered_row += update_info.filtered_row; - _filter_info[id].input_row += update_info.input_row; - _filter_info[id].type = update_info.type; - // to string - auto& info = _filter_info[id]; - std::string filter_name = "RuntimeFilterInfo id "; - filter_name += std::to_string(id); - std::string info_str; - info_str += "type = " + type_to_string(static_cast<PredicateType>(info.type)) + ", "; - info_str += "input = " + std::to_string(info.input_row) + ", "; - info_str += "filtered = " + std::to_string(info.filtered_row); - info_str = "[" + info_str + "]"; - - // add info - _segment_profile->add_info_string(filter_name, info_str); - - const std::string rf_name = "filter id = " + std::to_string(id) + " "; - - // add counter - auto* input_count = ADD_CHILD_COUNTER_WITH_LEVEL(_runtime_profile, rf_name + "input", - TUnit::UNIT, "RuntimeFilterInfo", 1); - auto* filtered_count = ADD_CHILD_COUNTER_WITH_LEVEL(_runtime_profile, rf_name + "filtered", - TUnit::UNIT, "RuntimeFilterInfo", 1); - COUNTER_SET(input_count, (int64_t)info.input_row); - COUNTER_SET(filtered_count, (int64_t)info.filtered_row); -} - OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs, int parallel_tasks, const TQueryCacheParam& param) diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index f6275df046e..ee3d995958a 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -87,8 +87,6 @@ private: Status _init_scanners(std::list<vectorized::ScannerSPtr>* scanners) override; - void add_filter_info(int id, const PredicateFilterInfo& info); - Status _build_key_ranges_and_filters(); std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges; @@ -122,7 +120,6 @@ private: RuntimeProfile::Counter* _short_cond_timer = nullptr; RuntimeProfile::Counter* _expr_filter_timer = nullptr; RuntimeProfile::Counter* _output_col_timer = nullptr; - std::map<int, PredicateFilterInfo> _filter_info; RuntimeProfile::Counter* _stats_filtered_counter = nullptr; RuntimeProfile::Counter* _stats_rp_filtered_counter = nullptr; @@ -188,8 +185,6 @@ private: // total number of segment related to this scan node RuntimeProfile::Counter* _total_segment_counter = nullptr; - RuntimeProfile::Counter* _runtime_filter_info = nullptr; - // timer about tablet reader RuntimeProfile::Counter* _tablet_reader_init_timer = nullptr; RuntimeProfile::Counter* _tablet_reader_capture_rs_readers_timer = nullptr; @@ -217,7 +212,6 @@ private: RuntimeProfile::Counter* _segment_create_column_readers_timer = nullptr; RuntimeProfile::Counter* _segment_load_index_timer = nullptr; - std::mutex _profile_mtx; std::vector<TabletWithVersion> _tablets; std::vector<TabletReader::ReadSource> _read_sources; }; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index fdc5678862e..3db2d5ced00 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -18,6 +18,7 @@ #include "scan_operator.h" #include <fmt/format.h> +#include <gen_cpp/Metrics_types.h> #include <cstdint> #include <memory> @@ -73,7 +74,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); auto& p = _parent->cast<typename Derived::Parent>(); - RETURN_IF_ERROR(_helper.init(state, profile(), p.is_serial_operator(), _filter_dependencies, + RETURN_IF_ERROR(_helper.init(state, p.is_serial_operator(), _filter_dependencies, p.operator_id(), p.node_id(), p.get_name() + "_FILTER_DEPENDENCY")); RETURN_IF_ERROR(_init_profile()); @@ -287,7 +288,9 @@ Status ScanLocalState<Derived>::_normalize_predicate( if (need_set_runtime_filter_id) { auto* rf_expr = assert_cast<vectorized::VRuntimeFilterWrapper*>( conjunct_expr_root.get()); - value_range.set_runtime_filter_info( + DCHECK(rf_expr->predicate_filtered_rows_counter() != nullptr); + DCHECK(rf_expr->predicate_input_rows_counter() != nullptr); + value_range.attach_profile_counter( rf_expr->filter_id(), rf_expr->predicate_filtered_rows_counter(), rf_expr->predicate_input_rows_counter()); @@ -591,8 +594,8 @@ Status ScanLocalState<Derived>::_normalize_in_and_eq_predicate(vectorized::VExpr iter = hybrid_set->begin(); } else { int runtime_filter_id = -1; - RuntimeProfile::Counter* predicate_filtered_rows_counter = nullptr; - RuntimeProfile::Counter* predicate_input_rows_counter = nullptr; + std::shared_ptr<RuntimeProfile::Counter> predicate_filtered_rows_counter = nullptr; + std::shared_ptr<RuntimeProfile::Counter> predicate_input_rows_counter = nullptr; if (expr_ctx->root()->is_rf_wrapper()) { auto* rf_expr = assert_cast<vectorized::VRuntimeFilterWrapper*>(expr_ctx->root().get()); @@ -1275,7 +1278,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) { std::list<std::shared_ptr<vectorized::ScannerDelegate>> {}.swap(_scanners); COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time()); COUNTER_SET(_wait_for_rf_timer, rf_time); - + _helper.collect_realtime_profile(profile()); return PipelineXLocalState<>::close(state); } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 4a8ccccb9ec..b0600320f0d 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -508,10 +508,10 @@ Status RuntimeState::register_producer_runtime_filter( Status RuntimeState::register_consumer_runtime_filter( const doris::TRuntimeFilterDesc& desc, bool need_local_merge, int node_id, - std::shared_ptr<RuntimeFilterConsumer>* consumer_filter, RuntimeProfile* parent_profile) { + std::shared_ptr<RuntimeFilterConsumer>* consumer_filter) { bool need_merge = desc.has_remote_targets || need_local_merge; RuntimeFilterMgr* mgr = need_merge ? global_runtime_filter_mgr() : local_runtime_filter_mgr(); - return mgr->register_consumer_filter(desc, node_id, consumer_filter, parent_profile); + return mgr->register_consumer_filter(desc, node_id, consumer_filter); } bool RuntimeState::is_nereids() const { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index a1b3f15c0aa..6db14254a11 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -565,10 +565,9 @@ public: std::shared_ptr<RuntimeFilterProducer>* producer_filter, RuntimeProfile* parent_profile); - Status register_consumer_runtime_filter(const doris::TRuntimeFilterDesc& desc, - bool need_local_merge, int node_id, - std::shared_ptr<RuntimeFilterConsumer>* consumer_filter, - RuntimeProfile* parent_profile); + Status register_consumer_runtime_filter( + const doris::TRuntimeFilterDesc& desc, bool need_local_merge, int node_id, + std::shared_ptr<RuntimeFilterConsumer>* consumer_filter); bool is_nereids() const; diff --git a/be/src/runtime_filter/runtime_filter_consumer.cpp b/be/src/runtime_filter/runtime_filter_consumer.cpp index 33daafca322..5a35c7ca5ad 100644 --- a/be/src/runtime_filter/runtime_filter_consumer.cpp +++ b/be/src/runtime_filter/runtime_filter_consumer.cpp @@ -17,11 +17,11 @@ #include "runtime_filter/runtime_filter_consumer.h" -#include "exprs/create_predicate_function.h" +#include "exprs/minmax_predicate.h" +#include "util/runtime_profile.h" #include "vec/exprs/vbitmap_predicate.h" #include "vec/exprs/vbloom_predicate.h" #include "vec/exprs/vdirect_in_predicate.h" -#include "vec/exprs/vexpr_context.h" namespace doris { #include "common/compile_check_begin.h" @@ -38,23 +38,9 @@ Status RuntimeFilterConsumer::_apply_ready_expr( auto origin_size = push_exprs.size(); RETURN_IF_ERROR(_get_push_exprs(push_exprs, _probe_expr)); - // The runtime filter is pushed down, adding filtering information. - auto* expr_filtered_rows_counter = _execution_profile->add_collaboration_counter( - "ExprFilteredRows", TUnit::UNIT, _rf_filter); - auto* expr_input_rows_counter = - _execution_profile->add_collaboration_counter("ExprInputRows", TUnit::UNIT, _rf_input); - auto* expr_always_true_counter = - ADD_COUNTER(_execution_profile, "AlwaysTruePassRows", TUnit::UNIT); - - auto* predicate_filtered_rows_counter = _storage_profile->add_collaboration_counter( - "PredicateFilteredRows", TUnit::UNIT, _rf_filter); - auto* predicate_input_rows_counter = _storage_profile->add_collaboration_counter( - "PredicateInputRows", TUnit::UNIT, _rf_input); for (auto i = origin_size; i < push_exprs.size(); i++) { - push_exprs[i]->attach_profile_counter( - expr_filtered_rows_counter, expr_input_rows_counter, expr_always_true_counter, - predicate_filtered_rows_counter, predicate_input_rows_counter); + push_exprs[i]->attach_profile_counter(_rf_input, _rf_filter, _always_true_counter); } return Status::OK(); } @@ -225,4 +211,33 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi return Status::OK(); } +void RuntimeFilterConsumer::collect_realtime_profile(RuntimeProfile* parent_operator_profile) { + DCHECK(parent_operator_profile != nullptr); + + // Counter* is owned by RuntimeProfile, so no need to free. + RuntimeProfile::Counter* c = parent_operator_profile->add_counter( + fmt::format("RF{} InputRows", _filter_id), TUnit::UNIT, "RuntimeFilterInfo", 1); + c->update(_rf_input->value()); + + c = parent_operator_profile->add_counter(fmt::format("RF{} FilterRows", _filter_id), + TUnit::UNIT, "RuntimeFilterInfo", 1); + c->update(_rf_filter->value()); + c = parent_operator_profile->add_counter(fmt::format("RF{} WaitTime", _filter_id), + TUnit::TIME_NS, "RuntimeFilterInfo", 2); + c->update(_wait_timer->value()); + + c = parent_operator_profile->add_counter(fmt::format("RF{} AlwaysTrueFilterRows", _filter_id), + TUnit::UNIT, "RuntimeFilterInfo", 2); + c->update(_always_true_counter->value()); + + { + // since debug_string will read from RuntimeFilter::_wrapper + // and it is a shared_ptr, instead of a atomic_shared_ptr + // so it is not thread safe + std::unique_lock<std::mutex> l(_mtx); + parent_operator_profile->add_description(fmt::format("RF{} Info", _filter_id), + debug_string(), "RuntimeFilterInfo"); + } +} + } // namespace doris diff --git a/be/src/runtime_filter/runtime_filter_consumer.h b/be/src/runtime_filter/runtime_filter_consumer.h index cc6581fa9fa..eba46b98406 100644 --- a/be/src/runtime_filter/runtime_filter_consumer.h +++ b/be/src/runtime_filter/runtime_filter_consumer.h @@ -17,6 +17,9 @@ #pragma once +#include <gen_cpp/Metrics_types.h> + +#include <memory> #include <string> #include "pipeline/dependency.h" @@ -39,12 +42,10 @@ public: }; static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, - int node_id, std::shared_ptr<RuntimeFilterConsumer>* res, - RuntimeProfile* parent_profile) { + int node_id, std::shared_ptr<RuntimeFilterConsumer>* res) { *res = std::shared_ptr<RuntimeFilterConsumer>( - new RuntimeFilterConsumer(state, desc, node_id, parent_profile)); + new RuntimeFilterConsumer(state, desc, node_id)); RETURN_IF_ERROR((*res)->_init_with_desc(desc, &state->get_query_ctx()->query_options())); - (*res)->_profile->add_info_string("Info", ((*res)->debug_string())); return Status::OK(); } @@ -65,6 +66,9 @@ public: bool is_applied() { return _rf_state == State::APPLIED; } + // Called by RuntimeFilterConsumerHelper + void collect_realtime_profile(RuntimeProfile* parent_operator_profile); + static std::string to_string(const State& state) { switch (state) { case State::NOT_READY: @@ -81,31 +85,20 @@ public: } private: + friend class RuntimeFilterProducer; + RuntimeFilterConsumer(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, - int node_id, RuntimeProfile* parent_profile) + int node_id) : RuntimeFilter(state, desc), _probe_expr(desc->planId_to_target_expr.find(node_id)->second), - _profile(new RuntimeProfile(fmt::format("RF{}", desc->filter_id))), - _storage_profile(new RuntimeProfile(fmt::format("Storage", desc->filter_id))), - _execution_profile(new RuntimeProfile(fmt::format("Execution", desc->filter_id))), _registration_time(MonotonicMillis()), - _rf_state(State::NOT_READY) { + _rf_state(State::NOT_READY), + _filter_id(desc->filter_id) { // If bitmap filter is not applied, it will cause the query result to be incorrect bool wait_infinitely = _state->get_query_ctx()->runtime_filter_wait_infinitely() || _runtime_filter_type == RuntimeFilterType::BITMAP_FILTER; _rf_wait_time_ms = wait_infinitely ? _state->get_query_ctx()->execution_timeout() * 1000 : _state->get_query_ctx()->runtime_filter_wait_time_ms(); - _profile->add_info_string("TimeoutLimit", std::to_string(_rf_wait_time_ms) + "ms"); - - parent_profile->add_child(_profile.get(), true, nullptr); - _profile->add_child(_storage_profile.get(), true, nullptr); - _profile->add_child(_execution_profile.get(), true, nullptr); - _wait_timer = ADD_TIMER(_profile, "WaitTime"); - - _rf_filter = ADD_COUNTER_WITH_LEVEL( - parent_profile, fmt::format("RF{} FilterRows", desc->filter_id), TUnit::UNIT, 1); - _rf_input = ADD_COUNTER_WITH_LEVEL( - parent_profile, fmt::format("RF{} InputRows", desc->filter_id), TUnit::UNIT, 1); DorisMetrics::instance()->runtime_filter_consumer_num->increment(1); } @@ -142,21 +135,25 @@ private: _check_state({State::NOT_READY, State::TIMEOUT}); } _rf_state = rf_state; - _profile->add_info_string("Info", debug_string()); } TExpr _probe_expr; std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer; - std::unique_ptr<RuntimeProfile> _profile; - std::unique_ptr<RuntimeProfile> _storage_profile; // for storage layer stats - std::unique_ptr<RuntimeProfile> _execution_profile; // for execution layer stats - RuntimeProfile::Counter* _wait_timer = nullptr; + std::shared_ptr<RuntimeProfile::Counter> _wait_timer = + std::make_shared<RuntimeProfile::Counter>(TUnit::TIME_NS, 0); //_rf_filter is used to record the number of rows filtered by the runtime filter. //It aggregates the filtering statistics from both the Storage and Execution. - RuntimeProfile::Counter* _rf_filter = nullptr; - RuntimeProfile::Counter* _rf_input = nullptr; + // Counter will be shared by RuntimeFilterConsumer & VRuntimeFilterWrapper + // OperatorLocalState's close method will collect the statistics from RuntimeFilterConsumer + // VRuntimeFilterWrapper will update the statistics. + std::shared_ptr<RuntimeProfile::Counter> _rf_filter = + std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0, 1); + std::shared_ptr<RuntimeProfile::Counter> _rf_input = + std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0, 1); + std::shared_ptr<RuntimeProfile::Counter> _always_true_counter = + std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0, 1); int32_t _rf_wait_time_ms; const int64_t _registration_time; @@ -169,6 +166,7 @@ private: bool _reached_timeout = false; friend class RuntimeFilterProducer; + int _filter_id = -1; }; #include "common/compile_check_end.h" } // namespace doris diff --git a/be/src/runtime_filter/runtime_filter_consumer_helper.cpp b/be/src/runtime_filter/runtime_filter_consumer_helper.cpp index 52ceeb59c77..0ec0b2fb38a 100644 --- a/be/src/runtime_filter/runtime_filter_consumer_helper.cpp +++ b/be/src/runtime_filter/runtime_filter_consumer_helper.cpp @@ -19,6 +19,7 @@ #include "pipeline/pipeline_task.h" #include "runtime_filter/runtime_filter_consumer.h" +#include "util/runtime_profile.h" namespace doris { #include "common/compile_check_begin.h" @@ -27,19 +28,16 @@ RuntimeFilterConsumerHelper::RuntimeFilterConsumerHelper( const RowDescriptor& row_descriptor) : _node_id(_node_id), _runtime_filter_descs(runtime_filters), - _row_descriptor_ref(row_descriptor), - _profile(new RuntimeProfile("RuntimeFilterConsumerHelper")) { + _row_descriptor_ref(row_descriptor) { _blocked_by_rf = std::make_shared<std::atomic_bool>(false); } Status RuntimeFilterConsumerHelper::init( - RuntimeState* state, RuntimeProfile* profile, bool need_local_merge, + RuntimeState* state, bool need_local_merge, std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>& dependencies, const int id, const int node_id, const std::string& name) { _state = state; - profile->add_child(_profile.get(), true, nullptr); RETURN_IF_ERROR(_register_runtime_filter(need_local_merge)); - _acquire_runtime_filter_timer = ADD_TIMER(_profile, "AcquireRuntimeFilterTime"); _init_dependency(dependencies, id, node_id, name); return Status::OK(); } @@ -49,7 +47,7 @@ Status RuntimeFilterConsumerHelper::_register_runtime_filter(bool need_local_mer for (size_t i = 0; i < filter_size; ++i) { std::shared_ptr<RuntimeFilterConsumer> filter; RETURN_IF_ERROR(_state->register_consumer_runtime_filter( - _runtime_filter_descs[i], need_local_merge, _node_id, &filter, _profile.get())); + _runtime_filter_descs[i], need_local_merge, _node_id, &filter)); _consumers.emplace_back(filter); } return Status::OK(); @@ -84,7 +82,7 @@ void RuntimeFilterConsumerHelper::_init_dependency( Status RuntimeFilterConsumerHelper::acquire_runtime_filter( vectorized::VExprContextSPtrs& conjuncts) { - SCOPED_TIMER(_acquire_runtime_filter_timer); + SCOPED_TIMER(_acquire_runtime_filter_timer.get()); std::vector<vectorized::VRuntimeFilterPtr> vexprs; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { RETURN_IF_ERROR(_consumers[i]->acquire_expr(vexprs)); @@ -148,4 +146,17 @@ Status RuntimeFilterConsumerHelper::try_append_late_arrival_runtime_filter( return Status::OK(); } +void RuntimeFilterConsumerHelper::collect_realtime_profile( + RuntimeProfile* parent_operator_profile) { + std::ignore = parent_operator_profile->add_counter("RuntimeFilterInfo", TUnit::NONE, + RuntimeProfile::ROOT_COUNTER, 1); + RuntimeProfile::Counter* c = parent_operator_profile->add_counter( + "AcquireRuntimeFilter", TUnit::TIME_NS, "RuntimeFilterInfo", 2); + c->update(_acquire_runtime_filter_timer->value()); + + for (auto& consumer : _consumers) { + consumer->collect_realtime_profile(parent_operator_profile); + } +} + } // namespace doris \ No newline at end of file diff --git a/be/src/runtime_filter/runtime_filter_consumer_helper.h b/be/src/runtime_filter/runtime_filter_consumer_helper.h index 55005b6da33..644343c431b 100644 --- a/be/src/runtime_filter/runtime_filter_consumer_helper.h +++ b/be/src/runtime_filter/runtime_filter_consumer_helper.h @@ -17,7 +17,10 @@ #pragma once +#include <mutex> + #include "pipeline/dependency.h" +#include "util/runtime_profile.h" #include "vec/exprs/vruntimefilter_wrapper.h" namespace doris { @@ -33,7 +36,7 @@ public: const RowDescriptor& row_descriptor); ~RuntimeFilterConsumerHelper() = default; - Status init(RuntimeState* state, RuntimeProfile* profile, bool need_local_merge, + Status init(RuntimeState* state, bool need_local_merge, std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>& runtime_filter_dependencies, const int id, const int node_id, const std::string& name); @@ -46,6 +49,10 @@ public: Status try_append_late_arrival_runtime_filter(int* arrived_rf_num, vectorized::VExprContextSPtrs& conjuncts); + // Called by XXXLocalState::close() + // parent_operator_profile is owned by LocalState so update it is safe at here. + void collect_realtime_profile(RuntimeProfile* parent_operator_profile); + private: // Register and get all runtime filters at Init phase. Status _register_runtime_filter(bool need_local_merge); @@ -70,8 +77,8 @@ private: bool _is_all_rf_applied = true; std::shared_ptr<std::atomic_bool> _blocked_by_rf; - RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr; - std::unique_ptr<RuntimeProfile> _profile; + std::unique_ptr<RuntimeProfile::Counter> _acquire_runtime_filter_timer = + std::make_unique<RuntimeProfile::Counter>(TUnit::TIME_NS, 0); }; #include "common/compile_check_end.h" } // namespace doris \ No newline at end of file diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp b/be/src/runtime_filter/runtime_filter_mgr.cpp index 4b718620997..a5c7ab71b19 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/runtime_filter/runtime_filter_mgr.cpp @@ -70,15 +70,14 @@ std::vector<std::shared_ptr<RuntimeFilterConsumer>> RuntimeFilterMgr::get_consum return iter->second; } -Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc, int node_id, - std::shared_ptr<RuntimeFilterConsumer>* consumer, - RuntimeProfile* parent_profile) { +Status RuntimeFilterMgr::register_consumer_filter( + const TRuntimeFilterDesc& desc, int node_id, + std::shared_ptr<RuntimeFilterConsumer>* consumer) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; std::lock_guard<std::mutex> l(_lock); - RETURN_IF_ERROR( - RuntimeFilterConsumer::create(_state, &desc, node_id, consumer, parent_profile)); + RETURN_IF_ERROR(RuntimeFilterConsumer::create(_state, &desc, node_id, consumer)); _consumer_map[key].push_back(*consumer); return Status::OK(); } diff --git a/be/src/runtime_filter/runtime_filter_mgr.h b/be/src/runtime_filter/runtime_filter_mgr.h index b6e2c89e820..edaf100a7f2 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.h +++ b/be/src/runtime_filter/runtime_filter_mgr.h @@ -89,8 +89,7 @@ public: // get/set consumer std::vector<std::shared_ptr<RuntimeFilterConsumer>> get_consume_filters(int filter_id); Status register_consumer_filter(const TRuntimeFilterDesc& desc, int node_id, - std::shared_ptr<RuntimeFilterConsumer>* consumer_filter, - RuntimeProfile* parent_profile); + std::shared_ptr<RuntimeFilterConsumer>* consumer_filter); Status register_local_merger_producer_filter( const TRuntimeFilterDesc& desc, std::shared_ptr<RuntimeFilterProducer> producer_filter, diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index cf684304697..d9fb7183321 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -397,9 +397,6 @@ RuntimeProfile::Counter* RuntimeProfile::add_counter(const std::string& name, TU std::lock_guard<std::mutex> l(_counter_map_lock); if (_counter_map.find(name) != _counter_map.end()) { - // TODO: FIX DUPLICATE COUNTERS - // In production, we will return the existing counter. - // This will not make be crash, but the result may be wrong. return _counter_map[name]; } @@ -434,19 +431,16 @@ RuntimeProfile::NonZeroCounter* RuntimeProfile::add_nonzero_counter( return counter; } -RuntimeProfile::CollaborationCounter* RuntimeProfile::add_collaboration_counter( - const std::string& name, TUnit::type type, Counter* other_counter, - const std::string& parent_counter_name, int64_t level) { +RuntimeProfile::DerivedCounter* RuntimeProfile::add_derived_counter( + const std::string& name, TUnit::type type, const DerivedCounterFunction& counter_fn, + const std::string& parent_counter_name) { std::lock_guard<std::mutex> l(_counter_map_lock); + if (_counter_map.find(name) != _counter_map.end()) { - DCHECK(dynamic_cast<CollaborationCounter*>(_counter_map[name])); - return static_cast<CollaborationCounter*>(_counter_map[name]); + return nullptr; } - DCHECK(parent_counter_name == ROOT_COUNTER || - _counter_map.find(parent_counter_name) != _counter_map.end()); - CollaborationCounter* counter = - _pool->add(new CollaborationCounter(type, level, other_counter)); + DerivedCounter* counter = _pool->add(new DerivedCounter(type, counter_fn)); _counter_map[name] = counter; std::set<std::string>* child_counters = find_or_insert(&_child_counter_map, parent_counter_name, std::set<std::string>()); @@ -454,21 +448,29 @@ RuntimeProfile::CollaborationCounter* RuntimeProfile::add_collaboration_counter( return counter; } -RuntimeProfile::DerivedCounter* RuntimeProfile::add_derived_counter( - const std::string& name, TUnit::type type, const DerivedCounterFunction& counter_fn, - const std::string& parent_counter_name) { +void RuntimeProfile::add_description(const std::string& name, const std::string& description, + std::string parent_counter_name) { std::lock_guard<std::mutex> l(_counter_map_lock); if (_counter_map.find(name) != _counter_map.end()) { - return nullptr; + Counter* counter = _counter_map[name]; + if (dynamic_cast<DescriptionEntry*>(counter) != nullptr) { + // Do replace instead of update to avoid data race. + _counter_map.erase(name); + } else { + DCHECK(false) << "Counter type mismatch, name: " << name + << ", type: " << counter->type() << ", description: " << description; + } } - DerivedCounter* counter = _pool->add(new DerivedCounter(type, counter_fn)); + // Parent counter must already exist. + DCHECK(parent_counter_name == ROOT_COUNTER || + _counter_map.find(parent_counter_name) != _counter_map.end()); + DescriptionEntry* counter = _pool->add(new DescriptionEntry(name, description)); _counter_map[name] = counter; std::set<std::string>* child_counters = find_or_insert(&_child_counter_map, parent_counter_name, std::set<std::string>()); child_counters->insert(name); - return counter; } RuntimeProfile::Counter* RuntimeProfile::get_counter(const std::string& name) { diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index cae1a7cefe7..ee9a71ee900 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -295,41 +295,36 @@ public: const std::string _parent_name; }; - // When the collaboration Counter modifies itself, it also modifies the other counter. - - class CollaborationCounter : public Counter { + class DescriptionEntry : public Counter { public: - CollaborationCounter(TUnit::type type, int64_t level, Counter* other_counter, - int64_t value = 0) - : Counter(type, value, level), _other_counter(other_counter) {} + DescriptionEntry(const std::string& name, const std::string& description) + : Counter(TUnit::NONE, 0, 2), _description(description), _name(name) {} virtual Counter* clone() const override { - return new CollaborationCounter(type(), level(), _other_counter, value()); - } - - void update(int64_t delta) override { - if (_other_counter != nullptr) { - _other_counter->update(delta); - } - Counter::update(delta); + return new DescriptionEntry(_name, _description); } void set(int64_t value) override { - if (_other_counter != nullptr) { - _other_counter->set(value); - } - Counter::set(value); + // Do nothing } - void set(double value) override { - if (_other_counter != nullptr) { - _other_counter->set(value); - } - Counter::set(value); + // Do nothing + } + void update(int64_t delta) override { + // Do nothing + } + + TCounter to_thrift(const std::string& name) const override { + TCounter counter; + counter.name = name; + counter.__set_level(2); + counter.__set_description(_description); + return counter; } private: - Counter* _other_counter = nullptr; // Pointer to the other counter to be modified + const std::string _description; + const std::string _name; }; // Create a runtime profile object with 'name'. @@ -385,11 +380,9 @@ public: const std::string& parent_counter_name = RuntimeProfile::ROOT_COUNTER, int64_t level = 2); - CollaborationCounter* add_collaboration_counter( - const std::string& name, TUnit::type type, Counter* other_counter, - const std::string& parent_counter_name = RuntimeProfile::ROOT_COUNTER, - int64_t level = 2); - + // Add a description entry under target counter. + void add_description(const std::string& name, const std::string& description, + std::string parent_counter_name); // Add a derived counter with 'name'/'type'. The counter is owned by the // RuntimeProfile object. // If parent_counter_name is a non-empty string, the counter is added as a child of diff --git a/be/src/vec/exec/scan/olap_scanner.cpp b/be/src/vec/exec/scan/olap_scanner.cpp index d623a54f4f9..605cfe92c7b 100644 --- a/be/src/vec/exec/scan/olap_scanner.cpp +++ b/be/src/vec/exec/scan/olap_scanner.cpp @@ -592,9 +592,6 @@ void OlapScanner::_collect_profile_before_close() { COUNTER_UPDATE(local_state->_rows_short_circuit_cond_input_counter, stats.short_circuit_cond_input_rows); COUNTER_UPDATE(local_state->_rows_expr_cond_input_counter, stats.expr_cond_input_rows); - for (const auto& [id, info] : stats.filter_info) { - local_state->add_filter_info(id, info); - } COUNTER_UPDATE(local_state->_stats_filtered_counter, stats.rows_stats_filtered); COUNTER_UPDATE(local_state->_stats_rp_filtered_counter, stats.rows_stats_rp_filtered); COUNTER_UPDATE(local_state->_dict_filtered_counter, stats.rows_dict_filtered); diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp b/be/src/vec/exprs/vruntimefilter_wrapper.cpp index d864dbd833a..41d2a6d76f4 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp +++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp @@ -89,8 +89,6 @@ void VRuntimeFilterWrapper::close(VExprContext* context, Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) { DCHECK(_open_finished || _getting_const_col); - DCHECK(_expr_filtered_rows_counter && _expr_input_rows_counter && _always_true_counter) - << "rf counter must be initialized"; if (_judge_counter.fetch_sub(1) == 0) { reset_judge_selectivity(); } @@ -99,9 +97,7 @@ Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* block->insert({create_always_true_column(size, _data_type->is_nullable()), _data_type, expr_name()}); *result_column_id = block->columns() - 1; - if (_always_true_counter) { - COUNTER_UPDATE(_always_true_counter, size); - } + COUNTER_UPDATE(_always_true_filter_rows, size); return Status::OK(); } else { if (_getting_const_col) { diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h b/be/src/vec/exprs/vruntimefilter_wrapper.h index 72627bb6cd9..84b5538e130 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.h +++ b/be/src/vec/exprs/vruntimefilter_wrapper.h @@ -17,8 +17,11 @@ #pragma once +#include <gen_cpp/Metrics_types.h> + #include <atomic> #include <cstdint> +#include <memory> #include <string> #include "common/config.h" @@ -61,21 +64,27 @@ public: VExprSPtr get_impl() const override { return _impl; } - void attach_profile_counter(RuntimeProfile::Counter* expr_filtered_rows_counter, - RuntimeProfile::Counter* expr_input_rows_counter, - RuntimeProfile::Counter* always_true_counter, - RuntimeProfile::Counter* predicate_filtered_rows_counter, - RuntimeProfile::Counter* predicate_input_rows_counter) { - _expr_filtered_rows_counter = expr_filtered_rows_counter; - _expr_input_rows_counter = expr_input_rows_counter; - _always_true_counter = always_true_counter; - _predicate_filtered_rows_counter = predicate_filtered_rows_counter; - _predicate_input_rows_counter = predicate_input_rows_counter; + void attach_profile_counter(std::shared_ptr<RuntimeProfile::Counter> rf_input_rows, + std::shared_ptr<RuntimeProfile::Counter> rf_filter_rows, + std::shared_ptr<RuntimeProfile::Counter> always_true_filter_rows) { + DCHECK(rf_input_rows != nullptr); + DCHECK(rf_filter_rows != nullptr); + DCHECK(always_true_filter_rows != nullptr); + + if (rf_input_rows != nullptr) { + _rf_input_rows = rf_input_rows; + } + if (rf_filter_rows != nullptr) { + _rf_filter_rows = rf_filter_rows; + } + if (always_true_filter_rows != nullptr) { + _always_true_filter_rows = always_true_filter_rows; + } } void update_counters(int64_t filter_rows, int64_t input_rows) { - COUNTER_UPDATE(_expr_filtered_rows_counter, filter_rows); - COUNTER_UPDATE(_expr_input_rows_counter, input_rows); + COUNTER_UPDATE(_rf_filter_rows, filter_rows); + COUNTER_UPDATE(_rf_input_rows, input_rows); } template <typename T> @@ -100,8 +109,12 @@ public: } } - auto* predicate_filtered_rows_counter() const { return _predicate_filtered_rows_counter; } - auto* predicate_input_rows_counter() const { return _predicate_input_rows_counter; } + std::shared_ptr<RuntimeProfile::Counter> predicate_filtered_rows_counter() const { + return _rf_filter_rows; + } + std::shared_ptr<RuntimeProfile::Counter> predicate_input_rows_counter() const { + return _rf_input_rows; + } private: void reset_judge_selectivity() { @@ -125,15 +138,12 @@ private: std::atomic_uint64_t _judge_filter_rows = 0; std::atomic_int _always_true = false; - RuntimeProfile::Counter* _expr_filtered_rows_counter = nullptr; - RuntimeProfile::Counter* _expr_input_rows_counter = nullptr; - RuntimeProfile::Counter* _always_true_counter = nullptr; - - // Used to record filtering information on predicates. - // The transfer relationship of these counters is: - // RuntimeFilterConsumer(create) ==> VRuntimeFilterWrapper(pass) ==> FilterOlapParam(pass) ==> ColumnPredicate(record) - RuntimeProfile::Counter* _predicate_filtered_rows_counter = nullptr; - RuntimeProfile::Counter* _predicate_input_rows_counter = nullptr; + std::shared_ptr<RuntimeProfile::Counter> _rf_input_rows = + std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0); + std::shared_ptr<RuntimeProfile::Counter> _rf_filter_rows = + std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0); + std::shared_ptr<RuntimeProfile::Counter> _always_true_filter_rows = + std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0); std::string _expr_name; double _ignore_thredhold; diff --git a/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp index 46a50c3f363..215de7d8358 100644 --- a/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp +++ b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp @@ -78,8 +78,8 @@ TEST_F(RuntimeFilterConsumerHelperTest, basic) { const_cast<std::vector<TupleDescriptor*>&>(row_desc._tuple_desc_map).push_back(&tuple_desc); auto helper = RuntimeFilterConsumerHelper(0, runtime_filter_descs, row_desc); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.init(_runtime_states[0].get(), &_profile, true, - runtime_filter_dependencies, 0, 0, "")); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + helper.init(_runtime_states[0].get(), true, runtime_filter_dependencies, 0, 0, "")); vectorized::VExprContextSPtrs conjuncts; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.acquire_runtime_filter(conjuncts)); diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp b/be/test/runtime_filter/runtime_filter_consumer_test.cpp index de6882b5aca..215edc78a30 100644 --- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp +++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp @@ -30,9 +30,8 @@ class RuntimeFilterConsumerTest : public RuntimeFilterTest { public: void test_signal_aquire(TRuntimeFilterDesc desc) { std::shared_ptr<RuntimeFilterConsumer> consumer; - FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()), - &desc, 0, &consumer, &_profile)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( @@ -60,11 +59,11 @@ TEST_F(RuntimeFilterConsumerTest, basic) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create( - RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer, &_profile)); + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterConsumer> registed_consumer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter( - desc, true, 0, ®isted_consumer, &_profile)); + desc, true, 0, ®isted_consumer)); } TEST_F(RuntimeFilterConsumerTest, signal_aquire_in_or_bloom) { @@ -117,7 +116,7 @@ TEST_F(RuntimeFilterConsumerTest, timeout_aquire) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create( - RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer, &_profile)); + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( @@ -142,18 +141,18 @@ TEST_F(RuntimeFilterConsumerTest, wait_infinity) { const_cast<TQueryOptions&>(_query_ctx->_query_options) .__set_runtime_filter_wait_infinitely(true); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create( - RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer, &_profile)); + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterConsumer> registed_consumer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter( - desc, true, 0, ®isted_consumer, &_profile)); + desc, true, 0, ®isted_consumer)); } TEST_F(RuntimeFilterConsumerTest, aquire_disabled) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create( - RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer, &_profile)); + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( @@ -171,7 +170,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_ignored) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create( - RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer, &_profile)); + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( @@ -193,9 +192,8 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) { std::shared_ptr<RuntimeFilterConsumer> consumer; { - auto st = - RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()), - &desc, 0, &consumer, &_profile); + auto st = RuntimeFilterConsumer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer); ASSERT_FALSE(st.ok()); } desc.__set_src_expr( @@ -216,17 +214,15 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) { .build()); { - auto st = - RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()), - &desc, 0, &consumer, &_profile); + auto st = RuntimeFilterConsumer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer); ASSERT_FALSE(st.ok()); } { desc.__set_has_local_targets(false); desc.__set_has_remote_targets(true); - auto st = - RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()), - &desc, 0, &consumer, &_profile); + auto st = RuntimeFilterConsumer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer); ASSERT_FALSE(st.ok()); desc.__set_has_local_targets(true); desc.__set_has_remote_targets(false); @@ -234,16 +230,15 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) { desc.__set_bitmap_target_expr(TRuntimeFilterDescBuilder::get_default_expr()); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create( - RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer, &_profile)); + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer)); } TEST_F(RuntimeFilterConsumerTest, aquire_signal_at_same_time) { for (int i = 0; i < 100; i++) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()), - &desc, 0, &consumer, &_profile)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( diff --git a/be/test/runtime_filter/runtime_filter_mgr_test.cpp b/be/test/runtime_filter/runtime_filter_mgr_test.cpp index 253631dd45a..0e1bdea1469 100644 --- a/be/test/runtime_filter/runtime_filter_mgr_test.cpp +++ b/be/test/runtime_filter/runtime_filter_mgr_test.cpp @@ -65,8 +65,7 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) { // Get / Register consumer EXPECT_TRUE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty()); std::shared_ptr<RuntimeFilterConsumer> consumer_filter; - EXPECT_TRUE(global_runtime_filter_mgr - ->register_consumer_filter(desc, 0, &consumer_filter, profile.get()) + EXPECT_TRUE(global_runtime_filter_mgr->register_consumer_filter(desc, 0, &consumer_filter) .ok()); EXPECT_FALSE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty()); } diff --git a/be/test/runtime_filter/runtime_filter_producer_test.cpp b/be/test/runtime_filter/runtime_filter_producer_test.cpp index 8122004ed58..4a367677bf4 100644 --- a/be/test/runtime_filter/runtime_filter_producer_test.cpp +++ b/be/test/runtime_filter/runtime_filter_producer_test.cpp @@ -106,8 +106,8 @@ TEST_F(RuntimeFilterProducerTest, sync_filter_size_local_merge) { _runtime_states[1]->register_producer_runtime_filter(desc, &producer2, &_profile)); std::shared_ptr<RuntimeFilterConsumer> consumer; - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter( - desc, true, 0, &consumer, &_profile)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + _runtime_states[1]->register_consumer_runtime_filter(desc, true, 0, &consumer)); ASSERT_EQ(producer->_need_sync_filter_size, true); ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE); @@ -141,8 +141,8 @@ TEST_F(RuntimeFilterProducerTest, set_ignore_or_disable) { _runtime_states[1]->register_producer_runtime_filter(desc, &producer2, &_profile)); std::shared_ptr<RuntimeFilterConsumer> consumer; - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter( - desc, true, 0, &consumer, &_profile)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + _runtime_states[1]->register_consumer_runtime_filter(desc, true, 0, &consumer)); ASSERT_EQ(producer->_need_sync_filter_size, true); ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE); @@ -183,8 +183,8 @@ TEST_F(RuntimeFilterProducerTest, sync_filter_size_local_merge_with_ignored) { _runtime_states[1]->register_producer_runtime_filter(desc, &producer2, &_profile)); std::shared_ptr<RuntimeFilterConsumer> consumer; - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter( - desc, true, 0, &consumer, &_profile)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + _runtime_states[1]->register_consumer_runtime_filter(desc, true, 0, &consumer)); ASSERT_EQ(producer->_need_sync_filter_size, true); ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE); diff --git a/be/test/util/runtime_profile_counter_tree_node_test.cpp b/be/test/util/runtime_profile_counter_tree_node_test.cpp index 0dcb34768aa..5226841b4dd 100644 --- a/be/test/util/runtime_profile_counter_tree_node_test.cpp +++ b/be/test/util/runtime_profile_counter_tree_node_test.cpp @@ -279,34 +279,70 @@ TEST_F(RuntimeProfileCounterTreeNodeTest, NonZeroCounterToThrfit) { ASSERT_EQ(child_counter_map[RuntimeProfile::ROOT_COUNTER].size(), 0); } -TEST_F(RuntimeProfileCounterTreeNodeTest, CollaborationCounterTest) { - auto root_counter = std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT); - auto child_counter1 = std::make_unique<RuntimeProfile::CollaborationCounter>( - TUnit::UNIT, 2, root_counter.get()); - auto child_counter2 = std::make_unique<RuntimeProfile::CollaborationCounter>( - TUnit::UNIT, 2, root_counter.get()); - - auto c1 = std::make_unique<RuntimeProfile::CollaborationCounter>(TUnit::UNIT, 2, - child_counter1.get()); - auto c2 = std::make_unique<RuntimeProfile::CollaborationCounter>(TUnit::UNIT, 2, - child_counter1.get()); - auto c3 = std::make_unique<RuntimeProfile::CollaborationCounter>(TUnit::UNIT, 2, - child_counter2.get()); - auto c4 = std::make_unique<RuntimeProfile::CollaborationCounter>(TUnit::UNIT, 2, - child_counter2.get()); - - c1->update(1); - c2->update(10); - c3->update(100); - c4->update(1000); - - ASSERT_EQ(root_counter->value(), 1111); - ASSERT_EQ(child_counter1->value(), 11); - ASSERT_EQ(child_counter2->value(), 1100); - ASSERT_EQ(c1->value(), 1); - ASSERT_EQ(c2->value(), 10); - ASSERT_EQ(c3->value(), 100); - ASSERT_EQ(c4->value(), 1000); +TEST_F(RuntimeProfileCounterTreeNodeTest, DescriptionCounter) { + RuntimeProfile::CounterMap counterMap; + RuntimeProfile::ChildCounterMap childCounterMap; + /* + "" + "root" + "description_entry" + */ + + auto rootCounter = std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT); + auto descriptionEntry = std::make_unique<RuntimeProfile::DescriptionEntry>( + "description_entry", "Updated description"); + + counterMap["root"] = rootCounter.get(); + counterMap["description_entry"] = descriptionEntry.get(); + + childCounterMap[RuntimeProfile::ROOT_COUNTER].insert("root"); + childCounterMap["root"].insert("description_entry"); + + RuntimeProfileCounterTreeNode rootNode = RuntimeProfileCounterTreeNode::from_map( + counterMap, childCounterMap, RuntimeProfile::ROOT_COUNTER); + + std::vector<TCounter> tcounter; + std::map<std::string, std::set<std::string>> child_counter_map; + + rootNode.to_thrift(tcounter, child_counter_map); + + /* + ROOT_COUNTER + root + description_entry + */ + + /* + tcounter: root, description_entry + child_counter_map: + ROOT_COUNTER -> {root} + root -> {description_entry} + */ + + for (const auto& counter : tcounter) { + std::cout << "Counter: " << counter.name; + if (counter.name == "description_entry") { + EXPECT_TRUE(counter.__isset.description); + EXPECT_EQ(counter.description, "Updated description"); + } + if (counter.__isset.description) { + std::cout << ", Description: " << counter.description; + } + std::cout << std::endl; + } + + ASSERT_EQ(tcounter.size(), 2); + EXPECT_EQ(tcounter[0].name, "root"); + EXPECT_EQ(tcounter[1].name, "description_entry"); + + ASSERT_TRUE(tcounter[1].__isset.description); + EXPECT_EQ(tcounter[1].description, "Updated description"); + EXPECT_EQ(tcounter[1].level, 2); + + ASSERT_EQ(child_counter_map.size(), 2); + ASSERT_EQ(child_counter_map[RuntimeProfile::ROOT_COUNTER].size(), 1); + ASSERT_EQ(child_counter_map["root"].size(), 1); + ASSERT_EQ(*child_counter_map["root"].begin(), "description_entry"); } } // namespace doris \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java index f306d7c73fb..d4a5e6e88dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java @@ -21,6 +21,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TUnit; +import com.google.common.base.Strings; import com.google.gson.annotations.SerializedName; import java.io.DataInput; @@ -35,6 +36,8 @@ public class Counter { private volatile int type; @SerializedName(value = "level") private volatile long level; + @SerializedName(value = "description") + private volatile String description; public static Counter read(DataInput input) throws IOException { return GsonUtils.GSON.fromJson(Text.readString(input), Counter.class); @@ -77,6 +80,13 @@ public class Counter { this.level = level; } + public Counter(String description) { + this.description = description; + this.value = 0; + // Make sure not merge. + this.level = 2; + } + public void addValue(Counter other) { if (other == null) { return; @@ -115,7 +125,11 @@ public class Counter { } public String print() { - return RuntimeProfile.printCounter(value, getType()); + if (Strings.isNullOrEmpty(description)) { + return RuntimeProfile.printCounter(value, getType()); + } else { + return description; + } } public String toString() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java index c607dc570d0..cfd253f2faa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java @@ -267,12 +267,18 @@ public class RuntimeProfile { // If different node has counter with the same name, it will lead to chaos. Counter counter = this.counterMap.get(tcounter.name); if (counter == null) { - counterMap.put(tcounter.name, new Counter(tcounter.type, tcounter.value, tcounter.level)); + if (tcounter.isSetDescription()) { + counterMap.put(tcounter.name, new Counter(tcounter.description)); + } else { + counterMap.put(tcounter.name, new Counter(tcounter.type, tcounter.value, tcounter.level)); + } } else { counter.setLevel(tcounter.level); if (counter.getType() != tcounter.type) { LOG.error("Cannot update counters with the same name but different types" + " type=" + tcounter.type); + } else if (tcounter.isSetDescription()) { + continue; } else { counter.setValue(tcounter.type, tcounter.value); } diff --git a/gensrc/thrift/RuntimeProfile.thrift b/gensrc/thrift/RuntimeProfile.thrift index 764db39f7d2..28be3fab060 100644 --- a/gensrc/thrift/RuntimeProfile.thrift +++ b/gensrc/thrift/RuntimeProfile.thrift @@ -26,6 +26,7 @@ struct TCounter { 2: required Metrics.TUnit type 3: required i64 value 4: optional i64 level + 5: optional string description } // A single runtime profile --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org