This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d542818fbcb [UT](runtime filter) Add runtime filter test case (#47035)
d542818fbcb is described below

commit d542818fbcbdffb94e44d2f1dc20f0b34f1be95a
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Thu Jan 16 10:12:04 2025 +0800

    [UT](runtime filter) Add runtime filter test case (#47035)
---
 be/src/exprs/runtime_filter.cpp               | 1343 ++++++++++++-------------
 be/src/exprs/runtime_filter.h                 |  111 ++
 be/src/exprs/runtime_filter_slots.h           |   54 +-
 be/src/pipeline/exec/operator.cpp             |    2 +-
 be/src/pipeline/pipeline_fragment_context.cpp |    3 +-
 be/src/pipeline/pipeline_fragment_context.h   |    3 +-
 be/src/pipeline/task_queue.h                  |    9 +-
 be/test/pipeline/dummy_task_queue.h           |   50 +
 be/test/pipeline/pipeline_test.cpp            |  276 ++++-
 be/test/pipeline/thrift_builder.h             |  106 +-
 gensrc/thrift/PaloInternalService.thrift      |    5 +-
 gensrc/thrift/PlanNodes.thrift                |    2 +-
 12 files changed, 1174 insertions(+), 790 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index b85f370165a..33cc155fc6f 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -283,711 +283,6 @@ Status create_vbin_predicate(const TypeDescriptor& type, 
TExprOpcode::type opcod
     *tnode = node;
     return vectorized::VExpr::create_expr(node, expr);
 }
-// This class is a wrapper of runtime predicate function
-class RuntimePredicateWrapper {
-public:
-    RuntimePredicateWrapper(const RuntimeFilterParams* params)
-            : RuntimePredicateWrapper(params->column_return_type, 
params->filter_type,
-                                      params->filter_id) {};
-    // for a 'tmp' runtime predicate wrapper
-    // only could called assign method or as a param for merge
-    RuntimePredicateWrapper(PrimitiveType column_type, RuntimeFilterType type, 
uint32_t filter_id)
-            : _column_return_type(column_type),
-              _filter_type(type),
-              _context(new RuntimeFilterContext()),
-              _filter_id(filter_id) {}
-
-    // init runtime filter wrapper
-    // alloc memory to init runtime filter function
-    Status init(const RuntimeFilterParams* params) {
-        _max_in_num = params->max_in_num;
-        switch (_filter_type) {
-        case RuntimeFilterType::IN_FILTER: {
-            _context->hybrid_set.reset(create_set(_column_return_type));
-            _context->hybrid_set->set_null_aware(params->null_aware);
-            break;
-        }
-        // Only use in nested loop join not need set null aware
-        case RuntimeFilterType::MIN_FILTER:
-        case RuntimeFilterType::MAX_FILTER: {
-            
_context->minmax_func.reset(create_minmax_filter(_column_return_type));
-            break;
-        }
-        case RuntimeFilterType::MINMAX_FILTER: {
-            
_context->minmax_func.reset(create_minmax_filter(_column_return_type));
-            _context->minmax_func->set_null_aware(params->null_aware);
-            break;
-        }
-        case RuntimeFilterType::BLOOM_FILTER: {
-            
_context->bloom_filter_func.reset(create_bloom_filter(_column_return_type));
-            _context->bloom_filter_func->init_params(params);
-            return Status::OK();
-        }
-        case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
-            _context->hybrid_set.reset(create_set(_column_return_type));
-            _context->hybrid_set->set_null_aware(params->null_aware);
-            
_context->bloom_filter_func.reset(create_bloom_filter(_column_return_type));
-            _context->bloom_filter_func->init_params(params);
-            return Status::OK();
-        }
-        case RuntimeFilterType::BITMAP_FILTER: {
-            
_context->bitmap_filter_func.reset(create_bitmap_filter(_column_return_type));
-            
_context->bitmap_filter_func->set_not_in(params->bitmap_filter_not_in);
-            return Status::OK();
-        }
-        default:
-            return Status::InternalError("Unknown Filter type");
-        }
-        return Status::OK();
-    }
-
-    Status change_to_bloom_filter() {
-        if (_filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-            return Status::InternalError(
-                    "Can not change to bloom filter because of runtime filter 
type is {}",
-                    IRuntimeFilter::to_string(_filter_type));
-        }
-        BloomFilterFuncBase* bf = _context->bloom_filter_func.get();
-
-        if (bf != nullptr) {
-            insert_to_bloom_filter(bf);
-        } else if (_context->hybrid_set != nullptr && 
_context->hybrid_set->size() != 0) {
-            return Status::InternalError("change to bloom filter need empty 
set ",
-                                         
IRuntimeFilter::to_string(_filter_type));
-        }
-
-        // release in filter
-        _context->hybrid_set.reset();
-        return Status::OK();
-    }
-
-    Status init_bloom_filter(const size_t build_bf_cardinality) {
-        if (_filter_type != RuntimeFilterType::BLOOM_FILTER &&
-            _filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-            throw Exception(ErrorCode::INTERNAL_ERROR,
-                            "init_bloom_filter meet invalid input type {}", 
int(_filter_type));
-        }
-        return 
_context->bloom_filter_func->init_with_cardinality(build_bf_cardinality);
-    }
-
-    bool get_build_bf_cardinality() const {
-        if (_filter_type == RuntimeFilterType::BLOOM_FILTER ||
-            _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-            return _context->bloom_filter_func->get_build_bf_cardinality();
-        }
-        return false;
-    }
-
-    void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const {
-        if (_context->hybrid_set->size() > 0) {
-            auto* it = _context->hybrid_set->begin();
-            while (it->has_next()) {
-                bloom_filter->insert(it->get_value());
-                it->next();
-            }
-        }
-        if (_context->hybrid_set->contain_null()) {
-            bloom_filter->set_contain_null_and_null_aware();
-        }
-    }
-
-    BloomFilterFuncBase* get_bloomfilter() const { return 
_context->bloom_filter_func.get(); }
-
-    void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) {
-        if (is_ignored()) {
-            throw Exception(ErrorCode::INTERNAL_ERROR, "insert_fixed_len meet 
ignored rf");
-        }
-        switch (_filter_type) {
-        case RuntimeFilterType::IN_FILTER: {
-            _context->hybrid_set->insert_fixed_len(column, start);
-            break;
-        }
-        case RuntimeFilterType::MIN_FILTER:
-        case RuntimeFilterType::MAX_FILTER:
-        case RuntimeFilterType::MINMAX_FILTER: {
-            _context->minmax_func->insert_fixed_len(column, start);
-            break;
-        }
-        case RuntimeFilterType::BLOOM_FILTER: {
-            _context->bloom_filter_func->insert_fixed_len(column, start);
-            break;
-        }
-        case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
-            if (is_bloomfilter()) {
-                _context->bloom_filter_func->insert_fixed_len(column, start);
-            } else {
-                _context->hybrid_set->insert_fixed_len(column, start);
-            }
-            break;
-        }
-        default:
-            DCHECK(false);
-            break;
-        }
-    }
-
-    void insert_batch(const vectorized::ColumnPtr& column, size_t start) {
-        if (get_real_type() == RuntimeFilterType::BITMAP_FILTER) {
-            bitmap_filter_insert_batch(column, start);
-        } else {
-            insert_fixed_len(column, start);
-        }
-    }
-
-    void bitmap_filter_insert_batch(const vectorized::ColumnPtr column, size_t 
start) {
-        std::vector<const BitmapValue*> bitmaps;
-        if (column->is_nullable()) {
-            const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
-            const auto& col =
-                    assert_cast<const 
vectorized::ColumnBitmap&>(nullable->get_nested_column());
-            const auto& nullmap =
-                    assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
-                            .get_data();
-            for (size_t i = start; i < column->size(); i++) {
-                if (!nullmap[i]) {
-                    bitmaps.push_back(&(col.get_data()[i]));
-                }
-            }
-        } else {
-            const auto* col = assert_cast<const 
vectorized::ColumnBitmap*>(column.get());
-            for (size_t i = start; i < column->size(); i++) {
-                bitmaps.push_back(&(col->get_data()[i]));
-            }
-        }
-        _context->bitmap_filter_func->insert_many(bitmaps);
-    }
-
-    RuntimeFilterType get_real_type() const {
-        if (_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-            if (_context->hybrid_set) {
-                return RuntimeFilterType::IN_FILTER;
-            }
-            return RuntimeFilterType::BLOOM_FILTER;
-        }
-        return _filter_type;
-    }
-
-    size_t get_bloom_filter_size() const {
-        return _context->bloom_filter_func ? 
_context->bloom_filter_func->get_size() : 0;
-    }
-
-    Status get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs,
-                          std::vector<vectorized::VRuntimeFilterPtr>& 
push_exprs,
-                          const TExpr& probe_expr);
-
-    Status merge(const RuntimePredicateWrapper* wrapper) {
-        if (wrapper->is_disabled()) {
-            set_disabled();
-            return Status::OK();
-        }
-
-        if (wrapper->is_ignored() || is_disabled()) {
-            return Status::OK();
-        }
-
-        _context->ignored = false;
-
-        bool can_not_merge_in_or_bloom =
-                _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
-                (wrapper->_filter_type != RuntimeFilterType::IN_FILTER &&
-                 wrapper->_filter_type != RuntimeFilterType::BLOOM_FILTER &&
-                 wrapper->_filter_type != 
RuntimeFilterType::IN_OR_BLOOM_FILTER);
-
-        bool can_not_merge_other = _filter_type != 
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
-                                   _filter_type != wrapper->_filter_type;
-
-        CHECK(!can_not_merge_in_or_bloom && !can_not_merge_other)
-                << " can not merge runtime filter(id=" << _filter_id
-                << "), current is filter type is " << 
IRuntimeFilter::to_string(_filter_type)
-                << ", other filter type is " << 
IRuntimeFilter::to_string(wrapper->_filter_type);
-
-        switch (_filter_type) {
-        case RuntimeFilterType::IN_FILTER: {
-            _context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
-            if (_max_in_num >= 0 && _context->hybrid_set->size() >= 
_max_in_num) {
-                set_disabled();
-            }
-            break;
-        }
-        case RuntimeFilterType::MIN_FILTER:
-        case RuntimeFilterType::MAX_FILTER:
-        case RuntimeFilterType::MINMAX_FILTER: {
-            
RETURN_IF_ERROR(_context->minmax_func->merge(wrapper->_context->minmax_func.get()));
-            break;
-        }
-        case RuntimeFilterType::BLOOM_FILTER: {
-            RETURN_IF_ERROR(
-                    
_context->bloom_filter_func->merge(wrapper->_context->bloom_filter_func.get()));
-            break;
-        }
-        case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
-            auto real_filter_type = get_real_type();
-
-            auto other_filter_type = wrapper->_filter_type;
-            if (other_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                other_filter_type = wrapper->get_real_type();
-            }
-
-            if (real_filter_type == RuntimeFilterType::IN_FILTER) {
-                // when we meet base rf is in-filter, threre only have two 
case:
-                // case1: all input-filter's build_bf_exactly is true, inited 
by synced global size
-                // case2: all input-filter's build_bf_exactly is false, inited 
by default size
-                if (other_filter_type == RuntimeFilterType::IN_FILTER) {
-                    
_context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
-                    if (_max_in_num >= 0 && _context->hybrid_set->size() >= 
_max_in_num) {
-                        // case2: use default size to init bf
-                        
RETURN_IF_ERROR(_context->bloom_filter_func->init_with_fixed_length());
-                        RETURN_IF_ERROR(change_to_bloom_filter());
-                    }
-                } else {
-                    // case1&case2: use input bf directly and insert hybrid 
set data into bf
-                    _context->bloom_filter_func = 
wrapper->_context->bloom_filter_func;
-                    RETURN_IF_ERROR(change_to_bloom_filter());
-                }
-            } else {
-                if (other_filter_type == RuntimeFilterType::IN_FILTER) {
-                    // case2: insert data to global filter
-                    
wrapper->insert_to_bloom_filter(_context->bloom_filter_func.get());
-                } else {
-                    // case1&case2: all input bf must has same size
-                    RETURN_IF_ERROR(_context->bloom_filter_func->merge(
-                            wrapper->_context->bloom_filter_func.get()));
-                }
-            }
-            break;
-        }
-        case RuntimeFilterType::BITMAP_FILTER: {
-            // use input bitmap directly because we assume bitmap filter join 
always have full data
-            _context->bitmap_filter_func = 
wrapper->_context->bitmap_filter_func;
-            break;
-        }
-        default:
-            return Status::InternalError("unknown runtime filter");
-        }
-        return Status::OK();
-    }
-
-    Status assign(const PInFilter* in_filter, bool contain_null) {
-        _context->hybrid_set.reset(create_set(_column_return_type));
-        if (contain_null) {
-            _context->hybrid_set->set_null_aware(true);
-            _context->hybrid_set->insert((const void*)nullptr);
-        }
-
-        switch (_column_return_type) {
-        case TYPE_BOOLEAN: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                bool bool_val = column.boolval();
-                set->insert(&bool_val);
-            });
-            break;
-        }
-        case TYPE_TINYINT: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                auto int_val = static_cast<int8_t>(column.intval());
-                set->insert(&int_val);
-            });
-            break;
-        }
-        case TYPE_SMALLINT: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                auto int_val = static_cast<int16_t>(column.intval());
-                set->insert(&int_val);
-            });
-            break;
-        }
-        case TYPE_INT: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                int32_t int_val = column.intval();
-                set->insert(&int_val);
-            });
-            break;
-        }
-        case TYPE_BIGINT: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                int64_t long_val = column.longval();
-                set->insert(&long_val);
-            });
-            break;
-        }
-        case TYPE_LARGEINT: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                auto string_val = column.stringval();
-                StringParser::ParseResult result;
-                auto int128_val = StringParser::string_to_int<int128_t>(
-                        string_val.c_str(), string_val.length(), &result);
-                DCHECK(result == StringParser::PARSE_SUCCESS);
-                set->insert(&int128_val);
-            });
-            break;
-        }
-        case TYPE_FLOAT: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                auto float_val = static_cast<float>(column.doubleval());
-                set->insert(&float_val);
-            });
-            break;
-        }
-        case TYPE_DOUBLE: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                double double_val = column.doubleval();
-                set->insert(&double_val);
-            });
-            break;
-        }
-        case TYPE_DATEV2: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                auto date_v2_val = column.intval();
-                set->insert(&date_v2_val);
-            });
-            break;
-        }
-        case TYPE_DATETIMEV2: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                auto date_v2_val = column.longval();
-                set->insert(&date_v2_val);
-            });
-            break;
-        }
-        case TYPE_DATETIME:
-        case TYPE_DATE: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                const auto& string_val_ref = column.stringval();
-                VecDateTimeValue datetime_val;
-                datetime_val.from_date_str(string_val_ref.c_str(), 
string_val_ref.length());
-                set->insert(&datetime_val);
-            });
-            break;
-        }
-        case TYPE_DECIMALV2: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                const auto& string_val_ref = column.stringval();
-                DecimalV2Value decimal_val(string_val_ref);
-                set->insert(&decimal_val);
-            });
-            break;
-        }
-        case TYPE_DECIMAL32: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                int32_t decimal_32_val = column.intval();
-                set->insert(&decimal_32_val);
-            });
-            break;
-        }
-        case TYPE_DECIMAL64: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                int64_t decimal_64_val = column.longval();
-                set->insert(&decimal_64_val);
-            });
-            break;
-        }
-        case TYPE_DECIMAL128I: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                auto string_val = column.stringval();
-                StringParser::ParseResult result;
-                auto int128_val = StringParser::string_to_int<int128_t>(
-                        string_val.c_str(), string_val.length(), &result);
-                DCHECK(result == StringParser::PARSE_SUCCESS);
-                set->insert(&int128_val);
-            });
-            break;
-        }
-        case TYPE_DECIMAL256: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                auto string_val = column.stringval();
-                StringParser::ParseResult result;
-                auto int_val = StringParser::string_to_int<wide::Int256>(
-                        string_val.c_str(), string_val.length(), &result);
-                DCHECK(result == StringParser::PARSE_SUCCESS);
-                set->insert(&int_val);
-            });
-            break;
-        }
-        case TYPE_VARCHAR:
-        case TYPE_CHAR:
-        case TYPE_STRING: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                const std::string& string_value = column.stringval();
-                // string_value is std::string, call insert(data, size) 
function in StringSet will not cast as StringRef
-                // so could avoid some cast error at different class object.
-                set->insert((void*)string_value.data(), string_value.size());
-            });
-            break;
-        }
-        case TYPE_IPV4: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                int32_t tmp = column.intval();
-                set->insert(&tmp);
-            });
-            break;
-        }
-        case TYPE_IPV6: {
-            batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
-                auto string_val = column.stringval();
-                StringParser::ParseResult result;
-                auto int128_val = StringParser::string_to_int<uint128_t>(
-                        string_val.c_str(), string_val.length(), &result);
-                DCHECK(result == StringParser::PARSE_SUCCESS);
-                set->insert(&int128_val);
-            });
-            break;
-        }
-        default: {
-            return Status::InternalError("not support assign to in filter, 
type: " +
-                                         type_to_string(_column_return_type));
-        }
-        }
-        return Status::OK();
-    }
-
-    void set_enable_fixed_len_to_uint32_v2() {
-        if (_context->bloom_filter_func) {
-            _context->bloom_filter_func->set_enable_fixed_len_to_uint32_v2();
-        }
-    }
-
-    // used by shuffle runtime filter
-    // assign this filter by protobuf
-    Status assign(const PBloomFilter* bloom_filter, 
butil::IOBufAsZeroCopyInputStream* data,
-                  bool contain_null) {
-        // we won't use this class to insert or find any data
-        // so any type is ok
-        
_context->bloom_filter_func.reset(create_bloom_filter(_column_return_type == 
INVALID_TYPE
-                                                                      ? 
PrimitiveType::TYPE_INT
-                                                                      : 
_column_return_type));
-        RETURN_IF_ERROR(_context->bloom_filter_func->assign(data, 
bloom_filter->filter_length(),
-                                                            contain_null));
-        return Status::OK();
-    }
-
-    // used by shuffle runtime filter
-    // assign this filter by protobuf
-    Status assign(const PMinMaxFilter* minmax_filter, bool contain_null) {
-        _context->minmax_func.reset(create_minmax_filter(_column_return_type));
-
-        if (contain_null) {
-            _context->minmax_func->set_null_aware(true);
-            _context->minmax_func->set_contain_null();
-        }
-
-        switch (_column_return_type) {
-        case TYPE_BOOLEAN: {
-            bool min_val = minmax_filter->min_val().boolval();
-            bool max_val = minmax_filter->max_val().boolval();
-            return _context->minmax_func->assign(&min_val, &max_val);
-        }
-        case TYPE_TINYINT: {
-            auto min_val = 
static_cast<int8_t>(minmax_filter->min_val().intval());
-            auto max_val = 
static_cast<int8_t>(minmax_filter->max_val().intval());
-            return _context->minmax_func->assign(&min_val, &max_val);
-        }
-        case TYPE_SMALLINT: {
-            auto min_val = 
static_cast<int16_t>(minmax_filter->min_val().intval());
-            auto max_val = 
static_cast<int16_t>(minmax_filter->max_val().intval());
-            return _context->minmax_func->assign(&min_val, &max_val);
-        }
-        case TYPE_INT: {
-            int32_t min_val = minmax_filter->min_val().intval();
-            int32_t max_val = minmax_filter->max_val().intval();
-            return _context->minmax_func->assign(&min_val, &max_val);
-        }
-        case TYPE_BIGINT: {
-            int64_t min_val = minmax_filter->min_val().longval();
-            int64_t max_val = minmax_filter->max_val().longval();
-            return _context->minmax_func->assign(&min_val, &max_val);
-        }
-        case TYPE_LARGEINT: {
-            auto min_string_val = minmax_filter->min_val().stringval();
-            auto max_string_val = minmax_filter->max_val().stringval();
-            StringParser::ParseResult result;
-            auto min_val = 
StringParser::string_to_int<int128_t>(min_string_val.c_str(),
-                                                                 
min_string_val.length(), &result);
-            DCHECK(result == StringParser::PARSE_SUCCESS);
-            auto max_val = 
StringParser::string_to_int<int128_t>(max_string_val.c_str(),
-                                                                 
max_string_val.length(), &result);
-            DCHECK(result == StringParser::PARSE_SUCCESS);
-            return _context->minmax_func->assign(&min_val, &max_val);
-        }
-        case TYPE_FLOAT: {
-            auto min_val = 
static_cast<float>(minmax_filter->min_val().doubleval());
-            auto max_val = 
static_cast<float>(minmax_filter->max_val().doubleval());
-            return _context->minmax_func->assign(&min_val, &max_val);
-        }
-        case TYPE_DOUBLE: {
-            auto min_val = 
static_cast<double>(minmax_filter->min_val().doubleval());
-            auto max_val = 
static_cast<double>(minmax_filter->max_val().doubleval());
-            return _context->minmax_func->assign(&min_val, &max_val);
-        }
-        case TYPE_DATEV2: {
-            int32_t min_val = minmax_filter->min_val().intval();
-            int32_t max_val = minmax_filter->max_val().intval();
-            return _context->minmax_func->assign(&min_val, &max_val);
-        }
-        case TYPE_DATETIMEV2: {
-            int64_t min_val = minmax_filter->min_val().longval();
-            int64_t max_val = minmax_filter->max_val().longval();
-            return _context->minmax_func->assign(&min_val, &max_val);
-        }
-        case TYPE_DATETIME:
-        case TYPE_DATE: {
-            const auto& min_val_ref = minmax_filter->min_val().stringval();
-            const auto& max_val_ref = minmax_filter->max_val().stringval();
-            VecDateTimeValue min_val;
-            VecDateTimeValue max_val;
-            min_val.from_date_str(min_val_ref.c_str(), min_val_ref.length());
-            max_val.from_date_str(max_val_ref.c_str(), max_val_ref.length());
-            return _context->minmax_func->assign(&min_val, &max_val);
-        }
-        case TYPE_DECIMALV2: {
-            const auto& min_val_ref = minmax_filter->min_val().stringval();
-            const auto& max_val_ref = minmax_filter->max_val().stringval();
-            DecimalV2Value min_val(min_val_ref);
-            DecimalV2Value max_val(max_val_ref);
-            return _context->minmax_func->assign(&min_val, &max_val);
-        }
-        case TYPE_DECIMAL32: {
-            int32_t min_val = minmax_filter->min_val().intval();
-            int32_t max_val = minmax_filter->max_val().intval();
-            return _context->minmax_func->assign(&min_val, &max_val);
-        }
-        case TYPE_DECIMAL64: {
-            int64_t min_val = minmax_filter->min_val().longval();
-            int64_t max_val = minmax_filter->max_val().longval();
-            return _context->minmax_func->assign(&min_val, &max_val);
-        }
-        case TYPE_DECIMAL128I: {
-            auto min_string_val = minmax_filter->min_val().stringval();
-            auto max_string_val = minmax_filter->max_val().stringval();
-            StringParser::ParseResult result;
-            auto min_val = 
StringParser::string_to_int<int128_t>(min_string_val.c_str(),
-                                                                 
min_string_val.length(), &result);
-            DCHECK(result == StringParser::PARSE_SUCCESS);
-            auto max_val = 
StringParser::string_to_int<int128_t>(max_string_val.c_str(),
-                                                                 
max_string_val.length(), &result);
-            DCHECK(result == StringParser::PARSE_SUCCESS);
-            return _context->minmax_func->assign(&min_val, &max_val);
-        }
-        case TYPE_DECIMAL256: {
-            auto min_string_val = minmax_filter->min_val().stringval();
-            auto max_string_val = minmax_filter->max_val().stringval();
-            StringParser::ParseResult result;
-            auto min_val = StringParser::string_to_int<wide::Int256>(
-                    min_string_val.c_str(), min_string_val.length(), &result);
-            DCHECK(result == StringParser::PARSE_SUCCESS);
-            auto max_val = StringParser::string_to_int<wide::Int256>(
-                    max_string_val.c_str(), max_string_val.length(), &result);
-            DCHECK(result == StringParser::PARSE_SUCCESS);
-            return _context->minmax_func->assign(&min_val, &max_val);
-        }
-        case TYPE_VARCHAR:
-        case TYPE_CHAR:
-        case TYPE_STRING: {
-            auto min_val_ref = minmax_filter->min_val().stringval();
-            auto max_val_ref = minmax_filter->max_val().stringval();
-            return _context->minmax_func->assign(&min_val_ref, &max_val_ref);
-        }
-        case TYPE_IPV4: {
-            int tmp_min = minmax_filter->min_val().intval();
-            int tmp_max = minmax_filter->max_val().intval();
-            return _context->minmax_func->assign(&tmp_min, &tmp_max);
-        }
-        case TYPE_IPV6: {
-            auto min_string_val = minmax_filter->min_val().stringval();
-            auto max_string_val = minmax_filter->max_val().stringval();
-            StringParser::ParseResult result;
-            auto min_val = 
StringParser::string_to_int<uint128_t>(min_string_val.c_str(),
-                                                                  
min_string_val.length(), &result);
-            DCHECK(result == StringParser::PARSE_SUCCESS);
-            auto max_val = 
StringParser::string_to_int<uint128_t>(max_string_val.c_str(),
-                                                                  
max_string_val.length(), &result);
-            DCHECK(result == StringParser::PARSE_SUCCESS);
-            return _context->minmax_func->assign(&min_val, &max_val);
-        }
-        default:
-            break;
-        }
-        return Status::InternalError("not support!");
-    }
-
-    void get_bloom_filter_desc(char** data, int* filter_length) {
-        _context->bloom_filter_func->get_data(data, filter_length);
-    }
-
-    PrimitiveType column_type() { return _column_return_type; }
-
-    bool is_bloomfilter() const { return get_real_type() == 
RuntimeFilterType::BLOOM_FILTER; }
-
-    bool contain_null() const {
-        if (is_bloomfilter()) {
-            return _context->bloom_filter_func->contain_null();
-        }
-        if (_context->hybrid_set) {
-            if (get_real_type() != RuntimeFilterType::IN_FILTER) {
-                throw Exception(ErrorCode::INTERNAL_ERROR, "rf has hybrid_set 
but real type is {}",
-                                int(get_real_type()));
-            }
-            return _context->hybrid_set->contain_null();
-        }
-        if (_context->minmax_func) {
-            return _context->minmax_func->contain_null();
-        }
-        return false;
-    }
-
-    bool is_ignored() const { return _context->ignored; }
-
-    void set_ignored() { _context->ignored = true; }
-
-    bool is_disabled() const { return _context->disabled; }
-
-    void set_disabled() {
-        _context->disabled = true;
-        _context->minmax_func.reset();
-        _context->hybrid_set.reset();
-        _context->bloom_filter_func.reset();
-        _context->bitmap_filter_func.reset();
-    }
-
-    void batch_assign(const PInFilter* filter,
-                      void (*assign_func)(std::shared_ptr<HybridSetBase>& 
_hybrid_set,
-                                          PColumnValue&)) {
-        for (int i = 0; i < filter->values_size(); ++i) {
-            PColumnValue column = filter->values(i);
-            assign_func(_context->hybrid_set, column);
-        }
-    }
-
-    size_t get_in_filter_size() const {
-        return _context->hybrid_set ? _context->hybrid_set->size() : 0;
-    }
-
-    std::shared_ptr<BitmapFilterFuncBase> get_bitmap_filter() const {
-        return _context->bitmap_filter_func;
-    }
-
-    friend class IRuntimeFilter;
-
-    void set_filter_id(int id) {
-        if (_context->bloom_filter_func) {
-            _context->bloom_filter_func->set_filter_id(id);
-        }
-        if (_context->bitmap_filter_func) {
-            _context->bitmap_filter_func->set_filter_id(id);
-        }
-        if (_context->hybrid_set) {
-            _context->hybrid_set->set_filter_id(id);
-        }
-    }
-
-private:
-    // When a runtime filter received from remote and it is a bloom filter, 
_column_return_type will be invalid.
-    PrimitiveType _column_return_type; // column type
-    RuntimeFilterType _filter_type;
-    int32_t _max_in_num = -1;
-
-    RuntimeFilterContextSPtr _context;
-    uint32_t _filter_id;
-};
 
 Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, const 
TRuntimeFilterDesc* desc,
                               const TQueryOptions* query_options, const 
RuntimeFilterRole role,
@@ -1794,4 +1089,642 @@ Status RuntimePredicateWrapper::get_push_exprs(
 RuntimeFilterWrapperHolder::RuntimeFilterWrapperHolder() = default;
 RuntimeFilterWrapperHolder::~RuntimeFilterWrapperHolder() = default;
 
+Status RuntimePredicateWrapper::init(const RuntimeFilterParams* params) {
+    _max_in_num = params->max_in_num;
+    switch (_filter_type) {
+    case RuntimeFilterType::IN_FILTER: {
+        _context->hybrid_set.reset(create_set(_column_return_type));
+        _context->hybrid_set->set_null_aware(params->null_aware);
+        break;
+    }
+    // Only use in nested loop join not need set null aware
+    case RuntimeFilterType::MIN_FILTER:
+    case RuntimeFilterType::MAX_FILTER: {
+        _context->minmax_func.reset(create_minmax_filter(_column_return_type));
+        break;
+    }
+    case RuntimeFilterType::MINMAX_FILTER: {
+        _context->minmax_func.reset(create_minmax_filter(_column_return_type));
+        _context->minmax_func->set_null_aware(params->null_aware);
+        break;
+    }
+    case RuntimeFilterType::BLOOM_FILTER: {
+        
_context->bloom_filter_func.reset(create_bloom_filter(_column_return_type));
+        _context->bloom_filter_func->init_params(params);
+        return Status::OK();
+    }
+    case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
+        _context->hybrid_set.reset(create_set(_column_return_type));
+        _context->hybrid_set->set_null_aware(params->null_aware);
+        
_context->bloom_filter_func.reset(create_bloom_filter(_column_return_type));
+        _context->bloom_filter_func->init_params(params);
+        return Status::OK();
+    }
+    case RuntimeFilterType::BITMAP_FILTER: {
+        
_context->bitmap_filter_func.reset(create_bitmap_filter(_column_return_type));
+        _context->bitmap_filter_func->set_not_in(params->bitmap_filter_not_in);
+        return Status::OK();
+    }
+    default:
+        return Status::InternalError("Unknown Filter type");
+    }
+    return Status::OK();
+}
+
+Status RuntimePredicateWrapper::change_to_bloom_filter() {
+    if (_filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+        return Status::InternalError(
+                "Can not change to bloom filter because of runtime filter type 
is {}",
+                IRuntimeFilter::to_string(_filter_type));
+    }
+    BloomFilterFuncBase* bf = _context->bloom_filter_func.get();
+
+    if (bf != nullptr) {
+        insert_to_bloom_filter(bf);
+    } else if (_context->hybrid_set != nullptr && _context->hybrid_set->size() 
!= 0) {
+        return Status::InternalError("change to bloom filter need empty set ",
+                                     IRuntimeFilter::to_string(_filter_type));
+    }
+
+    // release in filter
+    _context->hybrid_set.reset();
+    return Status::OK();
+}
+
+void RuntimePredicateWrapper::set_filter_id(int id) {
+    if (_context->bloom_filter_func) {
+        _context->bloom_filter_func->set_filter_id(id);
+    }
+    if (_context->bitmap_filter_func) {
+        _context->bitmap_filter_func->set_filter_id(id);
+    }
+    if (_context->hybrid_set) {
+        _context->hybrid_set->set_filter_id(id);
+    }
+}
+
+void RuntimePredicateWrapper::batch_assign(
+        const PInFilter* filter,
+        void (*assign_func)(std::shared_ptr<HybridSetBase>& _hybrid_set, 
PColumnValue&)) {
+    for (int i = 0; i < filter->values_size(); ++i) {
+        PColumnValue column = filter->values(i);
+        assign_func(_context->hybrid_set, column);
+    }
+}
+
+Status RuntimePredicateWrapper::init_bloom_filter(const size_t 
build_bf_cardinality) {
+    if (_filter_type != RuntimeFilterType::BLOOM_FILTER &&
+        _filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+        throw Exception(ErrorCode::INTERNAL_ERROR, "init_bloom_filter meet 
invalid input type {}",
+                        int(_filter_type));
+    }
+    return 
_context->bloom_filter_func->init_with_cardinality(build_bf_cardinality);
+}
+
+bool RuntimePredicateWrapper::get_build_bf_cardinality() const {
+    if (_filter_type == RuntimeFilterType::BLOOM_FILTER ||
+        _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+        return _context->bloom_filter_func->get_build_bf_cardinality();
+    }
+    return false;
+}
+
+void RuntimePredicateWrapper::insert_to_bloom_filter(BloomFilterFuncBase* 
bloom_filter) const {
+    if (_context->hybrid_set->size() > 0) {
+        auto* it = _context->hybrid_set->begin();
+        while (it->has_next()) {
+            bloom_filter->insert(it->get_value());
+            it->next();
+        }
+    }
+    if (_context->hybrid_set->contain_null()) {
+        bloom_filter->set_contain_null_and_null_aware();
+    }
+}
+
+void RuntimePredicateWrapper::insert_fixed_len(const vectorized::ColumnPtr& 
column, size_t start) {
+    if (is_ignored()) {
+        throw Exception(ErrorCode::INTERNAL_ERROR, "insert_fixed_len meet 
ignored rf");
+    }
+    switch (_filter_type) {
+    case RuntimeFilterType::IN_FILTER: {
+        _context->hybrid_set->insert_fixed_len(column, start);
+        break;
+    }
+    case RuntimeFilterType::MIN_FILTER:
+    case RuntimeFilterType::MAX_FILTER:
+    case RuntimeFilterType::MINMAX_FILTER: {
+        _context->minmax_func->insert_fixed_len(column, start);
+        break;
+    }
+    case RuntimeFilterType::BLOOM_FILTER: {
+        _context->bloom_filter_func->insert_fixed_len(column, start);
+        break;
+    }
+    case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
+        if (is_bloomfilter()) {
+            _context->bloom_filter_func->insert_fixed_len(column, start);
+        } else {
+            _context->hybrid_set->insert_fixed_len(column, start);
+        }
+        break;
+    }
+    default:
+        DCHECK(false);
+        break;
+    }
+}
+
+void RuntimePredicateWrapper::bitmap_filter_insert_batch(const 
vectorized::ColumnPtr column,
+                                                         size_t start) {
+    std::vector<const BitmapValue*> bitmaps;
+    if (column->is_nullable()) {
+        const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
+        const auto& col =
+                assert_cast<const 
vectorized::ColumnBitmap&>(nullable->get_nested_column());
+        const auto& nullmap =
+                assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+                        .get_data();
+        for (size_t i = start; i < column->size(); i++) {
+            if (!nullmap[i]) {
+                bitmaps.push_back(&(col.get_data()[i]));
+            }
+        }
+    } else {
+        const auto* col = assert_cast<const 
vectorized::ColumnBitmap*>(column.get());
+        for (size_t i = start; i < column->size(); i++) {
+            bitmaps.push_back(&(col->get_data()[i]));
+        }
+    }
+    _context->bitmap_filter_func->insert_many(bitmaps);
+}
+
+size_t RuntimePredicateWrapper::get_bloom_filter_size() const {
+    return _context->bloom_filter_func ? 
_context->bloom_filter_func->get_size() : 0;
+}
+
+Status RuntimePredicateWrapper::merge(const RuntimePredicateWrapper* wrapper) {
+    if (wrapper->is_disabled()) {
+        set_disabled();
+        return Status::OK();
+    }
+
+    if (wrapper->is_ignored() || is_disabled()) {
+        return Status::OK();
+    }
+
+    _context->ignored = false;
+
+    bool can_not_merge_in_or_bloom =
+            _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
+            (wrapper->_filter_type != RuntimeFilterType::IN_FILTER &&
+             wrapper->_filter_type != RuntimeFilterType::BLOOM_FILTER &&
+             wrapper->_filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER);
+
+    bool can_not_merge_other = _filter_type != 
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
+                               _filter_type != wrapper->_filter_type;
+
+    CHECK(!can_not_merge_in_or_bloom && !can_not_merge_other)
+            << " can not merge runtime filter(id=" << _filter_id << "), 
current is filter type is "
+            << IRuntimeFilter::to_string(_filter_type) << ", other filter type 
is "
+            << IRuntimeFilter::to_string(wrapper->_filter_type);
+
+    switch (_filter_type) {
+    case RuntimeFilterType::IN_FILTER: {
+        _context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
+        if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) {
+            set_disabled();
+        }
+        break;
+    }
+    case RuntimeFilterType::MIN_FILTER:
+    case RuntimeFilterType::MAX_FILTER:
+    case RuntimeFilterType::MINMAX_FILTER: {
+        
RETURN_IF_ERROR(_context->minmax_func->merge(wrapper->_context->minmax_func.get()));
+        break;
+    }
+    case RuntimeFilterType::BLOOM_FILTER: {
+        RETURN_IF_ERROR(
+                
_context->bloom_filter_func->merge(wrapper->_context->bloom_filter_func.get()));
+        break;
+    }
+    case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
+        auto real_filter_type = get_real_type();
+
+        auto other_filter_type = wrapper->_filter_type;
+        if (other_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+            other_filter_type = wrapper->get_real_type();
+        }
+
+        if (real_filter_type == RuntimeFilterType::IN_FILTER) {
+            // when we meet base rf is in-filter, threre only have two case:
+            // case1: all input-filter's build_bf_exactly is true, inited by 
synced global size
+            // case2: all input-filter's build_bf_exactly is false, inited by 
default size
+            if (other_filter_type == RuntimeFilterType::IN_FILTER) {
+                
_context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
+                if (_max_in_num >= 0 && _context->hybrid_set->size() >= 
_max_in_num) {
+                    // case2: use default size to init bf
+                    
RETURN_IF_ERROR(_context->bloom_filter_func->init_with_fixed_length());
+                    RETURN_IF_ERROR(change_to_bloom_filter());
+                }
+            } else {
+                // case1&case2: use input bf directly and insert hybrid set 
data into bf
+                _context->bloom_filter_func = 
wrapper->_context->bloom_filter_func;
+                RETURN_IF_ERROR(change_to_bloom_filter());
+            }
+        } else {
+            if (other_filter_type == RuntimeFilterType::IN_FILTER) {
+                // case2: insert data to global filter
+                
wrapper->insert_to_bloom_filter(_context->bloom_filter_func.get());
+            } else {
+                // case1&case2: all input bf must has same size
+                RETURN_IF_ERROR(_context->bloom_filter_func->merge(
+                        wrapper->_context->bloom_filter_func.get()));
+            }
+        }
+        break;
+    }
+    case RuntimeFilterType::BITMAP_FILTER: {
+        // use input bitmap directly because we assume bitmap filter join 
always have full data
+        _context->bitmap_filter_func = wrapper->_context->bitmap_filter_func;
+        break;
+    }
+    default:
+        return Status::InternalError("unknown runtime filter");
+    }
+    return Status::OK();
+}
+
+Status RuntimePredicateWrapper::assign(const PInFilter* in_filter, bool 
contain_null) {
+    _context->hybrid_set.reset(create_set(_column_return_type));
+    if (contain_null) {
+        _context->hybrid_set->set_null_aware(true);
+        _context->hybrid_set->insert((const void*)nullptr);
+    }
+
+    switch (_column_return_type) {
+    case TYPE_BOOLEAN: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            bool bool_val = column.boolval();
+            set->insert(&bool_val);
+        });
+        break;
+    }
+    case TYPE_TINYINT: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            auto int_val = static_cast<int8_t>(column.intval());
+            set->insert(&int_val);
+        });
+        break;
+    }
+    case TYPE_SMALLINT: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            auto int_val = static_cast<int16_t>(column.intval());
+            set->insert(&int_val);
+        });
+        break;
+    }
+    case TYPE_INT: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            int32_t int_val = column.intval();
+            set->insert(&int_val);
+        });
+        break;
+    }
+    case TYPE_BIGINT: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            int64_t long_val = column.longval();
+            set->insert(&long_val);
+        });
+        break;
+    }
+    case TYPE_LARGEINT: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            auto string_val = column.stringval();
+            StringParser::ParseResult result;
+            auto int128_val = 
StringParser::string_to_int<int128_t>(string_val.c_str(),
+                                                                    
string_val.length(), &result);
+            DCHECK(result == StringParser::PARSE_SUCCESS);
+            set->insert(&int128_val);
+        });
+        break;
+    }
+    case TYPE_FLOAT: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            auto float_val = static_cast<float>(column.doubleval());
+            set->insert(&float_val);
+        });
+        break;
+    }
+    case TYPE_DOUBLE: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            double double_val = column.doubleval();
+            set->insert(&double_val);
+        });
+        break;
+    }
+    case TYPE_DATEV2: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            auto date_v2_val = column.intval();
+            set->insert(&date_v2_val);
+        });
+        break;
+    }
+    case TYPE_DATETIMEV2: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            auto date_v2_val = column.longval();
+            set->insert(&date_v2_val);
+        });
+        break;
+    }
+    case TYPE_DATETIME:
+    case TYPE_DATE: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            const auto& string_val_ref = column.stringval();
+            VecDateTimeValue datetime_val;
+            datetime_val.from_date_str(string_val_ref.c_str(), 
string_val_ref.length());
+            set->insert(&datetime_val);
+        });
+        break;
+    }
+    case TYPE_DECIMALV2: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            const auto& string_val_ref = column.stringval();
+            DecimalV2Value decimal_val(string_val_ref);
+            set->insert(&decimal_val);
+        });
+        break;
+    }
+    case TYPE_DECIMAL32: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            int32_t decimal_32_val = column.intval();
+            set->insert(&decimal_32_val);
+        });
+        break;
+    }
+    case TYPE_DECIMAL64: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            int64_t decimal_64_val = column.longval();
+            set->insert(&decimal_64_val);
+        });
+        break;
+    }
+    case TYPE_DECIMAL128I: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            auto string_val = column.stringval();
+            StringParser::ParseResult result;
+            auto int128_val = 
StringParser::string_to_int<int128_t>(string_val.c_str(),
+                                                                    
string_val.length(), &result);
+            DCHECK(result == StringParser::PARSE_SUCCESS);
+            set->insert(&int128_val);
+        });
+        break;
+    }
+    case TYPE_DECIMAL256: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            auto string_val = column.stringval();
+            StringParser::ParseResult result;
+            auto int_val = 
StringParser::string_to_int<wide::Int256>(string_val.c_str(),
+                                                                     
string_val.length(), &result);
+            DCHECK(result == StringParser::PARSE_SUCCESS);
+            set->insert(&int_val);
+        });
+        break;
+    }
+    case TYPE_VARCHAR:
+    case TYPE_CHAR:
+    case TYPE_STRING: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            const std::string& string_value = column.stringval();
+            // string_value is std::string, call insert(data, size) function 
in StringSet will not cast as StringRef
+            // so could avoid some cast error at different class object.
+            set->insert((void*)string_value.data(), string_value.size());
+        });
+        break;
+    }
+    case TYPE_IPV4: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            int32_t tmp = column.intval();
+            set->insert(&tmp);
+        });
+        break;
+    }
+    case TYPE_IPV6: {
+        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+            auto string_val = column.stringval();
+            StringParser::ParseResult result;
+            auto int128_val = 
StringParser::string_to_int<uint128_t>(string_val.c_str(),
+                                                                     
string_val.length(), &result);
+            DCHECK(result == StringParser::PARSE_SUCCESS);
+            set->insert(&int128_val);
+        });
+        break;
+    }
+    default: {
+        return Status::InternalError("not support assign to in filter, type: " 
+
+                                     type_to_string(_column_return_type));
+    }
+    }
+    return Status::OK();
+}
+
+void RuntimePredicateWrapper::set_enable_fixed_len_to_uint32_v2() {
+    if (_context->bloom_filter_func) {
+        _context->bloom_filter_func->set_enable_fixed_len_to_uint32_v2();
+    }
+}
+
+Status RuntimePredicateWrapper::assign(const PBloomFilter* bloom_filter,
+                                       butil::IOBufAsZeroCopyInputStream* 
data, bool contain_null) {
+    // we won't use this class to insert or find any data
+    // so any type is ok
+    _context->bloom_filter_func.reset(create_bloom_filter(
+            _column_return_type == INVALID_TYPE ? PrimitiveType::TYPE_INT : 
_column_return_type));
+    RETURN_IF_ERROR(
+            _context->bloom_filter_func->assign(data, 
bloom_filter->filter_length(), contain_null));
+    return Status::OK();
+}
+
+// used by shuffle runtime filter
+// assign this filter by protobuf
+Status RuntimePredicateWrapper::assign(const PMinMaxFilter* minmax_filter, 
bool contain_null) {
+    _context->minmax_func.reset(create_minmax_filter(_column_return_type));
+
+    if (contain_null) {
+        _context->minmax_func->set_null_aware(true);
+        _context->minmax_func->set_contain_null();
+    }
+
+    switch (_column_return_type) {
+    case TYPE_BOOLEAN: {
+        bool min_val = minmax_filter->min_val().boolval();
+        bool max_val = minmax_filter->max_val().boolval();
+        return _context->minmax_func->assign(&min_val, &max_val);
+    }
+    case TYPE_TINYINT: {
+        auto min_val = static_cast<int8_t>(minmax_filter->min_val().intval());
+        auto max_val = static_cast<int8_t>(minmax_filter->max_val().intval());
+        return _context->minmax_func->assign(&min_val, &max_val);
+    }
+    case TYPE_SMALLINT: {
+        auto min_val = static_cast<int16_t>(minmax_filter->min_val().intval());
+        auto max_val = static_cast<int16_t>(minmax_filter->max_val().intval());
+        return _context->minmax_func->assign(&min_val, &max_val);
+    }
+    case TYPE_INT: {
+        int32_t min_val = minmax_filter->min_val().intval();
+        int32_t max_val = minmax_filter->max_val().intval();
+        return _context->minmax_func->assign(&min_val, &max_val);
+    }
+    case TYPE_BIGINT: {
+        int64_t min_val = minmax_filter->min_val().longval();
+        int64_t max_val = minmax_filter->max_val().longval();
+        return _context->minmax_func->assign(&min_val, &max_val);
+    }
+    case TYPE_LARGEINT: {
+        auto min_string_val = minmax_filter->min_val().stringval();
+        auto max_string_val = minmax_filter->max_val().stringval();
+        StringParser::ParseResult result;
+        auto min_val = 
StringParser::string_to_int<int128_t>(min_string_val.c_str(),
+                                                             
min_string_val.length(), &result);
+        DCHECK(result == StringParser::PARSE_SUCCESS);
+        auto max_val = 
StringParser::string_to_int<int128_t>(max_string_val.c_str(),
+                                                             
max_string_val.length(), &result);
+        DCHECK(result == StringParser::PARSE_SUCCESS);
+        return _context->minmax_func->assign(&min_val, &max_val);
+    }
+    case TYPE_FLOAT: {
+        auto min_val = 
static_cast<float>(minmax_filter->min_val().doubleval());
+        auto max_val = 
static_cast<float>(minmax_filter->max_val().doubleval());
+        return _context->minmax_func->assign(&min_val, &max_val);
+    }
+    case TYPE_DOUBLE: {
+        auto min_val = 
static_cast<double>(minmax_filter->min_val().doubleval());
+        auto max_val = 
static_cast<double>(minmax_filter->max_val().doubleval());
+        return _context->minmax_func->assign(&min_val, &max_val);
+    }
+    case TYPE_DATEV2: {
+        int32_t min_val = minmax_filter->min_val().intval();
+        int32_t max_val = minmax_filter->max_val().intval();
+        return _context->minmax_func->assign(&min_val, &max_val);
+    }
+    case TYPE_DATETIMEV2: {
+        int64_t min_val = minmax_filter->min_val().longval();
+        int64_t max_val = minmax_filter->max_val().longval();
+        return _context->minmax_func->assign(&min_val, &max_val);
+    }
+    case TYPE_DATETIME:
+    case TYPE_DATE: {
+        const auto& min_val_ref = minmax_filter->min_val().stringval();
+        const auto& max_val_ref = minmax_filter->max_val().stringval();
+        VecDateTimeValue min_val;
+        VecDateTimeValue max_val;
+        min_val.from_date_str(min_val_ref.c_str(), min_val_ref.length());
+        max_val.from_date_str(max_val_ref.c_str(), max_val_ref.length());
+        return _context->minmax_func->assign(&min_val, &max_val);
+    }
+    case TYPE_DECIMALV2: {
+        const auto& min_val_ref = minmax_filter->min_val().stringval();
+        const auto& max_val_ref = minmax_filter->max_val().stringval();
+        DecimalV2Value min_val(min_val_ref);
+        DecimalV2Value max_val(max_val_ref);
+        return _context->minmax_func->assign(&min_val, &max_val);
+    }
+    case TYPE_DECIMAL32: {
+        int32_t min_val = minmax_filter->min_val().intval();
+        int32_t max_val = minmax_filter->max_val().intval();
+        return _context->minmax_func->assign(&min_val, &max_val);
+    }
+    case TYPE_DECIMAL64: {
+        int64_t min_val = minmax_filter->min_val().longval();
+        int64_t max_val = minmax_filter->max_val().longval();
+        return _context->minmax_func->assign(&min_val, &max_val);
+    }
+    case TYPE_DECIMAL128I: {
+        auto min_string_val = minmax_filter->min_val().stringval();
+        auto max_string_val = minmax_filter->max_val().stringval();
+        StringParser::ParseResult result;
+        auto min_val = 
StringParser::string_to_int<int128_t>(min_string_val.c_str(),
+                                                             
min_string_val.length(), &result);
+        DCHECK(result == StringParser::PARSE_SUCCESS);
+        auto max_val = 
StringParser::string_to_int<int128_t>(max_string_val.c_str(),
+                                                             
max_string_val.length(), &result);
+        DCHECK(result == StringParser::PARSE_SUCCESS);
+        return _context->minmax_func->assign(&min_val, &max_val);
+    }
+    case TYPE_DECIMAL256: {
+        auto min_string_val = minmax_filter->min_val().stringval();
+        auto max_string_val = minmax_filter->max_val().stringval();
+        StringParser::ParseResult result;
+        auto min_val = 
StringParser::string_to_int<wide::Int256>(min_string_val.c_str(),
+                                                                 
min_string_val.length(), &result);
+        DCHECK(result == StringParser::PARSE_SUCCESS);
+        auto max_val = 
StringParser::string_to_int<wide::Int256>(max_string_val.c_str(),
+                                                                 
max_string_val.length(), &result);
+        DCHECK(result == StringParser::PARSE_SUCCESS);
+        return _context->minmax_func->assign(&min_val, &max_val);
+    }
+    case TYPE_VARCHAR:
+    case TYPE_CHAR:
+    case TYPE_STRING: {
+        auto min_val_ref = minmax_filter->min_val().stringval();
+        auto max_val_ref = minmax_filter->max_val().stringval();
+        return _context->minmax_func->assign(&min_val_ref, &max_val_ref);
+    }
+    case TYPE_IPV4: {
+        int tmp_min = minmax_filter->min_val().intval();
+        int tmp_max = minmax_filter->max_val().intval();
+        return _context->minmax_func->assign(&tmp_min, &tmp_max);
+    }
+    case TYPE_IPV6: {
+        auto min_string_val = minmax_filter->min_val().stringval();
+        auto max_string_val = minmax_filter->max_val().stringval();
+        StringParser::ParseResult result;
+        auto min_val = 
StringParser::string_to_int<uint128_t>(min_string_val.c_str(),
+                                                              
min_string_val.length(), &result);
+        DCHECK(result == StringParser::PARSE_SUCCESS);
+        auto max_val = 
StringParser::string_to_int<uint128_t>(max_string_val.c_str(),
+                                                              
max_string_val.length(), &result);
+        DCHECK(result == StringParser::PARSE_SUCCESS);
+        return _context->minmax_func->assign(&min_val, &max_val);
+    }
+    default:
+        break;
+    }
+    return Status::InternalError("not support!");
+}
+
+void RuntimePredicateWrapper::get_bloom_filter_desc(char** data, int* 
filter_length) {
+    _context->bloom_filter_func->get_data(data, filter_length);
+}
+
+bool RuntimePredicateWrapper::contain_null() const {
+    if (is_bloomfilter()) {
+        return _context->bloom_filter_func->contain_null();
+    }
+    if (_context->hybrid_set) {
+        if (get_real_type() != RuntimeFilterType::IN_FILTER) {
+            throw Exception(ErrorCode::INTERNAL_ERROR, "rf has hybrid_set but 
real type is {}",
+                            int(get_real_type()));
+        }
+        return _context->hybrid_set->contain_null();
+    }
+    if (_context->minmax_func) {
+        return _context->minmax_func->contain_null();
+    }
+    return false;
+}
+
+size_t RuntimePredicateWrapper::get_in_filter_size() const {
+    return _context->hybrid_set ? _context->hybrid_set->size() : 0;
+}
+
+void RuntimePredicateWrapper::set_disabled() {
+    _context->disabled = true;
+    _context->minmax_func.reset();
+    _context->hybrid_set.reset();
+    _context->bloom_filter_func.reset();
+    _context->bitmap_filter_func.reset();
+}
+
 } // namespace doris
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 441de7d4da3..e38f6901ef4 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -443,4 +443,115 @@ private:
     WrapperPtr _wrapper;
 };
 
+// This class is a wrapper of runtime predicate function
+class RuntimePredicateWrapper {
+public:
+    RuntimePredicateWrapper(const RuntimeFilterParams* params)
+            : RuntimePredicateWrapper(params->column_return_type, 
params->filter_type,
+                                      params->filter_id) {};
+    // for a 'tmp' runtime predicate wrapper
+    // only could called assign method or as a param for merge
+    RuntimePredicateWrapper(PrimitiveType column_type, RuntimeFilterType type, 
uint32_t filter_id)
+            : _column_return_type(column_type),
+              _filter_type(type),
+              _context(new RuntimeFilterContext()),
+              _filter_id(filter_id) {}
+
+    // init runtime filter wrapper
+    // alloc memory to init runtime filter function
+    Status init(const RuntimeFilterParams* params);
+
+    Status change_to_bloom_filter();
+
+    Status init_bloom_filter(const size_t build_bf_cardinality);
+
+    bool get_build_bf_cardinality() const;
+
+    void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const;
+
+    BloomFilterFuncBase* get_bloomfilter() const { return 
_context->bloom_filter_func.get(); }
+
+    void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start);
+
+    void insert_batch(const vectorized::ColumnPtr& column, size_t start) {
+        if (get_real_type() == RuntimeFilterType::BITMAP_FILTER) {
+            bitmap_filter_insert_batch(column, start);
+        } else {
+            insert_fixed_len(column, start);
+        }
+    }
+
+    void bitmap_filter_insert_batch(const vectorized::ColumnPtr column, size_t 
start);
+
+    RuntimeFilterType get_real_type() const {
+        if (_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+            if (_context->hybrid_set) {
+                return RuntimeFilterType::IN_FILTER;
+            }
+            return RuntimeFilterType::BLOOM_FILTER;
+        }
+        return _filter_type;
+    }
+
+    size_t get_bloom_filter_size() const;
+
+    Status get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs,
+                          std::vector<vectorized::VRuntimeFilterPtr>& 
push_exprs,
+                          const TExpr& probe_expr);
+
+    Status merge(const RuntimePredicateWrapper* wrapper);
+
+    Status assign(const PInFilter* in_filter, bool contain_null);
+
+    void set_enable_fixed_len_to_uint32_v2();
+
+    // used by shuffle runtime filter
+    // assign this filter by protobuf
+    Status assign(const PBloomFilter* bloom_filter, 
butil::IOBufAsZeroCopyInputStream* data,
+                  bool contain_null);
+
+    // used by shuffle runtime filter
+    // assign this filter by protobuf
+    Status assign(const PMinMaxFilter* minmax_filter, bool contain_null);
+
+    void get_bloom_filter_desc(char** data, int* filter_length);
+
+    PrimitiveType column_type() { return _column_return_type; }
+
+    bool is_bloomfilter() const { return get_real_type() == 
RuntimeFilterType::BLOOM_FILTER; }
+
+    bool contain_null() const;
+
+    bool is_ignored() const { return _context->ignored; }
+
+    void set_ignored() { _context->ignored = true; }
+
+    bool is_disabled() const { return _context->disabled; }
+
+    void set_disabled();
+
+    void batch_assign(const PInFilter* filter,
+                      void (*assign_func)(std::shared_ptr<HybridSetBase>& 
_hybrid_set,
+                                          PColumnValue&));
+
+    size_t get_in_filter_size() const;
+
+    std::shared_ptr<BitmapFilterFuncBase> get_bitmap_filter() const {
+        return _context->bitmap_filter_func;
+    }
+
+    friend class IRuntimeFilter;
+
+    void set_filter_id(int id);
+
+private:
+    // When a runtime filter received from remote and it is a bloom filter, 
_column_return_type will be invalid.
+    PrimitiveType _column_return_type; // column type
+    RuntimeFilterType _filter_type;
+    int32_t _max_in_num = -1;
+
+    RuntimeFilterContextSPtr _context;
+    uint32_t _filter_id;
+};
+
 } // namespace doris
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index a9dd631e358..b732ae155f7 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -35,11 +35,7 @@ public:
     VRuntimeFilterSlots(
             const std::vector<std::shared_ptr<vectorized::VExprContext>>& 
build_expr_ctxs,
             const std::vector<std::shared_ptr<IRuntimeFilter>>& 
runtime_filters)
-            : _build_expr_context(build_expr_ctxs), 
_runtime_filters(runtime_filters) {
-        for (auto runtime_filter : _runtime_filters) {
-            
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter.get());
-        }
-    }
+            : _build_expr_context(build_expr_ctxs), 
_runtime_filters(runtime_filters) {}
 
     Status send_filter_size(RuntimeState* state, uint64_t hash_table_size,
                             std::shared_ptr<pipeline::CountedFinishDependency> 
dependency) {
@@ -139,62 +135,48 @@ public:
     }
 
     void insert(const vectorized::Block* block) {
-        for (int i = 0; i < _build_expr_context.size(); ++i) {
-            auto iter = _runtime_filters_map.find(i);
-            if (iter == _runtime_filters_map.end()) {
-                continue;
-            }
-
-            int result_column_id = 
_build_expr_context[i]->get_last_result_column_id();
+        for (auto& filter : _runtime_filters) {
+            int result_column_id =
+                    
_build_expr_context[filter->expr_order()]->get_last_result_column_id();
             const auto& column = 
block->get_by_position(result_column_id).column;
-            for (auto* filter : iter->second) {
-                if (filter->get_ignored() || filter->get_disabled()) {
-                    continue;
-                }
-                filter->insert_batch(column, 1);
+            if (filter->get_ignored() || filter->get_disabled()) {
+                continue;
             }
+            filter->insert_batch(column, 1);
         }
     }
 
     // publish runtime filter
     Status publish(RuntimeState* state, bool publish_local) {
-        for (auto& pair : _runtime_filters_map) {
-            for (auto& filter : pair.second) {
-                RETURN_IF_ERROR(filter->publish(state, publish_local));
-            }
+        for (auto& filter : _runtime_filters) {
+            RETURN_IF_ERROR(filter->publish(state, publish_local));
         }
         return Status::OK();
     }
 
     void copy_to_shared_context(vectorized::SharedHashTableContextPtr& 
context) {
-        for (auto& it : _runtime_filters_map) {
-            for (auto& filter : it.second) {
-                context->runtime_filters[filter->filter_id()] = 
filter->get_shared_context_ref();
-            }
+        for (auto& filter : _runtime_filters) {
+            context->runtime_filters[filter->filter_id()] = 
filter->get_shared_context_ref();
         }
     }
 
     Status copy_from_shared_context(vectorized::SharedHashTableContextPtr& 
context) {
-        for (auto& it : _runtime_filters_map) {
-            for (auto& filter : it.second) {
-                auto filter_id = filter->filter_id();
-                auto ret = context->runtime_filters.find(filter_id);
-                if (ret == context->runtime_filters.end()) {
-                    return Status::Aborted("invalid runtime filter id: {}", 
filter_id);
-                }
-                filter->get_shared_context_ref() = ret->second;
+        for (auto& filter : _runtime_filters) {
+            auto filter_id = filter->filter_id();
+            auto ret = context->runtime_filters.find(filter_id);
+            if (ret == context->runtime_filters.end()) {
+                return Status::Aborted("invalid runtime filter id: {}", 
filter_id);
             }
+            filter->get_shared_context_ref() = ret->second;
         }
         return Status::OK();
     }
 
-    bool empty() { return _runtime_filters_map.empty(); }
+    bool empty() { return _runtime_filters.empty(); }
 
 private:
     const std::vector<std::shared_ptr<vectorized::VExprContext>>& 
_build_expr_context;
     std::vector<std::shared_ptr<IRuntimeFilter>> _runtime_filters;
-    // prob_contition index -> [IRuntimeFilter]
-    std::map<int, std::list<IRuntimeFilter*>> _runtime_filters_map;
 };
 
 } // namespace doris
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index bb254aae72b..9ca0f0fcd40 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -295,7 +295,7 @@ Status OperatorXBase::do_projections(RuntimeState* state, 
vectorized::Block* ori
                                                                        
*_output_row_descriptor);
     if (rows != 0) {
         auto& mutable_columns = mutable_block.mutable_columns();
-        DCHECK(mutable_columns.size() == local_state->_projections.size());
+        DCHECK_EQ(mutable_columns.size(), local_state->_projections.size()) << 
debug_string();
         for (int i = 0; i < mutable_columns.size(); ++i) {
             auto result_column_id = -1;
             
RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block, 
&result_column_id));
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index e8e8ed5d9fe..c8e3429107c 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -363,6 +363,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const 
doris::TPipelineFrag
     const auto target_size = request.local_params.size();
     _tasks.resize(target_size);
     _runtime_filter_states.resize(target_size);
+    _runtime_filter_mgr_map.resize(target_size);
     _task_runtime_states.resize(_pipelines.size());
     for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
         _task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks());
@@ -510,7 +511,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const 
doris::TPipelineFrag
         }
         {
             std::lock_guard<std::mutex> l(_state_map_lock);
-            _runtime_filter_mgr_map[fragment_instance_id] = 
std::move(runtime_filter_mgr);
+            _runtime_filter_mgr_map[i] = std::move(runtime_filter_mgr);
         }
         return Status::OK();
     };
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index d672ad6e923..bd3a350d0a2 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -275,8 +275,7 @@ private:
             _op_id_to_le_state;
 
     std::map<PipelineId, Pipeline*> _pip_id_to_pipeline;
-    // UniqueId -> runtime mgr
-    std::map<UniqueId, std::unique_ptr<RuntimeFilterMgr>> 
_runtime_filter_mgr_map;
+    std::vector<std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgr_map;
 
     //Here are two types of runtime states:
     //    - _runtime state is at the Fragment level.
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index 1651eb50cac..2218d70fde6 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -107,12 +107,15 @@ class MultiCoreTaskQueue {
 public:
     explicit MultiCoreTaskQueue(int core_size);
 
+#ifndef BE_TEST
     ~MultiCoreTaskQueue();
-
-    void close();
-
     // Get the task by core id.
     PipelineTask* take(int core_id);
+#else
+    virtual ~MultiCoreTaskQueue();
+    virtual PipelineTask* take(int core_id);
+#endif
+    void close();
 
     // TODO combine these methods to `push_back(task, core_id = -1)`
     Status push_back(PipelineTask* task);
diff --git a/be/test/pipeline/dummy_task_queue.h 
b/be/test/pipeline/dummy_task_queue.h
new file mode 100644
index 00000000000..bace0ecae96
--- /dev/null
+++ b/be/test/pipeline/dummy_task_queue.h
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/task_queue.h"
+
+namespace doris::pipeline {
+
+class DummyTaskQueue final : public MultiCoreTaskQueue {
+    explicit DummyTaskQueue(int core_size) : MultiCoreTaskQueue(core_size) {}
+    ~DummyTaskQueue() override = default;
+    PipelineTask* take(int core_id) override {
+        PipelineTask* task = nullptr;
+        do {
+            DCHECK(_prio_task_queues.size() > core_id)
+                    << " list size: " << _prio_task_queues.size() << " 
core_id: " << core_id
+                    << " _core_size: " << _core_size << " _next_core: " << 
_next_core.load();
+            task = _prio_task_queues[core_id].try_take(false);
+            if (task) {
+                break;
+            }
+            task = _steal_take(core_id);
+            if (task) {
+                break;
+            }
+            task = _prio_task_queues[core_id].take(1);
+            if (task) {
+                break;
+            }
+        } while (false);
+        if (task) {
+            task->pop_out_runnable_queue();
+        }
+        return task;
+    }
+};
+} // namespace doris::pipeline
diff --git a/be/test/pipeline/pipeline_test.cpp 
b/be/test/pipeline/pipeline_test.cpp
index d1c0af85f58..6466d6c2927 100644
--- a/be/test/pipeline/pipeline_test.cpp
+++ b/be/test/pipeline/pipeline_test.cpp
@@ -22,12 +22,15 @@
 
 #include "common/exception.h"
 #include "common/status.h"
+#include "dummy_task_queue.h"
+#include "exprs/bloom_filter_func.h"
+#include "exprs/hybrid_set.h"
+#include "exprs/runtime_filter.h"
 #include "pipeline/dependency.h"
 #include "pipeline/exec/exchange_source_operator.h"
 #include "pipeline/exec/hashjoin_build_sink.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
 #include "pipeline/pipeline_fragment_context.h"
-#include "pipeline/task_queue.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
 #include "thrift_builder.h"
@@ -45,10 +48,13 @@ class PipelineTest : public testing::Test {
 public:
     PipelineTest()
             : _obj_pool(new ObjectPool()),
-              _mgr(std::make_unique<doris::vectorized::VDataStreamMgr>()) {
+              _mgr(std::make_unique<doris::vectorized::VDataStreamMgr>()) {}
+    ~PipelineTest() override = default;
+    void SetUp() override {
         _query_options = TQueryOptionsBuilder()
                                  .set_enable_local_exchange(true)
                                  .set_enable_local_shuffle(true)
+                                 .set_runtime_filter_max_in_num(15)
                                  .build();
         auto fe_address = TNetworkAddress();
         fe_address.hostname = LOCALHOST;
@@ -56,10 +62,12 @@ public:
         _query_ctx = QueryContext::create_shared(_query_id, 
ExecEnv::GetInstance(), _query_options,
                                                  fe_address, true, fe_address,
                                                  
QuerySource::INTERNAL_FRONTEND);
+        _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+                TRuntimeFilterParamsBuilder().build());
         ExecEnv::GetInstance()->set_stream_mgr(_mgr.get());
-        _task_queue = std::make_unique<MultiCoreTaskQueue>(1);
+        _task_queue = std::make_unique<DummyTaskQueue>(1);
     }
-    ~PipelineTest() override = default;
+    void TearDown() override {}
 
 private:
     std::shared_ptr<Pipeline> _build_pipeline(int num_instances, Pipeline* 
parent = nullptr) {
@@ -111,6 +119,7 @@ private:
         _pipeline_profiles.clear();
         _pipeline_tasks.clear();
         _runtime_states.clear();
+        _runtime_filter_mgrs.clear();
     }
     int _next_fragment_id() { return _fragment_id++; }
     int _next_node_id() { return _next_node_idx++; }
@@ -125,7 +134,7 @@ private:
     std::shared_ptr<QueryContext> _query_ctx;
     TUniqueId _query_id = TUniqueId();
     TQueryOptions _query_options;
-    std::unique_ptr<MultiCoreTaskQueue> _task_queue;
+    std::unique_ptr<DummyTaskQueue> _task_queue;
 
     // Fragment level
     // Fragment0 -> Fragment1
@@ -149,6 +158,9 @@ private:
     std::vector<std::vector<std::unique_ptr<PipelineTask>>> _pipeline_tasks;
     std::vector<std::vector<std::unique_ptr<RuntimeState>>> _runtime_states;
 
+    // Instance level
+    std::vector<std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgrs;
+
     const std::string LOCALHOST = BackendOptions::get_localhost();
     const int DUMMY_PORT = config::brpc_port;
 };
@@ -184,7 +196,7 @@ TEST_F(PipelineTest, HAPPY_PATH) {
                                                            
.set_scalar_type(TPrimitiveType::INT)
                                                            .build())
                                         .build())
-                        .set_nullIndicatorBit(0)
+                        .set_nullIndicatorBit(-1)
                         .set_byteOffset(0)
                         .set_slotIdx(0)
                         .set_isMaterialized(true)
@@ -489,7 +501,7 @@ TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) {
                                                            
.set_scalar_type(TPrimitiveType::INT)
                                                            .build())
                                         .build())
-                        .set_nullIndicatorBit(0)
+                        .set_nullIndicatorBit(-1)
                         .set_byteOffset(0)
                         .set_slotIdx(0)
                         .set_isMaterialized(true)
@@ -585,7 +597,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
                                                            
.set_scalar_type(TPrimitiveType::INT)
                                                            .build())
                                         .build())
-                        .set_nullIndicatorBit(0)
+                        .set_nullIndicatorBit(-1)
                         .set_byteOffset(0)
                         .set_slotIdx(0)
                         .set_isMaterialized(true)
@@ -604,7 +616,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
                                                            
.set_scalar_type(TPrimitiveType::INT)
                                                            .build())
                                         .build())
-                        .set_nullIndicatorBit(0)
+                        .set_nullIndicatorBit(-1)
                         .set_byteOffset(0)
                         .set_slotIdx(0)
                         .set_isMaterialized(true)
@@ -623,7 +635,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
                                                            
.set_scalar_type(TPrimitiveType::INT)
                                                            .build())
                                         .build())
-                        .set_nullIndicatorBit(0)
+                        .set_nullIndicatorBit(-1)
                         .set_byteOffset(0)
                         .set_slotIdx(0)
                         .set_isMaterialized(true)
@@ -640,7 +652,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
                                                            
.set_scalar_type(TPrimitiveType::INT)
                                                            .build())
                                         .build())
-                        .set_nullIndicatorBit(0)
+                        .set_nullIndicatorBit(-1)
                         .set_byteOffset(4)
                         .set_slotIdx(1)
                         .set_isMaterialized(true)
@@ -720,6 +732,73 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
                                     .append_vintermediate_tuple_id_list(1)
                                     .build())
                     .append_row_tuples(2, false)
+                    .append_projections(
+                            TExprBuilder()
+                                    .append_nodes(
+                                            TExprNodeBuilder(
+                                                    TExprNodeType::SLOT_REF,
+                                                    TTypeDescBuilder()
+                                                            .set_types(
+                                                                    
TTypeNodeBuilder()
+                                                                            
.set_type(
+                                                                               
     TTypeNodeType::
+                                                                               
             SCALAR)
+                                                                            
.set_scalar_type(
+                                                                               
     TPrimitiveType::
+                                                                               
             INT)
+                                                                            
.build())
+                                                            .build(),
+                                                    0)
+                                                    
.set_slot_ref(TSlotRefBuilder(0, 0).build())
+                                                    .build())
+                                    .build())
+                    .append_projections(
+                            TExprBuilder()
+                                    .append_nodes(
+                                            TExprNodeBuilder(
+                                                    TExprNodeType::SLOT_REF,
+                                                    TTypeDescBuilder()
+                                                            .set_types(
+                                                                    
TTypeNodeBuilder()
+                                                                            
.set_type(
+                                                                               
     TTypeNodeType::
+                                                                               
             SCALAR)
+                                                                            
.set_scalar_type(
+                                                                               
     TPrimitiveType::
+                                                                               
             INT)
+                                                                            
.build())
+                                                            .build(),
+                                                    0)
+                                                    
.set_slot_ref(TSlotRefBuilder(1, 1).build())
+                                                    .build())
+                                    .build())
+                    .append_runtime_filters(
+                            TRuntimeFilterDescBuilder(
+                                    0,
+                                    TExprBuilder()
+                                            .append_nodes(
+                                                    TExprNodeBuilder(
+                                                            
TExprNodeType::SLOT_REF,
+                                                            TTypeDescBuilder()
+                                                                    .set_types(
+                                                                            
TTypeNodeBuilder()
+                                                                               
     .set_type(
+                                                                               
             TTypeNodeType::
+                                                                               
                     SCALAR)
+                                                                               
     .set_scalar_type(
+                                                                               
             TPrimitiveType::
+                                                                               
                     INT)
+                                                                               
     .build())
+                                                                    .build(),
+                                                            0)
+                                                            .set_slot_ref(
+                                                                    
TSlotRefBuilder(1, 1).build())
+                                                            .build())
+                                            .build(),
+                                    0, std::map<TPlanNodeId, TExpr> {})
+                                    .set_bloom_filter_size_bytes(1048576)
+                                    .set_build_bf_exactly(false)
+                                    .build())
                     .build();
 
     {
@@ -850,6 +929,12 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
     {
         // Build pipeline task
         int task_id = 0;
+        _runtime_filter_mgrs.resize(parallelism);
+        for (int j = 0; j < parallelism; j++) {
+            auto runtime_filter_state = 
RuntimeFilterParamsContext::create(_query_ctx.get());
+            _runtime_filter_mgrs[j] = std::make_unique<RuntimeFilterMgr>(
+                    _query_id, runtime_filter_state, 
_query_ctx->query_mem_tracker, false);
+        }
         for (size_t i = 0; i < _pipelines.size(); i++) {
             EXPECT_EQ(_pipelines[i]->id(), i);
             _pipeline_profiles[_pipelines[i]->id()] = 
std::make_shared<RuntimeProfile>(
@@ -871,6 +956,8 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
                 local_runtime_state->set_task_num(_pipelines[i]->num_tasks());
                 local_runtime_state->set_task_execution_context(
                         
std::static_pointer_cast<TaskExecutionContext>(_context.back()));
+                
local_runtime_state->set_runtime_filter_mgr(_runtime_filter_mgrs[j].get());
+                
_runtime_filter_mgrs[j]->_state->set_state(local_runtime_state.get());
                 std::map<int, 
std::pair<std::shared_ptr<LocalExchangeSharedState>,
                                         std::shared_ptr<Dependency>>>
                         le_state_map;
@@ -891,6 +978,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
     }
 
     std::shared_ptr<vectorized::VDataStreamRecvr> downstream_recvr;
+    auto downstream_pipeline_profile = 
std::make_shared<RuntimeProfile>("Downstream Pipeline");
     {
         // Build downstream recvr
         auto context = _build_fragment_context();
@@ -900,13 +988,12 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
         downstream_runtime_state->set_task_execution_context(
                 std::static_pointer_cast<TaskExecutionContext>(context));
 
-        auto downstream_pipeline_profile = 
std::make_shared<RuntimeProfile>("Downstream Pipeline");
         auto* memory_used_counter = 
downstream_pipeline_profile->AddHighWaterMarkCounter(
                 "MemoryUsage", TUnit::BYTES, "", 1);
         downstream_recvr = ExecEnv::GetInstance()->_vstream_mgr->create_recvr(
                 downstream_runtime_state.get(), memory_used_counter,
                 _pipelines.front()->operators().back()->row_desc(), 
dest_ins_id, dest_node_id,
-                parallelism, downstream_pipeline_profile.get(), false, 20480);
+                parallelism, downstream_pipeline_profile.get(), false, 
2048000);
     }
     for (size_t i = 0; i < _pipelines.size(); i++) {
         for (int j = 0; j < parallelism; j++) {
@@ -914,21 +1001,16 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
             
EXPECT_EQ(_pipeline_tasks[_pipelines[i]->id()][j]->prepare(scan_ranges, j, 
tsink,
                                                                        
_query_ctx.get()),
                       Status::OK());
+            if (i == 1) {
+                auto& local_state = _runtime_states[i][j]
+                                            ->get_sink_local_state()
+                                            
->cast<HashJoinBuildSinkLocalState>();
+                EXPECT_EQ(local_state._runtime_filters.size(), 1);
+                EXPECT_EQ(local_state._should_build_hash_table, true);
+            }
         }
     }
 
-    // Construct input block
-    vectorized::Block block;
-    {
-        vectorized::DataTypePtr int_type = 
std::make_shared<vectorized::DataTypeInt32>();
-
-        auto int_col0 = vectorized::ColumnInt32::create();
-        int_col0->insert_many_vals(1, 10);
-        block.insert({std::move(int_col0), int_type, "test_int_col0"});
-    }
-    auto block_mem_usage = block.allocated_bytes();
-    EXPECT_GT(block_mem_usage - 1, 0);
-
     {
         for (size_t i = 0; i < _pipelines.size(); i++) {
             for (int j = 0; j < parallelism; j++) {
@@ -960,24 +1042,146 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
     {
         for (int i = _pipelines.size() - 1; i >= 0; i--) {
             for (int j = 0; j < parallelism; j++) {
+                bool eos = false;
+                EXPECT_EQ(_pipeline_tasks[i][j]->execute(&eos), Status::OK());
+                EXPECT_EQ(_pipeline_tasks[i][j]->_opened, true);
+                EXPECT_EQ(eos, false);
+            }
+        }
+    }
+    for (int i = _pipelines.size() - 1; i >= 0; i--) {
+        for (int j = 0; j < parallelism; j++) {
+            {
+                vectorized::Block block;
+                {
+                    vectorized::DataTypePtr int_type =
+                            std::make_shared<vectorized::DataTypeInt32>();
+
+                    auto int_col0 = vectorized::ColumnInt32::create();
+                    if (j == 0 || i == 0) {
+                        int_col0->insert_many_vals(j, 10);
+                    } else {
+                        size_t ndv = 16;
+                        for (size_t n = 0; n < ndv; n++) {
+                            int_col0->insert_many_vals(n, 1);
+                        }
+                    }
+
+                    block.insert({std::move(int_col0), int_type, 
"test_int_col0"});
+                }
                 auto& local_state =
                         _runtime_states[i][j]
                                 
->get_local_state(_pipelines[i]->operators().front()->operator_id())
                                 ->cast<ExchangeLocalState>();
-                
local_state.stream_recvr->_sender_queues[0]->decrement_senders(0);
-
-                bool eos = false;
-                EXPECT_EQ(_pipeline_tasks[i][j]->execute(&eos), Status::OK());
-                EXPECT_EQ(_pipeline_tasks[i][j]->_is_blocked(), false);
-                EXPECT_EQ(eos, true);
-                EXPECT_EQ(_pipeline_tasks[i][j]->is_pending_finish(), false);
-                EXPECT_EQ(_pipeline_tasks[i][j]->close(Status::OK()), 
Status::OK());
+                
EXPECT_EQ(local_state.stream_recvr->_sender_queues[0]->_source_dependency->ready(),
+                          false);
+                EXPECT_EQ(local_state.stream_recvr->_sender_queues[0]
+                                  ->_source_dependency->_blocked_task.size(),
+                          i == 1 ? 1 : 0);
+                local_state.stream_recvr->_sender_queues[0]->add_block(&block, 
true);
+            }
+        }
+    }
+    {
+        // Pipeline 1 is blocked by exchange dependency so tasks are ready 
after data reached.
+        // Pipeline 0 is blocked by hash join dependency and is still waiting 
for upstream tasks done.
+        for (int j = 0; j < parallelism; j++) {
+            // Task is ready and be push into runnable task queue.
+            EXPECT_EQ(_task_queue->take(0) != nullptr, true);
+        }
+        EXPECT_EQ(_task_queue->take(0), nullptr);
+        for (int j = 0; j < parallelism; j++) {
+            EXPECT_EQ(_pipeline_tasks[1][j]->_is_blocked(), false);
+        }
+    }
+    {
+        // Pipeline 1 ran first and build hash table in join build operator.
+        for (int j = 0; j < parallelism; j++) {
+            bool eos = false;
+            EXPECT_EQ(_pipeline_tasks[1][j]->execute(&eos), Status::OK());
+            EXPECT_EQ(eos, false);
+        }
+        for (int j = 0; j < parallelism; j++) {
+            auto& local_state =
+                    _runtime_states[1][j]
+                            
->get_local_state(_pipelines[1]->operators().front()->operator_id())
+                            ->cast<ExchangeLocalState>();
+            local_state.stream_recvr->_sender_queues[0]->decrement_senders(0);
+
+            bool eos = false;
+            EXPECT_EQ(_pipeline_tasks[1][j]->execute(&eos), Status::OK());
+            EXPECT_EQ(_pipeline_tasks[1][j]->_is_blocked(), false);
+            EXPECT_EQ(eos, true);
+            auto& sink_local_state = _runtime_states[1][j]
+                                             ->get_sink_local_state()
+                                             
->cast<HashJoinBuildSinkLocalState>();
+            EXPECT_EQ(sink_local_state._runtime_filters_disabled, false);
+            
EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters.size(), 1);
+            
EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0]
+                              ->need_sync_filter_size(),
+                      false);
+            
EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0]
+                              ->_runtime_filter_type,
+                      RuntimeFilterType::IN_OR_BLOOM_FILTER);
+            EXPECT_EQ(_pipeline_tasks[1][j]->is_pending_finish(), false);
+            EXPECT_EQ(_pipeline_tasks[1][j]->close(Status::OK()), 
Status::OK());
+            
EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0]->get_real_type(),
+                      j == 0 ? RuntimeFilterType::IN_FILTER : 
RuntimeFilterType::BLOOM_FILTER)
+                    << "  " << j << " "
+                    << IRuntimeFilter::to_string(
+                               
sink_local_state._runtime_filter_slots->_runtime_filters[0]
+                                       ->get_real_type());
+            
EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0]
+                              ->_wrapper->is_ignored(),
+                      false);
+            if (j == 0) {
+                
EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0]
+                                  ->_wrapper->_context->hybrid_set->size(),
+                          1);
+            } else {
+                
EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0]
+                                  
->_wrapper->_context->bloom_filter_func->_build_bf_exactly,
+                          false);
+
+                
EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0]
+                                  
->_wrapper->_context->bloom_filter_func->_bloom_filter_length,
+                          1048576);
             }
         }
-        {
-            
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.size(), 0);
-            
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_num_remaining_senders, 0);
+    }
+    {
+        // Pipeline 0 ran once hash table is built.
+        for (int j = 0; j < parallelism; j++) {
+            EXPECT_EQ(_pipeline_tasks[0][j]->_is_blocked(), false);
         }
+        for (int j = 0; j < parallelism; j++) {
+            bool eos = false;
+            EXPECT_EQ(_pipeline_tasks[0][j]->execute(&eos), Status::OK());
+            EXPECT_EQ(eos, false);
+        }
+        for (int j = 0; j < parallelism; j++) {
+            auto& local_state =
+                    _runtime_states[0][j]
+                            
->get_local_state(_pipelines[0]->operators().front()->operator_id())
+                            ->cast<ExchangeLocalState>();
+            local_state.stream_recvr->_sender_queues[0]->decrement_senders(0);
+
+            bool eos = false;
+            EXPECT_EQ(_pipeline_tasks[0][j]->execute(&eos), Status::OK());
+            EXPECT_EQ(_pipeline_tasks[0][j]->_is_blocked(), false);
+            EXPECT_EQ(eos, true);
+            EXPECT_EQ(_pipeline_tasks[0][j]->is_pending_finish(), false);
+            EXPECT_EQ(_pipeline_tasks[0][j]->close(Status::OK()), 
Status::OK());
+        }
+    }
+    {
+        // [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] join [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] 
produces 100 rows in instance 0.
+        // [2, 2, 2, 2, 2, 2, 2, 2, 2, 2] join [2, 2, 2, 2, 2, 2, 2, 2, 2, 2] 
produces 100 rows in instance 1.
+        EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.size(), 2);
+        
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.front()._block->rows(),
+                  10 * 10);
+        
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.back()._block->rows(),
 10);
+        EXPECT_EQ(downstream_recvr->_sender_queues[0]->_num_remaining_senders, 
0);
     }
     downstream_recvr->close();
 }
diff --git a/be/test/pipeline/thrift_builder.h 
b/be/test/pipeline/thrift_builder.h
index bdcead88616..1af1ca760bd 100644
--- a/be/test/pipeline/thrift_builder.h
+++ b/be/test/pipeline/thrift_builder.h
@@ -58,7 +58,7 @@ public:
         return *this;
     }
     TQueryOptionsBuilder& set_enable_new_shuffle_hash_method(bool 
enable_new_shuffle_hash_method) {
-        _query_options.enable_new_shuffle_hash_method = 
enable_new_shuffle_hash_method;
+        
_query_options.__set_enable_new_shuffle_hash_method(enable_new_shuffle_hash_method);
         return *this;
     }
     TQueryOptionsBuilder& set_enable_local_shuffle(bool enable_local_shuffle) {
@@ -66,11 +66,23 @@ public:
         return *this;
     }
     TQueryOptionsBuilder& set_runtime_filter_wait_infinitely(bool 
runtime_filter_wait_infinitely) {
-        _query_options.runtime_filter_wait_infinitely = 
runtime_filter_wait_infinitely;
+        
_query_options.__set_runtime_filter_wait_infinitely(runtime_filter_wait_infinitely);
         return *this;
     }
     TQueryOptionsBuilder& set_enable_local_merge_sort(bool 
enable_local_merge_sort) {
-        _query_options.enable_local_merge_sort = enable_local_merge_sort;
+        _query_options.__set_enable_local_merge_sort(enable_local_merge_sort);
+        return *this;
+    }
+    TQueryOptionsBuilder& set_runtime_filter_max_in_num(int64_t 
runtime_filter_max_in_num) {
+        
_query_options.__set_runtime_filter_max_in_num(runtime_filter_max_in_num);
+        return *this;
+    }
+    TQueryOptionsBuilder& set_runtime_bloom_filter_min_size(int64_t 
runtime_bloom_filter_min_size) {
+        
_query_options.__set_runtime_bloom_filter_min_size(runtime_bloom_filter_min_size);
+        return *this;
+    }
+    TQueryOptionsBuilder& set_runtime_bloom_filter_max_size(int64_t 
runtime_bloom_filter_max_size) {
+        
_query_options.__set_runtime_bloom_filter_max_size(runtime_bloom_filter_max_size);
         return *this;
     }
 
@@ -117,6 +129,16 @@ public:
         _plan_node.__set_output_tuple_id(output_tuple_id);
         return *this;
     }
+    TPlanNodeBuilder& append_projections(TExpr& projections) {
+        _plan_node.__isset.projections = true;
+        _plan_node.projections.push_back(projections);
+        return *this;
+    }
+    TPlanNodeBuilder& append_runtime_filters(TRuntimeFilterDesc& 
runtime_filter) {
+        _plan_node.__isset.runtime_filters = true;
+        _plan_node.runtime_filters.push_back(runtime_filter);
+        return *this;
+    }
 
     TPlanNode& build() { return _plan_node; }
 
@@ -127,6 +149,55 @@ private:
     TPlanNode _plan_node;
 };
 
+class TRuntimeFilterDescBuilder {
+public:
+    explicit TRuntimeFilterDescBuilder(
+            int filter_id, TExpr& src_expr, int expr_order,
+            std::map<TPlanNodeId, TExpr> planId_to_target_expr, bool 
is_broadcast_join = false,
+            bool has_local_targets = true, bool has_remote_targets = false,
+            TRuntimeFilterType::type type = TRuntimeFilterType::IN_OR_BLOOM)
+            : _desc() {
+        _desc.__set_filter_id(filter_id);
+        _desc.__set_src_expr(src_expr);
+        _desc.__set_expr_order(expr_order);
+        _desc.__set_planId_to_target_expr(planId_to_target_expr);
+        _desc.__set_is_broadcast_join(is_broadcast_join);
+        _desc.__set_has_local_targets(has_local_targets);
+        _desc.__set_has_remote_targets(has_remote_targets);
+        _desc.__set_type(type);
+    }
+    explicit TRuntimeFilterDescBuilder(
+            int filter_id, TExpr&& src_expr, int expr_order,
+            std::map<TPlanNodeId, TExpr> planId_to_target_expr, bool 
is_broadcast_join = false,
+            bool has_local_targets = true, bool has_remote_targets = false,
+            TRuntimeFilterType::type type = TRuntimeFilterType::IN_OR_BLOOM)
+            : _desc() {
+        _desc.__set_filter_id(filter_id);
+        _desc.__set_src_expr(src_expr);
+        _desc.__set_expr_order(expr_order);
+        _desc.__set_planId_to_target_expr(planId_to_target_expr);
+        _desc.__set_is_broadcast_join(is_broadcast_join);
+        _desc.__set_has_local_targets(has_local_targets);
+        _desc.__set_has_remote_targets(has_remote_targets);
+        _desc.__set_type(type);
+    }
+
+    TRuntimeFilterDescBuilder& set_bloom_filter_size_bytes(int64_t 
bloom_filter_size_bytes) {
+        _desc.__set_bloom_filter_size_bytes(bloom_filter_size_bytes);
+        return *this;
+    }
+    TRuntimeFilterDescBuilder& set_build_bf_exactly(bool build_bf_exactly) {
+        _desc.__set_build_bf_exactly(build_bf_exactly);
+        return *this;
+    }
+    TRuntimeFilterDesc& build() { return _desc; }
+    TRuntimeFilterDescBuilder(const TRuntimeFilterDescBuilder&) = delete;
+    void operator=(const TRuntimeFilterDescBuilder&) = delete;
+
+private:
+    TRuntimeFilterDesc _desc;
+};
+
 class TExchangeNodeBuilder {
 public:
     explicit TExchangeNodeBuilder() : _plan_node() {}
@@ -269,7 +340,10 @@ private:
 
 class TTypeDescBuilder {
 public:
-    explicit TTypeDescBuilder() : _desc() {}
+    explicit TTypeDescBuilder() : _desc() {
+        _desc.__set_result_is_nullable(false);
+        _desc.__set_is_nullable(false);
+    }
 
     TTypeDescBuilder& set_types(TTypeNode type_node) {
         _desc.types.push_back(type_node);
@@ -423,6 +497,7 @@ public:
         _expr_node.__set_type(type);
         _expr_node.__set_num_children(num_children);
         _expr_node.__set_opcode(opcode);
+        _expr_node.__set_is_nullable(false);
     }
     explicit TExprNodeBuilder(TExprNodeType::type node_type, TTypeDesc&& type, 
int num_children,
                               TExprOpcode::type opcode = 
TExprOpcode::INVALID_OPCODE)
@@ -477,4 +552,27 @@ private:
     TEqJoinCondition _eq_conjuncts;
 };
 
+class TRuntimeFilterParamsBuilder {
+public:
+    explicit TRuntimeFilterParamsBuilder(
+            TNetworkAddress runtime_filter_merge_addr = TNetworkAddress(),
+            std::map<int, std::vector<TRuntimeFilterTargetParams>> 
rid_to_target_param = {},
+            std::map<int, TRuntimeFilterDesc> rid_to_runtime_filter = {},
+            std::map<int, int> runtime_filter_builder_num = {},
+            std::map<int, std::vector<TRuntimeFilterTargetParamsV2>> 
rid_to_target_paramv2 = {})
+            : _params() {
+        _params.__set_runtime_filter_merge_addr(runtime_filter_merge_addr);
+        _params.__set_rid_to_target_param(rid_to_target_param);
+        _params.__set_rid_to_runtime_filter(rid_to_runtime_filter);
+        _params.__set_runtime_filter_builder_num(runtime_filter_builder_num);
+        _params.__set_rid_to_target_paramv2(rid_to_target_paramv2);
+    }
+    TRuntimeFilterParams& build() { return _params; }
+    TRuntimeFilterParamsBuilder(const TRuntimeFilterParamsBuilder&) = delete;
+    void operator=(const TRuntimeFilterParamsBuilder&) = delete;
+
+private:
+    TRuntimeFilterParams _params;
+};
+
 } // namespace doris::pipeline
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 70db19bfa33..9ab20842c25 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -392,18 +392,21 @@ struct TRuntimeFilterTargetParamsV2 {
 }
 
 struct TRuntimeFilterParams {
-  // Runtime filter merge instance address
+  // Runtime filter merge instance address. Used if this filter has a remote 
target
   1: optional Types.TNetworkAddress runtime_filter_merge_addr
 
   // deprecated
   2: optional map<i32, list<TRuntimeFilterTargetParams>> rid_to_target_param
 
   // Runtime filter ID to the runtime filter desc
+  // Used if this filter has a remote target
   3: optional map<i32, PlanNodes.TRuntimeFilterDesc> rid_to_runtime_filter
 
   // Number of Runtime filter producers
+  // Used if this filter has a remote target
   4: optional map<i32, i32> runtime_filter_builder_num
 
+  // Used if this filter has a remote target
   5: optional map<i32, list<TRuntimeFilterTargetParamsV2>> 
rid_to_target_paramv2
 }
 
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index be920c2baf5..c7f8d247d7c 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1252,7 +1252,7 @@ struct TRuntimeFilterDesc {
   // The order of Expr in join predicate
   3: required i32 expr_order
 
-  // Map of target node id to the target expr
+  // Map of target node id to the target expr. Used by consumer
   4: required map<Types.TPlanNodeId, Exprs.TExpr> planId_to_target_expr
 
   // Indicates if the source join node of this filter is a broadcast or


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to