This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7e8415fe5bb [bugfix](asan core) should use weak ptr in rpc context to release rf context during query context deconstructed (#38653) 7e8415fe5bb is described below commit 7e8415fe5bb0521fa3511d80a6a20cd75a1ecda6 Author: yiguolei <676222...@qq.com> AuthorDate: Thu Aug 1 15:45:36 2024 +0800 [bugfix](asan core) should use weak ptr in rpc context to release rf context during query context deconstructed (#38653) ## Proposed changes Issue Number: close #xxx F20240731 10:38:53.012670 20850 mem_tracker_limiter.cpp:135] mem tracker label: Query#Id=7539c7d0087442b7-a1cda6392053548a, consumption: 52, peak consumption: 1155332, mem tracker not equal to 0 when mem tracker destruct, this usually means that memory tracking is inaccurate and SCOPED_ATTACH_TASK and SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. 1. For query and load, memory leaks may have occurred, it is expected that the query mem tracker will be bound to the thread context using SCOPED_ATTACH_TASK and SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER before all memory alloc and free. 2. If a memory alloc is recorded by this tracker, it is expected that be recorded in this tracker when memory is freed. 3. Merge the remaining memory tracking value by this tracker into Orphan, if you observe that Orphan is not equal to 0 in the mem tracker web or log, this indicates that there may be a memory leak. 4. If you need to transfer memory tracking value between two trackers, can use transfer_to..[Address Sanitizer]: 0x606002867d80, size 52, strack trace: 0# Allocator::alloc_impl(unsigned long, unsigned long) 1# void* phmap::priv::Allocate<4ul, doris::vectorized::Allocator_ >(doris::vectorized::Allocator_*, unsigned long) 2# phmap::priv::raw_hash_set, phmap::Hash, phmap::EqualTo, doris::vectorized::Allocator_ >::initialize_slots(unsigned long) 3# phmap::priv::raw_hash_set, phmap::Hash, phmap::EqualTo, doris::vectorized::Allocator_ >::resize(unsigned long) 4# phmap::priv::raw_hash_set, phmap::Hash, phmap::EqualTo, doris::vectorized::Allocator_ >::prepare_insert(unsigned long) 5# std::pair phmap::priv::raw_hash_set, phmap::Hash, phmap::EqualTo, doris::vectorized::Allocator_ >::find_or_prepare_insert(int const&, unsigned long) 6# std::pair, phmap::Hash, phmap::EqualTo, doris::vectorized::Allocator_ >::iterator, bool> phmap::priv::raw_hash_set, phmap::Hash, phmap::EqualTo, doris::vectorized::Allocator_ >::emplace_decomposable(int const&, unsigned long, int const&) 7# std::pair, phmap::Hash, phmap::EqualTo, doris::vectorized::Allocator_ >::iterator, bool> phmap::priv::raw_hash_set, phmap::Hash, phmap::EqualTo, doris::vectorized::Allocator_ >::EmplaceDecomposable::operator()(int const&, int const&) const 8# doris::HybridSet<(doris::PrimitiveType)5, doris::DynamicContainer, doris::vectorized::ColumnVector >::insert(void const*) 9# doris::HybridSetBase::insert(doris::HybridSetBase*) 10# doris::RuntimePredicateWrapper::merge(doris::RuntimePredicateWrapper const*) 11# doris::IRuntimeFilter::merge_from(doris::RuntimePredicateWrapper const*) 12# doris::RuntimeFilterMergeControllerEntity::merge(doris::PMergeFilterRequest const*, butil::IOBufAsZeroCopyInputStream*) 13# doris::FragmentMgr::merge_filter(doris::PMergeFilterRequest const*, butil::IOBufAsZeroCopyInputStream*) 14# std::_Function_handler::_M_invoke(std::_Any_data const&) 15# doris::WorkThreadPool::work_thread(int) 16# execute_native_thread_routine 17# ? 18# ? Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/exprs/runtime_filter.cpp | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 901af0472f0..d7fc2ff7490 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1005,7 +1005,10 @@ Status IRuntimeFilter::publish(bool publish_local) { class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest, DummyBrpcCallback<PSendFilterSizeResponse>> { std::shared_ptr<pipeline::Dependency> _dependency; - RuntimeFilterContextSPtr _rf_context; + // Should use weak ptr here, because when query context deconstructs, should also delete runtime filter + // context, it not the memory is not released. And rpc is in another thread, it will hold rf context + // after query context because the rpc is not returned. + std::weak_ptr<RuntimeFilterContext> _rf_context; std::string _rf_debug_info; using Base = AutoReleaseClosure<PSendFilterSizeRequest, DummyBrpcCallback<PSendFilterSizeResponse>>; @@ -1021,7 +1024,13 @@ class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest, ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); if (status.is<ErrorCode::END_OF_FILE>()) { // rf merger backend may finished before rf's send_filter_size, we just ignore filter in this case. - _rf_context->ignored = true; + auto ctx = _rf_context.lock(); + if (ctx) { + ctx->ignored = true; + } else { + LOG(WARNING) << "sync filter size returned but context is released, filter=" + << _rf_debug_info; + } } else { LOG(WARNING) << "sync filter size meet error status, filter=" << _rf_debug_info; Base::_process_if_meet_error_status(status); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org