This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new e962a7309b3 [Chore](runtime-filter) adjust some check and error msg on 
runtime filter (#35018) (#35251)
e962a7309b3 is described below

commit e962a7309b38c0c8cd2bcb8f6d3337d2a3ab3847
Author: Pxl <pxl...@qq.com>
AuthorDate: Thu May 23 11:20:02 2024 +0800

    [Chore](runtime-filter) adjust some check and error msg on runtime filter 
(#35018) (#35251)
    
    adjust some check and error msg on runtime filter
---
 be/src/exprs/bloom_filter_func.h            |  7 -------
 be/src/exprs/runtime_filter.cpp             | 24 ++++++++++++++++--------
 be/src/exprs/runtime_filter.h               |  9 ++++-----
 be/src/exprs/runtime_filter_slots.h         |  4 ++++
 be/src/runtime/runtime_filter_mgr.cpp       |  8 +++++++-
 be/src/vec/exec/runtime_filter_consumer.cpp |  2 +-
 6 files changed, 32 insertions(+), 22 deletions(-)

diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index bc56c7b505a..a831395a5ea 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -133,11 +133,6 @@ public:
     }
 
     Status init_with_fixed_length(int64_t bloom_filter_length) {
-        if (_inited) {
-            return Status::OK();
-        }
-        // TODO: really need the lock?
-        std::lock_guard<std::mutex> l(_lock);
         if (_inited) {
             return Status::OK();
         }
@@ -154,7 +149,6 @@ public:
         // If `_inited` is false, there is no memory allocated in bloom filter 
and this is the first
         // call for `merge` function. So we just reuse this bloom filter, and 
we don't need to
         // allocate memory again.
-        std::lock_guard<std::mutex> l(_lock);
         if (!_inited) {
             auto* other_func = 
static_cast<BloomFilterFuncBase*>(bloomfilter_func);
             DCHECK(_bloom_filter == nullptr);
@@ -228,7 +222,6 @@ protected:
     int32_t _bloom_filter_alloced;
     std::shared_ptr<BloomFilterAdaptor> _bloom_filter;
     bool _inited {};
-    std::mutex _lock;
     int64_t _bloom_filter_length;
     bool _build_bf_exactly = false;
     bool _bloom_filter_size_calculated_by_ndv = false;
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index e51b3c739f6..3e07943c45e 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -364,9 +364,11 @@ public:
     }
 
     bool get_build_bf_cardinality() const {
-        DCHECK(_filter_type == RuntimeFilterType::BLOOM_FILTER ||
-               _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER);
-        return _context->bloom_filter_func->get_build_bf_cardinality();
+        if (_filter_type == RuntimeFilterType::BLOOM_FILTER ||
+            _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+            return _context->bloom_filter_func->get_build_bf_cardinality();
+        }
+        return false;
     }
 
     void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const {
@@ -1522,15 +1524,21 @@ void 
IRuntimeFilter::update_runtime_filter_type_to_profile() {
     _profile->add_info_string("RealRuntimeFilterType", 
to_string(_wrapper->get_real_type()));
 }
 
+std::string IRuntimeFilter::debug_string() const {
+    return fmt::format(
+            "RuntimeFilter: (id = {}, type = {}, need_local_merge: {}, 
is_broadcast: {}, "
+            "build_bf_cardinality: {}",
+            _filter_id, to_string(_runtime_filter_type), _need_local_merge, 
_is_broadcast_join,
+            _wrapper->get_build_bf_cardinality());
+}
+
 Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
     auto status = _wrapper->merge(wrapper);
     if (!status) {
-        LOG(WARNING) << "runtime filter merge failed: " << _name
-                     << " ,need_local_merge: " << _need_local_merge
-                     << " ,is_broadcast: " << _is_broadcast_join;
-        DCHECK(false); // rpc response is often ignored, so let it crash 
directly here
+        return Status::InternalError("runtime filter merge failed: {}, 
error_msg: {}",
+                                     debug_string(), status.msg());
     }
-    return status;
+    return Status::OK();
 }
 
 template <typename T>
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 4733d39e298..ee6897be322 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -209,9 +209,9 @@ public:
               _rf_wait_time_ms(_state->runtime_filter_wait_time_ms),
               _enable_pipeline_exec(_state->enable_pipeline_exec),
               _runtime_filter_type(get_runtime_filter_type(desc)),
-              _name(fmt::format("RuntimeFilter: (id = {}, type = {})", 
_filter_id,
-                                to_string(_runtime_filter_type))),
-              _profile(new RuntimeProfile(_name)),
+              _profile(
+                      new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, 
type = {})",
+                                                     _filter_id, 
to_string(_runtime_filter_type)))),
               _need_local_merge(need_local_merge) {}
 
     ~IRuntimeFilter() = default;
@@ -311,7 +311,7 @@ public:
 
     void init_profile(RuntimeProfile* parent_profile);
 
-    std::string& get_name() { return _name; }
+    std::string debug_string() const;
 
     void update_runtime_filter_type_to_profile();
 
@@ -442,7 +442,6 @@ protected:
     std::atomic<bool> _profile_init = false;
     // runtime filter type
     RuntimeFilterType _runtime_filter_type;
-    std::string _name;
     // parent profile
     // only effect on consumer
     std::unique_ptr<RuntimeProfile> _profile;
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index fbc9ae6ceb3..b5b04a1ebac 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -101,6 +101,10 @@ public:
             }
 
             if (filter->get_real_type() == RuntimeFilterType::BLOOM_FILTER) {
+                if (filter->need_sync_filter_size() != 
filter->isset_synced_size()) {
+                    return Status::InternalError("sync filter size meet error, 
filter: {}",
+                                                 filter->debug_string());
+                }
                 RETURN_IF_ERROR(
                         filter->init_bloom_filter(get_real_size(filter, 
local_hash_table_size)));
             }
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 9f3d26d6a16..010cb5a60e5 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -416,7 +416,13 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
         RuntimeFilterWrapperHolder holder;
         RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, pool, 
holder.getHandle()));
 
-        
RETURN_IF_ERROR(cnt_val->filter->merge_from(holder.getHandle()->get()));
+        auto st = cnt_val->filter->merge_from(holder.getHandle()->get());
+        if (!st) {
+            // prevent error ignored
+            DCHECK(false) << st.msg();
+            return st;
+        }
+
         cnt_val->arrive_id.insert(UniqueId(request->fragment_instance_id()));
         merged_size = cnt_val->arrive_id.size();
         // TODO: avoid log when we had acquired a lock
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp 
b/be/src/vec/exec/runtime_filter_consumer.cpp
index 66fd0297c98..30c2cc14917 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -42,7 +42,7 @@ void RuntimeFilterConsumer::_init_profile(RuntimeProfile* 
profile) {
     fmt::memory_buffer buffer;
     for (auto& rf_ctx : _runtime_filter_ctxs) {
         rf_ctx.runtime_filter->init_profile(profile);
-        fmt::format_to(buffer, "{}, ", rf_ctx.runtime_filter->get_name());
+        fmt::format_to(buffer, "{}, ", rf_ctx.runtime_filter->debug_string());
     }
     profile->add_info_string("RuntimeFilters: ", to_string(buffer));
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to