This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e998ce8293d6bf856e862a6b39871e7e33cbd841
Author: Pxl <pxl...@qq.com>
AuthorDate: Mon Apr 8 17:01:11 2024 +0800

    [Improvement](runtime-filter) support sync join node build side's size to 
init bloom runtime filter (#32180)
    
    support sync join node build side's size to init bloom runtime filter
---
 be/src/exprs/bloom_filter_func.h                   |   4 +-
 be/src/exprs/runtime_filter.cpp                    | 457 ++++++++++++---------
 be/src/exprs/runtime_filter.h                      |  25 +-
 be/src/exprs/runtime_filter_slots.h                | 167 +++-----
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  86 ++--
 be/src/pipeline/exec/hashjoin_build_sink.h         |   9 +-
 be/src/pipeline/exec/join_build_sink_operator.cpp  |   3 +-
 be/src/pipeline/exec/join_build_sink_operator.h    |   1 +
 be/src/pipeline/exec/olap_scan_operator.cpp        |   1 -
 be/src/pipeline/pipeline_x/dependency.cpp          |  10 +
 be/src/pipeline/pipeline_x/dependency.h            |  31 +-
 be/src/runtime/fragment_mgr.cpp                    | 159 ++++---
 be/src/runtime/fragment_mgr.h                      |   4 +
 be/src/runtime/runtime_filter_mgr.cpp              |  95 ++++-
 be/src/runtime/runtime_filter_mgr.h                |  32 +-
 be/src/service/backend_options.cpp                 |   1 +
 be/src/service/internal_service.cpp                |  33 +-
 be/src/service/internal_service.h                  |  10 +
 be/src/util/brpc_client_cache.h                    |   4 +
 be/src/vec/exec/join/vhash_join_node.cpp           |   6 +-
 be/src/vec/exec/join/vhash_join_node.h             |  12 +-
 be/src/vec/exec/join/vjoin_node_base.cpp           |   1 +
 be/src/vec/exec/join/vjoin_node_base.h             |   1 +
 .../vec/runtime/shared_hash_table_controller.cpp   |  11 +-
 be/src/vec/runtime/shared_hash_table_controller.h  |  20 +-
 .../org/apache/doris/planner/RuntimeFilter.java    |   4 +
 .../java/org/apache/doris/qe/SessionVariable.java  |   9 +
 gensrc/proto/internal_service.proto                |  26 ++
 gensrc/proto/types.proto                           |   5 +
 gensrc/thrift/PlanNodes.thrift                     |   2 +
 30 files changed, 782 insertions(+), 447 deletions(-)

diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index 1473d4a4288..10d30212ff8 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -108,7 +108,9 @@ public:
 
     Status init_with_fixed_length() { return 
init_with_fixed_length(_bloom_filter_length); }
 
-    Status init_with_cardinality(const size_t build_bf_cardinality, int id = 
0) {
+    bool get_build_bf_cardinality() const { return _build_bf_exactly; }
+
+    Status init_with_cardinality(const size_t build_bf_cardinality) {
         if (_build_bf_exactly) {
             // Use the same algorithm as 
org.apache.doris.planner.RuntimeFilter#calculateFilterSize
             constexpr double fpp = 0.05;
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 1c5c3f7d4a2..67349008dac 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -290,6 +290,7 @@ public:
             : _pool(pool),
               _column_return_type(column_type),
               _filter_type(type),
+              _context(new RuntimeFilterContext()),
               _filter_id(filter_id),
               _build_bf_exactly(build_bf_exactly) {}
 
@@ -299,37 +300,36 @@ public:
         _max_in_num = params->max_in_num;
         switch (_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
-            _context.hybrid_set.reset(create_set(_column_return_type));
-            _context.hybrid_set->set_null_aware(params->null_aware);
+            _context->hybrid_set.reset(create_set(_column_return_type));
+            _context->hybrid_set->set_null_aware(params->null_aware);
             break;
         }
         // Only use in nested loop join not need set null aware
         case RuntimeFilterType::MIN_FILTER:
         case RuntimeFilterType::MAX_FILTER: {
-            
_context.minmax_func.reset(create_minmax_filter(_column_return_type));
+            
_context->minmax_func.reset(create_minmax_filter(_column_return_type));
             break;
         }
         case RuntimeFilterType::MINMAX_FILTER: {
-            
_context.minmax_func.reset(create_minmax_filter(_column_return_type));
-            _context.minmax_func->set_null_aware(params->null_aware);
+            
_context->minmax_func.reset(create_minmax_filter(_column_return_type));
+            _context->minmax_func->set_null_aware(params->null_aware);
             break;
         }
         case RuntimeFilterType::BLOOM_FILTER: {
-            _is_bloomfilter = true;
-            
_context.bloom_filter_func.reset(create_bloom_filter(_column_return_type));
-            _context.bloom_filter_func->init_params(params);
+            
_context->bloom_filter_func.reset(create_bloom_filter(_column_return_type));
+            _context->bloom_filter_func->init_params(params);
             return Status::OK();
         }
         case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
-            _context.hybrid_set.reset(create_set(_column_return_type));
-            _context.hybrid_set->set_null_aware(params->null_aware);
-            
_context.bloom_filter_func.reset(create_bloom_filter(_column_return_type));
-            _context.bloom_filter_func->init_params(params);
+            _context->hybrid_set.reset(create_set(_column_return_type));
+            _context->hybrid_set->set_null_aware(params->null_aware);
+            
_context->bloom_filter_func.reset(create_bloom_filter(_column_return_type));
+            _context->bloom_filter_func->init_params(params);
             return Status::OK();
         }
         case RuntimeFilterType::BITMAP_FILTER: {
-            
_context.bitmap_filter_func.reset(create_bitmap_filter(_column_return_type));
-            
_context.bitmap_filter_func->set_not_in(params->bitmap_filter_not_in);
+            
_context->bitmap_filter_func.reset(create_bitmap_filter(_column_return_type));
+            
_context->bitmap_filter_func->set_not_in(params->bitmap_filter_not_in);
             return Status::OK();
         }
         default:
@@ -342,61 +342,66 @@ public:
         CHECK(_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER)
                 << "Can not change to bloom filter because of runtime filter 
type is "
                 << IRuntimeFilter::to_string(_filter_type);
-        _is_bloomfilter = true;
-        BloomFilterFuncBase* bf = _context.bloom_filter_func.get();
+        BloomFilterFuncBase* bf = _context->bloom_filter_func.get();
         if (need_init_bf) {
             // BloomFilter may be not init
             RETURN_IF_ERROR(bf->init_with_fixed_length());
             insert_to_bloom_filter(bf);
         }
         // release in filter
-        _context.hybrid_set.reset();
+        _context->hybrid_set.reset();
         return Status::OK();
     }
 
     Status init_bloom_filter(const size_t build_bf_cardinality) {
         DCHECK(_filter_type == RuntimeFilterType::BLOOM_FILTER ||
                _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER);
-        return 
_context.bloom_filter_func->init_with_cardinality(build_bf_cardinality);
+        return 
_context->bloom_filter_func->init_with_cardinality(build_bf_cardinality);
+    }
+
+    bool get_build_bf_cardinality() const {
+        DCHECK(_filter_type == RuntimeFilterType::BLOOM_FILTER ||
+               _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER);
+        return _context->bloom_filter_func->get_build_bf_cardinality();
     }
 
     void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const {
-        if (_context.hybrid_set->size() > 0) {
-            auto* it = _context.hybrid_set->begin();
+        if (_context->hybrid_set->size() > 0) {
+            auto* it = _context->hybrid_set->begin();
             while (it->has_next()) {
                 bloom_filter->insert(it->get_value());
                 it->next();
             }
         }
-        if (_context.hybrid_set->contain_null()) {
+        if (_context->hybrid_set->contain_null()) {
             bloom_filter->set_contain_null_and_null_aware();
         }
     }
 
-    BloomFilterFuncBase* get_bloomfilter() const { return 
_context.bloom_filter_func.get(); }
+    BloomFilterFuncBase* get_bloomfilter() const { return 
_context->bloom_filter_func.get(); }
 
     void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) {
         DCHECK(!is_ignored());
         switch (_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
-            _context.hybrid_set->insert_fixed_len(column, start);
+            _context->hybrid_set->insert_fixed_len(column, start);
             break;
         }
         case RuntimeFilterType::MIN_FILTER:
         case RuntimeFilterType::MAX_FILTER:
         case RuntimeFilterType::MINMAX_FILTER: {
-            _context.minmax_func->insert_fixed_len(column, start);
+            _context->minmax_func->insert_fixed_len(column, start);
             break;
         }
         case RuntimeFilterType::BLOOM_FILTER: {
-            _context.bloom_filter_func->insert_fixed_len(column, start);
+            _context->bloom_filter_func->insert_fixed_len(column, start);
             break;
         }
         case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
-            if (_is_bloomfilter) {
-                _context.bloom_filter_func->insert_fixed_len(column, start);
+            if (is_bloomfilter()) {
+                _context->bloom_filter_func->insert_fixed_len(column, start);
             } else {
-                _context.hybrid_set->insert_fixed_len(column, start);
+                _context->hybrid_set->insert_fixed_len(column, start);
             }
             break;
         }
@@ -434,23 +439,21 @@ public:
                 bitmaps.push_back(&(col->get_data()[i]));
             }
         }
-        _context.bitmap_filter_func->insert_many(bitmaps);
+        _context->bitmap_filter_func->insert_many(bitmaps);
     }
 
     RuntimeFilterType get_real_type() const {
-        auto real_filter_type = _filter_type;
-        if (real_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-            real_filter_type = _is_bloomfilter ? 
RuntimeFilterType::BLOOM_FILTER
-                                               : RuntimeFilterType::IN_FILTER;
+        if (_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+            if (_context->hybrid_set) {
+                return RuntimeFilterType::IN_FILTER;
+            }
+            return RuntimeFilterType::BLOOM_FILTER;
         }
-        return real_filter_type;
+        return _filter_type;
     }
 
     size_t get_bloom_filter_size() const {
-        if (_is_bloomfilter) {
-            return _context.bloom_filter_func->get_size();
-        }
-        return 0;
+        return _context->bloom_filter_func ? 
_context->bloom_filter_func->get_size() : 0;
     }
 
     Status get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs,
@@ -458,6 +461,11 @@ public:
                           const TExpr& probe_expr);
 
     Status merge(const RuntimePredicateWrapper* wrapper) {
+        if (is_ignored() || wrapper->is_ignored()) {
+            _context->ignored = true;
+            return Status::OK();
+        }
+
         bool can_not_merge_in_or_bloom =
                 _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
                 (wrapper->_filter_type != RuntimeFilterType::IN_FILTER &&
@@ -475,70 +483,56 @@ public:
         switch (_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
             // try insert set
-            _context.hybrid_set->insert(wrapper->_context.hybrid_set.get());
-            if (_max_in_num >= 0 && _context.hybrid_set->size() >= 
_max_in_num) {
-                _ignored_msg = fmt::format(
-                        " ignore merge runtime filter(in filter id {})"
-                        "because: in_num({}) >= max_in_num({})",
-                        _filter_id, _context.hybrid_set->size(), _max_in_num);
-                _ignored = true;
+            _context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
+            if (_max_in_num >= 0 && _context->hybrid_set->size() >= 
_max_in_num) {
+                _context->ignored = true;
                 // release in filter
-                _context.hybrid_set.reset();
+                _context->hybrid_set.reset();
             }
             break;
         }
         case RuntimeFilterType::MIN_FILTER:
         case RuntimeFilterType::MAX_FILTER:
         case RuntimeFilterType::MINMAX_FILTER: {
-            
RETURN_IF_ERROR(_context.minmax_func->merge(wrapper->_context.minmax_func.get()));
+            
RETURN_IF_ERROR(_context->minmax_func->merge(wrapper->_context->minmax_func.get()));
             break;
         }
         case RuntimeFilterType::BLOOM_FILTER: {
             RETURN_IF_ERROR(
-                    
_context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get()));
+                    
_context->bloom_filter_func->merge(wrapper->_context->bloom_filter_func.get()));
             break;
         }
         case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
-            auto real_filter_type = _is_bloomfilter ? 
RuntimeFilterType::BLOOM_FILTER
-                                                    : 
RuntimeFilterType::IN_FILTER;
+            auto real_filter_type = get_real_type();
 
             auto other_filter_type = wrapper->_filter_type;
             if (other_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                other_filter_type = wrapper->_is_bloomfilter ? 
RuntimeFilterType::BLOOM_FILTER
-                                                             : 
RuntimeFilterType::IN_FILTER;
+                other_filter_type = wrapper->get_real_type();
             }
 
             if (real_filter_type == RuntimeFilterType::IN_FILTER) {
                 if (other_filter_type == RuntimeFilterType::IN_FILTER) { // in 
merge in
-                    CHECK(!wrapper->_ignored)
-                            << " can not ignore merge runtime filter(in filter 
id "
-                            << wrapper->_filter_id << ") when used 
IN_OR_BLOOM_FILTER, ignore msg: "
-                            << wrapper->ignored_msg();
-                    
_context.hybrid_set->insert(wrapper->_context.hybrid_set.get());
-                    if (_max_in_num >= 0 && _context.hybrid_set->size() >= 
_max_in_num) {
+                    
_context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
+                    if (_max_in_num >= 0 && _context->hybrid_set->size() >= 
_max_in_num) {
                         VLOG_DEBUG << " change runtime filter to bloom 
filter(id=" << _filter_id
-                                   << ") because: in_num(" << 
_context.hybrid_set->size()
+                                   << ") because: in_num(" << 
_context->hybrid_set->size()
                                    << ") >= max_in_num(" << _max_in_num << ")";
                         RETURN_IF_ERROR(change_to_bloom_filter(true));
                     }
                 } else {
                     VLOG_DEBUG << " change runtime filter to bloom filter(id=" 
<< _filter_id
                                << ") because: already exist a bloom filter";
-                    
RETURN_IF_ERROR(change_to_bloom_filter(!_build_bf_exactly));
-                    RETURN_IF_ERROR(_context.bloom_filter_func->merge(
-                            wrapper->_context.bloom_filter_func.get()));
+                    RETURN_IF_ERROR(change_to_bloom_filter(false));
+                    RETURN_IF_ERROR(_context->bloom_filter_func->merge(
+                            wrapper->_context->bloom_filter_func.get()));
                 }
             } else {
                 if (other_filter_type == RuntimeFilterType::IN_FILTER) { // 
bloom filter merge in
-                    CHECK(!wrapper->_ignored)
-                            << " can not ignore merge runtime filter(in filter 
id "
-                            << wrapper->_filter_id << ") when used 
IN_OR_BLOOM_FILTER, ignore msg: "
-                            << wrapper->ignored_msg();
-                    
wrapper->insert_to_bloom_filter(_context.bloom_filter_func.get());
+                    
wrapper->insert_to_bloom_filter(_context->bloom_filter_func.get());
                     // bloom filter merge bloom filter
                 } else {
-                    RETURN_IF_ERROR(_context.bloom_filter_func->merge(
-                            wrapper->_context.bloom_filter_func.get()));
+                    RETURN_IF_ERROR(_context->bloom_filter_func->merge(
+                            wrapper->_context->bloom_filter_func.get()));
                 }
             }
             break;
@@ -550,19 +544,11 @@ public:
     }
 
     Status assign(const PInFilter* in_filter, bool contain_null) {
-        if (in_filter->has_ignored_msg()) {
-            VLOG_DEBUG << "Ignore in filter(id=" << _filter_id
-                       << ") because: " << in_filter->ignored_msg();
-            _ignored = true;
-            _ignored_msg = in_filter->ignored_msg();
-            return Status::OK();
-        }
-
         PrimitiveType type = to_primitive_type(in_filter->column_type());
-        _context.hybrid_set.reset(create_set(type));
+        _context->hybrid_set.reset(create_set(type));
         if (contain_null) {
-            _context.hybrid_set->set_null_aware(true);
-            _context.hybrid_set->insert((const void*)nullptr);
+            _context->hybrid_set->set_null_aware(true);
+            _context->hybrid_set->insert((const void*)nullptr);
         }
 
         switch (type) {
@@ -735,14 +721,13 @@ public:
     // assign this filter by protobuf
     Status assign(const PBloomFilter* bloom_filter, 
butil::IOBufAsZeroCopyInputStream* data,
                   bool contain_null) {
-        _is_bloomfilter = true;
         // we won't use this class to insert or find any data
         // so any type is ok
-        
_context.bloom_filter_func.reset(create_bloom_filter(_column_return_type == 
INVALID_TYPE
-                                                                     ? 
PrimitiveType::TYPE_INT
-                                                                     : 
_column_return_type));
-        RETURN_IF_ERROR(_context.bloom_filter_func->assign(data, 
bloom_filter->filter_length(),
-                                                           contain_null));
+        
_context->bloom_filter_func.reset(create_bloom_filter(_column_return_type == 
INVALID_TYPE
+                                                                      ? 
PrimitiveType::TYPE_INT
+                                                                      : 
_column_return_type));
+        RETURN_IF_ERROR(_context->bloom_filter_func->assign(data, 
bloom_filter->filter_length(),
+                                                            contain_null));
         return Status::OK();
     }
 
@@ -750,38 +735,38 @@ public:
     // assign this filter by protobuf
     Status assign(const PMinMaxFilter* minmax_filter, bool contain_null) {
         PrimitiveType type = to_primitive_type(minmax_filter->column_type());
-        _context.minmax_func.reset(create_minmax_filter(type));
+        _context->minmax_func.reset(create_minmax_filter(type));
 
         if (contain_null) {
-            _context.minmax_func->set_null_aware(true);
-            _context.minmax_func->set_contain_null();
+            _context->minmax_func->set_null_aware(true);
+            _context->minmax_func->set_contain_null();
         }
 
         switch (type) {
         case TYPE_BOOLEAN: {
             bool min_val = minmax_filter->min_val().boolval();
             bool max_val = minmax_filter->max_val().boolval();
-            return _context.minmax_func->assign(&min_val, &max_val);
+            return _context->minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_TINYINT: {
             int8_t min_val = 
static_cast<int8_t>(minmax_filter->min_val().intval());
             int8_t max_val = 
static_cast<int8_t>(minmax_filter->max_val().intval());
-            return _context.minmax_func->assign(&min_val, &max_val);
+            return _context->minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_SMALLINT: {
             int16_t min_val = 
static_cast<int16_t>(minmax_filter->min_val().intval());
             int16_t max_val = 
static_cast<int16_t>(minmax_filter->max_val().intval());
-            return _context.minmax_func->assign(&min_val, &max_val);
+            return _context->minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_INT: {
             int32_t min_val = minmax_filter->min_val().intval();
             int32_t max_val = minmax_filter->max_val().intval();
-            return _context.minmax_func->assign(&min_val, &max_val);
+            return _context->minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_BIGINT: {
             int64_t min_val = minmax_filter->min_val().longval();
             int64_t max_val = minmax_filter->max_val().longval();
-            return _context.minmax_func->assign(&min_val, &max_val);
+            return _context->minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_LARGEINT: {
             auto min_string_val = minmax_filter->min_val().stringval();
@@ -793,27 +778,27 @@ public:
             int128_t max_val = StringParser::string_to_int<int128_t>(
                     max_string_val.c_str(), max_string_val.length(), &result);
             DCHECK(result == StringParser::PARSE_SUCCESS);
-            return _context.minmax_func->assign(&min_val, &max_val);
+            return _context->minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_FLOAT: {
             float min_val = 
static_cast<float>(minmax_filter->min_val().doubleval());
             float max_val = 
static_cast<float>(minmax_filter->max_val().doubleval());
-            return _context.minmax_func->assign(&min_val, &max_val);
+            return _context->minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_DOUBLE: {
             double min_val = 
static_cast<double>(minmax_filter->min_val().doubleval());
             double max_val = 
static_cast<double>(minmax_filter->max_val().doubleval());
-            return _context.minmax_func->assign(&min_val, &max_val);
+            return _context->minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_DATEV2: {
             int32_t min_val = minmax_filter->min_val().intval();
             int32_t max_val = minmax_filter->max_val().intval();
-            return _context.minmax_func->assign(&min_val, &max_val);
+            return _context->minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_DATETIMEV2: {
             int64_t min_val = minmax_filter->min_val().longval();
             int64_t max_val = minmax_filter->max_val().longval();
-            return _context.minmax_func->assign(&min_val, &max_val);
+            return _context->minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_DATETIME:
         case TYPE_DATE: {
@@ -823,24 +808,24 @@ public:
             VecDateTimeValue max_val;
             min_val.from_date_str(min_val_ref.c_str(), min_val_ref.length());
             max_val.from_date_str(max_val_ref.c_str(), max_val_ref.length());
-            return _context.minmax_func->assign(&min_val, &max_val);
+            return _context->minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_DECIMALV2: {
             auto& min_val_ref = minmax_filter->min_val().stringval();
             auto& max_val_ref = minmax_filter->max_val().stringval();
             DecimalV2Value min_val(min_val_ref);
             DecimalV2Value max_val(max_val_ref);
-            return _context.minmax_func->assign(&min_val, &max_val);
+            return _context->minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_DECIMAL32: {
             int32_t min_val = minmax_filter->min_val().intval();
             int32_t max_val = minmax_filter->max_val().intval();
-            return _context.minmax_func->assign(&min_val, &max_val);
+            return _context->minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_DECIMAL64: {
             int64_t min_val = minmax_filter->min_val().longval();
             int64_t max_val = minmax_filter->max_val().longval();
-            return _context.minmax_func->assign(&min_val, &max_val);
+            return _context->minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_DECIMAL128I: {
             auto min_string_val = minmax_filter->min_val().stringval();
@@ -852,7 +837,7 @@ public:
             int128_t max_val = StringParser::string_to_int<int128_t>(
                     max_string_val.c_str(), max_string_val.length(), &result);
             DCHECK(result == StringParser::PARSE_SUCCESS);
-            return _context.minmax_func->assign(&min_val, &max_val);
+            return _context->minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_DECIMAL256: {
             auto min_string_val = minmax_filter->min_val().stringval();
@@ -864,7 +849,7 @@ public:
             auto max_val = StringParser::string_to_int<wide::Int256>(
                     max_string_val.c_str(), max_string_val.length(), &result);
             DCHECK(result == StringParser::PARSE_SUCCESS);
-            return _context.minmax_func->assign(&min_val, &max_val);
+            return _context->minmax_func->assign(&min_val, &max_val);
         }
         case TYPE_VARCHAR:
         case TYPE_CHAR:
@@ -875,7 +860,7 @@ public:
             auto max_val_ptr = _pool->add(new std::string(max_val_ref));
             StringRef min_val(min_val_ptr->c_str(), min_val_ptr->length());
             StringRef max_val(max_val_ptr->c_str(), max_val_ptr->length());
-            return _context.minmax_func->assign(&min_val, &max_val);
+            return _context->minmax_func->assign(&min_val, &max_val);
         }
         default:
             DCHECK(false) << "unknown type";
@@ -884,65 +869,67 @@ public:
         return Status::InvalidArgument("not support!");
     }
 
-    HybridSetBase::IteratorBase* get_in_filter_iterator() { return 
_context.hybrid_set->begin(); }
+    HybridSetBase::IteratorBase* get_in_filter_iterator() { return 
_context->hybrid_set->begin(); }
 
     void get_bloom_filter_desc(char** data, int* filter_length) {
-        _context.bloom_filter_func->get_data(data, filter_length);
+        _context->bloom_filter_func->get_data(data, filter_length);
     }
 
     void get_minmax_filter_desc(void** min_data, void** max_data) {
-        *min_data = _context.minmax_func->get_min();
-        *max_data = _context.minmax_func->get_max();
+        *min_data = _context->minmax_func->get_min();
+        *max_data = _context->minmax_func->get_max();
     }
 
     PrimitiveType column_type() { return _column_return_type; }
 
-    bool is_bloomfilter() const { return _is_bloomfilter; }
+    bool is_bloomfilter() const { return get_real_type() == 
RuntimeFilterType::BLOOM_FILTER; }
 
     bool contain_null() const {
-        if (_is_bloomfilter) {
-            return _context.bloom_filter_func->contain_null();
+        if (is_bloomfilter()) {
+            return _context->bloom_filter_func->contain_null();
         }
-        if (_context.hybrid_set) {
+        if (_context->hybrid_set) {
             DCHECK(get_real_type() == RuntimeFilterType::IN_FILTER);
-            return _context.hybrid_set->contain_null();
+            return _context->hybrid_set->contain_null();
         }
-        if (_context.minmax_func) {
-            return _context.minmax_func->contain_null();
+        if (_context->minmax_func) {
+            return _context->minmax_func->contain_null();
         }
         return false;
     }
 
-    bool is_ignored() const { return _ignored; }
+    bool is_ignored() const { return _context->ignored; }
 
-    const std::string& ignored_msg() const { return _ignored_msg; }
+    void set_ignored() { _context->ignored = true; }
 
     void batch_assign(const PInFilter* filter,
                       void (*assign_func)(std::shared_ptr<HybridSetBase>& 
_hybrid_set,
                                           PColumnValue&, ObjectPool*)) {
         for (int i = 0; i < filter->values_size(); ++i) {
             PColumnValue column = filter->values(i);
-            assign_func(_context.hybrid_set, column, _pool);
+            assign_func(_context->hybrid_set, column, _pool);
         }
     }
 
-    size_t get_in_filter_size() const { return _context.hybrid_set->size(); }
+    size_t get_in_filter_size() const {
+        return _context->hybrid_set ? _context->hybrid_set->size() : 0;
+    }
 
     std::shared_ptr<BitmapFilterFuncBase> get_bitmap_filter() const {
-        return _context.bitmap_filter_func;
+        return _context->bitmap_filter_func;
     }
 
     friend class IRuntimeFilter;
 
     void set_filter_id(int id) {
-        if (_context.bloom_filter_func) {
-            _context.bloom_filter_func->set_filter_id(id);
+        if (_context->bloom_filter_func) {
+            _context->bloom_filter_func->set_filter_id(id);
         }
-        if (_context.bitmap_filter_func) {
-            _context.bitmap_filter_func->set_filter_id(id);
+        if (_context->bitmap_filter_func) {
+            _context->bitmap_filter_func->set_filter_id(id);
         }
-        if (_context.hybrid_set) {
-            _context.hybrid_set->set_filter_id(id);
+        if (_context->hybrid_set) {
+            _context->hybrid_set->set_filter_id(id);
         }
     }
 
@@ -954,10 +941,7 @@ private:
     RuntimeFilterType _filter_type;
     int32_t _max_in_num = -1;
 
-    vectorized::SharedRuntimeFilterContext _context;
-    bool _is_bloomfilter = false;
-    bool _ignored = false;
-    std::string _ignored_msg;
+    SharedRuntimeFilterContext _context;
     uint32_t _filter_id;
     bool _build_bf_exactly;
 };
@@ -968,11 +952,10 @@ Status IRuntimeFilter::create(RuntimeFilterParamsContext* 
state, ObjectPool* poo
                               bool build_bf_exactly, bool need_local_merge) {
     *res = pool->add(new IRuntimeFilter(state, pool, desc, need_local_merge));
     (*res)->set_role(role);
-    return (*res)->init_with_desc(desc, query_options, node_id,
-                                  need_local_merge ? false : build_bf_exactly);
+    return (*res)->init_with_desc(desc, query_options, node_id, 
build_bf_exactly);
 }
 
-vectorized::SharedRuntimeFilterContext& 
IRuntimeFilter::get_shared_context_ref() {
+SharedRuntimeFilterContext& IRuntimeFilter::get_shared_context_ref() {
     return _wrapper->_context;
 }
 
@@ -983,6 +966,7 @@ void IRuntimeFilter::insert_batch(const 
vectorized::ColumnPtr column, size_t sta
 
 Status IRuntimeFilter::publish(bool publish_local) {
     DCHECK(is_producer());
+
     auto send_to_remote = [&](IRuntimeFilter* filter) {
         TNetworkAddress addr;
         DCHECK(_state != nullptr);
@@ -994,7 +978,7 @@ Status IRuntimeFilter::publish(bool publish_local) {
         
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, 
filters));
         DCHECK(!filters.empty());
         // push down
-        for (auto filter : filters) {
+        for (auto* filter : filters) {
             filter->_wrapper = wrapper;
             filter->update_runtime_filter_type_to_profile();
             filter->signal();
@@ -1036,16 +1020,77 @@ Status IRuntimeFilter::publish(bool publish_local) {
     return Status::OK();
 }
 
-Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool 
opt_remote_rf) {
+Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) {
     DCHECK(is_producer());
+
+    if (_need_local_merge) {
+        LocalMergeFilters* local_merge_filters = nullptr;
+        
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters(
+                _filter_id, &local_merge_filters));
+        std::lock_guard l(*local_merge_filters->lock);
+        local_merge_filters->merge_size_times--;
+        local_merge_filters->local_merged_size += local_filter_size;
+        if (local_merge_filters->merge_size_times) {
+            return Status::OK();
+        } else {
+            if (_has_local_target) {
+                for (auto* filter : local_merge_filters->filters) {
+                    
filter->set_synced_size(local_merge_filters->local_merged_size);
+                }
+                return Status::OK();
+            } else {
+                local_filter_size = local_merge_filters->local_merged_size;
+            }
+        }
+    } else if (_has_local_target) {
+        set_synced_size(local_filter_size);
+        return Status::OK();
+    }
+
+    TNetworkAddress addr;
+    DCHECK(_state != nullptr);
+    RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
     std::shared_ptr<PBackendService_Stub> stub(
-            _state->exec_env->brpc_internal_client_cache()->get_client(*addr));
+            _state->exec_env->brpc_internal_client_cache()->get_client(addr));
     if (!stub) {
         std::string msg =
-                fmt::format("Get rpc stub failed, host={},  port=", 
addr->hostname, addr->port);
+                fmt::format("Get rpc stub failed, host={},  port=", 
addr.hostname, addr.port);
         return Status::InternalError(msg);
     }
 
+    auto request = std::make_shared<PSendFilterSizeRequest>();
+    auto callback = 
DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
+    auto closure =
+            AutoReleaseClosure<PSendFilterSizeRequest,
+                               
DummyBrpcCallback<PSendFilterSizeResponse>>::create_unique(request,
+                                                                               
           callback);
+    auto* pquery_id = request->mutable_query_id();
+    pquery_id->set_hi(_state->query_id.hi());
+    pquery_id->set_lo(_state->query_id.lo());
+
+    auto* source_addr = request->mutable_source_addr();
+    source_addr->set_hostname(BackendOptions::get_local_backend().host);
+    source_addr->set_port(BackendOptions::get_local_backend().brpc_port);
+
+    request->set_filter_size(local_filter_size);
+    request->set_filter_id(_filter_id);
+    callback->cntl_->set_timeout_ms(wait_time_ms());
+
+    stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), 
closure->response_.get(),
+                           closure.get());
+    closure.release();
+    return Status::OK();
+}
+
+Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool 
opt_remote_rf) {
+    DCHECK(is_producer());
+    std::shared_ptr<PBackendService_Stub> stub(
+            _state->exec_env->brpc_internal_client_cache()->get_client(*addr));
+    if (!stub) {
+        return Status::InternalError(
+                fmt::format("Get rpc stub failed, host={},  port=", 
addr->hostname, addr->port));
+    }
+
     auto merge_filter_request = std::make_shared<PMergeFilterRequest>();
     auto merge_filter_callback = 
DummyBrpcCallback<PMergeFilterResponse>::create_shared();
     auto merge_filter_closure =
@@ -1054,11 +1099,11 @@ Status IRuntimeFilter::push_to_remote(const 
TNetworkAddress* addr, bool opt_remo
     void* data = nullptr;
     int len = 0;
 
-    auto pquery_id = merge_filter_request->mutable_query_id();
+    auto* pquery_id = merge_filter_request->mutable_query_id();
     pquery_id->set_hi(_state->query_id.hi());
     pquery_id->set_lo(_state->query_id.lo());
 
-    auto pfragment_instance_id = 
merge_filter_request->mutable_fragment_instance_id();
+    auto* pfragment_instance_id = 
merge_filter_request->mutable_fragment_instance_id();
     pfragment_instance_id->set_hi(BackendOptions::get_local_backend().id);
     pfragment_instance_id->set_lo((int64_t)this);
 
@@ -1069,21 +1114,23 @@ Status IRuntimeFilter::push_to_remote(const 
TNetworkAddress* addr, bool opt_remo
     merge_filter_request->set_column_type(to_proto(column_type));
     merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms());
 
-    Status serialize_status = serialize(merge_filter_request.get(), &data, 
&len);
-    if (serialize_status.ok()) {
-        VLOG_NOTICE << "Producer:" << merge_filter_request->ShortDebugString() 
<< addr->hostname
-                    << ":" << addr->port;
-        if (len > 0) {
-            DCHECK(data != nullptr);
-            merge_filter_callback->cntl_->request_attachment().append(data, 
len);
-        }
+    if (get_ignored()) {
+        merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER);
+        merge_filter_request->set_ignored(true);
+    } else {
+        RETURN_IF_ERROR(serialize(merge_filter_request.get(), &data, &len));
+    }
 
-        stub->merge_filter(merge_filter_closure->cntl_.get(), 
merge_filter_closure->request_.get(),
-                           merge_filter_closure->response_.get(), 
merge_filter_closure.get());
-        // the closure will be released by brpc during closure->Run.
-        merge_filter_closure.release();
+    if (len > 0) {
+        DCHECK(data != nullptr);
+        merge_filter_callback->cntl_->request_attachment().append(data, len);
     }
-    return serialize_status;
+
+    stub->merge_filter(merge_filter_closure->cntl_.get(), 
merge_filter_closure->request_.get(),
+                       merge_filter_closure->response_.get(), 
merge_filter_closure.get());
+    // the closure will be released by brpc during closure->Run.
+    merge_filter_closure.release();
+    return Status::OK();
 }
 
 Status 
IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>& 
probe_ctxs,
@@ -1248,17 +1295,31 @@ void 
IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTim
     _filter_timer.push_back(timer);
 }
 
-void IRuntimeFilter::set_ignored(const std::string& msg) {
-    _wrapper->_ignored = true;
-    _wrapper->_ignored_msg = msg;
+void IRuntimeFilter::set_dependency(pipeline::CountedFinishDependency* 
dependency) {
+    _dependency = dependency;
+    _dependency->add();
+    CHECK(_dependency);
+}
+
+void IRuntimeFilter::set_synced_size(uint64_t global_size) {
+    CHECK(_dependency);
+    _synced_size = global_size;
+    _dependency->sub();
+}
+
+void IRuntimeFilter::set_ignored() {
+    _wrapper->_context->ignored = true;
+}
+
+bool IRuntimeFilter::get_ignored() {
+    return _wrapper->is_ignored();
 }
 
 std::string IRuntimeFilter::formatted_state() const {
     return fmt::format(
-            "[IsPushDown = {}, RuntimeFilterState = {}, IgnoredMsg = {}, 
HasRemoteTarget = {}, "
+            "[IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, "
             "HasLocalTarget = {}]",
-            _is_push_down, _get_explain_state_string(), 
_wrapper->ignored_msg(), _has_remote_target,
-            _has_local_target);
+            _is_push_down, _get_explain_state_string(), _has_remote_target, 
_has_local_target);
 }
 
 BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
@@ -1288,11 +1349,16 @@ Status IRuntimeFilter::init_with_desc(const 
TRuntimeFilterDesc* desc, const TQue
     // 1. Only 1 join key
     // 2. Do not have remote target (e.g. do not need to merge), or broadcast 
join
     // 3. Bloom filter
-    params.build_bf_exactly = build_bf_exactly && (!_has_remote_target || 
_is_broadcast_join) &&
-                              (_runtime_filter_type == 
RuntimeFilterType::BLOOM_FILTER ||
-                               _runtime_filter_type == 
RuntimeFilterType::IN_OR_BLOOM_FILTER);
+    params.build_bf_exactly =
+            build_bf_exactly && (_runtime_filter_type == 
RuntimeFilterType::BLOOM_FILTER ||
+                                 _runtime_filter_type == 
RuntimeFilterType::IN_OR_BLOOM_FILTER);
+
     params.bloom_filter_size_calculated_by_ndv = 
desc->bloom_filter_size_calculated_by_ndv;
 
+    if (!desc->__isset.sync_filter_size || !desc->sync_filter_size) {
+        params.build_bf_exactly &= (!_has_remote_target || _is_broadcast_join);
+    }
+
     if (desc->__isset.bloom_filter_size_bytes) {
         params.bloom_filter_size = desc->bloom_filter_size_bytes;
     }
@@ -1358,6 +1424,12 @@ Status IRuntimeFilter::create_wrapper(const 
UpdateRuntimeFilterParamsV2* param,
     PrimitiveType column_type = param->column_type;
     *wrapper = param->pool->add(new RuntimePredicateWrapper(
             param->pool, column_type, get_type(filter_type), 
param->request->filter_id()));
+
+    if (param->request->has_ignored() && param->request->ignored()) {
+        (*wrapper)->set_ignored();
+        return Status::OK();
+    }
+
     switch (filter_type) {
     case PFilterType::IN_FILTER: {
         DCHECK(param->request->has_in_filter());
@@ -1399,8 +1471,14 @@ Status IRuntimeFilter::_create_wrapper(const T* param, 
ObjectPool* pool,
     if (param->request->has_column_type()) {
         column_type = to_primitive_type(param->request->column_type());
     }
-    wrapper->reset(new RuntimePredicateWrapper(pool, column_type, 
get_type(filter_type),
-                                               param->request->filter_id()));
+    *wrapper = std::make_unique<RuntimePredicateWrapper>(pool, column_type, 
get_type(filter_type),
+                                                         
param->request->filter_id());
+
+    if (param->request->has_ignored() && param->request->ignored()) {
+        (*wrapper)->set_ignored();
+        return Status::OK();
+    }
+
     switch (filter_type) {
     case PFilterType::IN_FILTER: {
         DCHECK(param->request->has_in_filter());
@@ -1437,12 +1515,7 @@ void 
IRuntimeFilter::update_runtime_filter_type_to_profile() {
 }
 
 Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
-    if (wrapper->is_ignored()) {
-        set_ignored(wrapper->ignored_msg());
-    } else if (!_wrapper->is_ignored()) {
-        return _wrapper->merge(wrapper);
-    }
-    return Status::OK();
+    return _wrapper->merge(wrapper);
 }
 
 template <typename T>
@@ -1458,11 +1531,7 @@ void batch_copy(PInFilter* filter, 
HybridSetBase::IteratorBase* it,
 
 template <class T>
 Status IRuntimeFilter::serialize_impl(T* request, void** data, int* len) {
-    auto real_runtime_filter_type = _runtime_filter_type;
-    if (real_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-        real_runtime_filter_type = _wrapper->is_bloomfilter() ? 
RuntimeFilterType::BLOOM_FILTER
-                                                              : 
RuntimeFilterType::IN_FILTER;
-    }
+    auto real_runtime_filter_type = _wrapper->get_real_type();
 
     request->set_filter_type(get_type(real_runtime_filter_type));
     request->set_contain_null(_wrapper->contain_null());
@@ -1490,11 +1559,6 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) {
     auto column_type = _wrapper->column_type();
     filter->set_column_type(to_proto(column_type));
 
-    if (_wrapper->is_ignored()) {
-        filter->set_ignored_msg(_wrapper->ignored_msg());
-        return;
-    }
-
     auto it = _wrapper->get_in_filter_iterator();
     DCHECK(it != nullptr);
 
@@ -1736,16 +1800,21 @@ void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) 
{
     }
 }
 
-bool IRuntimeFilter::is_bloomfilter() {
-    return _wrapper->is_bloomfilter();
+RuntimeFilterType IRuntimeFilter::get_real_type() {
+    return _wrapper->get_real_type();
+}
+
+bool IRuntimeFilter::need_sync_filter_size() {
+    return (type() == RuntimeFilterType::IN_OR_BLOOM_FILTER ||
+            type() == RuntimeFilterType::BLOOM_FILTER) &&
+           _wrapper->get_build_bf_cardinality() && !_is_broadcast_join;
 }
 
 Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
     _profile->add_info_string("MergeTime", 
std::to_string(param->request->merge_time()) + " ms");
 
-    if (param->request->has_in_filter() && 
param->request->in_filter().has_ignored_msg()) {
-        const PInFilter in_filter = param->request->in_filter();
-        set_ignored(in_filter.ignored_msg());
+    if (param->request->has_ignored() && param->request->ignored()) {
+        set_ignored();
     } else {
         std::unique_ptr<RuntimePredicateWrapper> wrapper;
         RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(param, _pool, 
&wrapper));
@@ -1769,7 +1838,7 @@ void 
IRuntimeFilter::update_filter(RuntimePredicateWrapper* wrapper, int64_t mer
     }
     _wrapper = wrapper;
     update_runtime_filter_type_to_profile();
-    this->signal();
+    signal();
 }
 
 Status RuntimePredicateWrapper::get_push_exprs(
@@ -1800,7 +1869,7 @@ Status RuntimePredicateWrapper::get_push_exprs(
         node.in_predicate.__set_is_not_in(false);
         node.__set_opcode(TExprOpcode::FILTER_IN);
         node.__set_is_nullable(false);
-        auto in_pred = vectorized::VDirectInPredicate::create_shared(node, 
_context.hybrid_set);
+        auto in_pred = vectorized::VDirectInPredicate::create_shared(node, 
_context->hybrid_set);
         in_pred->add_child(probe_ctx->root());
         auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, 
in_pred, null_aware);
         container.push_back(wrapper);
@@ -1813,7 +1882,7 @@ Status RuntimePredicateWrapper::get_push_exprs(
         RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), 
TExprOpcode::GE, min_pred,
                                               &min_pred_node));
         vectorized::VExprSPtr min_literal;
-        RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), 
_context.minmax_func->get_min(),
+        RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), 
_context->minmax_func->get_min(),
                                        min_literal));
         min_pred->add_child(probe_ctx->root());
         min_pred->add_child(min_literal);
@@ -1828,7 +1897,7 @@ Status RuntimePredicateWrapper::get_push_exprs(
         RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), 
TExprOpcode::LE, max_pred,
                                               &max_pred_node));
         vectorized::VExprSPtr max_literal;
-        RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), 
_context.minmax_func->get_max(),
+        RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), 
_context->minmax_func->get_max(),
                                        max_literal));
         max_pred->add_child(probe_ctx->root());
         max_pred->add_child(max_literal);
@@ -1843,7 +1912,7 @@ Status RuntimePredicateWrapper::get_push_exprs(
         RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), 
TExprOpcode::LE, max_pred,
                                               &max_pred_node, null_aware));
         vectorized::VExprSPtr max_literal;
-        RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), 
_context.minmax_func->get_max(),
+        RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), 
_context->minmax_func->get_max(),
                                        max_literal));
         max_pred->add_child(probe_ctx->root());
         max_pred->add_child(max_literal);
@@ -1861,7 +1930,7 @@ Status RuntimePredicateWrapper::get_push_exprs(
                                               min_pred, &min_pred_node, 
null_aware));
         vectorized::VExprSPtr min_literal;
         RETURN_IF_ERROR(create_literal(new_probe_ctx->root()->type(),
-                                       _context.minmax_func->get_min(), 
min_literal));
+                                       _context->minmax_func->get_min(), 
min_literal));
         min_pred->add_child(new_probe_ctx->root());
         min_pred->add_child(min_literal);
         
container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(min_pred_node,
@@ -1878,7 +1947,7 @@ Status RuntimePredicateWrapper::get_push_exprs(
         node.__set_opcode(TExprOpcode::RT_FILTER);
         node.__set_is_nullable(false);
         auto bloom_pred = vectorized::VBloomPredicate::create_shared(node);
-        bloom_pred->set_filter(_context.bloom_filter_func);
+        bloom_pred->set_filter(_context->bloom_filter_func);
         bloom_pred->add_child(probe_ctx->root());
         auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, 
bloom_pred);
         container.push_back(wrapper);
@@ -1894,7 +1963,7 @@ Status RuntimePredicateWrapper::get_push_exprs(
         node.__set_opcode(TExprOpcode::RT_FILTER);
         node.__set_is_nullable(false);
         auto bitmap_pred = vectorized::VBitmapPredicate::create_shared(node);
-        bitmap_pred->set_filter(_context.bitmap_filter_func);
+        bitmap_pred->set_filter(_context->bitmap_filter_func);
         bitmap_pred->add_child(probe_ctx->root());
         auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, 
bitmap_pred);
         container.push_back(wrapper);
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index ff825523ae1..a65561d7cb8 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -70,6 +70,7 @@ struct SharedRuntimeFilterContext;
 
 namespace pipeline {
 class RuntimeFilterTimer;
+struct CountedFinishDependency;
 } // namespace pipeline
 
 enum class RuntimeFilterType {
@@ -221,7 +222,7 @@ public:
                          const RuntimeFilterRole role, int node_id, 
IRuntimeFilter** res,
                          bool build_bf_exactly = false, bool need_local_merge 
= false);
 
-    vectorized::SharedRuntimeFilterContext& get_shared_context_ref();
+    SharedRuntimeFilterContext& get_shared_context_ref();
 
     // insert data to build filter
     void insert_batch(vectorized::ColumnPtr column, size_t start);
@@ -230,6 +231,8 @@ public:
     // push filter to remote node or push down it to scan_node
     Status publish(bool publish_local = false);
 
+    Status send_filter_size(uint64_t local_filter_size);
+
     RuntimeFilterType type() const { return _runtime_filter_type; }
 
     PrimitiveType column_type() const;
@@ -294,10 +297,13 @@ public:
     void update_filter(RuntimePredicateWrapper* filter_wrapper, int64_t 
merge_time,
                        int64_t start_apply);
 
-    void set_ignored(const std::string& msg);
+    void set_ignored();
+
+    bool get_ignored();
 
-    // for ut
-    bool is_bloomfilter();
+    RuntimeFilterType get_real_type();
+
+    bool need_sync_filter_size();
 
     // async push runtimefilter to remote node
     Status push_to_remote(const TNetworkAddress* addr, bool opt_remote_rf);
@@ -358,6 +364,14 @@ public:
     void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>);
     std::string formatted_state() const;
 
+    void set_synced_size(uint64_t global_size);
+
+    void set_dependency(pipeline::CountedFinishDependency* dependency);
+
+    int64_t get_synced_size() const { return _synced_size; }
+
+    bool isset_synced_size() const { return _synced_size != -1; }
+
 protected:
     // serialize _wrapper to protobuf
     void to_protobuf(PInFilter* filter);
@@ -437,6 +451,9 @@ protected:
     bool _need_local_merge = false;
 
     std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;
+
+    int64_t _synced_size = -1;
+    pipeline::CountedFinishDependency* _dependency = nullptr;
 };
 
 // avoid expose RuntimePredicateWrapper
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index 1fadf81e809..2a44c6a3745 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -34,130 +34,77 @@ class VRuntimeFilterSlots {
 public:
     VRuntimeFilterSlots(
             const std::vector<std::shared_ptr<vectorized::VExprContext>>& 
build_expr_ctxs,
-            const std::vector<IRuntimeFilter*>& runtime_filters, bool 
need_local_merge = false)
-            : _build_expr_context(build_expr_ctxs),
-              _runtime_filters(runtime_filters),
-              _need_local_merge(need_local_merge) {}
-
-    Status init(RuntimeState* state, int64_t hash_table_size) {
-        // runtime filter effect strategy
-        // 1. we will ignore IN filter when hash_table_size is too big
-        // 2. we will ignore BLOOM filter and MinMax filter when 
hash_table_size
-        // is too small and IN filter has effect
-        std::map<int, bool> has_in_filter;
-
-        auto ignore_local_filter = [&](int filter_id) {
-            auto runtime_filter_mgr = _need_local_merge ? 
state->global_runtime_filter_mgr()
-                                                        : 
state->local_runtime_filter_mgr();
-
-            std::vector<IRuntimeFilter*> filters;
-            RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, 
filters));
-            if (filters.empty()) {
-                throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, 
filter_id={}",
-                                filter_id);
-            }
-            for (auto* filter : filters) {
-                filter->set_ignored("");
-                filter->signal();
-            }
-            return Status::OK();
-        };
+            const std::vector<IRuntimeFilter*>& runtime_filters)
+            : _build_expr_context(build_expr_ctxs), 
_runtime_filters(runtime_filters) {
+        for (auto* runtime_filter : _runtime_filters) {
+            
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
+        }
+    }
 
-        auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, 
std::string& msg) {
-            runtime_filter->set_ignored(msg);
-            RETURN_IF_ERROR(runtime_filter->publish());
+    Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, 
bool publish_local,
+                            pipeline::CountedFinishDependency* dependency) {
+        if (_runtime_filters.empty() || publish_local) {
             return Status::OK();
-        };
-
-        // ordered vector: IN, IN_OR_BLOOM, others.
-        // so we can ignore other filter if IN Predicate exists.
-        auto compare_desc = [](IRuntimeFilter* d1, IRuntimeFilter* d2) {
-            if (d1->type() == d2->type()) {
-                return false;
-            } else if (d1->type() == RuntimeFilterType::IN_FILTER) {
-                return true;
-            } else if (d2->type() == RuntimeFilterType::IN_FILTER) {
-                return false;
-            } else if (d1->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                return true;
-            } else if (d2->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                return false;
-            } else {
-                return d1->type() < d2->type();
+        }
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->need_sync_filter_size()) {
+                runtime_filter->set_dependency(dependency);
             }
-        };
-        std::sort(_runtime_filters.begin(), _runtime_filters.end(), 
compare_desc);
-
-        // do not create 'in filter' when hash_table size over limit
-        const auto max_in_num = state->runtime_filter_max_in_num();
-        const bool over_max_in_num = (hash_table_size >= max_in_num);
+        }
 
+        // send_filter_size may call dependency->sub(), so we call 
set_dependency firstly for all rf to avoid dependency set_ready repeatedly
         for (auto* runtime_filter : _runtime_filters) {
-            if (runtime_filter->expr_order() < 0 ||
-                runtime_filter->expr_order() >= _build_expr_context.size()) {
-                return Status::InternalError(
-                        "runtime_filter meet invalid expr_order, 
expr_order={}, "
-                        "_build_expr_context.size={}",
-                        runtime_filter->expr_order(), 
_build_expr_context.size());
+            if (runtime_filter->need_sync_filter_size()) {
+                
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
             }
+        }
+        return Status::OK();
+    }
 
-            bool is_in_filter = (runtime_filter->type() == 
RuntimeFilterType::IN_FILTER);
+    // use synced size when this rf has global merged
+    static uint64_t get_real_size(IRuntimeFilter* runtime_filter, uint64_t 
hash_table_size) {
+        return runtime_filter->isset_synced_size() ? 
runtime_filter->get_synced_size()
+                                                   : hash_table_size;
+    }
 
-            if (over_max_in_num &&
-                runtime_filter->type() == 
RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                RETURN_IF_ERROR(runtime_filter->change_to_bloom_filter());
+    Status ignore_filters(RuntimeState* state) {
+        // process ignore duplicate IN_FILTER
+        std::unordered_set<int> has_in_filter;
+        for (auto* filter : _runtime_filters) {
+            if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) {
+                continue;
             }
-
-            if (runtime_filter->is_bloomfilter()) {
-                
RETURN_IF_ERROR(runtime_filter->init_bloom_filter(hash_table_size));
+            if (has_in_filter.contains(filter->expr_order())) {
+                filter->set_ignored();
+                continue;
             }
+            has_in_filter.insert(filter->expr_order());
+        }
 
-            // Note:
-            // In the case that exist *remote target* and in filter and other 
filter,
-            // we must merge other filter whatever in filter is over the max 
num in current node,
-            // because:
-            // case 1: (in filter >= max num) in current node, so in filter 
will be ignored,
-            //         and then other filter can be used
-            // case 2: (in filter < max num) in current node, we don't know 
whether the in filter
-            //         will be ignored in merge node, so we must transfer 
other filter to merge node
-            if (!runtime_filter->has_remote_target()) {
-                bool exists_in_filter = 
has_in_filter[runtime_filter->expr_order()];
-                if (is_in_filter && over_max_in_num) {
-                    VLOG_DEBUG << "fragment instance " << 
print_id(state->fragment_instance_id())
-                               << " ignore runtime filter(in filter id "
-                               << runtime_filter->filter_id() << ") because: 
in_num("
-                               << hash_table_size << ") >= max_in_num(" << 
max_in_num << ")";
-                    
RETURN_IF_ERROR(ignore_local_filter(runtime_filter->filter_id()));
-                    continue;
-                } else if (!is_in_filter && exists_in_filter) {
-                    // do not create 'bloom filter' and 'minmax filter' when 
'in filter' has created
-                    // because in filter is exactly filter, so it is enough to 
filter data
-                    VLOG_DEBUG << "fragment instance " << 
print_id(state->fragment_instance_id())
-                               << " ignore runtime filter("
-                               << 
IRuntimeFilter::to_string(runtime_filter->type()) << " id "
-                               << runtime_filter->filter_id()
-                               << ") because: already exists in filter";
-                    
RETURN_IF_ERROR(ignore_local_filter(runtime_filter->filter_id()));
-                    continue;
-                }
-            } else if (is_in_filter && over_max_in_num) {
-                std::string msg = fmt::format(
-                        "fragment instance {} ignore runtime filter(in filter 
id {}) because: "
-                        "in_num({}) >= max_in_num({})",
-                        print_id(state->fragment_instance_id()), 
runtime_filter->filter_id(),
-                        hash_table_size, max_in_num);
-                RETURN_IF_ERROR(ignore_remote_filter(runtime_filter, msg));
+        // process ignore filter when it has IN_FILTER on same expr, and init 
bloom filter size
+        for (auto* filter : _runtime_filters) {
+            if (filter->get_real_type() == RuntimeFilterType::IN_FILTER ||
+                !has_in_filter.contains(filter->expr_order())) {
                 continue;
             }
+            filter->set_ignored();
+        }
+        return Status::OK();
+    }
 
-            if ((runtime_filter->type() == RuntimeFilterType::IN_FILTER) ||
-                (runtime_filter->type() == 
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
-                 !over_max_in_num)) {
-                has_in_filter[runtime_filter->expr_order()] = true;
+    Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
+        // process IN_OR_BLOOM_FILTER's real type
+        for (auto* filter : _runtime_filters) {
+            if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
+                get_real_size(filter, local_hash_table_size) > 
state->runtime_filter_max_in_num()) {
+                RETURN_IF_ERROR(filter->change_to_bloom_filter());
             }
-            
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
-        }
 
+            if (filter->get_real_type() == RuntimeFilterType::BLOOM_FILTER) {
+                RETURN_IF_ERROR(
+                        filter->init_bloom_filter(get_real_size(filter, 
local_hash_table_size)));
+            }
+        }
         return Status::OK();
     }
 
@@ -171,6 +118,9 @@ public:
             int result_column_id = 
_build_expr_context[i]->get_last_result_column_id();
             const auto& column = 
block->get_by_position(result_column_id).column;
             for (auto* filter : iter->second) {
+                if (filter->get_ignored()) {
+                    continue;
+                }
                 filter->insert_batch(column, 1);
             }
         }
@@ -213,7 +163,6 @@ public:
 private:
     const std::vector<std::shared_ptr<vectorized::VExprContext>>& 
_build_expr_context;
     std::vector<IRuntimeFilter*> _runtime_filters;
-    const bool _need_local_merge = false;
     // prob_contition index -> [IRuntimeFilter]
     std::map<int, std::list<IRuntimeFilter*>> _runtime_filters_map;
 };
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index dab127c2c50..d4dc1956400 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -39,7 +39,11 @@ Overload(Callables&&... callables) -> Overload<Callables...>;
 
 
HashJoinBuildSinkLocalState::HashJoinBuildSinkLocalState(DataSinkOperatorXBase* 
parent,
                                                          RuntimeState* state)
-        : JoinBuildSinkLocalState(parent, state) {}
+        : JoinBuildSinkLocalState(parent, state) {
+    _finish_dependency = std::make_shared<CountedFinishDependency>(
+            parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FINISH_DEPENDENCY",
+            state->get_query_ctx());
+}
 
 Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
@@ -72,8 +76,10 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     }
     if (!_should_build_hash_table) {
         _dependency->block();
+        _finish_dependency->block();
         p._shared_hashtable_controller->append_dependency(p.node_id(),
-                                                          
_dependency->shared_from_this());
+                                                          
_dependency->shared_from_this(),
+                                                          
_finish_dependency->shared_from_this());
     }
 
     _build_blocks_memory_usage =
@@ -102,6 +108,9 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
                 _build_expr_ctxs.size() == 1));
     }
 
+    _runtime_filter_slots =
+            std::make_shared<VRuntimeFilterSlots>(_build_expr_ctxs, 
runtime_filters());
+
     return Status::OK();
 }
 
@@ -112,6 +121,34 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* 
state) {
     return Status::OK();
 }
 
+Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status 
exec_status) {
+    auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
+    Defer defer {[&]() {
+        if (_should_build_hash_table && p._shared_hashtable_controller) {
+            p._shared_hashtable_controller->signal_finish(p.node_id());
+        }
+    }};
+
+    if (!_runtime_filter_slots || _runtime_filters.empty()) {
+        return Status::OK();
+    }
+    auto* block = _shared_state->build_block.get();
+    uint64_t hash_table_size = block ? block->rows() : 0;
+    {
+        SCOPED_TIMER(_runtime_filter_init_timer);
+        RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, 
hash_table_size));
+        RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
+    }
+    if (_should_build_hash_table && hash_table_size > 1) {
+        SCOPED_TIMER(_runtime_filter_compute_timer);
+        _runtime_filter_slots->insert(block);
+    }
+
+    SCOPED_TIMER(_publish_runtime_filter_timer);
+    RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
+    return Status::OK();
+}
+
 bool HashJoinBuildSinkLocalState::build_unique() const {
     return _parent->cast<HashJoinBuildSinkOperatorX>()._build_unique;
 }
@@ -444,6 +481,7 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& 
tnode, RuntimeState* st
                 is_null_safe_equal ||
                 (_build_expr_ctxs.back()->root()->is_nullable() && 
build_stores_null));
     }
+
     return Status::OK();
 }
 
@@ -501,9 +539,9 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
 
         const bool need_local_merge =
                 
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._need_local_merge;
-        RETURN_IF_ERROR(vectorized::process_runtime_filter_build(
-                state, local_state._shared_state->build_block.get(), 
&local_state,
-                need_local_merge));
+        RETURN_IF_ERROR(local_state._runtime_filter_slots->send_filter_size(
+                state, local_state._shared_state->build_block->rows(), 
need_local_merge,
+                
(CountedFinishDependency*)(local_state._finish_dependency.get())));
         RETURN_IF_ERROR(
                 local_state.process_build_block(state, 
(*local_state._shared_state->build_block)));
         if (_shared_hashtable_controller) {
@@ -514,13 +552,10 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                     local_state._shared_state->hash_table_variants;
             _shared_hash_table_context->short_circuit_for_null_in_probe_side =
                     local_state._shared_state->_has_null_in_build_side;
-            if (local_state._runtime_filter_slots) {
-                local_state._runtime_filter_slots->copy_to_shared_context(
-                        _shared_hash_table_context);
-            }
             _shared_hash_table_context->block = 
local_state._shared_state->build_block;
             _shared_hash_table_context->build_indexes_null =
                     local_state._shared_state->build_indexes_null;
+            
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
             _shared_hashtable_controller->signal(node_id());
         }
     } else if (!local_state._should_build_hash_table) {
@@ -532,6 +567,9 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
             return _shared_hash_table_context->status;
         }
 
+        
RETURN_IF_ERROR(local_state._runtime_filter_slots->copy_from_shared_context(
+                _shared_hash_table_context));
+
         local_state.profile()->add_info_string(
                 "SharedHashTableFrom",
                 print_id(
@@ -553,36 +591,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
         local_state._shared_state->build_block = 
_shared_hash_table_context->block;
         local_state._shared_state->build_indexes_null =
                 _shared_hash_table_context->build_indexes_null;
-        const bool need_local_merge =
-                
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._need_local_merge;
-
-        if (!_shared_hash_table_context->runtime_filters.empty()) {
-            auto ret = std::visit(
-                    Overload {
-                            [&](std::monostate&) -> Status {
-                                LOG(FATAL) << "FATAL: uninited hash table";
-                                __builtin_unreachable();
-                            },
-                            [&](auto&& arg) -> Status {
-                                if (local_state._runtime_filters.empty()) {
-                                    return Status::OK();
-                                }
-                                local_state._runtime_filter_slots =
-                                        std::make_shared<VRuntimeFilterSlots>(
-                                                _build_expr_ctxs, 
local_state._runtime_filters,
-                                                need_local_merge);
-
-                                
RETURN_IF_ERROR(local_state._runtime_filter_slots->init(
-                                        state, arg.hash_table->size()));
-                                RETURN_IF_ERROR(
-                                        
local_state._runtime_filter_slots->copy_from_shared_context(
-                                                _shared_hash_table_context));
-                                
RETURN_IF_ERROR(local_state._runtime_filter_slots->publish(true));
-                                return Status::OK();
-                            }},
-                    *local_state._shared_state->hash_table_variants);
-            RETURN_IF_ERROR(ret);
-        }
     }
 
     if (eos) {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 2712edc838d..0998884c99b 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -70,6 +70,10 @@ public:
         _profile->add_info_string("HashTableFilledBuckets", info);
     }
 
+    Dependency* finishdependency() override { return _finish_dependency.get(); 
}
+
+    Status close(RuntimeState* state, Status exec_status) override;
+
 protected:
     void _hash_table_init(RuntimeState* state);
     void _set_build_ignore_flag(vectorized::Block& block, const 
std::vector<int>& res_col_ids);
@@ -84,10 +88,6 @@ protected:
     friend class PartitionedHashJoinSinkLocalState;
     template <class HashTableContext, typename Parent>
     friend struct vectorized::ProcessHashTableBuild;
-    template <typename Parent>
-    friend Status vectorized::process_runtime_filter_build(RuntimeState* state,
-                                                           vectorized::Block* 
block, Parent* parent,
-                                                           bool is_global);
 
     // build expr
     vectorized::VExprContextSPtrs _build_expr_ctxs;
@@ -108,6 +108,7 @@ protected:
      */
     bool _build_side_ignore_null = false;
     std::vector<int> _build_col_ids;
+    std::shared_ptr<Dependency> _finish_dependency;
 
     RuntimeProfile::Counter* _build_table_timer = nullptr;
     RuntimeProfile::Counter* _build_expr_call_timer = nullptr;
diff --git a/be/src/pipeline/exec/join_build_sink_operator.cpp 
b/be/src/pipeline/exec/join_build_sink_operator.cpp
index 44c34a3c057..60b0970781d 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.cpp
+++ b/be/src/pipeline/exec/join_build_sink_operator.cpp
@@ -40,7 +40,8 @@ Status JoinBuildSinkLocalState<SharedStateArg, 
Derived>::init(RuntimeState* stat
                                               "PublishRuntimeFilterTime");
     _runtime_filter_compute_timer = 
ADD_TIMER(PipelineXSinkLocalState<SharedStateArg>::profile(),
                                               "RuntimeFilterComputeTime");
-
+    _runtime_filter_init_timer =
+            ADD_TIMER(PipelineXSinkLocalState<SharedStateArg>::profile(), 
"RuntimeFilterInitTime");
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/join_build_sink_operator.h 
b/be/src/pipeline/exec/join_build_sink_operator.h
index 9dbea12e021..2a204a75a5a 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.h
+++ b/be/src/pipeline/exec/join_build_sink_operator.h
@@ -44,6 +44,7 @@ protected:
     RuntimeProfile::Counter* _build_rows_counter = nullptr;
     RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr;
     RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr;
+    RuntimeProfile::Counter* _runtime_filter_init_timer = nullptr;
     std::vector<IRuntimeFilter*> _runtime_filters;
 };
 
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 6ec6734243c..5fbc657c78a 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -317,7 +317,6 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
                 RETURN_IF_ERROR(olap_scanner->prepare(state(), _conjuncts));
                 olap_scanner->set_compound_filters(_compound_filters);
             }
-            LOG(INFO) << "parallel scanners count: " << scanners->size();
             return Status::OK();
         }
     }
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index 3b415892c93..6dccf15dbd8 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -98,6 +98,16 @@ std::string Dependency::debug_string(int indentation_level) {
     return fmt::to_string(debug_string_buffer);
 }
 
+std::string CountedFinishDependency::debug_string(int indentation_level) {
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(
+            debug_string_buffer,
+            "{}{}: id={}, block task = {}, ready={}, _always_ready={}, is 
cancelled={}, count={}",
+            std::string(indentation_level * 2, ' '), _name, _node_id, 
_blocked_task.size(), _ready,
+            _always_ready, _is_cancelled(), _counter);
+    return fmt::to_string(debug_string_buffer);
+}
+
 std::string RuntimeFilterDependency::debug_string(int indentation_level) {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "{}, runtime filter: {}",
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index a2c858e4c5c..ebd44bec4d2 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -199,7 +199,7 @@ public:
     [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override { 
return nullptr; }
 };
 
-struct FinishDependency final : public Dependency {
+struct FinishDependency : public Dependency {
 public:
     using SharedState = FakeSharedState;
     FinishDependency(int id, int node_id, std::string name, QueryContext* 
query_ctx)
@@ -208,6 +208,35 @@ public:
     [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override;
 };
 
+struct CountedFinishDependency final : public FinishDependency {
+public:
+    using SharedState = FakeSharedState;
+    CountedFinishDependency(int id, int node_id, std::string name, 
QueryContext* query_ctx)
+            : FinishDependency(id, node_id, name, query_ctx) {}
+
+    void add() {
+        std::unique_lock<std::mutex> l(_mtx);
+        if (!_counter) {
+            block();
+        }
+        _counter++;
+    }
+
+    void sub() {
+        std::unique_lock<std::mutex> l(_mtx);
+        _counter--;
+        if (!_counter) {
+            set_ready();
+        }
+    }
+
+    std::string debug_string(int indentation_level = 0) override;
+
+private:
+    std::mutex _mtx;
+    uint32_t _counter = 0;
+};
+
 class RuntimeFilterDependency;
 struct RuntimeFilterTimerQueue;
 class RuntimeFilterTimer {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7e15477a641..9fff093b3bf 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1348,78 +1348,112 @@ Status FragmentMgr::apply_filterv2(const 
PPublishFilterRequestV2* request,
     bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
     int64_t start_apply = MonotonicMillis();
 
+    std::shared_ptr<PlanFragmentExecutor> fragment_executor;
+    std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
+    QueryThreadContext query_thread_context;
+
+    RuntimeFilterMgr* runtime_filter_mgr = nullptr;
+    ObjectPool* pool = nullptr;
+
     const auto& fragment_instance_ids = request->fragment_instance_ids();
-    if (!fragment_instance_ids.empty()) {
-        UniqueId fragment_instance_id = fragment_instance_ids[0];
-        TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
-
-        std::shared_ptr<PlanFragmentExecutor> fragment_executor;
-        std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
-        QueryThreadContext query_thread_context;
-
-        RuntimeFilterMgr* runtime_filter_mgr = nullptr;
-        ObjectPool* pool = nullptr;
-        if (is_pipeline) {
-            std::unique_lock<std::mutex> lock(_lock);
-            auto iter = _pipeline_map.find(tfragment_instance_id);
-            if (iter == _pipeline_map.end()) {
-                VLOG_CRITICAL << "unknown.... fragment-id:" << 
fragment_instance_id;
-                return Status::InvalidArgument("fragment-id: {}", 
fragment_instance_id.to_string());
-            }
-            pip_context = iter->second;
+    {
+        std::unique_lock<std::mutex> lock(_lock);
+        for (UniqueId fragment_instance_id : fragment_instance_ids) {
+            TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
 
-            DCHECK(pip_context != nullptr);
-            runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
-            pool = &pip_context->get_query_ctx()->obj_pool;
-            query_thread_context = {pip_context->get_query_ctx()->query_id(),
-                                    
pip_context->get_query_ctx()->query_mem_tracker};
-        } else {
-            std::unique_lock<std::mutex> lock(_lock);
-            auto iter = _fragment_instance_map.find(tfragment_instance_id);
-            if (iter == _fragment_instance_map.end()) {
-                VLOG_CRITICAL << "unknown.... fragment instance id:"
-                              << print_id(tfragment_instance_id);
-                return Status::InvalidArgument("fragment instance id: {}",
-                                               
print_id(tfragment_instance_id));
-            }
-            fragment_executor = iter->second;
+            if (is_pipeline) {
+                auto iter = _pipeline_map.find(tfragment_instance_id);
+                if (iter == _pipeline_map.end()) {
+                    continue;
+                }
+                pip_context = iter->second;
 
-            DCHECK(fragment_executor != nullptr);
-            runtime_filter_mgr = 
fragment_executor->get_query_ctx()->runtime_filter_mgr();
-            pool = &fragment_executor->get_query_ctx()->obj_pool;
-            query_thread_context = 
{fragment_executor->get_query_ctx()->query_id(),
-                                    
fragment_executor->get_query_ctx()->query_mem_tracker};
-        }
+                DCHECK(pip_context != nullptr);
+                runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
+                pool = &pip_context->get_query_ctx()->obj_pool;
+                query_thread_context = 
{pip_context->get_query_ctx()->query_id(),
+                                        
pip_context->get_query_ctx()->query_mem_tracker};
+            } else {
+                auto iter = _fragment_instance_map.find(tfragment_instance_id);
+                if (iter == _fragment_instance_map.end()) {
+                    continue;
+                }
+                fragment_executor = iter->second;
 
-        SCOPED_ATTACH_TASK(query_thread_context);
-
-        // 1. get the target filters
-        std::vector<IRuntimeFilter*> filters;
-        
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), 
filters));
-
-        // 2. create the filter wrapper to replace or ignore the target filters
-        if (request->has_in_filter() && 
request->in_filter().has_ignored_msg()) {
-            const auto& in_filter = request->in_filter();
-
-            std::ranges::for_each(filters, [&in_filter](auto& filter) {
-                filter->set_ignored(in_filter.ignored_msg());
-                filter->signal();
-            });
-        } else if (!filters.empty()) {
-            UpdateRuntimeFilterParamsV2 params {request, attach_data, pool,
-                                                filters[0]->column_type()};
-            RuntimePredicateWrapper* filter_wrapper = nullptr;
-            RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, 
&filter_wrapper));
-
-            std::ranges::for_each(filters, [&](auto& filter) {
-                filter->update_filter(filter_wrapper, request->merge_time(), 
start_apply);
-            });
+                DCHECK(fragment_executor != nullptr);
+                runtime_filter_mgr = 
fragment_executor->get_query_ctx()->runtime_filter_mgr();
+                pool = &fragment_executor->get_query_ctx()->obj_pool;
+                query_thread_context = 
{fragment_executor->get_query_ctx()->query_id(),
+                                        
fragment_executor->get_query_ctx()->query_mem_tracker};
+            }
+            break;
         }
     }
 
+    if (runtime_filter_mgr == nullptr) {
+        // all instance finished
+        return Status::OK();
+    }
+
+    SCOPED_ATTACH_TASK(query_thread_context);
+    // 1. get the target filters
+    std::vector<IRuntimeFilter*> filters;
+    
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), 
filters));
+
+    // 2. create the filter wrapper to replace or ignore the target filters
+    if (!filters.empty()) {
+        UpdateRuntimeFilterParamsV2 params {request, attach_data, pool, 
filters[0]->column_type()};
+        RuntimePredicateWrapper* filter_wrapper = nullptr;
+        RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, 
&filter_wrapper));
+
+        std::ranges::for_each(filters, [&](auto& filter) {
+            filter->update_filter(filter_wrapper, request->merge_time(), 
start_apply);
+        });
+    }
+
     return Status::OK();
 }
 
+Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
+    UniqueId queryid = request->query_id();
+    std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
+    RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, 
&filter_controller));
+
+    std::shared_ptr<QueryContext> query_ctx;
+    {
+        TUniqueId query_id;
+        query_id.__set_hi(queryid.hi);
+        query_id.__set_lo(queryid.lo);
+        std::lock_guard<std::mutex> lock(_lock);
+        auto iter = _query_ctx_map.find(query_id);
+        if (iter == _query_ctx_map.end()) {
+            return Status::InvalidArgument("query-id: {}", 
queryid.to_string());
+        }
+
+        query_ctx = iter->second;
+    }
+    auto merge_status = filter_controller->send_filter_size(request);
+    return merge_status;
+}
+
+Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
+    UniqueId queryid = request->query_id();
+    std::shared_ptr<QueryContext> query_ctx;
+    {
+        TUniqueId query_id;
+        query_id.__set_hi(queryid.hi);
+        query_id.__set_lo(queryid.lo);
+        std::lock_guard<std::mutex> lock(_lock);
+        auto iter = _query_ctx_map.find(query_id);
+        if (iter == _query_ctx_map.end()) {
+            return Status::InvalidArgument("query-id: {}", 
queryid.to_string());
+        }
+
+        query_ctx = iter->second;
+    }
+    return query_ctx->runtime_filter_mgr()->sync_filter_size(request);
+}
+
 Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
                                  butil::IOBufAsZeroCopyInputStream* 
attach_data) {
     UniqueId queryid = request->query_id();
@@ -1444,7 +1478,6 @@ Status FragmentMgr::merge_filter(const 
PMergeFilterRequest* request,
     }
     SCOPED_ATTACH_TASK_WITH_ID(query_ctx->query_mem_tracker, 
query_ctx->query_id());
     auto merge_status = filter_controller->merge(request, attach_data, 
opt_remote_rf);
-    DCHECK(merge_status.ok());
     return merge_status;
 }
 
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 76e9a50eb58..16da4826165 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -128,6 +128,10 @@ public:
     Status merge_filter(const PMergeFilterRequest* request,
                         butil::IOBufAsZeroCopyInputStream* attach_data);
 
+    Status send_filter_size(const PSendFilterSizeRequest* request);
+
+    Status sync_filter_size(const PSyncFilterSizeRequest* request);
+
     std::string to_http_path(const std::string& file_name);
 
     void coordinator_callback(const ReportStatusRequest& req);
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 42fbfb418fc..f0407e42ce2 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -40,6 +40,7 @@
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "util/brpc_client_cache.h"
+#include "util/ref_count_closure.h"
 
 namespace doris {
 
@@ -141,6 +142,7 @@ Status 
RuntimeFilterMgr::register_local_merge_producer_filter(
             iter->second.filters.emplace_back(merge_filter);
         }
         iter->second.merge_time++;
+        iter->second.merge_size_times++;
         iter->second.filters.emplace_back(*producer_filter);
     }
     return Status::OK();
@@ -151,7 +153,8 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters(
     std::lock_guard<std::mutex> l(_lock);
     auto iter = _local_merge_producer_map.find(filter_id);
     if (iter == _local_merge_producer_map.end()) {
-        return Status::InvalidArgument("unknown filter: {}, role: CONSUMER.", 
filter_id);
+        return Status::InvalidArgument("unknown filter: {}, role: 
LOCAL_MERGE_PRODUCER.",
+                                       filter_id);
     }
     *local_merge_filters = &iter->second;
     DCHECK(!iter->second.filters.empty());
@@ -241,7 +244,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
     auto filter_id = runtime_filter_desc->filter_id;
     
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, 
query_options,
                                                     -1, false));
-    _filter_map.emplace(filter_id, CntlValwithLock {cnt_val, 
std::make_unique<std::mutex>()});
+    _filter_map.emplace(filter_id, cnt_val);
     return Status::OK();
 }
 
@@ -262,7 +265,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
     
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, 
query_options));
 
     std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
-    _filter_map.emplace(filter_id, CntlValwithLock {cnt_val, 
std::make_unique<std::mutex>()});
+    _filter_map.emplace(filter_id, cnt_val);
     return Status::OK();
 }
 
@@ -311,6 +314,67 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId 
query_id,
     return Status::OK();
 }
 
+Status RuntimeFilterMergeControllerEntity::send_filter_size(const 
PSendFilterSizeRequest* request) {
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+    std::shared_ptr<RuntimeFilterCntlVal> cnt_val;
+
+    auto filter_id = request->filter_id();
+    std::map<int, CntlValwithLock>::iterator iter;
+    {
+        std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
+        iter = _filter_map.find(filter_id);
+        if (iter == _filter_map.end()) {
+            return Status::InvalidArgument("unknown filter id {}",
+                                           
std::to_string(request->filter_id()));
+        }
+    }
+    cnt_val = iter->second.cnt_val;
+    std::unique_lock<std::mutex> l(*iter->second.mutex);
+    cnt_val->global_size += request->filter_size();
+    cnt_val->source_addrs.push_back(request->source_addr());
+
+    if (cnt_val->source_addrs.size() == cnt_val->producer_size) {
+        for (auto addr : cnt_val->source_addrs) {
+            std::shared_ptr<PBackendService_Stub> stub(
+                    
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr));
+            AsyncRPCContext<PSyncFilterSizeRequest, PSyncFilterSizeResponse> 
ctx;
+            auto* pquery_id = ctx.request.mutable_query_id();
+            pquery_id->set_hi(_state->query_id.hi());
+            pquery_id->set_lo(_state->query_id.lo());
+
+            ctx.request.set_filter_id(filter_id);
+            ctx.request.set_filter_size(cnt_val->global_size);
+
+            stub->sync_filter_size(&ctx.cntl, &ctx.request, &ctx.response, 
brpc::DoNothing());
+            brpc::Join(ctx.cntl.call_id());
+            if (auto status = Status::create(ctx.response.status()); !status) {
+                return status;
+            }
+            if (ctx.cntl.Failed()) {
+                
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(ctx.cntl.remote_side());
+                return Status::InternalError(ctx.cntl.ErrorText());
+            }
+        }
+    }
+    return Status::OK();
+}
+
+Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* 
request) {
+    auto* filter = try_get_product_filter(request->filter_id());
+    if (filter) {
+        filter->set_synced_size(request->filter_size());
+        return Status::OK();
+    }
+
+    LocalMergeFilters* local_merge_filters = nullptr;
+    RETURN_IF_ERROR(get_local_merge_producer_filters(request->filter_id(), 
&local_merge_filters));
+    // first filter size merged filter
+    for (size_t i = 1; i < local_merge_filters->filters.size(); i++) {
+        
local_merge_filters->filters[i]->set_synced_size(request->filter_size());
+    }
+    return Status::OK();
+}
+
 // merge data
 Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* 
request,
                                                  
butil::IOBufAsZeroCopyInputStream* attach_data,
@@ -331,9 +395,9 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
                                            
std::to_string(request->filter_id()));
         }
     }
-    cnt_val = iter->second.first;
+    cnt_val = iter->second.cnt_val;
     {
-        std::lock_guard<std::mutex> l(*iter->second.second);
+        std::lock_guard<std::mutex> l(*iter->second.mutex);
         // Skip the other broadcast join runtime filter
         if (cnt_val->arrive_id.size() == 1 && 
cnt_val->runtime_filter_desc.is_broadcast_join) {
             return Status::OK();
@@ -372,7 +436,13 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
             void* data = nullptr;
             int len = 0;
             bool has_attachment = false;
-            RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, 
&len));
+            if (!cnt_val->filter->get_ignored()) {
+                RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, 
&data, &len));
+            } else {
+                apply_request.set_ignored(true);
+                apply_request.set_filter_type(PFilterType::UNKNOW_FILTER);
+            }
+
             if (data != nullptr && len > 0) {
                 request_attachment.append(data, len);
                 has_attachment = true;
@@ -392,7 +462,6 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
                     
rpc_contexts[cur]->cntl.request_attachment().append(request_attachment);
                 }
                 rpc_contexts[cur]->cid = rpc_contexts[cur]->cntl.call_id();
-
                 // set fragment-id
                 for (size_t fid = 0; fid < 
targets[cur].target_fragment_instance_ids.size();
                      fid++) {
@@ -417,6 +486,9 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
             }
             for (auto& rpc_context : rpc_contexts) {
                 brpc::Join(rpc_context->cid);
+                if (auto status = 
Status::create(rpc_context->response.status()); !status) {
+                    return status;
+                }
                 if (rpc_context->cntl.Failed()) {
                     LOG(WARNING) << "runtimefilter rpc err:" << 
rpc_context->cntl.ErrorText();
                     
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
@@ -437,7 +509,13 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
             void* data = nullptr;
             int len = 0;
             bool has_attachment = false;
-            RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, 
&len));
+            if (!cnt_val->filter->get_ignored()) {
+                RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, 
&data, &len));
+            } else {
+                apply_request.set_ignored(true);
+                apply_request.set_filter_type(PFilterType::UNKNOW_FILTER);
+            }
+
             if (data != nullptr && len > 0) {
                 request_attachment.append(data, len);
                 has_attachment = true;
@@ -457,7 +535,6 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
                     
rpc_contexts[cur]->cntl.request_attachment().append(request_attachment);
                 }
                 rpc_contexts[cur]->cid = rpc_contexts[cur]->cntl.call_id();
-
                 // set fragment_instance_id
                 auto request_fragment_instance_id =
                         
rpc_contexts[cur]->request.mutable_fragment_instance_id();
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index bd543e782a2..dbf68460606 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -20,8 +20,10 @@
 #include <gen_cpp/PaloInternalService_types.h>
 #include <gen_cpp/PlanNodes_types.h>
 #include <gen_cpp/Types_types.h>
+#include <gen_cpp/internal_service.pb.h>
 #include <stdint.h>
 
+#include <condition_variable>
 #include <functional>
 #include <map>
 #include <memory>
@@ -30,6 +32,7 @@
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
+#include <utility>
 #include <vector>
 
 #include "common/object_pool.h"
@@ -57,6 +60,8 @@ class ExecEnv;
 struct LocalMergeFilters {
     std::unique_ptr<std::mutex> lock = std::make_unique<std::mutex>();
     int merge_time = 0;
+    int merge_size_times = 0;
+    uint64_t local_merged_size = 0;
     std::vector<IRuntimeFilter*> filters;
 };
 
@@ -81,6 +86,15 @@ public:
 
     Status get_consume_filters(const int filter_id, 
std::vector<IRuntimeFilter*>& consumer_filters);
 
+    IRuntimeFilter* try_get_product_filter(const int filter_id) {
+        std::lock_guard<std::mutex> l(_lock);
+        auto iter = _producer_map.find(filter_id);
+        if (iter == _producer_map.end()) {
+            return nullptr;
+        }
+        return iter->second;
+    }
+
     // register filter
     Status register_consumer_filter(const TRuntimeFilterDesc& desc, const 
TQueryOptions& options,
                                     int node_id, IRuntimeFilter** 
consumer_filter,
@@ -105,6 +119,8 @@ public:
 
     Status get_merge_addr(TNetworkAddress* addr);
 
+    Status sync_filter_size(const PSyncFilterSizeRequest* request);
+
 private:
     struct ConsumerFilterHolder {
         int node_id;
@@ -146,16 +162,20 @@ public:
     Status merge(const PMergeFilterRequest* request, 
butil::IOBufAsZeroCopyInputStream* attach_data,
                  bool opt_remote_rf);
 
+    Status send_filter_size(const PSendFilterSizeRequest* request);
+
     UniqueId query_id() const { return _query_id; }
 
     struct RuntimeFilterCntlVal {
         int64_t merge_time;
         int producer_size;
+        uint64_t global_size;
         TRuntimeFilterDesc runtime_filter_desc;
         std::vector<doris::TRuntimeFilterTargetParams> target_info;
         std::vector<doris::TRuntimeFilterTargetParamsV2> targetv2_info;
         IRuntimeFilter* filter = nullptr;
-        std::unordered_set<UniqueId> arrive_id; // fragment_instance_id ?
+        std::unordered_set<UniqueId> arrive_id;
+        std::vector<PNetworkAddress> source_addrs;
         std::shared_ptr<ObjectPool> pool;
     };
 
@@ -174,8 +194,14 @@ private:
     // protect _filter_map
     std::shared_mutex _filter_map_mutex;
     std::shared_ptr<MemTracker> _mem_tracker;
-    using CntlValwithLock =
-            std::pair<std::shared_ptr<RuntimeFilterCntlVal>, 
std::unique_ptr<std::mutex>>;
+
+    struct CntlValwithLock {
+        std::shared_ptr<RuntimeFilterCntlVal> cnt_val;
+        std::unique_ptr<std::mutex> mutex;
+        CntlValwithLock(std::shared_ptr<RuntimeFilterCntlVal> input_cnt_val)
+                : cnt_val(std::move(input_cnt_val)), 
mutex(std::make_unique<std::mutex>()) {}
+    };
+
     std::map<int, CntlValwithLock> _filter_map;
     RuntimeFilterParamsContext* _state = nullptr;
 };
diff --git a/be/src/service/backend_options.cpp 
b/be/src/service/backend_options.cpp
index a8c48fd710e..19167d7d9b1 100644
--- a/be/src/service/backend_options.cpp
+++ b/be/src/service/backend_options.cpp
@@ -71,6 +71,7 @@ TBackend BackendOptions::get_local_backend() {
     _backend.__set_host(_s_localhost);
     _backend.__set_be_port(config::be_port);
     _backend.__set_http_port(config::webserver_port);
+    _backend.__set_brpc_port(config::brpc_port);
     return _backend;
 }
 
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 20ab10022d1..ef0521b5219 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1195,9 +1195,36 @@ void 
PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* contr
         auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
         butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
         Status st = _exec_env->fragment_mgr()->merge_filter(request, 
&zero_copy_input_stream);
-        if (!st.ok()) {
-            LOG(WARNING) << "merge meet error" << st.to_string();
-        }
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+        return;
+    }
+}
+
+void PInternalService::send_filter_size(::google::protobuf::RpcController* 
controller,
+                                        const ::doris::PSendFilterSizeRequest* 
request,
+                                        ::doris::PSendFilterSizeResponse* 
response,
+                                        ::google::protobuf::Closure* done) {
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        Status st = _exec_env->fragment_mgr()->send_filter_size(request);
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+        return;
+    }
+}
+
+void PInternalService::sync_filter_size(::google::protobuf::RpcController* 
controller,
+                                        const ::doris::PSyncFilterSizeRequest* 
request,
+                                        ::doris::PSyncFilterSizeResponse* 
response,
+                                        ::google::protobuf::Closure* done) {
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        Status st = _exec_env->fragment_mgr()->sync_filter_size(request);
         st.to_protobuf(response->mutable_status());
     });
     if (!ret) {
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index 4fffdc33875..b8b12ecbcb8 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -130,6 +130,16 @@ public:
                       ::doris::PMergeFilterResponse* response,
                       ::google::protobuf::Closure* done) override;
 
+    void send_filter_size(::google::protobuf::RpcController* controller,
+                          const ::doris::PSendFilterSizeRequest* request,
+                          ::doris::PSendFilterSizeResponse* response,
+                          ::google::protobuf::Closure* done) override;
+
+    void sync_filter_size(::google::protobuf::RpcController* controller,
+                          const ::doris::PSyncFilterSizeRequest* request,
+                          ::doris::PSyncFilterSizeResponse* response,
+                          ::google::protobuf::Closure* done) override;
+
     void apply_filter(::google::protobuf::RpcController* controller,
                       const ::doris::PPublishFilterRequest* request,
                       ::doris::PPublishFilterResponse* response,
diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h
index 7b313d6ae02..2e1d0508ba3 100644
--- a/be/src/util/brpc_client_cache.h
+++ b/be/src/util/brpc_client_cache.h
@@ -77,6 +77,10 @@ public:
     }
 #endif
 
+    std::shared_ptr<T> get_client(const PNetworkAddress& paddr) {
+        return get_client(paddr.hostname(), paddr.port());
+    }
+
     std::shared_ptr<T> get_client(const std::string& host, int port) {
         std::string realhost;
         realhost = host;
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 7f820eafea5..a95236b3664 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -758,8 +758,10 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
         DCHECK(!_build_side_mutable_block.empty());
         _build_block = 
std::make_shared<Block>(_build_side_mutable_block.to_block());
         COUNTER_UPDATE(_build_blocks_memory_usage, _build_block->bytes());
-        RETURN_IF_ERROR(process_runtime_filter_build(state, 
_build_block.get(), this));
+        _runtime_filter_slots =
+                std::make_shared<VRuntimeFilterSlots>(_build_expr_ctxs, 
runtime_filters());
         RETURN_IF_ERROR(_process_build_block(state, *_build_block));
+        RETURN_IF_ERROR(process_runtime_filter_build(state, 
_build_block.get(), this));
         if (_shared_hashtable_controller) {
             _shared_hash_table_context->status = Status::OK();
             // arena will be shared with other instances.
@@ -811,8 +813,6 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
                                   _runtime_filter_slots = 
std::make_shared<VRuntimeFilterSlots>(
                                           _build_expr_ctxs, _runtime_filters);
 
-                                  RETURN_IF_ERROR(_runtime_filter_slots->init(
-                                          state, arg.hash_table->size()));
                                   
RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context(
                                           _shared_hash_table_context));
                                   
RETURN_IF_ERROR(_runtime_filter_slots->publish(true));
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index 1559a5dfa5d..20245be06c2 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -78,12 +78,14 @@ Status process_runtime_filter_build(RuntimeState* state, 
Block* block, Parent* p
     if (parent->runtime_filters().empty()) {
         return Status::OK();
     }
-    parent->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
-            parent->_build_expr_ctxs, parent->runtime_filters(), is_global);
-
-    RETURN_IF_ERROR(parent->_runtime_filter_slots->init(state, block->rows()));
+    uint64_t rows = block->rows();
+    {
+        SCOPED_TIMER(parent->_runtime_filter_init_timer);
+        RETURN_IF_ERROR(parent->_runtime_filter_slots->init_filters(state, 
rows));
+        RETURN_IF_ERROR(parent->_runtime_filter_slots->ignore_filters(state));
+    }
 
-    if (!parent->_runtime_filter_slots->empty() && block->rows() > 1) {
+    if (!parent->_runtime_filter_slots->empty() && rows > 1) {
         SCOPED_TIMER(parent->_runtime_filter_compute_timer);
         parent->_runtime_filter_slots->insert(block);
     }
diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp 
b/be/src/vec/exec/join/vjoin_node_base.cpp
index a9e25e7626b..e7b7a6b96b9 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -144,6 +144,7 @@ Status VJoinNodeBase::prepare(RuntimeState* state) {
 
     _publish_runtime_filter_timer = ADD_TIMER(runtime_profile(), 
"PublishRuntimeFilterTime");
     _runtime_filter_compute_timer = ADD_TIMER(runtime_profile(), 
"RunmtimeFilterComputeTime");
+    _runtime_filter_init_timer = ADD_TIMER(runtime_profile(), 
"RunmtimeFilterInitTime");
 
     return Status::OK();
 }
diff --git a/be/src/vec/exec/join/vjoin_node_base.h 
b/be/src/vec/exec/join/vjoin_node_base.h
index 63a8bb20ba6..099b4d37e56 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -152,6 +152,7 @@ protected:
     RuntimeProfile::Counter* _probe_rows_counter = nullptr;
     RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr;
     RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr;
+    RuntimeProfile::Counter* _runtime_filter_init_timer = nullptr;
     RuntimeProfile::Counter* _join_filter_timer = nullptr;
     RuntimeProfile::Counter* _build_output_block_timer = nullptr;
 
diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp 
b/be/src/vec/runtime/shared_hash_table_controller.cpp
index 53e24df183a..4b1203d4822 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.cpp
+++ b/be/src/vec/runtime/shared_hash_table_controller.cpp
@@ -35,6 +35,7 @@ void 
SharedHashTableController::set_builder_and_consumers(TUniqueId builder, int
     DCHECK(_builder_fragment_ids.find(node_id) == 
_builder_fragment_ids.cend());
     _builder_fragment_ids.insert({node_id, builder});
     _dependencies.insert({node_id, {}});
+    _finish_dependencies.insert({node_id, {}});
 }
 
 bool SharedHashTableController::should_build_hash_table(const TUniqueId& 
fragment_instance_id,
@@ -57,7 +58,7 @@ bool SharedHashTableController::should_build_hash_table(const 
TUniqueId& fragmen
 
 SharedHashTableContextPtr SharedHashTableController::get_context(int 
my_node_id) {
     std::lock_guard<std::mutex> lock(_mutex);
-    if (!_shared_contexts.count(my_node_id)) {
+    if (!_shared_contexts.contains(my_node_id)) {
         _shared_contexts.insert({my_node_id, 
std::make_shared<SharedHashTableContext>()});
     }
     return _shared_contexts[my_node_id];
@@ -90,6 +91,14 @@ void SharedHashTableController::signal(int my_node_id) {
     _cv.notify_all();
 }
 
+void SharedHashTableController::signal_finish(int my_node_id) {
+    std::lock_guard<std::mutex> lock(_mutex);
+    for (auto& dep : _finish_dependencies[my_node_id]) {
+        dep->set_ready();
+    }
+    _cv.notify_all();
+}
+
 TUniqueId SharedHashTableController::get_builder_fragment_instance_id(int 
my_node_id) {
     std::lock_guard<std::mutex> lock(_mutex);
     auto it = _builder_fragment_ids.find(my_node_id);
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index 1f48ffa3439..8fe46b97b85 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -40,17 +40,20 @@ namespace pipeline {
 class Dependency;
 }
 
-namespace vectorized {
-
-class Arena;
-
-struct SharedRuntimeFilterContext {
+struct RuntimeFilterContext {
     std::shared_ptr<MinMaxFuncBase> minmax_func;
     std::shared_ptr<HybridSetBase> hybrid_set;
     std::shared_ptr<BloomFilterFuncBase> bloom_filter_func;
     std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func;
+    bool ignored = false;
 };
 
+using SharedRuntimeFilterContext = std::shared_ptr<RuntimeFilterContext>;
+
+namespace vectorized {
+
+class Arena;
+
 struct SharedHashTableContext {
     SharedHashTableContext()
             : hash_table_variants(nullptr), 
block(std::make_shared<vectorized::Block>()) {}
@@ -74,13 +77,16 @@ public:
     TUniqueId get_builder_fragment_instance_id(int my_node_id);
     SharedHashTableContextPtr get_context(int my_node_id);
     void signal(int my_node_id);
+    void signal_finish(int my_node_id);
     void signal(int my_node_id, Status status);
     Status wait_for_signal(RuntimeState* state, const 
SharedHashTableContextPtr& context);
     bool should_build_hash_table(const TUniqueId& fragment_instance_id, int 
my_node_id);
     void set_pipeline_engine_enabled(bool enabled) { _pipeline_engine_enabled 
= enabled; }
-    void append_dependency(int node_id, std::shared_ptr<pipeline::Dependency> 
dep) {
+    void append_dependency(int node_id, std::shared_ptr<pipeline::Dependency> 
dep,
+                           std::shared_ptr<pipeline::Dependency> finish_dep) {
         std::lock_guard<std::mutex> lock(_mutex);
         _dependencies[node_id].push_back(dep);
+        _finish_dependencies[node_id].push_back(finish_dep);
     }
 
 private:
@@ -88,6 +94,8 @@ private:
     std::mutex _mutex;
     // For pipelineX, we update all dependencies once hash table is built;
     std::map<int /*node id*/, 
std::vector<std::shared_ptr<pipeline::Dependency>>> _dependencies;
+    std::map<int /*node id*/, 
std::vector<std::shared_ptr<pipeline::Dependency>>>
+            _finish_dependencies;
     std::condition_variable _cv;
     std::map<int /*node id*/, TUniqueId /*fragment instance id*/> 
_builder_fragment_ids;
     std::map<int /*node id*/, SharedHashTableContextPtr> _shared_contexts;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index a049079ccd4..7decc5c2baa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -239,6 +239,10 @@ public final class RuntimeFilter {
                 tFilter.setNullAware(false);
             }
         }
+        tFilter.setSyncFilterSize(
+                ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable().getEnablePipelineXEngine()
+                        && 
ConnectContext.get().getSessionVariable().getEnablePipelineEngine()
+                        && 
ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize());
         return tFilter;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 72df816bf62..d1ff6550b39 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -185,6 +185,8 @@ public class SessionVariable implements Serializable, 
Writable {
     // if the right table is greater than this value in the hash join,  we 
will ignore IN filter
     public static final String RUNTIME_FILTER_MAX_IN_NUM = 
"runtime_filter_max_in_num";
 
+    public static final String ENABLE_SYNC_RUNTIME_FILTER_SIZE = 
"enable_sync_runtime_filter_size";
+
     public static final String BE_NUMBER_FOR_TEST = "be_number_for_test";
 
     // max ms to wait transaction publish finish when exec insert stmt.
@@ -984,6 +986,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = RUNTIME_FILTER_MAX_IN_NUM, needForward = true)
     private int runtimeFilterMaxInNum = 1024;
 
+    @VariableMgr.VarAttr(name = ENABLE_SYNC_RUNTIME_FILTER_SIZE, needForward = 
true)
+    private boolean enableSyncRuntimeFilterSize = true;
+
     @VariableMgr.VarAttr(name = USE_RF_DEFAULT)
     public boolean useRuntimeFilterDefaultSize = false;
 
@@ -3433,6 +3438,10 @@ public class SessionVariable implements Serializable, 
Writable {
         return enablePipelineXEngine;
     }
 
+    public boolean enableSyncRuntimeFilterSize() {
+        return enableSyncRuntimeFilterSize;
+    }
+
     public static boolean enablePipelineEngine() {
         ConnectContext connectContext = ConnectContext.get();
         if (connectContext == null) {
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 0230180796a..bd15c8d2017 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -509,6 +509,27 @@ enum PFilterType {
     MAX_FILTER = 6;
 };
 
+message PSendFilterSizeRequest {
+    required int32 filter_id = 1;
+    required PUniqueId query_id = 2;
+    required PNetworkAddress source_addr = 3;
+    required uint64 filter_size = 4;
+};
+
+message PSendFilterSizeResponse {
+    required PStatus status = 1;
+};
+
+message PSyncFilterSizeRequest {
+    required int32 filter_id = 1;
+    required PUniqueId query_id = 2;
+    required uint64 filter_size = 3;
+};
+
+message PSyncFilterSizeResponse {
+    required PStatus status = 1;
+};
+
 message PMergeFilterRequest {
     required int32 filter_id = 1;
     required PUniqueId query_id = 2;
@@ -521,6 +542,7 @@ message PMergeFilterRequest {
     optional bool opt_remote_rf = 9;
     optional PColumnType column_type = 10;
     optional bool contain_null = 11;
+    optional bool ignored = 12;
 };
 
 message PMergeFilterResponse {
@@ -540,6 +562,7 @@ message PPublishFilterRequest {
     optional int64 merge_time = 9;
     optional PColumnType column_type = 10;
     optional bool contain_null = 11;
+    optional bool ignored = 12;
 };
 
 message PPublishFilterRequestV2 {
@@ -553,6 +576,7 @@ message PPublishFilterRequestV2 {
     optional bool is_pipeline = 8;
     optional int64 merge_time = 9;
     optional bool contain_null = 10;
+    optional bool ignored = 11;
 };
 
 message PPublishFilterResponse {
@@ -874,6 +898,8 @@ service PBackendService {
     rpc commit(PCommitRequest) returns (PCommitResult);
     rpc rollback(PRollbackRequest) returns (PRollbackResult);
     rpc merge_filter(PMergeFilterRequest) returns (PMergeFilterResponse);
+    rpc send_filter_size(PSendFilterSizeRequest) returns 
(PSendFilterSizeResponse);
+    rpc sync_filter_size(PSyncFilterSizeRequest) returns 
(PSyncFilterSizeResponse);
     rpc apply_filter(PPublishFilterRequest) returns (PPublishFilterResponse);
     rpc apply_filterv2(PPublishFilterRequestV2) returns 
(PPublishFilterResponse);
     rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult);
diff --git a/gensrc/proto/types.proto b/gensrc/proto/types.proto
index b7a4b6ee53a..41bb3373f23 100644
--- a/gensrc/proto/types.proto
+++ b/gensrc/proto/types.proto
@@ -234,3 +234,8 @@ enum PPlanFragmentCancelReason {
     CALL_RPC_ERROR = 5;
     MEMORY_LIMIT_EXCEED = 6;
 }
+
+message PNetworkAddress {
+    required string hostname = 1;
+    required int32 port = 2;
+}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 80042fb0c57..5f6457682e5 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1197,6 +1197,8 @@ struct TRuntimeFilterDesc {
  
   // true, if join type is null aware like <=>. rf should dispose the case
   15: optional bool null_aware;
+
+  16: optional bool sync_filter_size;
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to