This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new e962a7309b3 [Chore](runtime-filter) adjust some check and error msg on runtime filter (#35018) (#35251) e962a7309b3 is described below commit e962a7309b38c0c8cd2bcb8f6d3337d2a3ab3847 Author: Pxl <pxl...@qq.com> AuthorDate: Thu May 23 11:20:02 2024 +0800 [Chore](runtime-filter) adjust some check and error msg on runtime filter (#35018) (#35251) adjust some check and error msg on runtime filter --- be/src/exprs/bloom_filter_func.h | 7 ------- be/src/exprs/runtime_filter.cpp | 24 ++++++++++++++++-------- be/src/exprs/runtime_filter.h | 9 ++++----- be/src/exprs/runtime_filter_slots.h | 4 ++++ be/src/runtime/runtime_filter_mgr.cpp | 8 +++++++- be/src/vec/exec/runtime_filter_consumer.cpp | 2 +- 6 files changed, 32 insertions(+), 22 deletions(-) diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h index bc56c7b505a..a831395a5ea 100644 --- a/be/src/exprs/bloom_filter_func.h +++ b/be/src/exprs/bloom_filter_func.h @@ -133,11 +133,6 @@ public: } Status init_with_fixed_length(int64_t bloom_filter_length) { - if (_inited) { - return Status::OK(); - } - // TODO: really need the lock? - std::lock_guard<std::mutex> l(_lock); if (_inited) { return Status::OK(); } @@ -154,7 +149,6 @@ public: // If `_inited` is false, there is no memory allocated in bloom filter and this is the first // call for `merge` function. So we just reuse this bloom filter, and we don't need to // allocate memory again. - std::lock_guard<std::mutex> l(_lock); if (!_inited) { auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); DCHECK(_bloom_filter == nullptr); @@ -228,7 +222,6 @@ protected: int32_t _bloom_filter_alloced; std::shared_ptr<BloomFilterAdaptor> _bloom_filter; bool _inited {}; - std::mutex _lock; int64_t _bloom_filter_length; bool _build_bf_exactly = false; bool _bloom_filter_size_calculated_by_ndv = false; diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index e51b3c739f6..3e07943c45e 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -364,9 +364,11 @@ public: } bool get_build_bf_cardinality() const { - DCHECK(_filter_type == RuntimeFilterType::BLOOM_FILTER || - _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER); - return _context->bloom_filter_func->get_build_bf_cardinality(); + if (_filter_type == RuntimeFilterType::BLOOM_FILTER || + _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { + return _context->bloom_filter_func->get_build_bf_cardinality(); + } + return false; } void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const { @@ -1522,15 +1524,21 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() { _profile->add_info_string("RealRuntimeFilterType", to_string(_wrapper->get_real_type())); } +std::string IRuntimeFilter::debug_string() const { + return fmt::format( + "RuntimeFilter: (id = {}, type = {}, need_local_merge: {}, is_broadcast: {}, " + "build_bf_cardinality: {}", + _filter_id, to_string(_runtime_filter_type), _need_local_merge, _is_broadcast_join, + _wrapper->get_build_bf_cardinality()); +} + Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) { auto status = _wrapper->merge(wrapper); if (!status) { - LOG(WARNING) << "runtime filter merge failed: " << _name - << " ,need_local_merge: " << _need_local_merge - << " ,is_broadcast: " << _is_broadcast_join; - DCHECK(false); // rpc response is often ignored, so let it crash directly here + return Status::InternalError("runtime filter merge failed: {}, error_msg: {}", + debug_string(), status.msg()); } - return status; + return Status::OK(); } template <typename T> diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 4733d39e298..ee6897be322 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -209,9 +209,9 @@ public: _rf_wait_time_ms(_state->runtime_filter_wait_time_ms), _enable_pipeline_exec(_state->enable_pipeline_exec), _runtime_filter_type(get_runtime_filter_type(desc)), - _name(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id, - to_string(_runtime_filter_type))), - _profile(new RuntimeProfile(_name)), + _profile( + new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})", + _filter_id, to_string(_runtime_filter_type)))), _need_local_merge(need_local_merge) {} ~IRuntimeFilter() = default; @@ -311,7 +311,7 @@ public: void init_profile(RuntimeProfile* parent_profile); - std::string& get_name() { return _name; } + std::string debug_string() const; void update_runtime_filter_type_to_profile(); @@ -442,7 +442,6 @@ protected: std::atomic<bool> _profile_init = false; // runtime filter type RuntimeFilterType _runtime_filter_type; - std::string _name; // parent profile // only effect on consumer std::unique_ptr<RuntimeProfile> _profile; diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index fbc9ae6ceb3..b5b04a1ebac 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -101,6 +101,10 @@ public: } if (filter->get_real_type() == RuntimeFilterType::BLOOM_FILTER) { + if (filter->need_sync_filter_size() != filter->isset_synced_size()) { + return Status::InternalError("sync filter size meet error, filter: {}", + filter->debug_string()); + } RETURN_IF_ERROR( filter->init_bloom_filter(get_real_size(filter, local_hash_table_size))); } diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 9f3d26d6a16..010cb5a60e5 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -416,7 +416,13 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ RuntimeFilterWrapperHolder holder; RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(¶ms, pool, holder.getHandle())); - RETURN_IF_ERROR(cnt_val->filter->merge_from(holder.getHandle()->get())); + auto st = cnt_val->filter->merge_from(holder.getHandle()->get()); + if (!st) { + // prevent error ignored + DCHECK(false) << st.msg(); + return st; + } + cnt_val->arrive_id.insert(UniqueId(request->fragment_instance_id())); merged_size = cnt_val->arrive_id.size(); // TODO: avoid log when we had acquired a lock diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index 66fd0297c98..30c2cc14917 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -42,7 +42,7 @@ void RuntimeFilterConsumer::_init_profile(RuntimeProfile* profile) { fmt::memory_buffer buffer; for (auto& rf_ctx : _runtime_filter_ctxs) { rf_ctx.runtime_filter->init_profile(profile); - fmt::format_to(buffer, "{}, ", rf_ctx.runtime_filter->get_name()); + fmt::format_to(buffer, "{}, ", rf_ctx.runtime_filter->debug_string()); } profile->add_info_string("RuntimeFilters: ", to_string(buffer)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org