This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch refactor_rf in repository https://gitbox.apache.org/repos/asf/doris.git
commit 74bcdbc464c510eda8c1d6183971716871189a3b Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Tue Mar 4 15:30:49 2025 +0800 test rf utils (#48615) --- be/src/runtime_filter/runtime_filter_mgr.cpp | 4 + be/src/runtime_filter/runtime_filter_wrapper.cpp | 4 +- be/src/runtime_filter/utils.cpp | 20 ++-- .../runtime_filter_definitions_test.cpp | 70 ++++++++++++ .../runtime_filter/runtime_filter_wrapper_test.cpp | 25 ++++- be/test/runtime_filter/utils_test.cpp | 119 +++++++++++++++++++++ gensrc/proto/internal_service.proto | 2 +- 7 files changed, 229 insertions(+), 15 deletions(-) diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp b/be/src/runtime_filter/runtime_filter_mgr.cpp index c712a0865a8..a9603285f3f 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/runtime_filter/runtime_filter_mgr.cpp @@ -411,6 +411,10 @@ RuntimeFilterMgr* RuntimeFilterParamsContext::global_runtime_filter_mgr() { } RuntimeFilterMgr* RuntimeFilterParamsContext::local_runtime_filter_mgr() { + if (_state == nullptr) { + throw Exception(Status::InternalError("Can not get a local runtime filter mgr by query {}", + print_id(_query_ctx->query_id()))); + } return _state->local_runtime_filter_mgr(); } diff --git a/be/src/runtime_filter/runtime_filter_wrapper.cpp b/be/src/runtime_filter/runtime_filter_wrapper.cpp index 8204cc0dd37..df850312150 100644 --- a/be/src/runtime_filter/runtime_filter_wrapper.cpp +++ b/be/src/runtime_filter/runtime_filter_wrapper.cpp @@ -164,9 +164,6 @@ Status RuntimeFilterWrapper::merge(const RuntimeFilterWrapper* other) { DCHECK(_state != State::IGNORED); DCHECK(other->_state == State::READY); - - set_state(State::READY); - DCHECK(_filter_type == other->_filter_type) << debug_string(); switch (_filter_type) { @@ -231,6 +228,7 @@ Status RuntimeFilterWrapper::merge(const RuntimeFilterWrapper* other) { default: return Status::InternalError("unknown runtime filter"); } + set_state(State::READY); return Status::OK(); } diff --git a/be/src/runtime_filter/utils.cpp b/be/src/runtime_filter/utils.cpp index b2e4e27fac4..b041c7d1efa 100644 --- a/be/src/runtime_filter/utils.cpp +++ b/be/src/runtime_filter/utils.cpp @@ -25,28 +25,28 @@ namespace doris { std::string filter_type_to_string(RuntimeFilterType type) { switch (type) { case RuntimeFilterType::IN_FILTER: { - return "in"; + return "IN_FILTER"; } case RuntimeFilterType::BLOOM_FILTER: { - return "bloomfilter"; + return "BLOOM_FILTER"; } case RuntimeFilterType::MIN_FILTER: { - return "only_min"; + return "MIN_FILTER"; } case RuntimeFilterType::MAX_FILTER: { - return "only_max"; + return "MAX_FILTER"; } case RuntimeFilterType::MINMAX_FILTER: { - return "minmax"; + return "MINMAX_FILTER"; } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { - return "in_or_bloomfilter"; + return "IN_OR_BLOOM_FILTER"; } case RuntimeFilterType::BITMAP_FILTER: { - return "bitmapfilter"; + return "BITMAP_FILTER"; } default: - return "unknown"; + return "UNKNOWN_FILTER"; } } @@ -96,6 +96,8 @@ RuntimeFilterType get_type(int filter_type) { return RuntimeFilterType::MIN_FILTER; case PFilterType::MAX_FILTER: return RuntimeFilterType::MAX_FILTER; + case PFilterType::IN_OR_BLOOM_FILTER: + return RuntimeFilterType::IN_OR_BLOOM_FILTER; default: return RuntimeFilterType::UNKNOWN_FILTER; } @@ -117,7 +119,7 @@ PFilterType get_type(RuntimeFilterType type) { case RuntimeFilterType::IN_OR_BLOOM_FILTER: return PFilterType::IN_OR_BLOOM_FILTER; default: - return PFilterType::UNKNOW_FILTER; + return PFilterType::UNKNOWN_FILTER; } } diff --git a/be/test/runtime_filter/runtime_filter_definitions_test.cpp b/be/test/runtime_filter/runtime_filter_definitions_test.cpp new file mode 100644 index 00000000000..073a90763be --- /dev/null +++ b/be/test/runtime_filter/runtime_filter_definitions_test.cpp @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime_filter/runtime_filter_definitions.h" + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "pipeline/thrift_builder.h" +#include "runtime/query_context.h" + +namespace doris { + +class RuntimeFilterDefinitionTest : public testing::Test { +public: + RuntimeFilterDefinitionTest() = default; + ~RuntimeFilterDefinitionTest() override = default; + void SetUp() override {} + void TearDown() override {} +}; + +TEST_F(RuntimeFilterDefinitionTest, TestErrorPath) { + RuntimeState state; + auto query_options = TQueryOptionsBuilder().build(); + auto fe_address = TNetworkAddress(); + fe_address.hostname = BackendOptions::get_localhost(); + fe_address.port = config::brpc_port; + auto ctx = QueryContext::create(TUniqueId(), ExecEnv::GetInstance(), query_options, fe_address, + true, fe_address, QuerySource::INTERNAL_FRONTEND); + state._query_ctx = ctx.get(); + { + auto* res = RuntimeFilterParamsContext::create(&state); + EXPECT_NE(res->get_query_ctx(), nullptr); + EXPECT_NE(res->get_runtime_state(), nullptr); + EXPECT_NE(res->global_runtime_filter_mgr(), nullptr); + EXPECT_EQ(res->local_runtime_filter_mgr(), nullptr); + } + { + auto* res = RuntimeFilterParamsContext::create(ctx.get()); + EXPECT_NE(res->get_query_ctx(), nullptr); + EXPECT_EQ(res->get_runtime_state(), nullptr); + EXPECT_NE(res->global_runtime_filter_mgr(), nullptr); + bool ex = false; + try { + [[maybe_unused]] auto* local = res->local_runtime_filter_mgr(); + } catch (std::exception) { + ex = true; + } + EXPECT_TRUE(ex); + res->set_state(&state); + EXPECT_NE(res->get_runtime_state(), nullptr); + EXPECT_EQ(res->local_runtime_filter_mgr(), nullptr); + } +} + +} // namespace doris diff --git a/be/test/runtime_filter/runtime_filter_wrapper_test.cpp b/be/test/runtime_filter/runtime_filter_wrapper_test.cpp index 7511015c722..5ca0443bfb9 100644 --- a/be/test/runtime_filter/runtime_filter_wrapper_test.cpp +++ b/be/test/runtime_filter/runtime_filter_wrapper_test.cpp @@ -620,7 +620,7 @@ TEST_F(RuntimeFilterWrapperTest, TestMinMax) { { wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); EXPECT_TRUE(wrapper->init(80).ok()); - EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + wrapper->check_state({RuntimeFilterWrapper::State::UNINITED}); EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), type_limit<int>::min()); EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), type_limit<int>::max()); } @@ -628,7 +628,7 @@ TEST_F(RuntimeFilterWrapperTest, TestMinMax) { // Insert auto col = vectorized::ColumnHelper::create_column<DataType>(data_vector); EXPECT_TRUE(wrapper->insert(col, 0).ok()); - EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + wrapper->check_state({RuntimeFilterWrapper::State::UNINITED}); EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), min_val + num_vals - 1); EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), min_val); } @@ -651,11 +651,13 @@ TEST_F(RuntimeFilterWrapperTest, TestMinMax) { auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); EXPECT_TRUE(new_wrapper->init(12312).ok()); EXPECT_EQ(new_wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + wrapper->check_state({RuntimeFilterWrapper::State::UNINITED}); // Insert std::vector<int> new_data_vector {-100, 100}; auto col = vectorized::ColumnHelper::create_column<DataType>(new_data_vector); EXPECT_TRUE(new_wrapper->insert(col, 0).ok()); EXPECT_EQ(new_wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + wrapper->check_state({RuntimeFilterWrapper::State::UNINITED}); new_wrapper->_state = RuntimeFilterWrapper::State::READY; // Merge EXPECT_TRUE(wrapper->merge(new_wrapper.get()).ok()); @@ -688,6 +690,7 @@ TEST_F(RuntimeFilterWrapperTest, TestMinMax) { wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); EXPECT_TRUE(wrapper->init(80).ok()); EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + wrapper->check_state({RuntimeFilterWrapper::State::UNINITED}); EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), type_limit<int>::min()); EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), type_limit<int>::max()); } @@ -696,6 +699,7 @@ TEST_F(RuntimeFilterWrapperTest, TestMinMax) { auto col = vectorized::ColumnHelper::create_column<DataType>(data_vector); EXPECT_TRUE(wrapper->insert(col, 0).ok()); EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + wrapper->check_state({RuntimeFilterWrapper::State::UNINITED}); EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), min_val + num_vals - 1); EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), min_val); } @@ -716,11 +720,13 @@ TEST_F(RuntimeFilterWrapperTest, TestMinMax) { auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); EXPECT_TRUE(new_wrapper->init(12312).ok()); EXPECT_EQ(new_wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + wrapper->check_state({RuntimeFilterWrapper::State::UNINITED}); // Insert std::vector<int> new_data_vector {-100, 100}; auto col = vectorized::ColumnHelper::create_column<DataType>(new_data_vector); EXPECT_TRUE(new_wrapper->insert(col, 0).ok()); EXPECT_EQ(new_wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + wrapper->check_state({RuntimeFilterWrapper::State::UNINITED}); new_wrapper->_state = RuntimeFilterWrapper::State::READY; // Merge EXPECT_TRUE(wrapper->merge(new_wrapper.get()).ok()); @@ -753,6 +759,7 @@ TEST_F(RuntimeFilterWrapperTest, TestMinMax) { wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); EXPECT_TRUE(wrapper->init(80).ok()); EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + wrapper->check_state({RuntimeFilterWrapper::State::UNINITED}); EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), type_limit<int>::min()); EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), type_limit<int>::max()); } @@ -761,6 +768,7 @@ TEST_F(RuntimeFilterWrapperTest, TestMinMax) { auto col = vectorized::ColumnHelper::create_column<DataType>(data_vector); EXPECT_TRUE(wrapper->insert(col, 0).ok()); EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + wrapper->check_state({RuntimeFilterWrapper::State::UNINITED}); EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), min_val + num_vals - 1); EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), min_val); } @@ -781,11 +789,13 @@ TEST_F(RuntimeFilterWrapperTest, TestMinMax) { auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); EXPECT_TRUE(new_wrapper->init(12312).ok()); EXPECT_EQ(new_wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + wrapper->check_state({RuntimeFilterWrapper::State::UNINITED}); // Insert std::vector<int> new_data_vector {-100, 100}; auto col = vectorized::ColumnHelper::create_column<DataType>(new_data_vector); EXPECT_TRUE(new_wrapper->insert(col, 0).ok()); EXPECT_EQ(new_wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + wrapper->check_state({RuntimeFilterWrapper::State::UNINITED}); new_wrapper->_state = RuntimeFilterWrapper::State::READY; // Merge EXPECT_TRUE(wrapper->merge(new_wrapper.get()).ok()); @@ -836,6 +846,7 @@ TEST_F(RuntimeFilterWrapperTest, TestBitMap) { wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); EXPECT_TRUE(wrapper->init(80).ok()); EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + wrapper->check_state({RuntimeFilterWrapper::State::UNINITED}); EXPECT_EQ(wrapper->bitmap_filter_func()->size(), 0); } { @@ -1311,6 +1322,16 @@ TEST_F(RuntimeFilterWrapperTest, TestErrorPath) { EXPECT_EQ(wrapper->bloom_filter_func(), nullptr); EXPECT_EQ(wrapper->bitmap_filter_func(), nullptr); EXPECT_EQ(wrapper->hybrid_set(), nullptr); + wrapper->check_state({RuntimeFilterWrapper::State::UNINITED}); + bool ex = false; + try { + wrapper->check_state({RuntimeFilterWrapper::State::READY, + RuntimeFilterWrapper::State::IGNORED, + RuntimeFilterWrapper::State::DISABLED}); + } catch (std::exception) { + ex = true; + } + EXPECT_TRUE(ex); } } // namespace doris diff --git a/be/test/runtime_filter/utils_test.cpp b/be/test/runtime_filter/utils_test.cpp new file mode 100644 index 00000000000..ca7a91a1fc7 --- /dev/null +++ b/be/test/runtime_filter/utils_test.cpp @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime_filter/utils.h" + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vliteral.h" + +namespace doris { + +class RuntimeFilterUtilsTest : public testing::Test { +public: + RuntimeFilterUtilsTest() = default; + ~RuntimeFilterUtilsTest() override = default; + void SetUp() override {} + void TearDown() override {} +}; + +TEST_F(RuntimeFilterUtilsTest, TestConvertorException) { + class TestClass {}; + bool exception = false; + try { + get_convertor<TestClass>(); + } catch (std::exception) { + exception = true; + } + EXPECT_TRUE(exception); +} + +TEST_F(RuntimeFilterUtilsTest, TestFilterTypeFunction) { +#define CHECK_FOR_FILTER(TYPE) \ + EXPECT_EQ(filter_type_to_string(RuntimeFilterType::TYPE), #TYPE); \ + EXPECT_EQ(get_type(RuntimeFilterType::TYPE), PFilterType::TYPE); \ + EXPECT_EQ(get_type(PFilterType::TYPE), RuntimeFilterType::TYPE); + CHECK_FOR_FILTER(UNKNOWN_FILTER); + CHECK_FOR_FILTER(IN_FILTER); + CHECK_FOR_FILTER(MINMAX_FILTER); + CHECK_FOR_FILTER(BLOOM_FILTER); + CHECK_FOR_FILTER(IN_OR_BLOOM_FILTER); + CHECK_FOR_FILTER(MIN_FILTER); + CHECK_FOR_FILTER(MAX_FILTER); +#undef CHECK_FOR_FILTER + + EXPECT_EQ(filter_type_to_string(RuntimeFilterType::BITMAP_FILTER), "BITMAP_FILTER"); + EXPECT_EQ(get_type(RuntimeFilterType::BITMAP_FILTER), PFilterType::UNKNOWN_FILTER); +} + +TEST_F(RuntimeFilterUtilsTest, TestRuntimeFilterFromThrift) { + TRuntimeFilterDesc desc; + desc.__set_type(TRuntimeFilterType::BLOOM); + EXPECT_EQ(get_runtime_filter_type(&desc), RuntimeFilterType::BLOOM_FILTER); + + desc.__set_type(TRuntimeFilterType::MIN_MAX); + desc.__set_min_max_type(TMinMaxRuntimeFilterType::MIN); + EXPECT_EQ(get_runtime_filter_type(&desc), RuntimeFilterType::MIN_FILTER); + desc.__set_min_max_type(TMinMaxRuntimeFilterType::MAX); + EXPECT_EQ(get_runtime_filter_type(&desc), RuntimeFilterType::MAX_FILTER); + desc.__set_min_max_type(TMinMaxRuntimeFilterType::MIN_MAX); + EXPECT_EQ(get_runtime_filter_type(&desc), RuntimeFilterType::MINMAX_FILTER); + + desc.__set_type(TRuntimeFilterType::IN_OR_BLOOM); + EXPECT_EQ(get_runtime_filter_type(&desc), RuntimeFilterType::IN_OR_BLOOM_FILTER); + desc.__set_type(TRuntimeFilterType::BITMAP); + EXPECT_EQ(get_runtime_filter_type(&desc), RuntimeFilterType::BITMAP_FILTER); + desc.__set_type(TRuntimeFilterType::IN); + EXPECT_EQ(get_runtime_filter_type(&desc), RuntimeFilterType::IN_FILTER); +} + +TEST_F(RuntimeFilterUtilsTest, TestCreateLiteral) { + vectorized::VExprSPtr literal; + TypeDescriptor type(PrimitiveType::TYPE_INT); + const int value = 1; + EXPECT_TRUE(create_literal(type, (const void*)&value, literal).ok()); + EXPECT_TRUE(literal->is_literal()); + EXPECT_EQ(((vectorized::VLiteral*)literal.get())->value(), std::to_string(value)); +} + +TEST_F(RuntimeFilterUtilsTest, TestCreateBinaryPredicate) { + { + vectorized::VExprSPtr expr; + TExprNode pred_node; + TypeDescriptor type(PrimitiveType::TYPE_INT); + auto op = TExprOpcode::EQ; + EXPECT_FALSE(create_vbin_predicate(type, op, expr, &pred_node, false).ok()); + } + { + vectorized::VExprSPtr expr; + TExprNode pred_node; + TypeDescriptor type(PrimitiveType::TYPE_INT); + auto op = TExprOpcode::GE; + EXPECT_TRUE(create_vbin_predicate(type, op, expr, &pred_node, false).ok()); + } + { + vectorized::VExprSPtr expr; + TExprNode pred_node; + TypeDescriptor type(PrimitiveType::TYPE_INT); + auto op = TExprOpcode::LE; + EXPECT_TRUE(create_vbin_predicate(type, op, expr, &pred_node, false).ok()); + } +} + +} // namespace doris diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 2242caae2a4..402f1044568 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -551,7 +551,7 @@ message PInFilter { } enum PFilterType { - UNKNOW_FILTER = 0; + UNKNOWN_FILTER = 0; BLOOM_FILTER = 1; MINMAX_FILTER = 2; IN_FILTER = 3; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org