This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch tpc_preview4-external2 in repository https://gitbox.apache.org/repos/asf/doris.git
commit d9a931a68bc6c8aad86b5bb3a982a8fad6a64b2b Author: Mryange <[email protected]> AuthorDate: Fri Dec 26 17:58:12 2025 +0800 pick 58636 --- be/src/common/config.cpp | 2 +- be/src/olap/column_predicate.h | 5 +- be/src/runtime_filter/runtime_filter_selectivity.h | 96 +++++++++ be/src/vec/exprs/vexpr.cpp | 55 +++++ be/src/vec/exprs/vexpr.h | 10 +- be/src/vec/exprs/vexpr_context.cpp | 90 +-------- be/src/vec/exprs/vexpr_context.h | 9 + be/src/vec/exprs/vruntimefilter_wrapper.cpp | 117 ++++++++--- be/src/vec/exprs/vruntimefilter_wrapper.h | 46 +---- .../runtime_filter_selectivity_test.cpp | 222 +++++++++++++++++++++ 10 files changed, 495 insertions(+), 157 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 29f84798fcf..03a25a6f891 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1037,7 +1037,7 @@ DEFINE_mInt64(big_column_size_buffer, "65535"); DEFINE_mInt64(small_column_size_buffer, "100"); // Perform the always_true check at intervals determined by runtime_filter_sampling_frequency -DEFINE_mInt32(runtime_filter_sampling_frequency, "64"); +DEFINE_mInt32(runtime_filter_sampling_frequency, "32"); DEFINE_mInt32(execution_max_rpc_timeout_sec, "3600"); DEFINE_mBool(execution_ignore_eovercrowded, "true"); // cooldown task configs diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h index 6e6671ff337..7162a96399d 100644 --- a/be/src/olap/column_predicate.h +++ b/be/src/olap/column_predicate.h @@ -25,6 +25,7 @@ #include "olap/rowset/segment_v2/bloom_filter.h" #include "olap/rowset/segment_v2/inverted_index_iterator.h" #include "runtime/define_primitive_type.h" +#include "runtime_filter/runtime_filter_selectivity.h" #include "util/defer_op.h" #include "util/runtime_profile.h" #include "vec/columns/column.h" @@ -372,8 +373,8 @@ protected: if (!_always_true) { _judge_filter_rows += filter_rows; _judge_input_rows += input_rows; - vectorized::VRuntimeFilterWrapper::judge_selectivity( - get_ignore_threshold(), _judge_filter_rows, _judge_input_rows, _always_true); + RuntimeFilterSelectivity::judge_selectivity(get_ignore_threshold(), _judge_filter_rows, + _judge_input_rows, _always_true); } } diff --git a/be/src/runtime_filter/runtime_filter_selectivity.h b/be/src/runtime_filter/runtime_filter_selectivity.h new file mode 100644 index 00000000000..1b0a82143de --- /dev/null +++ b/be/src/runtime_filter/runtime_filter_selectivity.h @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> + +#include "common/config.h" +#include "common/logging.h" + +namespace doris { + +// Used to track the selectivity of runtime filters +// If the selectivity of a runtime filter is very low, it is considered ineffective and can be ignored +// Considering that the selectivity of runtime filters may change with data variations +// A dynamic selectivity tracking mechanism is needed +// Note: this is not a thread-safe class + +class RuntimeFilterSelectivity { +public: + RuntimeFilterSelectivity() = default; + + RuntimeFilterSelectivity(const RuntimeFilterSelectivity&) = delete; + void update_judge_counter() { + if ((_judge_counter++) >= config::runtime_filter_sampling_frequency) { + reset_judge_selectivity(); + } + } + + void update_judge_selectivity(int filter_id, uint64_t filter_rows, uint64_t input_rows, + double ignore_thredhold) { + if (!_always_true) { + _judge_filter_rows += filter_rows; + _judge_input_rows += input_rows; + judge_selectivity(ignore_thredhold, _judge_filter_rows, _judge_input_rows, + _always_true); + } + + VLOG_ROW << fmt::format( + "Runtime filter[{}] selectivity update: filter_rows: {}, input_rows: {}, filter " + "rate: {}, " + "ignore_thredhold: {}, counter: {} , always_true: {}", + filter_id, _judge_filter_rows, _judge_input_rows, + static_cast<double>(_judge_filter_rows) / static_cast<double>(_judge_input_rows), + ignore_thredhold, _judge_counter, _always_true); + } + + bool maybe_always_true_can_ignore() const { + /// TODO: maybe we can use session variable to control this behavior ? + if (config::runtime_filter_sampling_frequency <= 0) { + return false; + } else { + return _always_true; + } + } + + static void judge_selectivity(double ignore_threshold, int64_t filter_rows, int64_t input_rows, + bool& always_true) { + // if the judged input rows is too small, we think the selectivity is not reliable + if (input_rows > min_judge_input_rows) { + always_true = (static_cast<double>(filter_rows) / static_cast<double>(input_rows)) < + ignore_threshold; + } + } + +private: + void reset_judge_selectivity() { + _always_true = false; + _judge_counter = 0; + _judge_input_rows = 0; + _judge_filter_rows = 0; + } + + int64_t _judge_input_rows = 0; + int64_t _judge_filter_rows = 0; + int _judge_counter = 0; + bool _always_true = false; + + constexpr static int64_t min_judge_input_rows = 4096 * 10; +}; + +} // namespace doris diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp index 1bafe01ad71..52d4ca01eac 100644 --- a/be/src/vec/exprs/vexpr.cpp +++ b/be/src/vec/exprs/vexpr.cpp @@ -1015,5 +1015,60 @@ bool VExpr::ann_dist_is_fulfilled() const { return _virtual_column_is_fulfilled; } +Status VExpr::execute_filter(VExprContext* context, const Block* block, + uint8_t* __restrict result_filter_data, size_t rows, bool accept_null, + bool* can_filter_all) const { + ColumnPtr filter_column; + RETURN_IF_ERROR(execute_column(context, block, filter_column)); + if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) { + // const(nullable) or const(bool) + const bool result = accept_null + ? (const_column->is_null_at(0) || const_column->get_bool(0)) + : (!const_column->is_null_at(0) && const_column->get_bool(0)); + if (!result) { + // filter all + *can_filter_all = true; + memset(result_filter_data, 0, rows); + return Status::OK(); + } + } else if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { + // nullable(bool) + const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); + const IColumn::Filter& filter = assert_cast<const ColumnUInt8&>(*nested_column).get_data(); + const auto* __restrict filter_data = filter.data(); + const auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); + + if (accept_null) { + for (size_t i = 0; i < rows; ++i) { + result_filter_data[i] &= (null_map_data[i]) || filter_data[i]; + } + } else { + for (size_t i = 0; i < rows; ++i) { + result_filter_data[i] &= (!null_map_data[i]) & filter_data[i]; + } + } + + if ((memchr(result_filter_data, 0x1, rows) == nullptr)) { + *can_filter_all = true; + return Status::OK(); + } + } else { + // bool + const IColumn::Filter& filter = assert_cast<const ColumnUInt8&>(*filter_column).get_data(); + const auto* __restrict filter_data = filter.data(); + + for (size_t i = 0; i < rows; ++i) { + result_filter_data[i] &= filter_data[i]; + } + + if (memchr(result_filter_data, 0x1, rows) == nullptr) { + *can_filter_all = true; + return Status::OK(); + } + } + + return Status::OK(); +} + #include "common/compile_check_end.h" } // namespace doris::vectorized diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index 35a0d3733b0..2a0abe439f9 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -147,6 +147,10 @@ public: // Therefore we need a function like this to return the actual type produced by execution. virtual DataTypePtr execute_type(const Block* block) const { return _data_type; } + virtual Status execute_filter(VExprContext* context, const Block* block, + uint8_t* __restrict result_filter_data, size_t rows, + bool accept_null, bool* can_filter_all) const; + // `is_blockable` means this expr will be blocked in `execute` (e.g. AI Function, Remote Function) [[nodiscard]] virtual bool is_blockable() const { return std::any_of(_children.begin(), _children.end(), @@ -204,12 +208,6 @@ public: [](VExprSPtr child) { return child->is_rf_wrapper(); }); } - virtual void do_judge_selectivity(uint64_t filter_rows, uint64_t input_rows) { - for (auto child : _children) { - child->do_judge_selectivity(filter_rows, input_rows); - } - } - static Status create_expr_tree(const TExpr& texpr, VExprContextSPtr& ctx); static Status create_expr_trees(const std::vector<TExpr>& texprs, VExprContextSPtrs& ctxs); diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index a7b71b77646..2a9c049e303 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -199,7 +199,12 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs, return execute_conjuncts(ctxs, filters, false, block, result_filter, can_filter_all); } -// TODO: Performance Optimization +Status VExprContext::execute_filter(const Block* block, uint8_t* __restrict result_filter_data, + size_t rows, bool accept_null, bool* can_filter_all) { + return _root->execute_filter(this, block, result_filter_data, rows, accept_null, + can_filter_all); +} + Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs, const std::vector<IColumn::Filter*>* filters, bool accept_null, const Block* block, @@ -209,85 +214,10 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs, *can_filter_all = false; auto* __restrict result_filter_data = result_filter->data(); for (const auto& ctx : ctxs) { - // Statistics are only required when an rf wrapper exists in the expr. - bool is_rf_wrapper = ctx->root()->is_rf_wrapper(); - ColumnPtr filter_column; - RETURN_IF_ERROR(ctx->execute(block, filter_column)); - if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { - size_t column_size = nullable_column->size(); - if (column_size == 0) { - *can_filter_all = true; - return Status::OK(); - } else { - const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); - const IColumn::Filter& filter = - assert_cast<const ColumnUInt8&>(*nested_column).get_data(); - const auto* __restrict filter_data = filter.data(); - const auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); - - size_t input_rows = - rows - (is_rf_wrapper - ? simd::count_zero_num((int8_t*)result_filter_data, rows) - : 0); - - if (accept_null) { - for (size_t i = 0; i < rows; ++i) { - result_filter_data[i] &= (null_map_data[i]) || filter_data[i]; - } - } else { - for (size_t i = 0; i < rows; ++i) { - result_filter_data[i] &= (!null_map_data[i]) & filter_data[i]; - } - } - - size_t output_rows = - rows - (is_rf_wrapper - ? simd::count_zero_num((int8_t*)result_filter_data, rows) - : 0); - - if (is_rf_wrapper) { - ctx->root()->do_judge_selectivity(input_rows - output_rows, input_rows); - } - - if ((is_rf_wrapper && output_rows == 0) || - (!is_rf_wrapper && memchr(result_filter_data, 0x1, rows) == nullptr)) { - *can_filter_all = true; - return Status::OK(); - } - } - } else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) { - // filter all - if (!const_column->get_bool(0)) { - *can_filter_all = true; - memset(result_filter_data, 0, result_filter->size()); - return Status::OK(); - } - } else { - const IColumn::Filter& filter = - assert_cast<const ColumnUInt8&>(*filter_column).get_data(); - const auto* __restrict filter_data = filter.data(); - - size_t input_rows = - rows - - (is_rf_wrapper ? simd::count_zero_num((int8_t*)result_filter_data, rows) : 0); - - for (size_t i = 0; i < rows; ++i) { - result_filter_data[i] &= filter_data[i]; - } - - size_t output_rows = - rows - - (is_rf_wrapper ? simd::count_zero_num((int8_t*)result_filter_data, rows) : 0); - - if (is_rf_wrapper) { - ctx->root()->do_judge_selectivity(input_rows - output_rows, input_rows); - } - - if ((is_rf_wrapper && output_rows == 0) || - (!is_rf_wrapper && memchr(result_filter_data, 0x1, rows) == nullptr)) { - *can_filter_all = true; - return Status::OK(); - } + RETURN_IF_ERROR( + ctx->execute_filter(block, result_filter_data, rows, accept_null, can_filter_all)); + if (*can_filter_all) { + return Status::OK(); } } if (filters != nullptr) { diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h index 3179526ec54..349f199af23 100644 --- a/be/src/vec/exprs/vexpr_context.h +++ b/be/src/vec/exprs/vexpr_context.h @@ -33,6 +33,7 @@ #include "olap/rowset/segment_v2/inverted_index_reader.h" #include "runtime/runtime_state.h" #include "runtime/types.h" +#include "runtime_filter/runtime_filter_selectivity.h" #include "udf/udf.h" #include "vec/columns/column.h" #include "vec/core/block.h" @@ -210,6 +211,9 @@ public: bool all_expr_inverted_index_evaluated(); + Status execute_filter(const Block* block, uint8_t* __restrict result_filter_data, size_t rows, + bool accept_null, bool* can_filter_all); + [[nodiscard]] static Status filter_block(VExprContext* vexpr_ctx, Block* block); [[nodiscard]] static Status filter_block(const VExprContextSPtrs& expr_contexts, Block* block, @@ -246,6 +250,8 @@ public: return _last_result_column_id; } + RuntimeFilterSelectivity& get_runtime_filter_selectivity() { return *_rf_selectivity; } + FunctionContext::FunctionStateScope get_function_state_scope() const { return _is_clone ? FunctionContext::THREAD_LOCAL : FunctionContext::FRAGMENT_LOCAL; } @@ -337,5 +343,8 @@ private: segment_v2::AnnRangeSearchRuntime _ann_range_search_runtime; bool _suitable_for_ann_index = true; + + std::unique_ptr<RuntimeFilterSelectivity> _rf_selectivity = + std::make_unique<RuntimeFilterSelectivity>(); }; } // namespace doris::vectorized diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp b/be/src/vec/exprs/vruntimefilter_wrapper.cpp index 8e915ffff67..b24df4860da 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp +++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp @@ -62,9 +62,7 @@ VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExprSPtr im _impl(std::move(impl)), _ignore_thredhold(ignore_thredhold), _null_aware(null_aware), - _filter_id(filter_id) { - reset_judge_selectivity(); -} + _filter_id(filter_id) {} Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) { @@ -89,38 +87,105 @@ void VRuntimeFilterWrapper::close(VExprContext* context, Status VRuntimeFilterWrapper::execute_column(VExprContext* context, const Block* block, ColumnPtr& result_column) const { - DCHECK(_open_finished || _getting_const_col); - if (_judge_counter.fetch_sub(1) == 0) { - reset_judge_selectivity(); + return Status::InternalError("Not implement VRuntimeFilterWrapper::execute_column"); +} + +const std::string& VRuntimeFilterWrapper::expr_name() const { + return _expr_name; +} + +Status VRuntimeFilterWrapper::execute_filter(VExprContext* context, const Block* block, + uint8_t* __restrict result_filter_data, size_t rows, + bool accept_null, bool* can_filter_all) const { + DCHECK(_open_finished); + if (accept_null) { + return Status::InternalError( + "Runtime filter does not support accept_null in execute_filter"); } - if (_always_true) { - size_t size = block->rows(); - result_column = create_always_true_column(size, _data_type->is_nullable()); - COUNTER_UPDATE(_always_true_filter_rows, size); + + auto& rf_selectivity = context->get_runtime_filter_selectivity(); + Defer auto_update_judge_counter = [&]() { rf_selectivity.update_judge_counter(); }; + + // if always true, skip evaluate runtime filter + if (rf_selectivity.maybe_always_true_can_ignore()) { + COUNTER_UPDATE(_always_true_filter_rows, rows); return Status::OK(); - } else { - if (_getting_const_col) { - _impl->set_getting_const_col(true); + } + + ColumnPtr filter_column; + ColumnPtr arg_column = nullptr; + RETURN_IF_ERROR(_impl->execute_runtime_filter(context, block, filter_column, &arg_column)); + + // bloom filter will handle null aware inside itself + if (_null_aware && TExprNodeType::BLOOM_PRED != node_type()) { + DCHECK(arg_column); + change_null_to_true(filter_column->assume_mutable(), arg_column); + } + + if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) { + // const(nullable) or const(bool) + if (!const_column->get_bool(0)) { + // filter all + COUNTER_UPDATE(_rf_filter_rows, rows); + COUNTER_UPDATE(_rf_input_rows, rows); + rf_selectivity.update_judge_selectivity(_filter_id, rows, rows, _ignore_thredhold); + *can_filter_all = true; + memset(result_filter_data, 0, rows); + return Status::OK(); + } else { + // filter none + COUNTER_UPDATE(_rf_input_rows, rows); + rf_selectivity.update_judge_selectivity(_filter_id, 0, rows, _ignore_thredhold); + return Status::OK(); } + } else if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { + // nullable(bool) + const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); + const IColumn::Filter& filter = assert_cast<const ColumnUInt8&>(*nested_column).get_data(); + const auto* __restrict filter_data = filter.data(); + const auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); + + const size_t input_rows = rows - simd::count_zero_num((int8_t*)result_filter_data, rows); - ColumnPtr arg_column = nullptr; - RETURN_IF_ERROR(_impl->execute_runtime_filter(context, block, result_column, &arg_column)); - if (_getting_const_col) { - _impl->set_getting_const_col(false); + for (size_t i = 0; i < rows; ++i) { + result_filter_data[i] &= (!null_map_data[i]) & filter_data[i]; } - // bloom filter will handle null aware inside itself - if (_null_aware && TExprNodeType::BLOOM_PRED != node_type()) { - DCHECK(arg_column); - change_null_to_true(result_column->assume_mutable(), arg_column); + const size_t output_rows = rows - simd::count_zero_num((int8_t*)result_filter_data, rows); + + COUNTER_UPDATE(_rf_filter_rows, input_rows - output_rows); + COUNTER_UPDATE(_rf_input_rows, input_rows); + rf_selectivity.update_judge_selectivity(_filter_id, input_rows - output_rows, input_rows, + _ignore_thredhold); + + if (output_rows == 0) { + *can_filter_all = true; + return Status::OK(); } + } else { + // bool + const IColumn::Filter& filter = assert_cast<const ColumnUInt8&>(*filter_column).get_data(); + const auto* __restrict filter_data = filter.data(); - return Status::OK(); - } -} + const size_t input_rows = rows - simd::count_zero_num((int8_t*)result_filter_data, rows); -const std::string& VRuntimeFilterWrapper::expr_name() const { - return _expr_name; + for (size_t i = 0; i < rows; ++i) { + result_filter_data[i] &= filter_data[i]; + } + + const size_t output_rows = rows - simd::count_zero_num((int8_t*)result_filter_data, rows); + + COUNTER_UPDATE(_rf_filter_rows, input_rows - output_rows); + COUNTER_UPDATE(_rf_input_rows, input_rows); + rf_selectivity.update_judge_selectivity(_filter_id, input_rows - output_rows, input_rows, + _ignore_thredhold); + + if (output_rows == 0) { + *can_filter_all = true; + return Status::OK(); + } + } + return Status::OK(); } #include "common/compile_check_end.h" diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h b/be/src/vec/exprs/vruntimefilter_wrapper.h index 3535898915b..09bc8a815c7 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.h +++ b/be/src/vec/exprs/vruntimefilter_wrapper.h @@ -63,6 +63,10 @@ public: const std::string& expr_name() const override; const VExprSPtrs& children() const override { return _impl->children(); } + Status execute_filter(VExprContext* context, const Block* block, + uint8_t* __restrict result_filter_data, size_t rows, bool accept_null, + bool* can_filter_all) const override; + uint64_t get_digest(uint64_t seed) const override { seed = _impl->get_digest(seed); if (seed) { @@ -91,33 +95,10 @@ public: } } - void update_counters(int64_t filter_rows, int64_t input_rows) { - COUNTER_UPDATE(_rf_filter_rows, filter_rows); - COUNTER_UPDATE(_rf_input_rows, input_rows); - } - - template <typename T> - static void judge_selectivity(double ignore_threshold, int64_t filter_rows, int64_t input_rows, - T& always_true) { - always_true = static_cast<double>(filter_rows) / static_cast<double>(input_rows) < - ignore_threshold; - } - bool is_rf_wrapper() const override { return true; } int filter_id() const { return _filter_id; } - void do_judge_selectivity(uint64_t filter_rows, uint64_t input_rows) override { - update_counters(filter_rows, input_rows); - - if (!_always_true) { - _judge_filter_rows += filter_rows; - _judge_input_rows += input_rows; - judge_selectivity(_ignore_thredhold, _judge_filter_rows, _judge_input_rows, - _always_true); - } - } - std::shared_ptr<RuntimeProfile::Counter> predicate_filtered_rows_counter() const { return _rf_filter_rows; } @@ -129,26 +110,7 @@ public: } private: - void reset_judge_selectivity() const { - _always_true = false; - _judge_counter = config::runtime_filter_sampling_frequency; - _judge_input_rows = 0; - _judge_filter_rows = 0; - } - VExprSPtr _impl; - // VRuntimeFilterWrapper and ColumnPredicate share the same logic, - // but it's challenging to unify them, so the code is duplicated. - // _judge_counter, _judge_input_rows, _judge_filter_rows, and _always_true - // are variables used to implement the _always_true logic, calculated periodically - // based on runtime_filter_sampling_frequency. During each period, if _always_true - // is evaluated as true, the logic for always_true is applied for the rest of that period - // without recalculating. At the beginning of the next period, - // reset_judge_selectivity is used to reset these variables. - mutable std::atomic_int _judge_counter = 0; - mutable std::atomic_uint64_t _judge_input_rows = 0; - mutable std::atomic_uint64_t _judge_filter_rows = 0; - mutable std::atomic_int _always_true = false; std::shared_ptr<RuntimeProfile::Counter> _rf_input_rows = std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0); diff --git a/be/test/runtime_filter/runtime_filter_selectivity_test.cpp b/be/test/runtime_filter/runtime_filter_selectivity_test.cpp new file mode 100644 index 00000000000..b8504f950c2 --- /dev/null +++ b/be/test/runtime_filter/runtime_filter_selectivity_test.cpp @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime_filter/runtime_filter_selectivity.h" + +#include <glog/logging.h> +#include <gtest/gtest.h> + +namespace doris { + +class RuntimeFilterSelectivityTest : public testing::Test { +protected: + void SetUp() override { + // Save original config value + _original_sampling_frequency = config::runtime_filter_sampling_frequency; + } + + void TearDown() override { + // Restore original config value + config::runtime_filter_sampling_frequency = _original_sampling_frequency; + } + + int _original_sampling_frequency; +}; + +TEST_F(RuntimeFilterSelectivityTest, basic_initialization) { + RuntimeFilterSelectivity selectivity; + // Initially should be false (not always_true) + EXPECT_FALSE(selectivity.maybe_always_true_can_ignore()); +} + +TEST_F(RuntimeFilterSelectivityTest, disabled_sampling_frequency) { + RuntimeFilterSelectivity selectivity; + config::runtime_filter_sampling_frequency = 0; + + // Even if conditions are met, should return false when sampling is disabled + selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1); + EXPECT_FALSE(selectivity.maybe_always_true_can_ignore()); +} + +TEST_F(RuntimeFilterSelectivityTest, negative_sampling_frequency) { + RuntimeFilterSelectivity selectivity; + config::runtime_filter_sampling_frequency = -1; + + selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1); + EXPECT_FALSE(selectivity.maybe_always_true_can_ignore()); +} + +TEST_F(RuntimeFilterSelectivityTest, judge_selectivity_below_threshold) { + bool always_true = false; + // filter_rows/input_rows = 5/50000 = 0.0001 < 0.1 + // input_rows (50000) > min_judge_input_rows (40960) + RuntimeFilterSelectivity::judge_selectivity(0.1, 5, 50000, always_true); + EXPECT_TRUE(always_true); +} + +TEST_F(RuntimeFilterSelectivityTest, judge_selectivity_above_threshold) { + bool always_true = false; + // filter_rows/input_rows = 25000/50000 = 0.5 >= 0.1 + RuntimeFilterSelectivity::judge_selectivity(0.1, 25000, 50000, always_true); + EXPECT_FALSE(always_true); +} + +TEST_F(RuntimeFilterSelectivityTest, judge_selectivity_insufficient_input_rows) { + bool always_true = false; + // Even though 5/100 = 0.05 < 0.1, input_rows (100) < min_judge_input_rows (40960) + RuntimeFilterSelectivity::judge_selectivity(0.1, 5, 100, always_true); + EXPECT_FALSE(always_true); +} + +TEST_F(RuntimeFilterSelectivityTest, update_with_low_selectivity) { + config::runtime_filter_sampling_frequency = 100; + RuntimeFilterSelectivity selectivity; + + // filter_rows/input_rows = 2000/50000 = 0.04 < 0.1 + selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1); + EXPECT_TRUE(selectivity.maybe_always_true_can_ignore()); +} + +TEST_F(RuntimeFilterSelectivityTest, update_with_high_selectivity) { + config::runtime_filter_sampling_frequency = 100; + RuntimeFilterSelectivity selectivity; + + // filter_rows/input_rows = 45000/50000 = 0.9 >= 0.1 + selectivity.update_judge_selectivity(-1, 45000, 50000, 0.1); + EXPECT_FALSE(selectivity.maybe_always_true_can_ignore()); +} + +TEST_F(RuntimeFilterSelectivityTest, once_always_true_stays_true) { + config::runtime_filter_sampling_frequency = 100; + RuntimeFilterSelectivity selectivity; + + // First update: low selectivity + selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1); + EXPECT_TRUE(selectivity.maybe_always_true_can_ignore()); + + // Second update: high selectivity, but should be ignored + selectivity.update_judge_selectivity(-1, 45000, 50000, 0.1); + EXPECT_TRUE(selectivity.maybe_always_true_can_ignore()); +} + +TEST_F(RuntimeFilterSelectivityTest, accumulated_selectivity_low) { + config::runtime_filter_sampling_frequency = 100; + RuntimeFilterSelectivity selectivity; + + // First update: 1000/50000 = 0.02 + selectivity.update_judge_selectivity(-1, 1000, 50000, 0.1); + EXPECT_TRUE(selectivity.maybe_always_true_can_ignore()); +} + +TEST_F(RuntimeFilterSelectivityTest, accumulated_selectivity_high) { + config::runtime_filter_sampling_frequency = 100; + RuntimeFilterSelectivity selectivity; + + // First update: 20000/50000 = 0.4 + selectivity.update_judge_selectivity(-1, 20000, 50000, 0.1); + EXPECT_FALSE(selectivity.maybe_always_true_can_ignore()); + + // Second update: accumulated (20000+20000)/(50000+50000) = 0.4 + selectivity.update_judge_selectivity(-1, 20000, 50000, 0.1); + EXPECT_FALSE(selectivity.maybe_always_true_can_ignore()); +} + +TEST_F(RuntimeFilterSelectivityTest, counter_triggers_reset) { + config::runtime_filter_sampling_frequency = 3; + RuntimeFilterSelectivity selectivity; + + // Mark as always_true + selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1); + EXPECT_TRUE(selectivity.maybe_always_true_can_ignore()); + + // Update counter to trigger reset + selectivity.update_judge_counter(); // counter = 1 + selectivity.update_judge_counter(); // counter = 2 + selectivity.update_judge_counter(); // counter = 3, triggers reset + + EXPECT_TRUE(selectivity.maybe_always_true_can_ignore()); +} + +TEST_F(RuntimeFilterSelectivityTest, reset_allows_reevaluation) { + config::runtime_filter_sampling_frequency = 2; + RuntimeFilterSelectivity selectivity; + + // First cycle: mark as always_true + selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1); + EXPECT_TRUE(selectivity.maybe_always_true_can_ignore()); + + // Trigger reset + selectivity.update_judge_counter(); // counter = 1 + selectivity.update_judge_counter(); // counter = 2, triggers reset + + // Second cycle: now with high selectivity + selectivity.update_judge_selectivity(-1, 45000, 50000, 0.1); + EXPECT_TRUE(selectivity.maybe_always_true_can_ignore()); +} + +TEST_F(RuntimeFilterSelectivityTest, edge_case_zero_rows) { + bool always_true = false; + RuntimeFilterSelectivity::judge_selectivity(0.1, 0, 0, always_true); + EXPECT_FALSE(always_true); +} + +TEST_F(RuntimeFilterSelectivityTest, edge_case_exact_threshold) { + bool always_true = false; + // Exactly at threshold: 5000/50000 = 0.1, NOT less than 0.1 + RuntimeFilterSelectivity::judge_selectivity(0.1, 5000, 50000, always_true); + EXPECT_FALSE(always_true); + + // Just below threshold: 4999/50000 = 0.09998 < 0.1 + RuntimeFilterSelectivity::judge_selectivity(0.1, 4999, 50000, always_true); + EXPECT_TRUE(always_true); +} + +TEST_F(RuntimeFilterSelectivityTest, multiple_updates_before_threshold) { + config::runtime_filter_sampling_frequency = 100; + RuntimeFilterSelectivity selectivity; + + // Multiple updates with insufficient rows each time + selectivity.update_judge_selectivity(-1, 100, 1000, 0.1); // 100/1000, insufficient + EXPECT_FALSE(selectivity.maybe_always_true_can_ignore()); + + selectivity.update_judge_selectivity(-1, 200, 2000, 0.1); // 300/3000, insufficient + EXPECT_FALSE(selectivity.maybe_always_true_can_ignore()); + + // Now accumulated rows are sufficient: 300+2000 = 2300, 3000+40000 = 43000 + selectivity.update_judge_selectivity(-1, 2000, 40000, 0.1); // 2300/43000 = 0.053 < 0.1 + EXPECT_TRUE(selectivity.maybe_always_true_can_ignore()); +} + +TEST_F(RuntimeFilterSelectivityTest, different_thresholds) { + config::runtime_filter_sampling_frequency = 100; + + // Test with threshold 0.05 + { + RuntimeFilterSelectivity selectivity; + selectivity.update_judge_selectivity(-1, 2000, 50000, 0.05); // 0.04 < 0.05 + EXPECT_TRUE(selectivity.maybe_always_true_can_ignore()); + } + + // Test with threshold 0.03 + { + RuntimeFilterSelectivity selectivity; + selectivity.update_judge_selectivity(-1, 2000, 50000, 0.03); // 0.04 >= 0.03 + EXPECT_FALSE(selectivity.maybe_always_true_can_ignore()); + } +} + +} // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
