This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch refactor_rf
in repository https://gitbox.apache.org/repos/asf/doris.git

commit dbc4eacaa20afbb920f39d7ab79ee5336b6010b0
Author: Pxl <x...@selectdb.com>
AuthorDate: Mon Mar 3 18:37:23 2025 +0800

    add more test and some fix
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |   3 +-
 be/src/runtime_filter/runtime_filter.cpp           |   3 -
 be/src/runtime_filter/runtime_filter.h             |   1 +
 .../runtime_filter_consumer_helper.cpp             |   4 +-
 .../runtime_filter_consumer_helper.h               |   4 +-
 be/src/runtime_filter/runtime_filter_definitions.h |   3 +-
 be/src/runtime_filter/runtime_filter_mgr.h         |   2 +-
 be/src/runtime_filter/runtime_filter_producer.h    |   4 +-
 .../runtime_filter_producer_helper.cpp             |   7 +-
 .../runtime_filter_producer_helper.h               |   3 +-
 .../runtime_filter_producer_helper_cross.h         |   2 +-
 be/src/runtime_filter/runtime_filter_wrapper.cpp   |   9 +-
 be/src/runtime_filter/utils.cpp                    | 106 ----------------
 be/src/runtime_filter/utils.h                      |   6 -
 be/test/pipeline/thrift_builder.h                  |   2 +-
 .../runtime_filter_consumer_helper_test.cpp        | 108 +++++++++++++++++
 .../runtime_filter_consumer_test.cpp               |  94 ++++++++++++--
 .../runtime_filter/runtime_filter_merger_test.cpp  |  31 +++--
 .../runtime_filter_producer_helper_cross_test.cpp  |  84 +++++++++++++
 .../runtime_filter_producer_helper_test.cpp        | 135 +++++++++++++++++++++
 .../runtime_filter_producer_test.cpp               |   8 +-
 be/test/runtime_filter/runtime_filter_test_utils.h |   8 +-
 .../runtime_filter/runtime_filter_wrapper_test.cpp |   2 +-
 gensrc/proto/internal_service.proto                |   2 +-
 24 files changed, 475 insertions(+), 156 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index a8f0ce78c41..a064b5d9398 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -226,8 +226,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
 
     try {
         RETURN_IF_ERROR(_runtime_filter_producer_helper->process(
-                state, _shared_state->build_block.get(), _finish_dependency,
-                p._shared_hash_table_context));
+                state, _shared_state->build_block.get(), 
p._shared_hash_table_context));
     } catch (Exception& e) {
         bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
                                                    
p._shared_hashtable_controller &&
diff --git a/be/src/runtime_filter/runtime_filter.cpp 
b/be/src/runtime_filter/runtime_filter.cpp
index 9ba7324e3fb..cb64fb43af0 100644
--- a/be/src/runtime_filter/runtime_filter.cpp
+++ b/be/src/runtime_filter/runtime_filter.cpp
@@ -52,9 +52,6 @@ Status RuntimeFilter::_push_to_remote(RuntimeState* state, 
const TNetworkAddress
     pfragment_instance_id->set_hi(BackendOptions::get_local_backend().id);
     pfragment_instance_id->set_lo((int64_t)this);
 
-    auto column_type = _wrapper->column_type();
-    
RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type)));
-
     merge_filter_callback->cntl_->set_timeout_ms(
             
get_execution_rpc_timeout_ms(_state->get_query_ctx()->execution_timeout()));
     if (config::execution_ignore_eovercrowded) {
diff --git a/be/src/runtime_filter/runtime_filter.h 
b/be/src/runtime_filter/runtime_filter.h
index 660f0ad88ce..91883bbcec0 100644
--- a/be/src/runtime_filter/runtime_filter.h
+++ b/be/src/runtime_filter/runtime_filter.h
@@ -89,6 +89,7 @@ protected:
               _has_local_target(desc->has_local_targets),
               _runtime_filter_type(get_runtime_filter_type(desc)) {
         DCHECK_NE(desc->has_remote_targets, _has_local_target);
+        DCHECK_NE(state, nullptr);
     }
 
     virtual Status _init_with_desc(const TRuntimeFilterDesc* desc, const 
TQueryOptions* options);
diff --git a/be/src/runtime_filter/runtime_filter_consumer_helper.cpp 
b/be/src/runtime_filter/runtime_filter_consumer_helper.cpp
index e3259c03164..c9b0cff475f 100644
--- a/be/src/runtime_filter/runtime_filter_consumer_helper.cpp
+++ b/be/src/runtime_filter/runtime_filter_consumer_helper.cpp
@@ -20,7 +20,7 @@
 #include "pipeline/pipeline_task.h"
 #include "runtime_filter/runtime_filter_consumer.h"
 
-namespace doris::pipeline {
+namespace doris {
 
 RuntimeFilterConsumerHelper::RuntimeFilterConsumerHelper(
         const int32_t _node_id, const std::vector<TRuntimeFilterDesc>& 
runtime_filters,
@@ -148,4 +148,4 @@ Status 
RuntimeFilterConsumerHelper::try_append_late_arrival_runtime_filter(
     return Status::OK();
 }
 
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime_filter/runtime_filter_consumer_helper.h 
b/be/src/runtime_filter/runtime_filter_consumer_helper.h
index b1cf99356c2..15c7ef557e1 100644
--- a/be/src/runtime_filter/runtime_filter_consumer_helper.h
+++ b/be/src/runtime_filter/runtime_filter_consumer_helper.h
@@ -20,7 +20,7 @@
 #include "pipeline/dependency.h"
 #include "vec/exprs/vruntimefilter_wrapper.h"
 
-namespace doris::pipeline {
+namespace doris {
 
 // this class used in ScanNode or MultiCastDataStreamSource
 /**
@@ -74,4 +74,4 @@ private:
     std::unique_ptr<RuntimeProfile> _profile;
 };
 
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime_filter/runtime_filter_definitions.h 
b/be/src/runtime_filter/runtime_filter_definitions.h
index 693e62beea9..28329f65cb7 100644
--- a/be/src/runtime_filter/runtime_filter_definitions.h
+++ b/be/src/runtime_filter/runtime_filter_definitions.h
@@ -79,7 +79,8 @@ class RuntimeFilterMgr;
 // There are two types of runtime filters:
 // 1. Global runtime filter. Managed by QueryContext's RuntimeFilterMgr which 
is produced by multiple producers and shared by multiple consumers.
 // 2. Local runtime filter. Managed by RuntimeState's RuntimeFilterMgr which 
is 1-producer-1-consumer mode.
-struct RuntimeFilterParamsContext {
+class RuntimeFilterParamsContext {
+public:
     static RuntimeFilterParamsContext* create(RuntimeState* state);
     static RuntimeFilterParamsContext* create(QueryContext* query_ctx);
 
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h 
b/be/src/runtime_filter/runtime_filter_mgr.h
index a63aca9b68d..896a987fad5 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -52,7 +52,7 @@ class MemTrackerLimiter;
 class RuntimeState;
 class RuntimeFilterWrapper;
 class QueryContext;
-struct RuntimeFilterParamsContext;
+class RuntimeFilterParamsContext;
 class ExecEnv;
 class RuntimeProfile;
 
diff --git a/be/src/runtime_filter/runtime_filter_producer.h 
b/be/src/runtime_filter/runtime_filter_producer.h
index 022fe6450d8..17107a61d67 100644
--- a/be/src/runtime_filter/runtime_filter_producer.h
+++ b/be/src/runtime_filter/runtime_filter_producer.h
@@ -105,11 +105,11 @@ public:
         }
     }
 
-    void copy_to_shared_context(vectorized::SharedHashTableContextPtr& 
context) {
+    void copy_to_shared_context(const vectorized::SharedHashTableContextPtr& 
context) {
         DCHECK(!context->runtime_filters.contains(_wrapper->filter_id()));
         context->runtime_filters[_wrapper->filter_id()] = _wrapper;
     }
-    void copy_from_shared_context(vectorized::SharedHashTableContextPtr& 
context) {
+    void copy_from_shared_context(const vectorized::SharedHashTableContextPtr& 
context) {
         DCHECK(context->runtime_filters.contains(_wrapper->filter_id()));
         _wrapper = context->runtime_filters[_wrapper->filter_id()];
     }
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.cpp 
b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
index 53038a7fa61..22c8768b285 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper.cpp
+++ b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
@@ -73,6 +73,7 @@ Status RuntimeFilterProducerHelper::_insert(const 
vectorized::Block* block, size
             continue;
         }
         int result_column_id = 
_filter_expr_contexts[i]->get_last_result_column_id();
+        DCHECK_NE(result_column_id, -1);
         const auto& column = block->get_by_position(result_column_id).column;
         RETURN_IF_ERROR(filter->insert(column, start));
     }
@@ -89,8 +90,7 @@ Status RuntimeFilterProducerHelper::_publish(RuntimeState* 
state) {
 
 Status RuntimeFilterProducerHelper::process(
         RuntimeState* state, const vectorized::Block* block,
-        std::shared_ptr<pipeline::CountedFinishDependency> finish_dependency,
-        vectorized::SharedHashTableContextPtr& shared_hash_table_ctx) {
+        const vectorized::SharedHashTableContextPtr& shared_hash_table_ctx) {
     if (_skip_runtime_filters_process) {
         return Status::OK();
     }
@@ -104,7 +104,8 @@ Status RuntimeFilterProducerHelper::process(
         uint64_t hash_table_size = block ? block->rows() : 0;
         RETURN_IF_ERROR(_init_filters(state, hash_table_size));
         if (hash_table_size > 1) {
-            RETURN_IF_ERROR(_insert(block, 1));
+            constexpr int HASH_JOIN_INSERT_OFFSET = 1; // the first row is 
mocked on hash join sink
+            RETURN_IF_ERROR(_insert(block, HASH_JOIN_INSERT_OFFSET));
         }
     }
 
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.h 
b/be/src/runtime_filter/runtime_filter_producer_helper.h
index f8b7d985f50..dfd6b6bccf9 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper.h
+++ b/be/src/runtime_filter/runtime_filter_producer_helper.h
@@ -58,8 +58,7 @@ public:
 
     // build rf's predicate and publish rf
     Status process(RuntimeState* state, const vectorized::Block* block,
-                   std::shared_ptr<pipeline::CountedFinishDependency> 
finish_dependency,
-                   vectorized::SharedHashTableContextPtr& 
shared_hash_table_ctx);
+                   const vectorized::SharedHashTableContextPtr& 
shared_hash_table_ctx);
 
 protected:
     virtual void _init_expr(const vectorized::VExprContextSPtrs& 
build_expr_ctxs,
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper_cross.h 
b/be/src/runtime_filter/runtime_filter_producer_helper_cross.h
index a7525fba4df..e8aa1a72be3 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper_cross.h
+++ b/be/src/runtime_filter/runtime_filter_producer_helper_cross.h
@@ -51,7 +51,7 @@ private:
         for (const auto& vexpr_ctx : _filter_expr_contexts) {
             int result_column_id = -1;
             RETURN_IF_ERROR(vexpr_ctx->execute(block, &result_column_id));
-            DCHECK(result_column_id != -1);
+            DCHECK_NE(result_column_id, -1) << 
vexpr_ctx->root()->debug_string();
             block->get_by_position(result_column_id).column =
                     block->get_by_position(result_column_id)
                             .column->convert_to_full_column_if_const();
diff --git a/be/src/runtime_filter/runtime_filter_wrapper.cpp 
b/be/src/runtime_filter/runtime_filter_wrapper.cpp
index c3c5217d775..a341021bc1c 100644
--- a/be/src/runtime_filter/runtime_filter_wrapper.cpp
+++ b/be/src/runtime_filter/runtime_filter_wrapper.cpp
@@ -419,6 +419,7 @@ Status RuntimeFilterWrapper::_assign(const PInFilter& 
in_filter, bool contain_nu
 
 Status RuntimeFilterWrapper::_assign(const PBloomFilter& bloom_filter,
                                      butil::IOBufAsZeroCopyInputStream* data, 
bool contain_null) {
+    DCHECK(_bloom_filter_func);
     RETURN_IF_ERROR(_bloom_filter_func->assign(data, 
bloom_filter.filter_length(), contain_null));
     return Status::OK();
 }
@@ -612,12 +613,16 @@ std::string RuntimeFilterWrapper::debug_string() const {
 }
 
 void RuntimeFilterWrapper::to_protobuf(PInFilter* filter) {
-    filter->set_column_type(to_proto(column_type()));
+    filter->set_column_type(
+            PColumnType::
+                    COLUMN_TYPE_BOOL); // set deprecated field coz it is 
required and we can't delete it
     _hybrid_set->to_pb(filter);
 }
 
 void RuntimeFilterWrapper::to_protobuf(PMinMaxFilter* filter) {
-    filter->set_column_type(to_proto(column_type()));
+    filter->set_column_type(
+            PColumnType::
+                    COLUMN_TYPE_BOOL); // set deprecated field coz it is 
required and we can't delete it
     _minmax_func->to_pb(filter);
 }
 
diff --git a/be/src/runtime_filter/utils.cpp b/be/src/runtime_filter/utils.cpp
index 340dd824bb7..b2e4e27fac4 100644
--- a/be/src/runtime_filter/utils.cpp
+++ b/be/src/runtime_filter/utils.cpp
@@ -81,112 +81,6 @@ RuntimeFilterType get_runtime_filter_type(const 
TRuntimeFilterDesc* desc) {
     }
 }
 
-// PrimitiveType-> PColumnType
-PColumnType to_proto(PrimitiveType type) {
-    switch (type) {
-    case TYPE_BOOLEAN:
-        return PColumnType::COLUMN_TYPE_BOOL;
-    case TYPE_TINYINT:
-        return PColumnType::COLUMN_TYPE_TINY_INT;
-    case TYPE_SMALLINT:
-        return PColumnType::COLUMN_TYPE_SMALL_INT;
-    case TYPE_INT:
-        return PColumnType::COLUMN_TYPE_INT;
-    case TYPE_BIGINT:
-        return PColumnType::COLUMN_TYPE_BIGINT;
-    case TYPE_LARGEINT:
-        return PColumnType::COLUMN_TYPE_LARGEINT;
-    case TYPE_FLOAT:
-        return PColumnType::COLUMN_TYPE_FLOAT;
-    case TYPE_DOUBLE:
-        return PColumnType::COLUMN_TYPE_DOUBLE;
-    case TYPE_DATE:
-        return PColumnType::COLUMN_TYPE_DATE;
-    case TYPE_DATEV2:
-        return PColumnType::COLUMN_TYPE_DATEV2;
-    case TYPE_DATETIMEV2:
-        return PColumnType::COLUMN_TYPE_DATETIMEV2;
-    case TYPE_DATETIME:
-        return PColumnType::COLUMN_TYPE_DATETIME;
-    case TYPE_DECIMALV2:
-        return PColumnType::COLUMN_TYPE_DECIMALV2;
-    case TYPE_DECIMAL32:
-        return PColumnType::COLUMN_TYPE_DECIMAL32;
-    case TYPE_DECIMAL64:
-        return PColumnType::COLUMN_TYPE_DECIMAL64;
-    case TYPE_DECIMAL128I:
-        return PColumnType::COLUMN_TYPE_DECIMAL128I;
-    case TYPE_DECIMAL256:
-        return PColumnType::COLUMN_TYPE_DECIMAL256;
-    case TYPE_CHAR:
-        return PColumnType::COLUMN_TYPE_CHAR;
-    case TYPE_VARCHAR:
-        return PColumnType::COLUMN_TYPE_VARCHAR;
-    case TYPE_STRING:
-        return PColumnType::COLUMN_TYPE_STRING;
-    case TYPE_IPV4:
-        return PColumnType::COLUMN_TYPE_IPV4;
-    case TYPE_IPV6:
-        return PColumnType::COLUMN_TYPE_IPV6;
-    default:
-        throw Exception(ErrorCode::INTERNAL_ERROR,
-                        "runtime filter meet invalid PrimitiveType type {}", 
int(type));
-    }
-}
-
-// PColumnType->PrimitiveType
-PrimitiveType to_primitive_type(PColumnType type) {
-    switch (type) {
-    case PColumnType::COLUMN_TYPE_BOOL:
-        return TYPE_BOOLEAN;
-    case PColumnType::COLUMN_TYPE_TINY_INT:
-        return TYPE_TINYINT;
-    case PColumnType::COLUMN_TYPE_SMALL_INT:
-        return TYPE_SMALLINT;
-    case PColumnType::COLUMN_TYPE_INT:
-        return TYPE_INT;
-    case PColumnType::COLUMN_TYPE_BIGINT:
-        return TYPE_BIGINT;
-    case PColumnType::COLUMN_TYPE_LARGEINT:
-        return TYPE_LARGEINT;
-    case PColumnType::COLUMN_TYPE_FLOAT:
-        return TYPE_FLOAT;
-    case PColumnType::COLUMN_TYPE_DOUBLE:
-        return TYPE_DOUBLE;
-    case PColumnType::COLUMN_TYPE_DATE:
-        return TYPE_DATE;
-    case PColumnType::COLUMN_TYPE_DATEV2:
-        return TYPE_DATEV2;
-    case PColumnType::COLUMN_TYPE_DATETIMEV2:
-        return TYPE_DATETIMEV2;
-    case PColumnType::COLUMN_TYPE_DATETIME:
-        return TYPE_DATETIME;
-    case PColumnType::COLUMN_TYPE_DECIMALV2:
-        return TYPE_DECIMALV2;
-    case PColumnType::COLUMN_TYPE_DECIMAL32:
-        return TYPE_DECIMAL32;
-    case PColumnType::COLUMN_TYPE_DECIMAL64:
-        return TYPE_DECIMAL64;
-    case PColumnType::COLUMN_TYPE_DECIMAL128I:
-        return TYPE_DECIMAL128I;
-    case PColumnType::COLUMN_TYPE_DECIMAL256:
-        return TYPE_DECIMAL256;
-    case PColumnType::COLUMN_TYPE_VARCHAR:
-        return TYPE_VARCHAR;
-    case PColumnType::COLUMN_TYPE_CHAR:
-        return TYPE_CHAR;
-    case PColumnType::COLUMN_TYPE_STRING:
-        return TYPE_STRING;
-    case PColumnType::COLUMN_TYPE_IPV4:
-        return TYPE_IPV4;
-    case PColumnType::COLUMN_TYPE_IPV6:
-        return TYPE_IPV6;
-    default:
-        throw Exception(ErrorCode::INTERNAL_ERROR,
-                        "runtime filter meet invalid PColumnType type {}", 
int(type));
-    }
-}
-
 // PFilterType -> RuntimeFilterType
 RuntimeFilterType get_type(int filter_type) {
     switch (filter_type) {
diff --git a/be/src/runtime_filter/utils.h b/be/src/runtime_filter/utils.h
index bd55caf6316..7b24f1f5e83 100644
--- a/be/src/runtime_filter/utils.h
+++ b/be/src/runtime_filter/utils.h
@@ -82,12 +82,6 @@ std::string filter_type_to_string(RuntimeFilterType type);
 
 RuntimeFilterType get_runtime_filter_type(const TRuntimeFilterDesc* desc);
 
-// PrimitiveType-> PColumnType
-PColumnType to_proto(PrimitiveType type);
-
-// PColumnType->PrimitiveType
-PrimitiveType to_primitive_type(PColumnType type);
-
 // PFilterType -> RuntimeFilterType
 RuntimeFilterType get_type(int filter_type);
 // RuntimeFilterType -> PFilterType
diff --git a/be/test/pipeline/thrift_builder.h 
b/be/test/pipeline/thrift_builder.h
index b1555a18c42..89eb8ed0fda 100644
--- a/be/test/pipeline/thrift_builder.h
+++ b/be/test/pipeline/thrift_builder.h
@@ -461,7 +461,7 @@ public:
                                                            .build())
                                         .build(),
                                 0)
-                                .set_slot_ref(TSlotRefBuilder(1, 1).build())
+                                .set_slot_ref(TSlotRefBuilder(0, 0).build())
                                 .build())
                 .build();
     }
diff --git a/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp 
b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp
new file mode 100644
index 00000000000..b13036b28f0
--- /dev/null
+++ b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime_filter/runtime_filter_consumer_helper.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "common/object_pool.h"
+#include "pipeline/exec/hashjoin_build_sink.h"
+#include "pipeline/exec/mock_operator.h"
+#include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime/descriptors.h"
+#include "runtime_filter/runtime_filter_consumer.h"
+#include "runtime_filter/runtime_filter_test_utils.h"
+#include "vec/columns/columns_number.h"
+#include "vec/data_types/data_type_number.h"
+
+namespace doris {
+
+class RuntimeFilterConsumerHelperTest : public RuntimeFilterTest {
+    void SetUp() override {
+        RuntimeFilterTest::SetUp();
+        _pipeline = std::make_shared<pipeline::Pipeline>(0, INSTANCE_NUM, 
INSTANCE_NUM);
+        _op.reset(new pipeline::MockOperatorX());
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->add_operator(_op, 2));
+
+        _sink.reset(new pipeline::HashJoinBuildSinkOperatorX(
+                &_pool, 0, _op->operator_id(),
+                TPlanNodeBuilder(0, TPlanNodeType::HASH_JOIN_NODE).build(), 
_tbl));
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->set_sink(_sink));
+
+        _task.reset(new pipeline::PipelineTask(_pipeline, 0, 
_runtime_states[0].get(), nullptr,
+                                               &_profile, {}, 0));
+        _runtime_states[0]->set_task(_task.get());
+
+        
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(ExecEnv::GetInstance()->init_pipeline_task_scheduler());
+    }
+
+    pipeline::OperatorPtr _op;
+    pipeline::DataSinkOperatorPtr _sink;
+    pipeline::PipelinePtr _pipeline;
+    std::shared_ptr<pipeline::PipelineTask> _task;
+    ObjectPool _pool;
+};
+
+TEST_F(RuntimeFilterConsumerHelperTest, basic) {
+    vectorized::VExprContextSPtr ctx;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
+            TRuntimeFilterDescBuilder::get_default_expr(), ctx));
+    ctx->_last_result_column_id = 0;
+
+    vectorized::VExprContextSPtrs build_expr_ctxs = {ctx};
+    std::vector<TRuntimeFilterDesc> runtime_filter_descs = {
+            TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(),
+            TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build()};
+
+    std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>> 
runtime_filter_dependencies;
+    SlotDescriptor slot_desc;
+    TupleDescriptor tuple_desc;
+    tuple_desc.add_slot(&slot_desc);
+    RowDescriptor row_desc;
+    _tbl._slot_desc_map[0] = &slot_desc;
+    
const_cast<std::vector<TupleDescriptor*>&>(row_desc._tuple_desc_map).push_back(&tuple_desc);
+    auto helper = RuntimeFilterConsumerHelper(0, runtime_filter_descs, 
row_desc);
+
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.init(_runtime_states[0].get(), 
&_profile, true,
+                                                 runtime_filter_dependencies, 
0, 0, ""));
+
+    vectorized::VExprContextSPtrs conjuncts;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.acquire_runtime_filter(conjuncts));
+    ASSERT_EQ(conjuncts.size(), 0);
+
+    std::shared_ptr<RuntimeFilterProducer> producer;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            
RuntimeFilterProducer::create(RuntimeFilterParamsContext::create(_query_ctx.get()),
+                                          runtime_filter_descs.data(), 
&producer, &_profile));
+    
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
+    helper._consumers[0]->signal(producer.get());
+
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.acquire_runtime_filter(conjuncts));
+    ASSERT_EQ(conjuncts.size(), 1);
+
+    conjuncts.clear();
+    int arrived_rf_num = -1;
+    helper._consumers[1]->signal(producer.get());
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            helper.try_append_late_arrival_runtime_filter(&arrived_rf_num, 
conjuncts));
+    ASSERT_EQ(conjuncts.size(), 1);
+    ASSERT_EQ(arrived_rf_num, 2);
+}
+
+} // namespace doris
diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp 
b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
index 617ce3fdbb0..3e9a456c2a3 100644
--- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
@@ -17,6 +17,7 @@
 
 #include "runtime_filter/runtime_filter_consumer.h"
 
+#include <gen_cpp/PlanNodes_types.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -25,7 +26,35 @@
 
 namespace doris {
 
-class RuntimeFilterConsumerTest : public RuntimeFilterTest {};
+class RuntimeFilterConsumerTest : public RuntimeFilterTest {
+public:
+    void test_signal_aquire(TRuntimeFilterDesc desc) {
+        std::shared_ptr<RuntimeFilterConsumer> consumer;
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+                
RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()),
+                                              &desc, 0, &consumer, &_profile));
+
+        std::shared_ptr<RuntimeFilterProducer> producer;
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
+                RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
&producer, &_profile));
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123));
+        
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
+
+        consumer->signal(producer.get());
+
+        try {
+            consumer->signal(producer.get());
+            ASSERT_TRUE(false);
+        } catch (const Exception& e) {
+            ASSERT_EQ(e.code(), ErrorCode::INTERNAL_ERROR);
+        }
+
+        std::vector<vectorized::VRuntimeFilterPtr> push_exprs;
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(consumer->acquire_expr(push_exprs));
+        ASSERT_NE(push_exprs.size(), 0);
+        ASSERT_TRUE(consumer->is_applied());
+    }
+};
 
 TEST_F(RuntimeFilterConsumerTest, basic) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
@@ -38,7 +67,53 @@ TEST_F(RuntimeFilterConsumerTest, basic) {
             desc, true, 0, &registed_consumer, &_profile));
 }
 
-TEST_F(RuntimeFilterConsumerTest, signal_aquire) {
+TEST_F(RuntimeFilterConsumerTest, signal_aquire_in_or_bloom) {
+    test_signal_aquire(TRuntimeFilterDescBuilder()
+                               .set_type(TRuntimeFilterType::IN_OR_BLOOM)
+                               .add_planId_to_target_expr(0)
+                               .build());
+}
+
+TEST_F(RuntimeFilterConsumerTest, signal_aquire_bloom) {
+    test_signal_aquire(TRuntimeFilterDescBuilder()
+                               .set_type(TRuntimeFilterType::BLOOM)
+                               .add_planId_to_target_expr(0)
+                               .build());
+}
+
+TEST_F(RuntimeFilterConsumerTest, signal_aquire_in) {
+    test_signal_aquire(TRuntimeFilterDescBuilder()
+                               .set_type(TRuntimeFilterType::IN)
+                               .add_planId_to_target_expr(0)
+                               .build());
+}
+
+TEST_F(RuntimeFilterConsumerTest, signal_aquire_min_max) {
+    test_signal_aquire(TRuntimeFilterDescBuilder()
+                               .set_type(TRuntimeFilterType::MIN_MAX)
+                               .add_planId_to_target_expr(0)
+                               .build());
+}
+
+TEST_F(RuntimeFilterConsumerTest, signal_aquire_min) {
+    auto desc = TRuntimeFilterDescBuilder()
+                        .set_type(TRuntimeFilterType::MIN_MAX)
+                        .add_planId_to_target_expr(0)
+                        .build();
+    desc.__set_min_max_type(TMinMaxRuntimeFilterType::MIN);
+    test_signal_aquire(desc);
+}
+
+TEST_F(RuntimeFilterConsumerTest, signal_aquire_max) {
+    auto desc = TRuntimeFilterDescBuilder()
+                        .set_type(TRuntimeFilterType::MIN_MAX)
+                        .add_planId_to_target_expr(0)
+                        .build();
+    desc.__set_min_max_type(TMinMaxRuntimeFilterType::MAX);
+    test_signal_aquire(desc);
+}
+
+TEST_F(RuntimeFilterConsumerTest, timeout_aquire) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
     auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
@@ -49,17 +124,14 @@ TEST_F(RuntimeFilterConsumerTest, signal_aquire) {
             RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
&producer, &_profile));
     
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
 
-    consumer->signal(producer.get());
-
-    try {
-        consumer->signal(producer.get());
-        ASSERT_TRUE(false);
-    } catch (const Exception& e) {
-        ASSERT_EQ(e.code(), ErrorCode::INTERNAL_ERROR);
-    }
-
     std::vector<vectorized::VRuntimeFilterPtr> push_exprs;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(consumer->acquire_expr(push_exprs));
+    ASSERT_EQ(push_exprs.size(), 0);
+    ASSERT_FALSE(consumer->is_applied());
+    ASSERT_EQ(consumer->_rf_state, RuntimeFilterConsumer::State::TIMEOUT);
+
+    consumer->signal(producer.get());
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(consumer->acquire_expr(push_exprs));
     ASSERT_EQ(push_exprs.size(), 1);
     ASSERT_TRUE(consumer->is_applied());
 }
diff --git a/be/test/runtime_filter/runtime_filter_merger_test.cpp 
b/be/test/runtime_filter/runtime_filter_merger_test.cpp
index 768284879bc..e05e73fad89 100644
--- a/be/test/runtime_filter/runtime_filter_merger_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_merger_test.cpp
@@ -56,9 +56,10 @@ public:
         ASSERT_EQ(merger->_wrapper->_state, second_expected_state);
     }
 
-    void test_serialize(RuntimeFilterWrapper::State state) {
+    void test_serialize(RuntimeFilterWrapper::State state,
+                        TRuntimeFilterType::type type = 
TRuntimeFilterType::IN_OR_BLOOM) {
         std::shared_ptr<RuntimeFilterMerger> merger;
-        auto desc = TRuntimeFilterDescBuilder().build();
+        auto desc = TRuntimeFilterDescBuilder().set_type(type).build();
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(
                 RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
&merger, &_profile));
         merger->set_expected_producer_num(1);
@@ -67,6 +68,7 @@ public:
         std::shared_ptr<RuntimeFilterProducer> producer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
                 _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer, &_profile));
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123));
         producer->set_wrapper_state_and_ready_to_publish(state);
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get()));
         ASSERT_TRUE(merger->ready());
@@ -76,12 +78,15 @@ public:
         int len = 0;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->serialize(&request, &data, 
&len));
 
-        std::shared_ptr<RuntimeFilterMerger> deserialized_merger;
+        std::shared_ptr<RuntimeFilterProducer> deserialized_producer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-                
RuntimeFilterMerger::create(RuntimeFilterParamsContext::create(_query_ctx.get()),
-                                            &desc, &deserialized_merger, 
&_profile));
-        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(deserialized_merger->assign(request, 
nullptr));
-        ASSERT_EQ(merger->_wrapper->_state, state);
+                
RuntimeFilterProducer::create(RuntimeFilterParamsContext::create(_query_ctx.get()),
+                                              &desc, &deserialized_producer, 
&_profile));
+        butil::IOBuf buf;
+        buf.append(data, len);
+        butil::IOBufAsZeroCopyInputStream stream(buf);
+        
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(deserialized_producer->assign(request, 
&stream));
+        ASSERT_EQ(deserialized_producer->_wrapper->_state, state);
     }
 };
 
@@ -176,4 +181,16 @@ TEST_F(RuntimeFilterMergerTest, serialize_ignored) {
     test_serialize(RuntimeFilterWrapper::State::IGNORED);
 }
 
+TEST_F(RuntimeFilterMergerTest, serialize_bloom) {
+    test_serialize(RuntimeFilterWrapper::State::READY, 
TRuntimeFilterType::type::BLOOM);
+}
+
+TEST_F(RuntimeFilterMergerTest, serialize_min_max) {
+    test_serialize(RuntimeFilterWrapper::State::READY, 
TRuntimeFilterType::type::MIN_MAX);
+}
+
+TEST_F(RuntimeFilterMergerTest, serialize_in) {
+    test_serialize(RuntimeFilterWrapper::State::READY, 
TRuntimeFilterType::type::IN);
+}
+
 } // namespace doris
diff --git 
a/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp 
b/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp
new file mode 100644
index 00000000000..534245f4437
--- /dev/null
+++ b/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime_filter/runtime_filter_producer_helper_cross.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "common/object_pool.h"
+#include "pipeline/exec/hashjoin_build_sink.h"
+#include "pipeline/exec/mock_operator.h"
+#include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime_filter/runtime_filter_test_utils.h"
+#include "vec/columns/columns_number.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/exprs/vslot_ref.h"
+
+namespace doris {
+
+class RuntimeFilterProducerHelperCrossTest : public RuntimeFilterTest {
+    void SetUp() override {
+        RuntimeFilterTest::SetUp();
+        _pipeline = std::make_shared<pipeline::Pipeline>(0, INSTANCE_NUM, 
INSTANCE_NUM);
+        _op.reset(new pipeline::MockOperatorX());
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->add_operator(_op, 2));
+
+        _sink.reset(new pipeline::HashJoinBuildSinkOperatorX(
+                &_pool, 0, _op->operator_id(),
+                TPlanNodeBuilder(0, TPlanNodeType::HASH_JOIN_NODE).build(), 
_tbl));
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->set_sink(_sink));
+
+        _task.reset(new pipeline::PipelineTask(_pipeline, 0, 
_runtime_states[0].get(), nullptr,
+                                               &_profile, {}, 0));
+        _runtime_states[0]->set_task(_task.get());
+    }
+
+    pipeline::OperatorPtr _op;
+    pipeline::DataSinkOperatorPtr _sink;
+    pipeline::PipelinePtr _pipeline;
+    std::shared_ptr<pipeline::PipelineTask> _task;
+    ObjectPool _pool;
+};
+
+TEST_F(RuntimeFilterProducerHelperCrossTest, basic) {
+    auto helper = RuntimeFilterProducerHelperCross(&_profile);
+
+    vectorized::VExprContextSPtr ctx;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
+            TRuntimeFilterDescBuilder::get_default_expr(), ctx));
+    ctx->_last_result_column_id = 0;
+
+    assert_cast<vectorized::VSlotRef*>(ctx->root().get())->_column_id = 0;
+
+    vectorized::VExprContextSPtrs build_expr_ctxs = {ctx};
+    std::vector<TRuntimeFilterDesc> runtime_filter_descs = 
{TRuntimeFilterDescBuilder().build()};
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            helper.init(_runtime_states[0].get(), build_expr_ctxs, 
runtime_filter_descs));
+
+    vectorized::Block block;
+    auto column = vectorized::ColumnInt32::create();
+    column->insert(1);
+    column->insert(2);
+    block.insert({std::move(column), 
std::make_shared<vectorized::DataTypeInt32>(), "col1"});
+
+    vectorized::Blocks blocks = {block};
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.process(_runtime_states[0].get(), 
blocks));
+}
+
+} // namespace doris
diff --git a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp 
b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
new file mode 100644
index 00000000000..433a612d0c7
--- /dev/null
+++ b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime_filter/runtime_filter_producer_helper.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "common/object_pool.h"
+#include "pipeline/exec/hashjoin_build_sink.h"
+#include "pipeline/exec/mock_operator.h"
+#include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime_filter/runtime_filter_test_utils.h"
+#include "vec/columns/columns_number.h"
+#include "vec/data_types/data_type_number.h"
+
+namespace doris {
+
+class RuntimeFilterProducerHelperTest : public RuntimeFilterTest {
+    void SetUp() override {
+        RuntimeFilterTest::SetUp();
+        _pipeline = std::make_shared<pipeline::Pipeline>(0, INSTANCE_NUM, 
INSTANCE_NUM);
+        _op.reset(new pipeline::MockOperatorX());
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->add_operator(_op, 2));
+
+        _sink.reset(new pipeline::HashJoinBuildSinkOperatorX(
+                &_pool, 0, _op->operator_id(),
+                TPlanNodeBuilder(0, TPlanNodeType::HASH_JOIN_NODE).build(), 
_tbl));
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->set_sink(_sink));
+
+        _task.reset(new pipeline::PipelineTask(_pipeline, 0, 
_runtime_states[0].get(), nullptr,
+                                               &_profile, {}, 0));
+        _runtime_states[0]->set_task(_task.get());
+    }
+
+    pipeline::OperatorPtr _op;
+    pipeline::DataSinkOperatorPtr _sink;
+    pipeline::PipelinePtr _pipeline;
+    std::shared_ptr<pipeline::PipelineTask> _task;
+    ObjectPool _pool;
+};
+
+TEST_F(RuntimeFilterProducerHelperTest, basic) {
+    auto helper = RuntimeFilterProducerHelper(&_profile, true, false);
+
+    vectorized::VExprContextSPtr ctx;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
+            TRuntimeFilterDescBuilder::get_default_expr(), ctx));
+    ctx->_last_result_column_id = 0;
+
+    vectorized::VExprContextSPtrs build_expr_ctxs = {ctx};
+    std::vector<TRuntimeFilterDesc> runtime_filter_descs = 
{TRuntimeFilterDescBuilder().build()};
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            helper.init(_runtime_states[0].get(), build_expr_ctxs, 
runtime_filter_descs));
+
+    vectorized::Block block;
+    auto column = vectorized::ColumnInt32::create();
+    column->insert(1);
+    column->insert(2);
+    block.insert({std::move(column), 
std::make_shared<vectorized::DataTypeInt32>(), "col1"});
+
+    vectorized::SharedHashTableContextPtr shared_hash_table_ctx;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            helper.process(_runtime_states[0].get(), &block, 
shared_hash_table_ctx));
+}
+
+TEST_F(RuntimeFilterProducerHelperTest, wake_up_eraly) {
+    auto helper = RuntimeFilterProducerHelper(&_profile, true, false);
+
+    vectorized::VExprContextSPtr ctx;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
+            TRuntimeFilterDescBuilder::get_default_expr(), ctx));
+    ctx->_last_result_column_id = 0;
+
+    vectorized::VExprContextSPtrs build_expr_ctxs = {ctx};
+    std::vector<TRuntimeFilterDesc> runtime_filter_descs = {
+            
TRuntimeFilterDescBuilder().set_build_bf_by_runtime_size(true).build()};
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            helper.init(_runtime_states[0].get(), build_expr_ctxs, 
runtime_filter_descs));
+
+    vectorized::Block block;
+    auto column = vectorized::ColumnInt32::create();
+    column->insert(1);
+    column->insert(2);
+    block.insert({std::move(column), 
std::make_shared<vectorized::DataTypeInt32>(), "col1"});
+
+    vectorized::SharedHashTableContextPtr shared_hash_table_ctx;
+    _task->set_wake_up_early();
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            helper.process(_runtime_states[0].get(), &block, 
shared_hash_table_ctx));
+}
+
+TEST_F(RuntimeFilterProducerHelperTest, skip_process) {
+    auto helper = RuntimeFilterProducerHelper(&_profile, true, false);
+
+    vectorized::VExprContextSPtr ctx;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
+            TRuntimeFilterDescBuilder::get_default_expr(), ctx));
+    ctx->_last_result_column_id = 0;
+
+    vectorized::VExprContextSPtrs build_expr_ctxs = {ctx};
+    std::vector<TRuntimeFilterDesc> runtime_filter_descs = {
+            
TRuntimeFilterDescBuilder().set_build_bf_by_runtime_size(true).build()};
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            helper.init(_runtime_states[0].get(), build_expr_ctxs, 
runtime_filter_descs));
+
+    
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.skip_process(_runtime_states[0].get()));
+
+    vectorized::Block block;
+    auto column = vectorized::ColumnInt32::create();
+    column->insert(1);
+    column->insert(2);
+    block.insert({std::move(column), 
std::make_shared<vectorized::DataTypeInt32>(), "col1"});
+
+    vectorized::SharedHashTableContextPtr shared_hash_table_ctx;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            helper.process(_runtime_states[0].get(), &block, 
shared_hash_table_ctx));
+}
+
+} // namespace doris
diff --git a/be/test/runtime_filter/runtime_filter_producer_test.cpp 
b/be/test/runtime_filter/runtime_filter_producer_test.cpp
index ab04391fe7d..9822e7a0f41 100644
--- a/be/test/runtime_filter/runtime_filter_producer_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_producer_test.cpp
@@ -20,6 +20,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "runtime_filter/runtime_filter_consumer.h"
 #include "runtime_filter/runtime_filter_test_utils.h"
 
 namespace doris {
@@ -160,6 +161,11 @@ TEST_F(RuntimeFilterProducerTest, set_ignore_or_disable) {
     
producer2->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::DISABLED);
     ASSERT_EQ(producer2->_rf_state, 
RuntimeFilterProducer::State::READY_TO_PUBLISH);
     ASSERT_EQ(producer2->_wrapper->_state, 
RuntimeFilterWrapper::State::DISABLED);
+
+    
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->publish(_runtime_states[0].get(), 
true));
+    
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer2->publish(_runtime_states[1].get(), 
true));
+    ASSERT_EQ(consumer->_rf_state, RuntimeFilterConsumer::State::READY);
+    ASSERT_EQ(consumer->_wrapper->_state, 
RuntimeFilterWrapper::State::DISABLED);
 }
 
-} // namespace doris
+} // namespace doris
\ No newline at end of file
diff --git a/be/test/runtime_filter/runtime_filter_test_utils.h 
b/be/test/runtime_filter/runtime_filter_test_utils.h
index 3141be9d848..ccc6e0b834b 100644
--- a/be/test/runtime_filter/runtime_filter_test_utils.h
+++ b/be/test/runtime_filter/runtime_filter_test_utils.h
@@ -28,7 +28,9 @@ public:
     RuntimeFilterTest() = default;
     ~RuntimeFilterTest() override = default;
     void SetUp() override {
-        _query_options = 
TQueryOptionsBuilder().set_runtime_filter_max_in_num(15).build();
+        _tbl._row_tuples.push_back({});
+
+        _query_options = TQueryOptionsBuilder().build();
         auto fe_address = TNetworkAddress();
         fe_address.hostname = LOCALHOST;
         fe_address.port = DUMMY_PORT;
@@ -50,6 +52,9 @@ public:
                     TUniqueId(), 
RuntimeFilterParamsContext::create(_query_ctx.get()),
                     _query_ctx->query_mem_tracker(), false);
             _runtime_states[i]->set_runtime_filter_mgr(_local_mgrs[i].get());
+            _runtime_states[i]->local_runtime_filter_mgr()->_state->set_state(
+                    _runtime_states[i].get());
+            _runtime_states[i]->set_desc_tbl(&_tbl);
         }
     }
     void TearDown() override {}
@@ -63,6 +68,7 @@ protected:
     const int INSTANCE_NUM = 2;
     std::vector<std::unique_ptr<RuntimeState>> _runtime_states;
     std::vector<std::unique_ptr<RuntimeFilterMgr>> _local_mgrs;
+    DescriptorTbl _tbl;
 };
 
 } // namespace doris
diff --git a/be/test/runtime_filter/runtime_filter_wrapper_test.cpp 
b/be/test/runtime_filter/runtime_filter_wrapper_test.cpp
index 8cd3c1e32b6..d15d78f70ad 100644
--- a/be/test/runtime_filter/runtime_filter_wrapper_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_wrapper_test.cpp
@@ -228,7 +228,7 @@ TEST_F(RuntimeFilterWrapperTest, TestInAssign) {
         auto wrapper = std::make_shared<RuntimeFilterWrapper>(&params);        
                    \
         PMergeFilterRequest valid_request;                                     
                    \
         auto* in_filter = valid_request.mutable_in_filter();                   
                    \
-        in_filter->set_column_type(to_proto(column_return_type));              
                    \
+        in_filter->set_column_type(PColumnType::COLUMN_TYPE_BOOL);             
                    \
         
get_convertor<PrimitiveTypeTraits<column_return_type>::CppType>()(in_filter->add_values(),
 \
                                                                           
value1);                 \
         
get_convertor<PrimitiveTypeTraits<column_return_type>::CppType>()(in_filter->add_values(),
 \
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 46b94860701..2242caae2a4 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -591,7 +591,7 @@ message PMergeFilterRequest {
     optional PInFilter in_filter = 7;
     optional bool is_pipeline = 8 [deprecated = true];
     optional bool opt_remote_rf = 9; // Deprecated
-    optional PColumnType column_type = 10;
+    optional PColumnType column_type = 10; // Deprecated
     optional bool contain_null = 11;
     optional bool ignored = 12;
     optional uint64 local_merge_time = 13;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to