This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit b0b7161ad01ab69ef635ba02a53ac100ea669de7 Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Mon Mar 11 10:35:56 2024 +0800 [feature](rf) add filter info profile when rf run as expr (#31822) --- be/src/exprs/runtime_filter.cpp | 20 +++++++++++++----- be/src/exprs/runtime_filter.h | 3 ++- be/src/vec/exec/runtime_filter_consumer.cpp | 9 ++++---- be/src/vec/exec/runtime_filter_consumer.h | 6 +++--- be/src/vec/exec/scan/new_olap_scan_node.cpp | 4 ++-- be/src/vec/exprs/vruntimefilter_wrapper.cpp | 32 +++++++++++++++++++++-------- be/src/vec/exprs/vruntimefilter_wrapper.h | 19 ++++++++++++++--- 7 files changed, 67 insertions(+), 26 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index a949969ca65..6e4f57ef8e1 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -444,7 +444,8 @@ public: } Status get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs, - std::vector<vectorized::VExprSPtr>& push_exprs, const TExpr& probe_expr); + std::vector<vectorized::VRuntimeFilterPtr>& push_exprs, + const TExpr& probe_expr); Status merge(const RuntimePredicateWrapper* wrapper) { bool can_not_merge_in_or_bloom = @@ -1056,14 +1057,23 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool opt_remo } Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>& probe_ctxs, - std::vector<vectorized::VExprSPtr>& push_exprs, + std::vector<vectorized::VRuntimeFilterPtr>& push_exprs, bool is_late_arrival) { DCHECK(is_consumer()); + auto origin_size = push_exprs.size(); if (!_wrapper->is_ignored()) { _set_push_down(!is_late_arrival); RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr)); } _profile->add_info_string("Info", _format_status()); + // The runtime filter is pushed down, adding filtering information. + auto* expr_filtered_rows_counter = ADD_COUNTER(_profile, "expr_filtered_rows", TUnit::UNIT); + auto* expr_input_rows_counter = ADD_COUNTER(_profile, "expr_input_rows", TUnit::UNIT); + auto* always_true_counter = ADD_COUNTER(_profile, "always_true", TUnit::UNIT); + for (auto i = origin_size; i < push_exprs.size(); i++) { + push_exprs[i]->attach_profile_counter(expr_filtered_rows_counter, expr_input_rows_counter, + always_true_counter); + } return Status::OK(); } @@ -1715,9 +1725,9 @@ void IRuntimeFilter::update_filter(RuntimePredicateWrapper* wrapper, int64_t mer this->signal(); } -Status RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs, - std::vector<vectorized::VExprSPtr>& container, - const TExpr& probe_expr) { +Status RuntimePredicateWrapper::get_push_exprs( + std::list<vectorized::VExprContextSPtr>& probe_ctxs, + std::vector<vectorized::VRuntimeFilterPtr>& container, const TExpr& probe_expr) { vectorized::VExprContextSPtr probe_ctx; RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, probe_ctx)); probe_ctxs.push_back(probe_ctx); diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 5cfc88f4ed8..fe5ddd68da6 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -233,7 +233,8 @@ public: PrimitiveType column_type() const; Status get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>& probe_ctxs, - std::vector<vectorized::VExprSPtr>& push_exprs, bool is_late_arrival); + std::vector<vectorized::VRuntimeFilterPtr>& push_exprs, + bool is_late_arrival); bool is_broadcast_join() const { return _is_broadcast_join; } diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index 52caf84e361..097df801615 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -85,7 +85,7 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency( Status RuntimeFilterConsumer::_acquire_runtime_filter() { SCOPED_TIMER(_acquire_runtime_filter_timer); - VExprSPtrs vexprs; + std::vector<vectorized::VRuntimeFilterPtr> vexprs; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; bool ready = runtime_filter->is_ready(); @@ -111,12 +111,13 @@ Status RuntimeFilterConsumer::_acquire_runtime_filter() { return Status::OK(); } -Status RuntimeFilterConsumer::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) { +Status RuntimeFilterConsumer::_append_rf_into_conjuncts( + const std::vector<vectorized::VRuntimeFilterPtr>& vexprs) { if (vexprs.empty()) { return Status::OK(); } - for (auto& expr : vexprs) { + for (const auto& expr : vexprs) { VExprContextSPtr conjunct = VExprContext::create_shared(expr); RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor_ref)); RETURN_IF_ERROR(conjunct->open(_state)); @@ -142,7 +143,7 @@ Status RuntimeFilterConsumer::try_append_late_arrival_runtime_filter(int* arrive } // 1. Check if are runtime filter ready but not applied. - VExprSPtrs exprs; + std::vector<vectorized::VRuntimeFilterPtr> exprs; int current_arrived_rf_num = 0; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { if (_runtime_filter_ctxs[i].apply_mark) { diff --git a/be/src/vec/exec/runtime_filter_consumer.h b/be/src/vec/exec/runtime_filter_consumer.h index b8513e666bc..86609624be6 100644 --- a/be/src/vec/exec/runtime_filter_consumer.h +++ b/be/src/vec/exec/runtime_filter_consumer.h @@ -46,7 +46,7 @@ protected: // Get all arrived runtime filters at Open phase. Status _acquire_runtime_filter(); // Append late-arrival runtime filters to the vconjunct_ctx. - Status _append_rf_into_conjuncts(const VExprSPtrs& vexprs); + Status _append_rf_into_conjuncts(const std::vector<vectorized::VRuntimeFilterPtr>& vexprs); void _init_profile(RuntimeProfile* profile); @@ -54,9 +54,9 @@ protected: // For runtime filters struct RuntimeFilterContext { - RuntimeFilterContext(IRuntimeFilter* rf) : apply_mark(false), runtime_filter(rf) {} + RuntimeFilterContext(IRuntimeFilter* rf) : runtime_filter(rf) {} // set to true if this runtime filter is already applied to vconjunct_ctx_ptr - bool apply_mark; + bool apply_mark = false; IRuntimeFilter* runtime_filter = nullptr; }; diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 44a435ed2a2..7473820abef 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -757,8 +757,8 @@ void NewOlapScanNode::add_filter_info(int id, const PredicateFilterInfo& update_ filter_name += std::to_string(id); std::string info_str; info_str += "type = " + type_to_string(static_cast<PredicateType>(info.type)) + ", "; - info_str += "input = " + std::to_string(info.input_row) + ", "; - info_str += "filtered = " + std::to_string(info.filtered_row); + info_str += "predicate input = " + std::to_string(info.input_row) + ", "; + info_str += "predicate filtered = " + std::to_string(info.filtered_row); info_str = "[" + info_str + "]"; // add info diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp b/be/src/vec/exprs/vruntimefilter_wrapper.cpp index 075e5194866..dcf3cba72e2 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp +++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp @@ -20,9 +20,12 @@ #include <fmt/format.h> #include <stddef.h> +#include <cstdint> #include <memory> #include <utility> +#include "util/defer_op.h" +#include "util/runtime_profile.h" #include "util/simd/bits.h" #include "vec/columns/column.h" #include "vec/columns/column_const.h" @@ -84,7 +87,19 @@ Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* *result_column_id = num_columns_without_result; return Status::OK(); } else { - _scan_rows += block->rows(); + int64_t input_rows = 0, filter_rows = 0; + Defer statistic_filter_info {[&]() { + if (_expr_filtered_rows_counter) { + COUNTER_UPDATE(_expr_filtered_rows_counter, filter_rows); + } + if (_expr_input_rows_counter) { + COUNTER_UPDATE(_expr_input_rows_counter, input_rows); + } + if (_always_true_counter) { + COUNTER_SET(_always_true_counter, (int64_t)_always_true); + } + }}; + input_rows += block->rows(); if (_getting_const_col) { _impl->set_getting_const_col(true); @@ -99,28 +114,29 @@ Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* if (is_column_const(*result_column.column)) { auto* constant_val = const_cast<char*>(result_column.column->get_data_at(0).data); if (constant_val == nullptr || !*reinterpret_cast<bool*>(constant_val)) { - _filtered_rows += block->rows(); + filter_rows += block->rows(); } } else if (const auto* nullable = check_and_get_column<ColumnNullable>(*result_column.column)) { data = ((ColumnVector<UInt8>*)nullable->get_nested_column_ptr().get()) ->get_data() .data(); - _filtered_rows += doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data), - nullable->get_null_map_data().data(), - block->rows()); + filter_rows += doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data), + nullable->get_null_map_data().data(), + block->rows()); } else if (const auto* res_col = check_and_get_column<ColumnVector<UInt8>>(*result_column.column)) { data = const_cast<uint8_t*>(res_col->get_data().data()); - _filtered_rows += doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data), - block->rows()); + filter_rows += doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data), + block->rows()); } else { return Status::InternalError( "Invalid type for runtime filters!, and _expr_name is: {}. _data_type is: {}. " "result_column_id is: {}. block structure: {}.", _expr_name, _data_type->get_name(), *result_column_id, block->dump_structure()); } - + _filtered_rows += filter_rows; + _scan_rows += input_rows; calculate_filter(VRuntimeFilterWrapper::EXPECTED_FILTER_RATE, _filtered_rows, _scan_rows, _has_calculate_filter, _always_true); return Status::OK(); diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h b/be/src/vec/exprs/vruntimefilter_wrapper.h index 6d50b914beb..30017850028 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.h +++ b/be/src/vec/exprs/vruntimefilter_wrapper.h @@ -17,9 +17,8 @@ #pragma once -#include <stdint.h> - #include <atomic> +#include <cstdint> #include <string> #include <vector> @@ -27,6 +26,7 @@ #include "common/object_pool.h" #include "common/status.h" #include "udf/udf.h" +#include "util/runtime_profile.h" #include "vec/exprs/vexpr.h" namespace doris { @@ -57,6 +57,14 @@ public: const VExprSPtr get_impl() const override { return _impl; } + void attach_profile_counter(RuntimeProfile::Counter* expr_filtered_rows_counter, + RuntimeProfile::Counter* expr_input_rows_counter, + RuntimeProfile::Counter* always_true_counter) { + _expr_filtered_rows_counter = expr_filtered_rows_counter; + _expr_input_rows_counter = expr_input_rows_counter; + _always_true_counter = always_true_counter; + } + // if filter rate less than this, bloom filter will set always true constexpr static double EXPECTED_FILTER_RATE = 0.4; @@ -74,12 +82,17 @@ private: VExprSPtr _impl; bool _always_true; - /// TODO: statistic filter rate in the profile std::atomic<int64_t> _filtered_rows; std::atomic<int64_t> _scan_rows; + RuntimeProfile::Counter* _expr_filtered_rows_counter = nullptr; + RuntimeProfile::Counter* _expr_input_rows_counter = nullptr; + RuntimeProfile::Counter* _always_true_counter = nullptr; bool _has_calculate_filter = false; std::string _expr_name; }; + +using VRuntimeFilterPtr = std::shared_ptr<VRuntimeFilterWrapper>; + } // namespace doris::vectorized \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org