This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch refactor_rf in repository https://gitbox.apache.org/repos/asf/doris.git
commit dbc4eacaa20afbb920f39d7ab79ee5336b6010b0 Author: Pxl <x...@selectdb.com> AuthorDate: Mon Mar 3 18:37:23 2025 +0800 add more test and some fix --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 3 +- be/src/runtime_filter/runtime_filter.cpp | 3 - be/src/runtime_filter/runtime_filter.h | 1 + .../runtime_filter_consumer_helper.cpp | 4 +- .../runtime_filter_consumer_helper.h | 4 +- be/src/runtime_filter/runtime_filter_definitions.h | 3 +- be/src/runtime_filter/runtime_filter_mgr.h | 2 +- be/src/runtime_filter/runtime_filter_producer.h | 4 +- .../runtime_filter_producer_helper.cpp | 7 +- .../runtime_filter_producer_helper.h | 3 +- .../runtime_filter_producer_helper_cross.h | 2 +- be/src/runtime_filter/runtime_filter_wrapper.cpp | 9 +- be/src/runtime_filter/utils.cpp | 106 ---------------- be/src/runtime_filter/utils.h | 6 - be/test/pipeline/thrift_builder.h | 2 +- .../runtime_filter_consumer_helper_test.cpp | 108 +++++++++++++++++ .../runtime_filter_consumer_test.cpp | 94 ++++++++++++-- .../runtime_filter/runtime_filter_merger_test.cpp | 31 +++-- .../runtime_filter_producer_helper_cross_test.cpp | 84 +++++++++++++ .../runtime_filter_producer_helper_test.cpp | 135 +++++++++++++++++++++ .../runtime_filter_producer_test.cpp | 8 +- be/test/runtime_filter/runtime_filter_test_utils.h | 8 +- .../runtime_filter/runtime_filter_wrapper_test.cpp | 2 +- gensrc/proto/internal_service.proto | 2 +- 24 files changed, 475 insertions(+), 156 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index a8f0ce78c41..a064b5d9398 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -226,8 +226,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu try { RETURN_IF_ERROR(_runtime_filter_producer_helper->process( - state, _shared_state->build_block.get(), _finish_dependency, - p._shared_hash_table_context)); + state, _shared_state->build_block.get(), p._shared_hash_table_context)); } catch (Exception& e) { bool blocked_by_shared_hash_table_signal = !_should_build_hash_table && p._shared_hashtable_controller && diff --git a/be/src/runtime_filter/runtime_filter.cpp b/be/src/runtime_filter/runtime_filter.cpp index 9ba7324e3fb..cb64fb43af0 100644 --- a/be/src/runtime_filter/runtime_filter.cpp +++ b/be/src/runtime_filter/runtime_filter.cpp @@ -52,9 +52,6 @@ Status RuntimeFilter::_push_to_remote(RuntimeState* state, const TNetworkAddress pfragment_instance_id->set_hi(BackendOptions::get_local_backend().id); pfragment_instance_id->set_lo((int64_t)this); - auto column_type = _wrapper->column_type(); - RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type))); - merge_filter_callback->cntl_->set_timeout_ms( get_execution_rpc_timeout_ms(_state->get_query_ctx()->execution_timeout())); if (config::execution_ignore_eovercrowded) { diff --git a/be/src/runtime_filter/runtime_filter.h b/be/src/runtime_filter/runtime_filter.h index 660f0ad88ce..91883bbcec0 100644 --- a/be/src/runtime_filter/runtime_filter.h +++ b/be/src/runtime_filter/runtime_filter.h @@ -89,6 +89,7 @@ protected: _has_local_target(desc->has_local_targets), _runtime_filter_type(get_runtime_filter_type(desc)) { DCHECK_NE(desc->has_remote_targets, _has_local_target); + DCHECK_NE(state, nullptr); } virtual Status _init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options); diff --git a/be/src/runtime_filter/runtime_filter_consumer_helper.cpp b/be/src/runtime_filter/runtime_filter_consumer_helper.cpp index e3259c03164..c9b0cff475f 100644 --- a/be/src/runtime_filter/runtime_filter_consumer_helper.cpp +++ b/be/src/runtime_filter/runtime_filter_consumer_helper.cpp @@ -20,7 +20,7 @@ #include "pipeline/pipeline_task.h" #include "runtime_filter/runtime_filter_consumer.h" -namespace doris::pipeline { +namespace doris { RuntimeFilterConsumerHelper::RuntimeFilterConsumerHelper( const int32_t _node_id, const std::vector<TRuntimeFilterDesc>& runtime_filters, @@ -148,4 +148,4 @@ Status RuntimeFilterConsumerHelper::try_append_late_arrival_runtime_filter( return Status::OK(); } -} // namespace doris::pipeline \ No newline at end of file +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime_filter/runtime_filter_consumer_helper.h b/be/src/runtime_filter/runtime_filter_consumer_helper.h index b1cf99356c2..15c7ef557e1 100644 --- a/be/src/runtime_filter/runtime_filter_consumer_helper.h +++ b/be/src/runtime_filter/runtime_filter_consumer_helper.h @@ -20,7 +20,7 @@ #include "pipeline/dependency.h" #include "vec/exprs/vruntimefilter_wrapper.h" -namespace doris::pipeline { +namespace doris { // this class used in ScanNode or MultiCastDataStreamSource /** @@ -74,4 +74,4 @@ private: std::unique_ptr<RuntimeProfile> _profile; }; -} // namespace doris::pipeline \ No newline at end of file +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime_filter/runtime_filter_definitions.h b/be/src/runtime_filter/runtime_filter_definitions.h index 693e62beea9..28329f65cb7 100644 --- a/be/src/runtime_filter/runtime_filter_definitions.h +++ b/be/src/runtime_filter/runtime_filter_definitions.h @@ -79,7 +79,8 @@ class RuntimeFilterMgr; // There are two types of runtime filters: // 1. Global runtime filter. Managed by QueryContext's RuntimeFilterMgr which is produced by multiple producers and shared by multiple consumers. // 2. Local runtime filter. Managed by RuntimeState's RuntimeFilterMgr which is 1-producer-1-consumer mode. -struct RuntimeFilterParamsContext { +class RuntimeFilterParamsContext { +public: static RuntimeFilterParamsContext* create(RuntimeState* state); static RuntimeFilterParamsContext* create(QueryContext* query_ctx); diff --git a/be/src/runtime_filter/runtime_filter_mgr.h b/be/src/runtime_filter/runtime_filter_mgr.h index a63aca9b68d..896a987fad5 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.h +++ b/be/src/runtime_filter/runtime_filter_mgr.h @@ -52,7 +52,7 @@ class MemTrackerLimiter; class RuntimeState; class RuntimeFilterWrapper; class QueryContext; -struct RuntimeFilterParamsContext; +class RuntimeFilterParamsContext; class ExecEnv; class RuntimeProfile; diff --git a/be/src/runtime_filter/runtime_filter_producer.h b/be/src/runtime_filter/runtime_filter_producer.h index 022fe6450d8..17107a61d67 100644 --- a/be/src/runtime_filter/runtime_filter_producer.h +++ b/be/src/runtime_filter/runtime_filter_producer.h @@ -105,11 +105,11 @@ public: } } - void copy_to_shared_context(vectorized::SharedHashTableContextPtr& context) { + void copy_to_shared_context(const vectorized::SharedHashTableContextPtr& context) { DCHECK(!context->runtime_filters.contains(_wrapper->filter_id())); context->runtime_filters[_wrapper->filter_id()] = _wrapper; } - void copy_from_shared_context(vectorized::SharedHashTableContextPtr& context) { + void copy_from_shared_context(const vectorized::SharedHashTableContextPtr& context) { DCHECK(context->runtime_filters.contains(_wrapper->filter_id())); _wrapper = context->runtime_filters[_wrapper->filter_id()]; } diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.cpp b/be/src/runtime_filter/runtime_filter_producer_helper.cpp index 53038a7fa61..22c8768b285 100644 --- a/be/src/runtime_filter/runtime_filter_producer_helper.cpp +++ b/be/src/runtime_filter/runtime_filter_producer_helper.cpp @@ -73,6 +73,7 @@ Status RuntimeFilterProducerHelper::_insert(const vectorized::Block* block, size continue; } int result_column_id = _filter_expr_contexts[i]->get_last_result_column_id(); + DCHECK_NE(result_column_id, -1); const auto& column = block->get_by_position(result_column_id).column; RETURN_IF_ERROR(filter->insert(column, start)); } @@ -89,8 +90,7 @@ Status RuntimeFilterProducerHelper::_publish(RuntimeState* state) { Status RuntimeFilterProducerHelper::process( RuntimeState* state, const vectorized::Block* block, - std::shared_ptr<pipeline::CountedFinishDependency> finish_dependency, - vectorized::SharedHashTableContextPtr& shared_hash_table_ctx) { + const vectorized::SharedHashTableContextPtr& shared_hash_table_ctx) { if (_skip_runtime_filters_process) { return Status::OK(); } @@ -104,7 +104,8 @@ Status RuntimeFilterProducerHelper::process( uint64_t hash_table_size = block ? block->rows() : 0; RETURN_IF_ERROR(_init_filters(state, hash_table_size)); if (hash_table_size > 1) { - RETURN_IF_ERROR(_insert(block, 1)); + constexpr int HASH_JOIN_INSERT_OFFSET = 1; // the first row is mocked on hash join sink + RETURN_IF_ERROR(_insert(block, HASH_JOIN_INSERT_OFFSET)); } } diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.h b/be/src/runtime_filter/runtime_filter_producer_helper.h index f8b7d985f50..dfd6b6bccf9 100644 --- a/be/src/runtime_filter/runtime_filter_producer_helper.h +++ b/be/src/runtime_filter/runtime_filter_producer_helper.h @@ -58,8 +58,7 @@ public: // build rf's predicate and publish rf Status process(RuntimeState* state, const vectorized::Block* block, - std::shared_ptr<pipeline::CountedFinishDependency> finish_dependency, - vectorized::SharedHashTableContextPtr& shared_hash_table_ctx); + const vectorized::SharedHashTableContextPtr& shared_hash_table_ctx); protected: virtual void _init_expr(const vectorized::VExprContextSPtrs& build_expr_ctxs, diff --git a/be/src/runtime_filter/runtime_filter_producer_helper_cross.h b/be/src/runtime_filter/runtime_filter_producer_helper_cross.h index a7525fba4df..e8aa1a72be3 100644 --- a/be/src/runtime_filter/runtime_filter_producer_helper_cross.h +++ b/be/src/runtime_filter/runtime_filter_producer_helper_cross.h @@ -51,7 +51,7 @@ private: for (const auto& vexpr_ctx : _filter_expr_contexts) { int result_column_id = -1; RETURN_IF_ERROR(vexpr_ctx->execute(block, &result_column_id)); - DCHECK(result_column_id != -1); + DCHECK_NE(result_column_id, -1) << vexpr_ctx->root()->debug_string(); block->get_by_position(result_column_id).column = block->get_by_position(result_column_id) .column->convert_to_full_column_if_const(); diff --git a/be/src/runtime_filter/runtime_filter_wrapper.cpp b/be/src/runtime_filter/runtime_filter_wrapper.cpp index c3c5217d775..a341021bc1c 100644 --- a/be/src/runtime_filter/runtime_filter_wrapper.cpp +++ b/be/src/runtime_filter/runtime_filter_wrapper.cpp @@ -419,6 +419,7 @@ Status RuntimeFilterWrapper::_assign(const PInFilter& in_filter, bool contain_nu Status RuntimeFilterWrapper::_assign(const PBloomFilter& bloom_filter, butil::IOBufAsZeroCopyInputStream* data, bool contain_null) { + DCHECK(_bloom_filter_func); RETURN_IF_ERROR(_bloom_filter_func->assign(data, bloom_filter.filter_length(), contain_null)); return Status::OK(); } @@ -612,12 +613,16 @@ std::string RuntimeFilterWrapper::debug_string() const { } void RuntimeFilterWrapper::to_protobuf(PInFilter* filter) { - filter->set_column_type(to_proto(column_type())); + filter->set_column_type( + PColumnType:: + COLUMN_TYPE_BOOL); // set deprecated field coz it is required and we can't delete it _hybrid_set->to_pb(filter); } void RuntimeFilterWrapper::to_protobuf(PMinMaxFilter* filter) { - filter->set_column_type(to_proto(column_type())); + filter->set_column_type( + PColumnType:: + COLUMN_TYPE_BOOL); // set deprecated field coz it is required and we can't delete it _minmax_func->to_pb(filter); } diff --git a/be/src/runtime_filter/utils.cpp b/be/src/runtime_filter/utils.cpp index 340dd824bb7..b2e4e27fac4 100644 --- a/be/src/runtime_filter/utils.cpp +++ b/be/src/runtime_filter/utils.cpp @@ -81,112 +81,6 @@ RuntimeFilterType get_runtime_filter_type(const TRuntimeFilterDesc* desc) { } } -// PrimitiveType-> PColumnType -PColumnType to_proto(PrimitiveType type) { - switch (type) { - case TYPE_BOOLEAN: - return PColumnType::COLUMN_TYPE_BOOL; - case TYPE_TINYINT: - return PColumnType::COLUMN_TYPE_TINY_INT; - case TYPE_SMALLINT: - return PColumnType::COLUMN_TYPE_SMALL_INT; - case TYPE_INT: - return PColumnType::COLUMN_TYPE_INT; - case TYPE_BIGINT: - return PColumnType::COLUMN_TYPE_BIGINT; - case TYPE_LARGEINT: - return PColumnType::COLUMN_TYPE_LARGEINT; - case TYPE_FLOAT: - return PColumnType::COLUMN_TYPE_FLOAT; - case TYPE_DOUBLE: - return PColumnType::COLUMN_TYPE_DOUBLE; - case TYPE_DATE: - return PColumnType::COLUMN_TYPE_DATE; - case TYPE_DATEV2: - return PColumnType::COLUMN_TYPE_DATEV2; - case TYPE_DATETIMEV2: - return PColumnType::COLUMN_TYPE_DATETIMEV2; - case TYPE_DATETIME: - return PColumnType::COLUMN_TYPE_DATETIME; - case TYPE_DECIMALV2: - return PColumnType::COLUMN_TYPE_DECIMALV2; - case TYPE_DECIMAL32: - return PColumnType::COLUMN_TYPE_DECIMAL32; - case TYPE_DECIMAL64: - return PColumnType::COLUMN_TYPE_DECIMAL64; - case TYPE_DECIMAL128I: - return PColumnType::COLUMN_TYPE_DECIMAL128I; - case TYPE_DECIMAL256: - return PColumnType::COLUMN_TYPE_DECIMAL256; - case TYPE_CHAR: - return PColumnType::COLUMN_TYPE_CHAR; - case TYPE_VARCHAR: - return PColumnType::COLUMN_TYPE_VARCHAR; - case TYPE_STRING: - return PColumnType::COLUMN_TYPE_STRING; - case TYPE_IPV4: - return PColumnType::COLUMN_TYPE_IPV4; - case TYPE_IPV6: - return PColumnType::COLUMN_TYPE_IPV6; - default: - throw Exception(ErrorCode::INTERNAL_ERROR, - "runtime filter meet invalid PrimitiveType type {}", int(type)); - } -} - -// PColumnType->PrimitiveType -PrimitiveType to_primitive_type(PColumnType type) { - switch (type) { - case PColumnType::COLUMN_TYPE_BOOL: - return TYPE_BOOLEAN; - case PColumnType::COLUMN_TYPE_TINY_INT: - return TYPE_TINYINT; - case PColumnType::COLUMN_TYPE_SMALL_INT: - return TYPE_SMALLINT; - case PColumnType::COLUMN_TYPE_INT: - return TYPE_INT; - case PColumnType::COLUMN_TYPE_BIGINT: - return TYPE_BIGINT; - case PColumnType::COLUMN_TYPE_LARGEINT: - return TYPE_LARGEINT; - case PColumnType::COLUMN_TYPE_FLOAT: - return TYPE_FLOAT; - case PColumnType::COLUMN_TYPE_DOUBLE: - return TYPE_DOUBLE; - case PColumnType::COLUMN_TYPE_DATE: - return TYPE_DATE; - case PColumnType::COLUMN_TYPE_DATEV2: - return TYPE_DATEV2; - case PColumnType::COLUMN_TYPE_DATETIMEV2: - return TYPE_DATETIMEV2; - case PColumnType::COLUMN_TYPE_DATETIME: - return TYPE_DATETIME; - case PColumnType::COLUMN_TYPE_DECIMALV2: - return TYPE_DECIMALV2; - case PColumnType::COLUMN_TYPE_DECIMAL32: - return TYPE_DECIMAL32; - case PColumnType::COLUMN_TYPE_DECIMAL64: - return TYPE_DECIMAL64; - case PColumnType::COLUMN_TYPE_DECIMAL128I: - return TYPE_DECIMAL128I; - case PColumnType::COLUMN_TYPE_DECIMAL256: - return TYPE_DECIMAL256; - case PColumnType::COLUMN_TYPE_VARCHAR: - return TYPE_VARCHAR; - case PColumnType::COLUMN_TYPE_CHAR: - return TYPE_CHAR; - case PColumnType::COLUMN_TYPE_STRING: - return TYPE_STRING; - case PColumnType::COLUMN_TYPE_IPV4: - return TYPE_IPV4; - case PColumnType::COLUMN_TYPE_IPV6: - return TYPE_IPV6; - default: - throw Exception(ErrorCode::INTERNAL_ERROR, - "runtime filter meet invalid PColumnType type {}", int(type)); - } -} - // PFilterType -> RuntimeFilterType RuntimeFilterType get_type(int filter_type) { switch (filter_type) { diff --git a/be/src/runtime_filter/utils.h b/be/src/runtime_filter/utils.h index bd55caf6316..7b24f1f5e83 100644 --- a/be/src/runtime_filter/utils.h +++ b/be/src/runtime_filter/utils.h @@ -82,12 +82,6 @@ std::string filter_type_to_string(RuntimeFilterType type); RuntimeFilterType get_runtime_filter_type(const TRuntimeFilterDesc* desc); -// PrimitiveType-> PColumnType -PColumnType to_proto(PrimitiveType type); - -// PColumnType->PrimitiveType -PrimitiveType to_primitive_type(PColumnType type); - // PFilterType -> RuntimeFilterType RuntimeFilterType get_type(int filter_type); // RuntimeFilterType -> PFilterType diff --git a/be/test/pipeline/thrift_builder.h b/be/test/pipeline/thrift_builder.h index b1555a18c42..89eb8ed0fda 100644 --- a/be/test/pipeline/thrift_builder.h +++ b/be/test/pipeline/thrift_builder.h @@ -461,7 +461,7 @@ public: .build()) .build(), 0) - .set_slot_ref(TSlotRefBuilder(1, 1).build()) + .set_slot_ref(TSlotRefBuilder(0, 0).build()) .build()) .build(); } diff --git a/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp new file mode 100644 index 00000000000..b13036b28f0 --- /dev/null +++ b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime_filter/runtime_filter_consumer_helper.h" + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "common/object_pool.h" +#include "pipeline/exec/hashjoin_build_sink.h" +#include "pipeline/exec/mock_operator.h" +#include "pipeline/exec/operator.h" +#include "pipeline/pipeline_task.h" +#include "runtime/descriptors.h" +#include "runtime_filter/runtime_filter_consumer.h" +#include "runtime_filter/runtime_filter_test_utils.h" +#include "vec/columns/columns_number.h" +#include "vec/data_types/data_type_number.h" + +namespace doris { + +class RuntimeFilterConsumerHelperTest : public RuntimeFilterTest { + void SetUp() override { + RuntimeFilterTest::SetUp(); + _pipeline = std::make_shared<pipeline::Pipeline>(0, INSTANCE_NUM, INSTANCE_NUM); + _op.reset(new pipeline::MockOperatorX()); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->add_operator(_op, 2)); + + _sink.reset(new pipeline::HashJoinBuildSinkOperatorX( + &_pool, 0, _op->operator_id(), + TPlanNodeBuilder(0, TPlanNodeType::HASH_JOIN_NODE).build(), _tbl)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->set_sink(_sink)); + + _task.reset(new pipeline::PipelineTask(_pipeline, 0, _runtime_states[0].get(), nullptr, + &_profile, {}, 0)); + _runtime_states[0]->set_task(_task.get()); + + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(ExecEnv::GetInstance()->init_pipeline_task_scheduler()); + } + + pipeline::OperatorPtr _op; + pipeline::DataSinkOperatorPtr _sink; + pipeline::PipelinePtr _pipeline; + std::shared_ptr<pipeline::PipelineTask> _task; + ObjectPool _pool; +}; + +TEST_F(RuntimeFilterConsumerHelperTest, basic) { + vectorized::VExprContextSPtr ctx; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree( + TRuntimeFilterDescBuilder::get_default_expr(), ctx)); + ctx->_last_result_column_id = 0; + + vectorized::VExprContextSPtrs build_expr_ctxs = {ctx}; + std::vector<TRuntimeFilterDesc> runtime_filter_descs = { + TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(), + TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build()}; + + std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>> runtime_filter_dependencies; + SlotDescriptor slot_desc; + TupleDescriptor tuple_desc; + tuple_desc.add_slot(&slot_desc); + RowDescriptor row_desc; + _tbl._slot_desc_map[0] = &slot_desc; + const_cast<std::vector<TupleDescriptor*>&>(row_desc._tuple_desc_map).push_back(&tuple_desc); + auto helper = RuntimeFilterConsumerHelper(0, runtime_filter_descs, row_desc); + + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.init(_runtime_states[0].get(), &_profile, true, + runtime_filter_dependencies, 0, 0, "")); + + vectorized::VExprContextSPtrs conjuncts; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.acquire_runtime_filter(conjuncts)); + ASSERT_EQ(conjuncts.size(), 0); + + std::shared_ptr<RuntimeFilterProducer> producer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + RuntimeFilterProducer::create(RuntimeFilterParamsContext::create(_query_ctx.get()), + runtime_filter_descs.data(), &producer, &_profile)); + producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); + helper._consumers[0]->signal(producer.get()); + + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.acquire_runtime_filter(conjuncts)); + ASSERT_EQ(conjuncts.size(), 1); + + conjuncts.clear(); + int arrived_rf_num = -1; + helper._consumers[1]->signal(producer.get()); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + helper.try_append_late_arrival_runtime_filter(&arrived_rf_num, conjuncts)); + ASSERT_EQ(conjuncts.size(), 1); + ASSERT_EQ(arrived_rf_num, 2); +} + +} // namespace doris diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp b/be/test/runtime_filter/runtime_filter_consumer_test.cpp index 617ce3fdbb0..3e9a456c2a3 100644 --- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp +++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp @@ -17,6 +17,7 @@ #include "runtime_filter/runtime_filter_consumer.h" +#include <gen_cpp/PlanNodes_types.h> #include <glog/logging.h> #include <gtest/gtest.h> @@ -25,7 +26,35 @@ namespace doris { -class RuntimeFilterConsumerTest : public RuntimeFilterTest {}; +class RuntimeFilterConsumerTest : public RuntimeFilterTest { +public: + void test_signal_aquire(TRuntimeFilterDesc desc) { + std::shared_ptr<RuntimeFilterConsumer> consumer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()), + &desc, 0, &consumer, &_profile)); + + std::shared_ptr<RuntimeFilterProducer> producer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &producer, &_profile)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123)); + producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); + + consumer->signal(producer.get()); + + try { + consumer->signal(producer.get()); + ASSERT_TRUE(false); + } catch (const Exception& e) { + ASSERT_EQ(e.code(), ErrorCode::INTERNAL_ERROR); + } + + std::vector<vectorized::VRuntimeFilterPtr> push_exprs; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(consumer->acquire_expr(push_exprs)); + ASSERT_NE(push_exprs.size(), 0); + ASSERT_TRUE(consumer->is_applied()); + } +}; TEST_F(RuntimeFilterConsumerTest, basic) { std::shared_ptr<RuntimeFilterConsumer> consumer; @@ -38,7 +67,53 @@ TEST_F(RuntimeFilterConsumerTest, basic) { desc, true, 0, ®isted_consumer, &_profile)); } -TEST_F(RuntimeFilterConsumerTest, signal_aquire) { +TEST_F(RuntimeFilterConsumerTest, signal_aquire_in_or_bloom) { + test_signal_aquire(TRuntimeFilterDescBuilder() + .set_type(TRuntimeFilterType::IN_OR_BLOOM) + .add_planId_to_target_expr(0) + .build()); +} + +TEST_F(RuntimeFilterConsumerTest, signal_aquire_bloom) { + test_signal_aquire(TRuntimeFilterDescBuilder() + .set_type(TRuntimeFilterType::BLOOM) + .add_planId_to_target_expr(0) + .build()); +} + +TEST_F(RuntimeFilterConsumerTest, signal_aquire_in) { + test_signal_aquire(TRuntimeFilterDescBuilder() + .set_type(TRuntimeFilterType::IN) + .add_planId_to_target_expr(0) + .build()); +} + +TEST_F(RuntimeFilterConsumerTest, signal_aquire_min_max) { + test_signal_aquire(TRuntimeFilterDescBuilder() + .set_type(TRuntimeFilterType::MIN_MAX) + .add_planId_to_target_expr(0) + .build()); +} + +TEST_F(RuntimeFilterConsumerTest, signal_aquire_min) { + auto desc = TRuntimeFilterDescBuilder() + .set_type(TRuntimeFilterType::MIN_MAX) + .add_planId_to_target_expr(0) + .build(); + desc.__set_min_max_type(TMinMaxRuntimeFilterType::MIN); + test_signal_aquire(desc); +} + +TEST_F(RuntimeFilterConsumerTest, signal_aquire_max) { + auto desc = TRuntimeFilterDescBuilder() + .set_type(TRuntimeFilterType::MIN_MAX) + .add_planId_to_target_expr(0) + .build(); + desc.__set_min_max_type(TMinMaxRuntimeFilterType::MAX); + test_signal_aquire(desc); +} + +TEST_F(RuntimeFilterConsumerTest, timeout_aquire) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create( @@ -49,17 +124,14 @@ TEST_F(RuntimeFilterConsumerTest, signal_aquire) { RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &producer, &_profile)); producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); - consumer->signal(producer.get()); - - try { - consumer->signal(producer.get()); - ASSERT_TRUE(false); - } catch (const Exception& e) { - ASSERT_EQ(e.code(), ErrorCode::INTERNAL_ERROR); - } - std::vector<vectorized::VRuntimeFilterPtr> push_exprs; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(consumer->acquire_expr(push_exprs)); + ASSERT_EQ(push_exprs.size(), 0); + ASSERT_FALSE(consumer->is_applied()); + ASSERT_EQ(consumer->_rf_state, RuntimeFilterConsumer::State::TIMEOUT); + + consumer->signal(producer.get()); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(consumer->acquire_expr(push_exprs)); ASSERT_EQ(push_exprs.size(), 1); ASSERT_TRUE(consumer->is_applied()); } diff --git a/be/test/runtime_filter/runtime_filter_merger_test.cpp b/be/test/runtime_filter/runtime_filter_merger_test.cpp index 768284879bc..e05e73fad89 100644 --- a/be/test/runtime_filter/runtime_filter_merger_test.cpp +++ b/be/test/runtime_filter/runtime_filter_merger_test.cpp @@ -56,9 +56,10 @@ public: ASSERT_EQ(merger->_wrapper->_state, second_expected_state); } - void test_serialize(RuntimeFilterWrapper::State state) { + void test_serialize(RuntimeFilterWrapper::State state, + TRuntimeFilterType::type type = TRuntimeFilterType::IN_OR_BLOOM) { std::shared_ptr<RuntimeFilterMerger> merger; - auto desc = TRuntimeFilterDescBuilder().build(); + auto desc = TRuntimeFilterDescBuilder().set_type(type).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create( RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &merger, &_profile)); merger->set_expected_producer_num(1); @@ -67,6 +68,7 @@ public: std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( _runtime_states[0]->register_producer_runtime_filter(desc, &producer, &_profile)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123)); producer->set_wrapper_state_and_ready_to_publish(state); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); ASSERT_TRUE(merger->ready()); @@ -76,12 +78,15 @@ public: int len = 0; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->serialize(&request, &data, &len)); - std::shared_ptr<RuntimeFilterMerger> deserialized_merger; + std::shared_ptr<RuntimeFilterProducer> deserialized_producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterMerger::create(RuntimeFilterParamsContext::create(_query_ctx.get()), - &desc, &deserialized_merger, &_profile)); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(deserialized_merger->assign(request, nullptr)); - ASSERT_EQ(merger->_wrapper->_state, state); + RuntimeFilterProducer::create(RuntimeFilterParamsContext::create(_query_ctx.get()), + &desc, &deserialized_producer, &_profile)); + butil::IOBuf buf; + buf.append(data, len); + butil::IOBufAsZeroCopyInputStream stream(buf); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(deserialized_producer->assign(request, &stream)); + ASSERT_EQ(deserialized_producer->_wrapper->_state, state); } }; @@ -176,4 +181,16 @@ TEST_F(RuntimeFilterMergerTest, serialize_ignored) { test_serialize(RuntimeFilterWrapper::State::IGNORED); } +TEST_F(RuntimeFilterMergerTest, serialize_bloom) { + test_serialize(RuntimeFilterWrapper::State::READY, TRuntimeFilterType::type::BLOOM); +} + +TEST_F(RuntimeFilterMergerTest, serialize_min_max) { + test_serialize(RuntimeFilterWrapper::State::READY, TRuntimeFilterType::type::MIN_MAX); +} + +TEST_F(RuntimeFilterMergerTest, serialize_in) { + test_serialize(RuntimeFilterWrapper::State::READY, TRuntimeFilterType::type::IN); +} + } // namespace doris diff --git a/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp b/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp new file mode 100644 index 00000000000..534245f4437 --- /dev/null +++ b/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime_filter/runtime_filter_producer_helper_cross.h" + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "common/object_pool.h" +#include "pipeline/exec/hashjoin_build_sink.h" +#include "pipeline/exec/mock_operator.h" +#include "pipeline/exec/operator.h" +#include "pipeline/pipeline_task.h" +#include "runtime_filter/runtime_filter_test_utils.h" +#include "vec/columns/columns_number.h" +#include "vec/data_types/data_type_number.h" +#include "vec/exprs/vslot_ref.h" + +namespace doris { + +class RuntimeFilterProducerHelperCrossTest : public RuntimeFilterTest { + void SetUp() override { + RuntimeFilterTest::SetUp(); + _pipeline = std::make_shared<pipeline::Pipeline>(0, INSTANCE_NUM, INSTANCE_NUM); + _op.reset(new pipeline::MockOperatorX()); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->add_operator(_op, 2)); + + _sink.reset(new pipeline::HashJoinBuildSinkOperatorX( + &_pool, 0, _op->operator_id(), + TPlanNodeBuilder(0, TPlanNodeType::HASH_JOIN_NODE).build(), _tbl)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->set_sink(_sink)); + + _task.reset(new pipeline::PipelineTask(_pipeline, 0, _runtime_states[0].get(), nullptr, + &_profile, {}, 0)); + _runtime_states[0]->set_task(_task.get()); + } + + pipeline::OperatorPtr _op; + pipeline::DataSinkOperatorPtr _sink; + pipeline::PipelinePtr _pipeline; + std::shared_ptr<pipeline::PipelineTask> _task; + ObjectPool _pool; +}; + +TEST_F(RuntimeFilterProducerHelperCrossTest, basic) { + auto helper = RuntimeFilterProducerHelperCross(&_profile); + + vectorized::VExprContextSPtr ctx; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree( + TRuntimeFilterDescBuilder::get_default_expr(), ctx)); + ctx->_last_result_column_id = 0; + + assert_cast<vectorized::VSlotRef*>(ctx->root().get())->_column_id = 0; + + vectorized::VExprContextSPtrs build_expr_ctxs = {ctx}; + std::vector<TRuntimeFilterDesc> runtime_filter_descs = {TRuntimeFilterDescBuilder().build()}; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + helper.init(_runtime_states[0].get(), build_expr_ctxs, runtime_filter_descs)); + + vectorized::Block block; + auto column = vectorized::ColumnInt32::create(); + column->insert(1); + column->insert(2); + block.insert({std::move(column), std::make_shared<vectorized::DataTypeInt32>(), "col1"}); + + vectorized::Blocks blocks = {block}; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.process(_runtime_states[0].get(), blocks)); +} + +} // namespace doris diff --git a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp new file mode 100644 index 00000000000..433a612d0c7 --- /dev/null +++ b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime_filter/runtime_filter_producer_helper.h" + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "common/object_pool.h" +#include "pipeline/exec/hashjoin_build_sink.h" +#include "pipeline/exec/mock_operator.h" +#include "pipeline/exec/operator.h" +#include "pipeline/pipeline_task.h" +#include "runtime_filter/runtime_filter_test_utils.h" +#include "vec/columns/columns_number.h" +#include "vec/data_types/data_type_number.h" + +namespace doris { + +class RuntimeFilterProducerHelperTest : public RuntimeFilterTest { + void SetUp() override { + RuntimeFilterTest::SetUp(); + _pipeline = std::make_shared<pipeline::Pipeline>(0, INSTANCE_NUM, INSTANCE_NUM); + _op.reset(new pipeline::MockOperatorX()); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->add_operator(_op, 2)); + + _sink.reset(new pipeline::HashJoinBuildSinkOperatorX( + &_pool, 0, _op->operator_id(), + TPlanNodeBuilder(0, TPlanNodeType::HASH_JOIN_NODE).build(), _tbl)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->set_sink(_sink)); + + _task.reset(new pipeline::PipelineTask(_pipeline, 0, _runtime_states[0].get(), nullptr, + &_profile, {}, 0)); + _runtime_states[0]->set_task(_task.get()); + } + + pipeline::OperatorPtr _op; + pipeline::DataSinkOperatorPtr _sink; + pipeline::PipelinePtr _pipeline; + std::shared_ptr<pipeline::PipelineTask> _task; + ObjectPool _pool; +}; + +TEST_F(RuntimeFilterProducerHelperTest, basic) { + auto helper = RuntimeFilterProducerHelper(&_profile, true, false); + + vectorized::VExprContextSPtr ctx; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree( + TRuntimeFilterDescBuilder::get_default_expr(), ctx)); + ctx->_last_result_column_id = 0; + + vectorized::VExprContextSPtrs build_expr_ctxs = {ctx}; + std::vector<TRuntimeFilterDesc> runtime_filter_descs = {TRuntimeFilterDescBuilder().build()}; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + helper.init(_runtime_states[0].get(), build_expr_ctxs, runtime_filter_descs)); + + vectorized::Block block; + auto column = vectorized::ColumnInt32::create(); + column->insert(1); + column->insert(2); + block.insert({std::move(column), std::make_shared<vectorized::DataTypeInt32>(), "col1"}); + + vectorized::SharedHashTableContextPtr shared_hash_table_ctx; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + helper.process(_runtime_states[0].get(), &block, shared_hash_table_ctx)); +} + +TEST_F(RuntimeFilterProducerHelperTest, wake_up_eraly) { + auto helper = RuntimeFilterProducerHelper(&_profile, true, false); + + vectorized::VExprContextSPtr ctx; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree( + TRuntimeFilterDescBuilder::get_default_expr(), ctx)); + ctx->_last_result_column_id = 0; + + vectorized::VExprContextSPtrs build_expr_ctxs = {ctx}; + std::vector<TRuntimeFilterDesc> runtime_filter_descs = { + TRuntimeFilterDescBuilder().set_build_bf_by_runtime_size(true).build()}; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + helper.init(_runtime_states[0].get(), build_expr_ctxs, runtime_filter_descs)); + + vectorized::Block block; + auto column = vectorized::ColumnInt32::create(); + column->insert(1); + column->insert(2); + block.insert({std::move(column), std::make_shared<vectorized::DataTypeInt32>(), "col1"}); + + vectorized::SharedHashTableContextPtr shared_hash_table_ctx; + _task->set_wake_up_early(); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + helper.process(_runtime_states[0].get(), &block, shared_hash_table_ctx)); +} + +TEST_F(RuntimeFilterProducerHelperTest, skip_process) { + auto helper = RuntimeFilterProducerHelper(&_profile, true, false); + + vectorized::VExprContextSPtr ctx; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree( + TRuntimeFilterDescBuilder::get_default_expr(), ctx)); + ctx->_last_result_column_id = 0; + + vectorized::VExprContextSPtrs build_expr_ctxs = {ctx}; + std::vector<TRuntimeFilterDesc> runtime_filter_descs = { + TRuntimeFilterDescBuilder().set_build_bf_by_runtime_size(true).build()}; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + helper.init(_runtime_states[0].get(), build_expr_ctxs, runtime_filter_descs)); + + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.skip_process(_runtime_states[0].get())); + + vectorized::Block block; + auto column = vectorized::ColumnInt32::create(); + column->insert(1); + column->insert(2); + block.insert({std::move(column), std::make_shared<vectorized::DataTypeInt32>(), "col1"}); + + vectorized::SharedHashTableContextPtr shared_hash_table_ctx; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + helper.process(_runtime_states[0].get(), &block, shared_hash_table_ctx)); +} + +} // namespace doris diff --git a/be/test/runtime_filter/runtime_filter_producer_test.cpp b/be/test/runtime_filter/runtime_filter_producer_test.cpp index ab04391fe7d..9822e7a0f41 100644 --- a/be/test/runtime_filter/runtime_filter_producer_test.cpp +++ b/be/test/runtime_filter/runtime_filter_producer_test.cpp @@ -20,6 +20,7 @@ #include <glog/logging.h> #include <gtest/gtest.h> +#include "runtime_filter/runtime_filter_consumer.h" #include "runtime_filter/runtime_filter_test_utils.h" namespace doris { @@ -160,6 +161,11 @@ TEST_F(RuntimeFilterProducerTest, set_ignore_or_disable) { producer2->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::DISABLED); ASSERT_EQ(producer2->_rf_state, RuntimeFilterProducer::State::READY_TO_PUBLISH); ASSERT_EQ(producer2->_wrapper->_state, RuntimeFilterWrapper::State::DISABLED); + + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->publish(_runtime_states[0].get(), true)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer2->publish(_runtime_states[1].get(), true)); + ASSERT_EQ(consumer->_rf_state, RuntimeFilterConsumer::State::READY); + ASSERT_EQ(consumer->_wrapper->_state, RuntimeFilterWrapper::State::DISABLED); } -} // namespace doris +} // namespace doris \ No newline at end of file diff --git a/be/test/runtime_filter/runtime_filter_test_utils.h b/be/test/runtime_filter/runtime_filter_test_utils.h index 3141be9d848..ccc6e0b834b 100644 --- a/be/test/runtime_filter/runtime_filter_test_utils.h +++ b/be/test/runtime_filter/runtime_filter_test_utils.h @@ -28,7 +28,9 @@ public: RuntimeFilterTest() = default; ~RuntimeFilterTest() override = default; void SetUp() override { - _query_options = TQueryOptionsBuilder().set_runtime_filter_max_in_num(15).build(); + _tbl._row_tuples.push_back({}); + + _query_options = TQueryOptionsBuilder().build(); auto fe_address = TNetworkAddress(); fe_address.hostname = LOCALHOST; fe_address.port = DUMMY_PORT; @@ -50,6 +52,9 @@ public: TUniqueId(), RuntimeFilterParamsContext::create(_query_ctx.get()), _query_ctx->query_mem_tracker(), false); _runtime_states[i]->set_runtime_filter_mgr(_local_mgrs[i].get()); + _runtime_states[i]->local_runtime_filter_mgr()->_state->set_state( + _runtime_states[i].get()); + _runtime_states[i]->set_desc_tbl(&_tbl); } } void TearDown() override {} @@ -63,6 +68,7 @@ protected: const int INSTANCE_NUM = 2; std::vector<std::unique_ptr<RuntimeState>> _runtime_states; std::vector<std::unique_ptr<RuntimeFilterMgr>> _local_mgrs; + DescriptorTbl _tbl; }; } // namespace doris diff --git a/be/test/runtime_filter/runtime_filter_wrapper_test.cpp b/be/test/runtime_filter/runtime_filter_wrapper_test.cpp index 8cd3c1e32b6..d15d78f70ad 100644 --- a/be/test/runtime_filter/runtime_filter_wrapper_test.cpp +++ b/be/test/runtime_filter/runtime_filter_wrapper_test.cpp @@ -228,7 +228,7 @@ TEST_F(RuntimeFilterWrapperTest, TestInAssign) { auto wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); \ PMergeFilterRequest valid_request; \ auto* in_filter = valid_request.mutable_in_filter(); \ - in_filter->set_column_type(to_proto(column_return_type)); \ + in_filter->set_column_type(PColumnType::COLUMN_TYPE_BOOL); \ get_convertor<PrimitiveTypeTraits<column_return_type>::CppType>()(in_filter->add_values(), \ value1); \ get_convertor<PrimitiveTypeTraits<column_return_type>::CppType>()(in_filter->add_values(), \ diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 46b94860701..2242caae2a4 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -591,7 +591,7 @@ message PMergeFilterRequest { optional PInFilter in_filter = 7; optional bool is_pipeline = 8 [deprecated = true]; optional bool opt_remote_rf = 9; // Deprecated - optional PColumnType column_type = 10; + optional PColumnType column_type = 10; // Deprecated optional bool contain_null = 11; optional bool ignored = 12; optional uint64 local_merge_time = 13; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org