This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ad39b9e2afb [Bug](pipeline) fix nullptr access when broadcast join meet wakeup early and build sh… (#48247) ad39b9e2afb is described below commit ad39b9e2afbfaa2e2ebfea456d00e35d89fd98ea Author: Pxl <x...@selectdb.com> AuthorDate: Tue Feb 25 14:24:06 2025 +0800 [Bug](pipeline) fix nullptr access when broadcast join meet wakeup early and build sh… (#48247) …ared hash table's instance not opened ### What problem does this PR solve? Consider a scenario like this: broadcast join task is wake up early, and then the following things happen in order. 1. The instance of the build hash table was not opened, it went directly to close, and signaled other instances 2. Other instances are opened. Enter sink eos. At this time, the shared hash table is a nullptr 3. coredump due to nullptr related with: https://github.com/apache/doris/pull/47380 https://github.com/apache/doris/pull/48008 ```cpp 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /mnt/disk3/xiaolei/doris_master/be/src/common/signal_handler.h:421 1# 0x00007F7F7EA625B0 in /lib64/libc.so.6 2# std::__detail::__variant::_Variant_storage<false, std::monostate, doris::vectorized::MethodSerialized<doris::JoinHashTable<doris::StringRef, DefaultHash<doris::StringRef, void> > >, doris::vectorized::MethodOneNumber<unsigned char, doris::JoinHashTable<unsigned char, HashCRC32<unsigned char> > >, doris::vectorized::MethodOneNumber<unsigned short, doris::JoinHashTable<unsigned short, HashCRC32<unsigned short> > >, doris::vectorized::MethodOneNumber<unsigned int, doris::JoinHashTabl [...] 3# std::variant<std::monostate, doris::vectorized::MethodSerialized<doris::JoinHashTable<doris::StringRef, DefaultHash<doris::StringRef, void> > >, doris::vectorized::MethodOneNumber<unsigned char, doris::JoinHashTable<unsigned char, HashCRC32<unsigned char> > >, doris::vectorized::MethodOneNumber<unsigned short, doris::JoinHashTable<unsigned short, HashCRC32<unsigned short> > >, doris::vectorized::MethodOneNumber<unsigned int, doris::JoinHashTable<unsigned int, HashCRC32<unsigned in [...] 4# std::invoke_result<doris::pipeline::HashJoinBuildSinkOperatorX::sink(doris::RuntimeState*, doris::vectorized::Block*, bool)::$_0, std::__conditional<is_lvalue_reference_v<std::variant<std::monostate, doris::vectorized::MethodSerialized<doris::JoinHashTable<doris::StringRef, DefaultHash<doris::StringRef, void> > >, doris::vectorized::MethodOneNumber<unsigned char, doris::JoinHashTable<unsigned char, HashCRC32<unsigned char> > >, doris::vectorized::MethodOneNumber<unsigned short, do [...] 5# doris::pipeline::HashJoinBuildSinkOperatorX::sink(doris::RuntimeState*, doris::vectorized::Block*, bool) at /mnt/disk3/xiaolei/doris_master/be/src/pipeline/exec/hashjoin_build_sink.cpp:591 6# doris::pipeline::PipelineTask::execute(bool*) at /mnt/disk3/xiaolei/doris_master/be/src/pipeline/pipeline_task.cpp:398 7# doris::pipeline::TaskScheduler::_do_work(int) at /mnt/disk3/xiaolei/doris_master/be/src/pipeline/task_scheduler.cpp:144 8# doris::pipeline::TaskScheduler::start()::$_0::operator()() const at /mnt/disk3/xiaolei/doris_master/be/src/pipeline/task_scheduler.cpp:63 9# void std::__invoke_impl<void, doris::pipeline::TaskScheduler::start()::$_0&>(std::__invoke_other, doris::pipeline::TaskScheduler::start()::$_0&) at /mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:61 10# std::enable_if<is_invocable_r_v<void, doris::pipeline::TaskScheduler::start()::$_0&>, void>::type std::__invoke_r<void, doris::pipeline::TaskScheduler::start()::$_0&>(doris::pipeline::TaskScheduler::start()::$_0&) at /mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:117 11# std::_Function_handler<void (), doris::pipeline::TaskScheduler::start()::$_0>::_M_invoke(std::_Any_data const&) at /mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:290 12# std::function<void ()>::operator()() const at /mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:591 13# doris::FunctionRunnable::run() at /mnt/disk3/xiaolei/doris_master/be/src/util/threadpool.cpp:60 14# doris::ThreadPool::dispatch_thread() at /mnt/disk3/xiaolei/doris_master/be/src/util/threadpool.cpp:608 15# void std::__invoke_impl<void, void (doris::ThreadPool::*&)(), doris::ThreadPool*&>(std::__invoke_memfun_deref, void (doris::ThreadPool::*&)(), doris::ThreadPool*&) at /mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:74 16# std::__invoke_result<void (doris::ThreadPool::*&)(), doris::ThreadPool*&>::type std::__invoke<void (doris::ThreadPool::*&)(), doris::ThreadPool*&>(void (doris::ThreadPool::*&)(), doris::ThreadPool*&) at /mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:96 17# void std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()>::__call<void, , 0ul>(std::tuple<>&&, std::_Index_tuple<0ul>) at /mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/functional:506 18# void std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()>::operator()<, void>() at /mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/functional:591 19# void std::__invoke_impl<void, std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()>&>(std::__invoke_other, std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()>&) at /mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:61 20# std::enable_if<is_invocable_r_v<void, std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()>&>, void>::type std::__invoke_r<void, std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()>&>(std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()>&) at /mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:117 21# std::_Function_handler<void (), std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()> >::_M_invoke(std::_Any_data const&) at /mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:290 22# std::function<void ()>::operator()() const at /mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:591 23# doris::Thread::supervise_thread(void*) at /mnt/disk3/xiaolei/doris_master/be/src/util/thread.cpp:498 24# asan_thread_start(void*) in /mnt/disk3/xiaolei/doris_master/output/be/lib/doris_be 25# start_thread in /lib64/libpthread.so.0 26# __GI___clone in /lib64/libc.so.6 ``` ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 3 ++- be/src/pipeline/pipeline.cpp | 3 ++- be/src/pipeline/pipeline_task.cpp | 25 +++++++++++++++++++++ .../join/test_slow_close/test_slow_close.out | Bin 114 -> 133 bytes .../join/test_slow_close/test_slow_close.groovy | 10 +++++++++ 5 files changed, 39 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 084b51510e9..e271dbacba0 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -674,7 +674,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* // the instance which is not build hash table, it's should wait the signal of hash table build finished. // but if it's running and signaled == false, maybe the source operator have closed caused by some short circuit // return eof will make task marked as wake_up_early - if (!_shared_hash_table_context->signaled) { + // todo: remove signaled after we can guarantee that wake up eraly is always set accurately + if (!_shared_hash_table_context->signaled || state->get_task()->wake_up_early()) { return Status::Error<ErrorCode::END_OF_FILE>("source have closed"); } diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 2dd0394d2ae..27980976d81 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -114,8 +114,9 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) { void Pipeline::make_all_runnable() { DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", { auto pipeline_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>( - "Pipeline::make_all_runnable", "pipeline_id", 0); + "Pipeline::make_all_runnable.sleep", "pipeline_id", -1); if (pipeline_id == id()) { + LOG(WARNING) << "Pipeline::make_all_runnable.sleep sleep 10s"; sleep(10); } }); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 27d1242cd42..d394535de07 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -331,6 +331,18 @@ Status PipelineTask::execute(bool* eos) { // The status must be runnable if (!_opened && !_fragment_context->is_canceled()) { + DBUG_EXECUTE_IF("PipelineTask::execute.open_sleep", { + auto required_pipeline_id = + DebugPoints::instance()->get_debug_param_or_default<int32_t>( + "PipelineTask::execute.open_sleep", "pipeline_id", -1); + auto required_task_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>( + "PipelineTask::execute.open_sleep", "task_id", -1); + if (required_pipeline_id == pipeline_id() && required_task_id == task_id()) { + LOG(WARNING) << "PipelineTask::execute.open_sleep sleep 5s"; + sleep(5); + } + }); + if (_wake_up_early) { *eos = true; _eos = true; @@ -507,6 +519,19 @@ Status PipelineTask::execute(bool* eos) { RETURN_IF_ERROR(close(Status::OK(), false)); } + DBUG_EXECUTE_IF("PipelineTask::execute.sink_eos_sleep", { + auto required_pipeline_id = + DebugPoints::instance()->get_debug_param_or_default<int32_t>( + "PipelineTask::execute.sink_eos_sleep", "pipeline_id", -1); + auto required_task_id = + DebugPoints::instance()->get_debug_param_or_default<int32_t>( + "PipelineTask::execute.sink_eos_sleep", "task_id", -1); + if (required_pipeline_id == pipeline_id() && required_task_id == task_id()) { + LOG(WARNING) << "PipelineTask::execute.sink_eos_sleep sleep 10s"; + sleep(10); + } + }); + status = _sink->sink(_state, block, *eos); if (status.is<ErrorCode::END_OF_FILE>()) { diff --git a/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out index cb92be84e47..5e4d8ec9448 100644 Binary files a/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out and b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out differ diff --git a/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy index 8d1c33ff923..0b36d2da5ab 100644 --- a/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy +++ b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy @@ -75,4 +75,14 @@ suite("test_slow_close") { } finally { GetDebugPoint().disableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep") } + + sql "set ignore_runtime_filter_ids='0';" + try { + GetDebugPoint().enableDebugPointForAllBEs("PipelineTask::execute.open_sleep",[pipeline_id: 4, task_id: 7]) + GetDebugPoint().enableDebugPointForAllBEs("PipelineTask::execute.sink_eos_sleep",[pipeline_id: 4, task_id: 15]) + qt_sql "select count(*),sleep(2) from (select t1.k1 from t5 join [broadcast] t1 on t1.k1=t5.k1) tmp join [broadcast] t3 join t3 t3s [broadcast] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;" + } finally { + GetDebugPoint().disableDebugPointForAllBEs("PipelineTask::execute.open_sleep") + GetDebugPoint().disableDebugPointForAllBEs("PipelineTask::execute.sink_eos_sleep") + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org