This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e40eebfa79d4fa4c433cc2e1574aa243b443a03d
Author: Pxl <pxl...@qq.com>
AuthorDate: Thu Apr 25 17:30:22 2024 +0800

    [Bug](runtime-filter) fix bloom filter size error on rf merge (#34082)
    
    fix bloom filter size error on rf merge
    
    W20240424 11:28:56.826277 3494287 ref_count_closure.h:80] RPC meet error 
status: [INVALID_ARGUMENT]PStatus: (172.21.0.15)[INVALID_ARGUMENT]bloom filter 
size not the same: already allocated bytes 65536, expected allocated bytes 32768
---
 be/src/exprs/bloom_filter_func.h             |  2 +-
 be/src/exprs/runtime_filter.cpp              | 88 +++++++++++++++-------------
 be/src/pipeline/exec/hashjoin_build_sink.cpp |  4 +-
 3 files changed, 51 insertions(+), 43 deletions(-)

diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index 10d30212ff8..af9337f61ba 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -167,7 +167,7 @@ public:
         DCHECK(bloomfilter_func != nullptr);
         auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
         if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) {
-            return Status::InvalidArgument(
+            return Status::InternalError(
                     "bloom filter size not the same: already allocated bytes 
{}, expected "
                     "allocated bytes {}",
                     _bloom_filter_alloced, other_func->_bloom_filter_alloced);
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 50105d56068..1cbf86c95c4 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -246,7 +246,7 @@ Status create_vbin_predicate(const TypeDescriptor& type, 
TExprOpcode::type opcod
         fn_name.__set_function_name("ge");
         break;
     default:
-        return Status::InvalidArgument(
+        return Status::InternalError(
                 strings::Substitute("Invalid opcode for max_min_runtimefilter: 
'$0'", opcode));
     }
     fn.__set_name(fn_name);
@@ -282,17 +282,16 @@ class RuntimePredicateWrapper {
 public:
     RuntimePredicateWrapper(ObjectPool* pool, const RuntimeFilterParams* 
params)
             : RuntimePredicateWrapper(pool, params->column_return_type, 
params->filter_type,
-                                      params->filter_id, 
params->build_bf_exactly) {};
+                                      params->filter_id) {};
     // for a 'tmp' runtime predicate wrapper
     // only could called assign method or as a param for merge
     RuntimePredicateWrapper(ObjectPool* pool, PrimitiveType column_type, 
RuntimeFilterType type,
-                            uint32_t filter_id, bool build_bf_exactly = false)
+                            uint32_t filter_id)
             : _pool(pool),
               _column_return_type(column_type),
               _filter_type(type),
               _context(new RuntimeFilterContext()),
-              _filter_id(filter_id),
-              _build_bf_exactly(build_bf_exactly) {}
+              _filter_id(filter_id) {}
 
     // init runtime filter wrapper
     // alloc memory to init runtime filter function
@@ -333,24 +332,26 @@ public:
             return Status::OK();
         }
         default:
-            return Status::InvalidArgument("Unknown Filter type");
+            return Status::InternalError("Unknown Filter type");
         }
         return Status::OK();
     }
 
-    Status change_to_bloom_filter(bool need_init_bf = false) {
-        CHECK(_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER)
-                << "Can not change to bloom filter because of runtime filter 
type is "
-                << IRuntimeFilter::to_string(_filter_type);
+    Status change_to_bloom_filter() {
+        if (_filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+            return Status::InternalError(
+                    "Can not change to bloom filter because of runtime filter 
type is {}",
+                    IRuntimeFilter::to_string(_filter_type));
+        }
         BloomFilterFuncBase* bf = _context->bloom_filter_func.get();
-        if (need_init_bf) {
-            // BloomFilter may be not init
-            RETURN_IF_ERROR(bf->init_with_fixed_length());
+
+        if (bf != nullptr) {
             insert_to_bloom_filter(bf);
-        } else {
-            DCHECK(_context->hybrid_set == nullptr || 
_context->hybrid_set->size() == 0)
-                    << "set size: " << (_context->hybrid_set ? 
_context->hybrid_set->size() : 0);
+        } else if (_context->hybrid_set != nullptr && 
_context->hybrid_set->size() != 0) {
+            return Status::InternalError("change to bloom filter need empty 
set ",
+                                         
IRuntimeFilter::to_string(_filter_type));
         }
+
         // release in filter
         _context->hybrid_set.reset();
         return Status::OK();
@@ -514,26 +515,27 @@ public:
             }
 
             if (real_filter_type == RuntimeFilterType::IN_FILTER) {
-                if (other_filter_type == RuntimeFilterType::IN_FILTER) { // in 
merge in
+                // when we meet base rf is in-filter, threre only have two 
case:
+                // case1: all input-filter's build_bf_exactly is true, inited 
by synced global size
+                // case2: all input-filter's build_bf_exactly is false, inited 
by default size
+                if (other_filter_type == RuntimeFilterType::IN_FILTER) {
                     
_context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
                     if (_max_in_num >= 0 && _context->hybrid_set->size() >= 
_max_in_num) {
-                        VLOG_DEBUG << " change runtime filter to bloom 
filter(id=" << _filter_id
-                                   << ") because: in_num(" << 
_context->hybrid_set->size()
-                                   << ") >= max_in_num(" << _max_in_num << ")";
-                        RETURN_IF_ERROR(change_to_bloom_filter(true));
+                        // case2: use default size to init bf
+                        
RETURN_IF_ERROR(_context->bloom_filter_func->init_with_fixed_length());
+                        RETURN_IF_ERROR(change_to_bloom_filter());
                     }
                 } else {
-                    VLOG_DEBUG << " change runtime filter to bloom filter(id=" 
<< _filter_id
-                               << ") because: already exist a bloom filter";
-                    
RETURN_IF_ERROR(change_to_bloom_filter(!_build_bf_exactly));
-                    RETURN_IF_ERROR(_context->bloom_filter_func->merge(
-                            wrapper->_context->bloom_filter_func.get()));
+                    // case1&case2: use input bf directly and insert hybrid 
set data into bf
+                    _context->bloom_filter_func = 
wrapper->_context->bloom_filter_func;
+                    RETURN_IF_ERROR(change_to_bloom_filter());
                 }
             } else {
-                if (other_filter_type == RuntimeFilterType::IN_FILTER) { // 
bloom filter merge in
+                if (other_filter_type == RuntimeFilterType::IN_FILTER) {
+                    // case2: insert data to global filter
                     
wrapper->insert_to_bloom_filter(_context->bloom_filter_func.get());
-                    // bloom filter merge bloom filter
                 } else {
+                    // case1&case2: all input bf must has same size
                     RETURN_IF_ERROR(_context->bloom_filter_func->merge(
                             wrapper->_context->bloom_filter_func.get()));
                 }
@@ -716,9 +718,8 @@ public:
             break;
         }
         default: {
-            DCHECK(false) << "unknown type: " << type_to_string(type);
-            return Status::InvalidArgument("not support assign to in filter, 
type: " +
-                                           type_to_string(type));
+            return Status::InternalError("not support assign to in filter, 
type: " +
+                                         type_to_string(type));
         }
         }
         return Status::OK();
@@ -870,10 +871,9 @@ public:
             return _context->minmax_func->assign(&min_val, &max_val);
         }
         default:
-            DCHECK(false) << "unknown type";
             break;
         }
-        return Status::InvalidArgument("not support!");
+        return Status::InternalError("not support!");
     }
 
     HybridSetBase::IteratorBase* get_in_filter_iterator() { return 
_context->hybrid_set->begin(); }
@@ -950,7 +950,6 @@ private:
 
     SharedRuntimeFilterContext _context;
     uint32_t _filter_id;
-    bool _build_bf_exactly;
 };
 
 Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* 
pool,
@@ -1374,11 +1373,11 @@ Status IRuntimeFilter::init_with_desc(const 
TRuntimeFilterDesc* desc, const TQue
     }
     if (_runtime_filter_type == RuntimeFilterType::BITMAP_FILTER) {
         if (!build_ctx->root()->type().is_bitmap_type()) {
-            return Status::InvalidArgument("Unexpected src expr type:{} for 
bitmap filter.",
-                                           
build_ctx->root()->type().debug_string());
+            return Status::InternalError("Unexpected src expr type:{} for 
bitmap filter.",
+                                         
build_ctx->root()->type().debug_string());
         }
         if (!desc->__isset.bitmap_target_expr) {
-            return Status::InvalidArgument("Unknown bitmap filter target 
expr.");
+            return Status::InternalError("Unknown bitmap filter target expr.");
         }
         vectorized::VExprContextSPtr bitmap_target_ctx;
         RETURN_IF_ERROR(
@@ -1454,7 +1453,7 @@ Status IRuntimeFilter::create_wrapper(const 
UpdateRuntimeFilterParamsV2* param,
         return (*wrapper)->assign(&param->request->minmax_filter(), 
param->request->contain_null());
     }
     default:
-        return Status::InvalidArgument("unknown filter type");
+        return Status::InternalError("unknown filter type");
     }
 }
 
@@ -1503,7 +1502,7 @@ Status IRuntimeFilter::_create_wrapper(const T* param, 
ObjectPool* pool,
         return (*wrapper)->assign(&param->request->minmax_filter(), 
param->request->contain_null());
     }
     default:
-        return Status::InvalidArgument("unknown filter type");
+        return Status::InternalError("unknown filter type");
     }
 }
 
@@ -1522,7 +1521,14 @@ void 
IRuntimeFilter::update_runtime_filter_type_to_profile() {
 }
 
 Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
-    return _wrapper->merge(wrapper);
+    auto status = _wrapper->merge(wrapper);
+    if (!status) {
+        LOG(WARNING) << "runtime filter merge failed: " << _name
+                     << " ,need_local_merge: " << _need_local_merge
+                     << " ,is_broadcast: " << _is_broadcast_join;
+        DCHECK(false); // rpc response is often ignored, so let it crash 
directly here
+    }
+    return status;
 }
 
 template <typename T>
@@ -1557,7 +1563,7 @@ Status IRuntimeFilter::serialize_impl(T* request, void** 
data, int* len) {
         auto minmax_filter = request->mutable_minmax_filter();
         to_protobuf(minmax_filter);
     } else {
-        return Status::InvalidArgument("not implemented !");
+        return Status::InternalError("not implemented !");
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 9a9af079ab3..e331a7d6751 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -136,7 +136,9 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
     uint64_t hash_table_size = block ? block->rows() : 0;
     {
         SCOPED_TIMER(_runtime_filter_init_timer);
-        RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, 
hash_table_size));
+        if (_should_build_hash_table) {
+            RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, 
hash_table_size));
+        }
         RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
     }
     if (_should_build_hash_table && hash_table_size > 1) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to