This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 34cb4f05e83 [branch-2.1]fix task queue core (#50236) 34cb4f05e83 is described below commit 34cb4f05e83f2a9be84c2a5b4422d3108dcf0892 Author: wangbo <wan...@selectdb.com> AuthorDate: Tue Apr 22 14:23:23 2025 +0800 [branch-2.1]fix task queue core (#50236) TaskQueue should be released after all theads stop. ``` ================================================================= ==6632==ERROR: AddressSanitizer: heap-use-after-free on address 0x61700518d440 at pc 0x5616fa5c2a87 bp 0x7f7635b709b0 sp 0x7f7635b709a8 READ of size 8 at 0x61700518d440 thread T5188 (Pipe_test_follo) #0 0x5616fa5c2a86 in std::__atomic_base<unsigned long>::load(std::memory_order) const /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/atomic_base.h:481:9 #1 0x5616fa5c2a86 in std::__atomic_base<unsigned long>::operator unsigned long() const /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/atomic_base.h:341:16 #2 0x5616fa5c2a86 in doris::pipeline::PriorityTaskQueue::_try_take_unprotected(bool) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/pipeline/task_queue.cpp:58:9 #3 0x5616fa5c2e16 in doris::pipeline::PriorityTaskQueue::take(unsigned int) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/pipeline/task_queue.cpp:110:16 #4 0x5616fa5c4b06 in doris::pipeline::MultiCoreTaskQueue::take(int) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/pipeline/task_queue.cpp:170:51 #5 0x5616fa5d1919 in doris::pipeline::TaskScheduler::_do_work(unsigned long) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/pipeline/task_scheduler.cpp:267:35 #6 0x5616d1e169f8 in doris::ThreadPool::dispatch_thread() /home/zcp/repo_center/doris_branch-2.1/doris/be/src/util/threadpool.cpp:544:24 #7 0x5616d1df3e38 in std::function<void ()>::operator()() const /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:560:9 #8 0x5616d1df3e38 in doris::Thread::supervise_thread(void*) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/util/thread.cpp:498:5 #9 0x7f83a4c35ac2 in start_thread nptl/pthread_create.c:442:8 #10 0x7f83a4cc784f misc/../sysdeps/unix/sysv/linux/x86_64/clone3.S:81 0x61700518d440 is located 704 bytes inside of 728-byte region [0x61700518d180,0x61700518d458) freed by thread T2166 here: #0 0x5616cea38d9d in operator delete(void*) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x16dc0d9d) (BuildId: 09b7b6856fee4f7f) #1 0x5616fa5cae77 in std::default_delete<doris::pipeline::PriorityTaskQueue>::operator()(doris::pipeline::PriorityTaskQueue*) const /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:85:2 #2 0x5616fa5cae77 in std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>::~unique_ptr() /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:361:4 #3 0x5616fa5cae77 in void std::destroy_at<std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>>(std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:88:15 #4 0x5616fa5cae77 in void std::_Destroy<std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>>(std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:138:7 #5 0x5616fa5cae77 in void std::_Destroy_aux<false>::__destroy<std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>*>(std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>*, std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bit [...] #6 0x5616fa5cae77 in void std::_Destroy<std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>*>(std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>*, std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:184:7 #7 0x5616fa5cae77 in void std::_Destroy<std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>*, std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>>(std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>*, std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>*, st [...] #8 0x5616fa5cae77 in std::vector<std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>, std::allocator<std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>>>::~vector() /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_vector.h:680:2 #9 0x5616fa5c466d in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:168:6 #10 0x5616fa5c466d in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:702:11 #11 0x5616fa5c466d in std::__shared_ptr<std::vector<std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>, std::allocator<std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>>>, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:1149:31 #12 0x5616fa5c466d in void std::atomic_store<std::vector<std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>, std::allocator<std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>>>>(std::shared_ptr<std::vector<std::unique_ptr<doris::pipeline::PriorityTaskQueue, std::default_delete<doris::pipeline::PriorityTaskQueue>>, std::allocator<std::unique_ptr<doris::pipeline::Prior [...] #13 0x5616fa5c466d in doris::pipeline::MultiCoreTaskQueue::close() /home/zcp/repo_center/doris_branch-2.1/doris/be/src/pipeline/task_queue.cpp:151:5 #14 0x5616fa5ce9f4 in doris::pipeline::TaskScheduler::stop() /home/zcp/repo_center/doris_branch-2.1/doris/be/src/pipeline/task_scheduler.cpp:437:26 #15 0x5616d19b7d36 in doris::WorkloadGroup::try_stop_schedulers() /home/zcp/repo_center/doris_branch-2.1/doris/be/src/runtime/workload_group/workload_group.cpp:654:22 #16 0x5616d19c6042 in doris::WorkloadGroupMgr::delete_workload_group_by_ids(std::set<unsigned long, std::less<unsigned long>, std::allocator<unsigned long>>) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/runtime/workload_group/workload_group_manager.cpp:107:13 #17 0x5616d1a7f6a1 in doris::WorkloadGroupListener::handle_topic_info(std::vector<doris::TopicInfo, std::allocator<doris::TopicInfo>> const&) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/agent/workload_group_listener.cpp:82:38 #18 0x5616d1a7dd7e in doris::TopicSubscriber::handle_topic_info(doris::TPublishTopicRequest const&) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/agent/topic_subscriber.cpp:45:35 #19 0x5616d2002932 in doris::BackendServiceProcessor::process_publish_topic_info(int, apache::thrift::protocol::TProtocol*, apache::thrift::protocol::TProtocol*, void*) /home/zcp/repo_center/doris_branch-2.1/doris/gensrc/build/gen_cpp/BackendService.cpp:6577:13 #20 0x5616d1fd6f11 in doris::BackendServiceProcessor::dispatchCall(apache::thrift::protocol::TProtocol*, apache::thrift::protocol::TProtocol*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>> const&, int, void*) /home/zcp/repo_center/doris_branch-2.1/doris/gensrc/build/gen_cpp/BackendService.cpp:5416:3 #21 0x5616d2039186 in apache::thrift::TDispatchProcessor::process(std::shared_ptr<apache::thrift::protocol::TProtocol>, std::shared_ptr<apache::thrift::protocol::TProtocol>, void*) /home/zcp/repo_center/doris_branch-2.1/doris/thirdparty/installed/include/thrift/TDispatchProcessor.h:121:12 #22 0x5616faee19f7 in apache::thrift::server::TConnectedClient::run() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x432699f7) (BuildId: 09b7b6856fee4f7f) #23 0x5616faee2cf6 in apache::thrift::server::TThreadedServer::TConnectedClientRunner::run() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4326acf6) (BuildId: 09b7b6856fee4f7f) #24 0x5616faee830d in apache::thrift::concurrency::Thread::threadMain(std::shared_ptr<apache::thrift::concurrency::Thread>) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4327030d) (BuildId: 09b7b6856fee4f7f) #25 0x5616faee81b3 in void std::__invoke_impl<void, void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>(std::__invoke_other, void (*&&)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>&&) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x432701b3) (BuildId: 09b7b6856fee4f7f) #26 0x5616faee8106 in std::__invoke_result<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>::type std::__invoke<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>(void (*&&)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>&&) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4 [...] #27 0x5616faee8076 in void std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>>::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x43270076) (BuildId: 09b7b6856fee4f7f) #28 0x5616faee7ffb in std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>>::operator()() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4326fffb) (BuildId: 09b7b6856fee4f7f) #29 0x5616faee7f9f in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>>>::_M_run() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4326ff9f) (BuildId: 09b7b6856fee4f7f) #30 0x5616fd80568f in execute_native_thread_routine /data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc++-v3/src/c++11/../../../../../libstdc++-v3/src/c++11/thread.cc:82:18 previously allocated by thread T3392 here: #0 0x5616cea3853d in operator new(unsigned long) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x16dc053d) (BuildId: 09b7b6856fee4f7f) #1 0x5616fa5c3e29 in std::_MakeUniq<doris::pipeline::PriorityTaskQueue>::__single_object std::make_unique<doris::pipeline::PriorityTaskQueue>() /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:962:30 #2 0x5616fa5c3e29 in doris::pipeline::MultiCoreTaskQueue::MultiCoreTaskQueue(int) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/pipeline/task_queue.cpp:139:39 #3 0x5616d19b276d in decltype(::new((void*)(0)) doris::pipeline::MultiCoreTaskQueue(std::declval<int&>())) std::construct_at<doris::pipeline::MultiCoreTaskQueue, int&>(doris::pipeline::MultiCoreTaskQueue*, int&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:97:39 #4 0x5616d19b276d in void std::allocator_traits<std::allocator<doris::pipeline::MultiCoreTaskQueue>>::construct<doris::pipeline::MultiCoreTaskQueue, int&>(std::allocator<doris::pipeline::MultiCoreTaskQueue>&, doris::pipeline::MultiCoreTaskQueue*, int&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/alloc_traits.h:514:4 #5 0x5616d19b276d in std::_Sp_counted_ptr_inplace<doris::pipeline::MultiCoreTaskQueue, std::allocator<doris::pipeline::MultiCoreTaskQueue>, (__gnu_cxx::_Lock_policy)2>::_Sp_counted_ptr_inplace<int&>(std::allocator<doris::pipeline::MultiCoreTaskQueue>, int&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:519:4 #6 0x5616d19b276d in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<doris::pipeline::MultiCoreTaskQueue, std::allocator<doris::pipeline::MultiCoreTaskQueue>, int&>(doris::pipeline::MultiCoreTaskQueue*&, std::_Sp_alloc_shared_tag<std::allocator<doris::pipeline::MultiCoreTaskQueue>>, int&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:651:6 #7 0x5616d19b276d in std::__shared_ptr<doris::pipeline::MultiCoreTaskQueue, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<doris::pipeline::MultiCoreTaskQueue>, int&>(std::_Sp_alloc_shared_tag<std::allocator<doris::pipeline::MultiCoreTaskQueue>>, int&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:1337:14 #8 0x5616d19b276d in std::shared_ptr<doris::pipeline::MultiCoreTaskQueue>::shared_ptr<std::allocator<doris::pipeline::MultiCoreTaskQueue>, int&>(std::_Sp_alloc_shared_tag<std::allocator<doris::pipeline::MultiCoreTaskQueue>>, int&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr.h:409:4 #9 0x5616d19b276d in std::shared_ptr<doris::pipeline::MultiCoreTaskQueue> std::allocate_shared<doris::pipeline::MultiCoreTaskQueue, std::allocator<doris::pipeline::MultiCoreTaskQueue>, int&>(std::allocator<doris::pipeline::MultiCoreTaskQueue> const&, int&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr.h:860:14 #10 0x5616d19b276d in std::shared_ptr<doris::pipeline::MultiCoreTaskQueue> std::make_shared<doris::pipeline::MultiCoreTaskQueue, int&>(int&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr.h:876:14 #11 0x5616d19b276d in doris::WorkloadGroup::upsert_thread_pool_no_lock(doris::WorkloadGroupInfo*, std::shared_ptr<doris::CgroupCpuCtl>, doris::ExecEnv*) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/runtime/workload_group/workload_group.cpp:462:27 #12 0x5616d19b5991 in doris::WorkloadGroup::upsert_task_scheduler(doris::WorkloadGroupInfo*, doris::ExecEnv*) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/runtime/workload_group/workload_group.cpp:572:9 #13 0x5616d1a7ecd8 in doris::WorkloadGroupListener::handle_topic_info(std::vector<doris::TopicInfo, std::allocator<doris::TopicInfo>> const&) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/agent/workload_group_listener.cpp:60:13 #14 0x5616d1a7dd7e in doris::TopicSubscriber::handle_topic_info(doris::TPublishTopicRequest const&) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/agent/topic_subscriber.cpp:45:35 #15 0x5616d2002932 in doris::BackendServiceProcessor::process_publish_topic_info(int, apache::thrift::protocol::TProtocol*, apache::thrift::protocol::TProtocol*, void*) /home/zcp/repo_center/doris_branch-2.1/doris/gensrc/build/gen_cpp/BackendService.cpp:6577:13 #16 0x5616d1fd6f11 in doris::BackendServiceProcessor::dispatchCall(apache::thrift::protocol::TProtocol*, apache::thrift::protocol::TProtocol*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>> const&, int, void*) /home/zcp/repo_center/doris_branch-2.1/doris/gensrc/build/gen_cpp/BackendService.cpp:5416:3 #17 0x5616d2039186 in apache::thrift::TDispatchProcessor::process(std::shared_ptr<apache::thrift::protocol::TProtocol>, std::shared_ptr<apache::thrift::protocol::TProtocol>, void*) /home/zcp/repo_center/doris_branch-2.1/doris/thirdparty/installed/include/thrift/TDispatchProcessor.h:121:12 #18 0x5616faee19f7 in apache::thrift::server::TConnectedClient::run() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x432699f7) (BuildId: 09b7b6856fee4f7f) #19 0x5616faee2cf6 in apache::thrift::server::TThreadedServer::TConnectedClientRunner::run() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4326acf6) (BuildId: 09b7b6856fee4f7f) #20 0x5616faee830d in apache::thrift::concurrency::Thread::threadMain(std::shared_ptr<apache::thrift::concurrency::Thread>) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4327030d) (BuildId: 09b7b6856fee4f7f) #21 0x5616faee81b3 in void std::__invoke_impl<void, void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>(std::__invoke_other, void (*&&)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>&&) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x432701b3) (BuildId: 09b7b6856fee4f7f) #22 0x5616faee8106 in std::__invoke_result<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>::type std::__invoke<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>(void (*&&)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>&&) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4 [...] #23 0x5616faee8076 in void std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>>::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x43270076) (BuildId: 09b7b6856fee4f7f) #24 0x5616faee7ffb in std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>>::operator()() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4326fffb) (BuildId: 09b7b6856fee4f7f) #25 0x5616faee7f9f in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>>>::_M_run() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4326ff9f) (BuildId: 09b7b6856fee4f7f) #26 0x5616fd80568f in execute_native_thread_routine /data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc++-v3/src/c++11/../../../../../libstdc++-v3/src/c++11/thread.cc:82:18 Thread T5188 (Pipe_test_follo) created by T3392 here: #0 0x5616ce9e5caa in pthread_create (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x16d6dcaa) (BuildId: 09b7b6856fee4f7f) #1 0x5616d1df2cc9 in doris::Thread::start_thread(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>> const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>> const&, std::function<void ()> const&, unsigned long, scoped_refptr<doris::Thread>*) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/util/thread.cpp:449:15 #2 0x5616d1e1d979 in doris::Status doris::Thread::create<void (doris::ThreadPool::*)(), doris::ThreadPool*>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>> const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>> const&, void (doris::ThreadPool::* const&)(), doris::ThreadPool* const&, scoped_refptr<doris::Thread>*) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/util/thread.h:56:16 #3 0x5616d1e1423d in doris::ThreadPool::create_thread() /home/zcp/repo_center/doris_branch-2.1/doris/be/src/util/threadpool.cpp:612:12 #4 0x5616d1e13e43 in doris::ThreadPool::init() /home/zcp/repo_center/doris_branch-2.1/doris/be/src/util/threadpool.cpp:266:25 #5 0x5616cec31535 in doris::Status doris::ThreadPoolBuilder::build<doris::ThreadPool>(std::unique_ptr<doris::ThreadPool, std::default_delete<doris::ThreadPool>>*) const /home/zcp/repo_center/doris_branch-2.1/doris/be/src/util/threadpool.h:121:13 #6 0x5616fa5cef22 in doris::pipeline::TaskScheduler::start() /home/zcp/repo_center/doris_branch-2.1/doris/be/src/pipeline/task_scheduler.cpp:208:5 #7 0x5616d19b2a9b in doris::WorkloadGroup::upsert_thread_pool_no_lock(doris::WorkloadGroupInfo*, std::shared_ptr<doris::CgroupCpuCtl>, doris::ExecEnv*) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/runtime/workload_group/workload_group.cpp:467:47 #8 0x5616d19b5991 in doris::WorkloadGroup::upsert_task_scheduler(doris::WorkloadGroupInfo*, doris::ExecEnv*) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/runtime/workload_group/workload_group.cpp:572:9 #9 0x5616d1a7ecd8 in doris::WorkloadGroupListener::handle_topic_info(std::vector<doris::TopicInfo, std::allocator<doris::TopicInfo>> const&) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/agent/workload_group_listener.cpp:60:13 #10 0x5616d1a7dd7e in doris::TopicSubscriber::handle_topic_info(doris::TPublishTopicRequest const&) /home/zcp/repo_center/doris_branch-2.1/doris/be/src/agent/topic_subscriber.cpp:45:35 #11 0x5616d2002932 in doris::BackendServiceProcessor::process_publish_topic_info(int, apache::thrift::protocol::TProtocol*, apache::thrift::protocol::TProtocol*, void*) /home/zcp/repo_center/doris_branch-2.1/doris/gensrc/build/gen_cpp/BackendService.cpp:6577:13 #12 0x5616d1fd6f11 in doris::BackendServiceProcessor::dispatchCall(apache::thrift::protocol::TProtocol*, apache::thrift::protocol::TProtocol*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>> const&, int, void*) /home/zcp/repo_center/doris_branch-2.1/doris/gensrc/build/gen_cpp/BackendService.cpp:5416:3 #13 0x5616d2039186 in apache::thrift::TDispatchProcessor::process(std::shared_ptr<apache::thrift::protocol::TProtocol>, std::shared_ptr<apache::thrift::protocol::TProtocol>, void*) /home/zcp/repo_center/doris_branch-2.1/doris/thirdparty/installed/include/thrift/TDispatchProcessor.h:121:12 #14 0x5616faee19f7 in apache::thrift::server::TConnectedClient::run() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x432699f7) (BuildId: 09b7b6856fee4f7f) #15 0x5616faee2cf6 in apache::thrift::server::TThreadedServer::TConnectedClientRunner::run() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4326acf6) (BuildId: 09b7b6856fee4f7f) #16 0x5616faee830d in apache::thrift::concurrency::Thread::threadMain(std::shared_ptr<apache::thrift::concurrency::Thread>) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4327030d) (BuildId: 09b7b6856fee4f7f) #17 0x5616faee81b3 in void std::__invoke_impl<void, void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>(std::__invoke_other, void (*&&)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>&&) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x432701b3) (BuildId: 09b7b6856fee4f7f) #18 0x5616faee8106 in std::__invoke_result<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>::type std::__invoke<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>(void (*&&)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>&&) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4 [...] #19 0x5616faee8076 in void std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>>::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x43270076) (BuildId: 09b7b6856fee4f7f) #20 0x5616faee7ffb in std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>>::operator()() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4326fffb) (BuildId: 09b7b6856fee4f7f) #21 0x5616faee7f9f in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>>>>::_M_run() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4326ff9f) (BuildId: 09b7b6856fee4f7f) #22 0x5616fd80568f in execute_native_thread_routine /data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc++-v3/src/c++11/../../../../../libstdc++-v3/src/c++11/thread.cc:82:18 Thread T3392 created by T933 here: #0 0x5616ce9e5caa in pthread_create (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x16d6dcaa) (BuildId: 09b7b6856fee4f7f) #1 0x5616fd8057b5 in __gthread_create /data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc++-v3/include/x86_64-pc-linux-gnu/bits/gthr-default.h:663:35 #2 0x5616fd8057b5 in std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State>>, void (*)()) /data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc++-v3/src/c++11/../../../../../libstdc++-v3/src/c++11/thread.cc:147:37 #3 0x5616faee667c in apache::thrift::concurrency::Thread::start() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4326e67c) (BuildId: 09b7b6856fee4f7f) #4 0x5616faee2a26 in apache::thrift::server::TThreadedServer::onClientConnected(std::shared_ptr<apache::thrift::server::TConnectedClient> const&) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4326aa26) (BuildId: 09b7b6856fee4f7f) #5 0x5616faedebc4 in apache::thrift::server::TServerFramework::newlyConnectedClient(std::shared_ptr<apache::thrift::server::TConnectedClient> const&) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x43266bc4) (BuildId: 09b7b6856fee4f7f) #6 0x5616faede25f in apache::thrift::server::TServerFramework::serve() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4326625f) (BuildId: 09b7b6856fee4f7f) #7 0x5616faee2791 in apache::thrift::server::TThreadedServer::serve() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4326a791) (BuildId: 09b7b6856fee4f7f) #8 0x5616d1e31419 in doris::ThriftServer::ThriftServerEventProcessor::supervise() /home/zcp/repo_center/doris_branch-2.1/doris/be/src/util/thrift_server.cpp:206:34 #9 0x5616fd80568f in execute_native_thread_routine /data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc++-v3/src/c++11/../../../../../libstdc++-v3/src/c++11/thread.cc:82:18 Thread T933 created by T0 here: #0 0x5616ce9e5caa in pthread_create (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x16d6dcaa) (BuildId: 09b7b6856fee4f7f) #1 0x5616fd8057b5 in __gthread_create /data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc++-v3/include/x86_64-pc-linux-gnu/bits/gthr-default.h:663:35 #2 0x5616fd8057b5 in std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State>>, void (*)()) /data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc++-v3/src/c++11/../../../../../libstdc++-v3/src/c++11/thread.cc:147:37 #3 0x5616d1e37788 in doris::ThriftServer::start() /home/zcp/repo_center/doris_branch-2.1/doris/be/src/util/thrift_server.cpp:421:5 #4 0x5616cea3f174 in main /home/zcp/repo_center/doris_branch-2.1/doris/be/src/service/doris_main.cpp:536:25 #5 0x7f83a4bcad8f in __libc_start_call_main csu/../sysdeps/nptl/libc_start_call_main.h:58:16 Thread T2166 created by T933 here: #0 0x5616ce9e5caa in pthread_create (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x16d6dcaa) (BuildId: 09b7b6856fee4f7f) #1 0x5616fd8057b5 in __gthread_create /data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc++-v3/include/x86_64-pc-linux-gnu/bits/gthr-default.h:663:35 #2 0x5616fd8057b5 in std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State>>, void (*)()) /data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc++-v3/src/c++11/../../../../../libstdc++-v3/src/c++11/thread.cc:147:37 #3 0x5616faee667c in apache::thrift::concurrency::Thread::start() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4326e67c) (BuildId: 09b7b6856fee4f7f) #4 0x5616faee2a26 in apache::thrift::server::TThreadedServer::onClientConnected(std::shared_ptr<apache::thrift::server::TConnectedClient> const&) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4326aa26) (BuildId: 09b7b6856fee4f7f) #5 0x5616faedebc4 in apache::thrift::server::TServerFramework::newlyConnectedClient(std::shared_ptr<apache::thrift::server::TConnectedClient> const&) (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x43266bc4) (BuildId: 09b7b6856fee4f7f) #6 0x5616faede25f in apache::thrift::server::TServerFramework::serve() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4326625f) (BuildId: 09b7b6856fee4f7f) #7 0x5616faee2791 in apache::thrift::server::TThreadedServer::serve() (/mnt/hdd01/ci/doris-deploy-branch-2.1-local/be/lib/doris_be+0x4326a791) (BuildId: 09b7b6856fee4f7f) #8 0x5616d1e31419 in doris::ThriftServer::ThriftServerEventProcessor::supervise() /home/zcp/repo_center/doris_branch-2.1/doris/be/src/util/thrift_server.cpp:206:34 #9 0x5616fd80568f in execute_native_thread_routine /data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc++-v3/src/c++11/../../../../../libstdc++-v3/src/c++11/thread.cc:82:18 SUMMARY: AddressSanitizer: heap-use-after-free /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/atomic_base.h:481:9 in std::__atomic_base<unsigned long>::load(std::memory_order) const Shadow bytes around the buggy address: 0x61700518d180: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd 0x61700518d200: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd 0x61700518d280: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd 0x61700518d300: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd 0x61700518d380: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd =>0x61700518d400: fd fd fd fd fd fd fd fd[fd]fd fd fa fa fa fa fa 0x61700518d480: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa 0x61700518d500: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa 0x61700518d580: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa 0x61700518d600: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa 0x61700518d680: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa Shadow byte legend (one shadow byte represents 8 application bytes): Addressable: 00 Partially addressable: 01 02 03 04 05 06 07 Heap left redzone: fa Freed heap region: fd Stack left redzone: f1 Stack mid redzone: f2 Stack right redzone: f3 Stack after return: f5 Stack use after scope: f8 Global redzone: f9 Global init order: f6 Poisoned by user: f7 Container overflow: fc Array cookie: ac Intra object redzone: bb ASan internal: fe Left alloca redzone: ca Right alloca redzone: cb ==6632==ABORTING ``` --- be/src/pipeline/task_queue.cpp | 34 +++++++++++++--------------------- be/src/pipeline/task_queue.h | 9 ++++----- be/src/runtime/query_context.cpp | 5 ++++- 3 files changed, 21 insertions(+), 27 deletions(-) diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index 601db7014b1..ad89a7d56a9 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -132,42 +132,35 @@ Status PriorityTaskQueue::push(PipelineTask* task) { MultiCoreTaskQueue::~MultiCoreTaskQueue() = default; -MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size) : TaskQueue(core_size), _closed(false) { - _prio_task_queue_list = - std::make_shared<std::vector<std::unique_ptr<PriorityTaskQueue>>>(core_size); - for (int i = 0; i < core_size; i++) { - (*_prio_task_queue_list)[i] = std::make_unique<PriorityTaskQueue>(); - } -} +MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size) + : TaskQueue(core_size), _prio_task_queue_list(core_size), _closed(false) {} void MultiCoreTaskQueue::close() { if (_closed) { return; } _closed = true; - for (int i = 0; i < _core_size; ++i) { - (*_prio_task_queue_list)[i]->close(); - } - std::atomic_store(&_prio_task_queue_list, - std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>>(nullptr)); + // close all priority task queue + std::ranges::for_each(_prio_task_queue_list, + [](auto& prio_task_queue) { prio_task_queue.close(); }); } PipelineTask* MultiCoreTaskQueue::take(int core_id) { PipelineTask* task = nullptr; while (!_closed) { - DCHECK(_prio_task_queue_list->size() > core_id) - << " list size: " << _prio_task_queue_list->size() << " core_id: " << core_id + DCHECK(_prio_task_queue_list.size() > core_id) + << " list size: " << _prio_task_queue_list.size() << " core_id: " << core_id << " _core_size: " << _core_size << " _next_core: " << _next_core.load(); - task = (*_prio_task_queue_list)[core_id]->try_take(false); + task = _prio_task_queue_list[core_id].try_take(false); if (task) { task->set_core_id(core_id); break; } - task = _steal_take(core_id, *_prio_task_queue_list); + task = _steal_take(core_id); if (task) { break; } - task = (*_prio_task_queue_list)[core_id]->take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); + task = _prio_task_queue_list[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); if (task) { task->set_core_id(core_id); break; @@ -179,8 +172,7 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) { return task; } -PipelineTask* MultiCoreTaskQueue::_steal_take( - int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>& prio_task_queue_list) { +PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) { DCHECK(core_id < _core_size); int next_id = core_id; for (int i = 1; i < _core_size; ++i) { @@ -189,7 +181,7 @@ PipelineTask* MultiCoreTaskQueue::_steal_take( next_id = 0; } DCHECK(next_id < _core_size); - auto task = prio_task_queue_list[next_id]->try_take(true); + auto task = _prio_task_queue_list[next_id].try_take(true); if (task) { task->set_core_id(next_id); return task; @@ -209,7 +201,7 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task) { Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) { DCHECK(core_id < _core_size); task->put_in_runnable_queue(); - return (*_prio_task_queue_list)[core_id]->push(task); + return _prio_task_queue_list[core_id].push(task); } } // namespace pipeline diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 01282be7be0..1c6d2be4929 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -144,15 +144,14 @@ public: void update_statistics(PipelineTask* task, int64_t time_spent) override { task->inc_runtime_ns(time_spent); - (*_prio_task_queue_list)[task->get_core_id()]->inc_sub_queue_runtime( - task->get_queue_level(), time_spent); + _prio_task_queue_list[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(), + time_spent); } private: - PipelineTask* _steal_take( - int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>& prio_task_queue_list); + PipelineTask* _steal_take(int core_id); - std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>> _prio_task_queue_list; + std::vector<PriorityTaskQueue> _prio_task_queue_list; std::atomic<uint32_t> _next_core = 0; std::atomic<bool> _closed; }; diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 13f9c0c9974..4e050f17b4e 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -147,10 +147,12 @@ QueryContext::~QueryContext() { MemTracker::print_bytes(query_mem_tracker->peak_consumption())); } uint64_t group_id = 0; + std::string wg_name = ""; if (_workload_group) { group_id = _workload_group->id(); // before remove _workload_group->remove_mem_tracker_limiter(query_mem_tracker); _workload_group->remove_query(_query_id); + wg_name = _workload_group->name(); } _exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id)); @@ -186,7 +188,8 @@ QueryContext::~QueryContext() { _exec_env->spill_stream_mgr()->async_cleanup_query(_query_id); DorisMetrics::instance()->query_ctx_cnt->increment(-1); // the only one msg shows query's end. any other msg should append to it if need. - LOG(INFO) << fmt::format("Query {} deconstructed, mem_tracker: {}", print_id(this->_query_id), + LOG(INFO) << fmt::format("Query {} deconstructed, use wg: {}, query type: {}, mem_tracker: {}", + print_id(this->_query_id), wg_name, _query_options.query_type, mem_tracker_msg); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org