This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ad39b9e2afb [Bug](pipeline) fix nullptr access when broadcast join 
meet wakeup early and build sh… (#48247)
ad39b9e2afb is described below

commit ad39b9e2afbfaa2e2ebfea456d00e35d89fd98ea
Author: Pxl <x...@selectdb.com>
AuthorDate: Tue Feb 25 14:24:06 2025 +0800

    [Bug](pipeline) fix nullptr access when broadcast join meet wakeup early 
and build sh… (#48247)
    
    …ared hash table's instance not opened
    
    ### What problem does this PR solve?
    Consider a scenario like this:
    broadcast join task is wake up early, and then the following things
    happen in order.
    1. The instance of the build hash table was not opened, it went directly
    to close, and signaled other instances
    2. Other instances are opened. Enter sink eos. At this time, the shared
    hash table is a nullptr
    3. coredump due to nullptr
    
    related with: https://github.com/apache/doris/pull/47380
    https://github.com/apache/doris/pull/48008
    
    ```cpp
    0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, 
siginfo_t*, void*) at 
/mnt/disk3/xiaolei/doris_master/be/src/common/signal_handler.h:421
     1# 0x00007F7F7EA625B0 in /lib64/libc.so.6
     2# std::__detail::__variant::_Variant_storage<false, std::monostate, 
doris::vectorized::MethodSerialized<doris::JoinHashTable<doris::StringRef, 
DefaultHash<doris::StringRef, void> > >, 
doris::vectorized::MethodOneNumber<unsigned char, doris::JoinHashTable<unsigned 
char, HashCRC32<unsigned char> > >, doris::vectorized::MethodOneNumber<unsigned 
short, doris::JoinHashTable<unsigned short, HashCRC32<unsigned short> > >, 
doris::vectorized::MethodOneNumber<unsigned int, doris::JoinHashTabl [...]
     3# std::variant<std::monostate, 
doris::vectorized::MethodSerialized<doris::JoinHashTable<doris::StringRef, 
DefaultHash<doris::StringRef, void> > >, 
doris::vectorized::MethodOneNumber<unsigned char, doris::JoinHashTable<unsigned 
char, HashCRC32<unsigned char> > >, doris::vectorized::MethodOneNumber<unsigned 
short, doris::JoinHashTable<unsigned short, HashCRC32<unsigned short> > >, 
doris::vectorized::MethodOneNumber<unsigned int, doris::JoinHashTable<unsigned 
int, HashCRC32<unsigned in [...]
     4# 
std::invoke_result<doris::pipeline::HashJoinBuildSinkOperatorX::sink(doris::RuntimeState*,
 doris::vectorized::Block*, bool)::$_0, 
std::__conditional<is_lvalue_reference_v<std::variant<std::monostate, 
doris::vectorized::MethodSerialized<doris::JoinHashTable<doris::StringRef, 
DefaultHash<doris::StringRef, void> > >, 
doris::vectorized::MethodOneNumber<unsigned char, doris::JoinHashTable<unsigned 
char, HashCRC32<unsigned char> > >, doris::vectorized::MethodOneNumber<unsigned 
short, do [...]
     5# doris::pipeline::HashJoinBuildSinkOperatorX::sink(doris::RuntimeState*, 
doris::vectorized::Block*, bool) at 
/mnt/disk3/xiaolei/doris_master/be/src/pipeline/exec/hashjoin_build_sink.cpp:591
     6# doris::pipeline::PipelineTask::execute(bool*) at 
/mnt/disk3/xiaolei/doris_master/be/src/pipeline/pipeline_task.cpp:398
     7# doris::pipeline::TaskScheduler::_do_work(int) at 
/mnt/disk3/xiaolei/doris_master/be/src/pipeline/task_scheduler.cpp:144
     8# doris::pipeline::TaskScheduler::start()::$_0::operator()() const at 
/mnt/disk3/xiaolei/doris_master/be/src/pipeline/task_scheduler.cpp:63
     9# void std::__invoke_impl<void, 
doris::pipeline::TaskScheduler::start()::$_0&>(std::__invoke_other, 
doris::pipeline::TaskScheduler::start()::$_0&) at 
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:61
    10# std::enable_if<is_invocable_r_v<void, 
doris::pipeline::TaskScheduler::start()::$_0&>, void>::type 
std::__invoke_r<void, 
doris::pipeline::TaskScheduler::start()::$_0&>(doris::pipeline::TaskScheduler::start()::$_0&)
 at 
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:117
    11# std::_Function_handler<void (), 
doris::pipeline::TaskScheduler::start()::$_0>::_M_invoke(std::_Any_data const&) 
at 
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:290
    12# std::function<void ()>::operator()() const at 
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:591
    13# doris::FunctionRunnable::run() at 
/mnt/disk3/xiaolei/doris_master/be/src/util/threadpool.cpp:60
    14# doris::ThreadPool::dispatch_thread() at 
/mnt/disk3/xiaolei/doris_master/be/src/util/threadpool.cpp:608
    15# void std::__invoke_impl<void, void (doris::ThreadPool::*&)(), 
doris::ThreadPool*&>(std::__invoke_memfun_deref, void 
(doris::ThreadPool::*&)(), doris::ThreadPool*&) at 
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:74
    16# std::__invoke_result<void (doris::ThreadPool::*&)(), 
doris::ThreadPool*&>::type std::__invoke<void (doris::ThreadPool::*&)(), 
doris::ThreadPool*&>(void (doris::ThreadPool::*&)(), doris::ThreadPool*&) at 
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:96
    17# void std::_Bind<void 
(doris::ThreadPool::*(doris::ThreadPool*))()>::__call<void, , 
0ul>(std::tuple<>&&, std::_Index_tuple<0ul>) at 
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/functional:506
    18# void std::_Bind<void 
(doris::ThreadPool::*(doris::ThreadPool*))()>::operator()<, void>() at 
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/functional:591
    19# void std::__invoke_impl<void, std::_Bind<void 
(doris::ThreadPool::*(doris::ThreadPool*))()>&>(std::__invoke_other, 
std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()>&) at 
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:61
    20# std::enable_if<is_invocable_r_v<void, std::_Bind<void 
(doris::ThreadPool::*(doris::ThreadPool*))()>&>, void>::type 
std::__invoke_r<void, std::_Bind<void 
(doris::ThreadPool::*(doris::ThreadPool*))()>&>(std::_Bind<void 
(doris::ThreadPool::*(doris::ThreadPool*))()>&) at 
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:117
    21# std::_Function_handler<void (), std::_Bind<void 
(doris::ThreadPool::*(doris::ThreadPool*))()> >::_M_invoke(std::_Any_data 
const&) at 
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:290
    22# std::function<void ()>::operator()() const at 
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:591
    23# doris::Thread::supervise_thread(void*) at 
/mnt/disk3/xiaolei/doris_master/be/src/util/thread.cpp:498
    24# asan_thread_start(void*) in 
/mnt/disk3/xiaolei/doris_master/output/be/lib/doris_be
    25# start_thread in /lib64/libpthread.so.0
    26# __GI___clone in /lib64/libc.so.6
    ```
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |   3 ++-
 be/src/pipeline/pipeline.cpp                       |   3 ++-
 be/src/pipeline/pipeline_task.cpp                  |  25 +++++++++++++++++++++
 .../join/test_slow_close/test_slow_close.out       | Bin 114 -> 133 bytes
 .../join/test_slow_close/test_slow_close.groovy    |  10 +++++++++
 5 files changed, 39 insertions(+), 2 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 084b51510e9..e271dbacba0 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -674,7 +674,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
         // the instance which is not build hash table, it's should wait the 
signal of hash table build finished.
         // but if it's running and signaled == false, maybe the source 
operator have closed caused by some short circuit
         // return eof will make task marked as wake_up_early
-        if (!_shared_hash_table_context->signaled) {
+        // todo: remove signaled after we can guarantee that wake up eraly is 
always set accurately
+        if (!_shared_hash_table_context->signaled || 
state->get_task()->wake_up_early()) {
             return Status::Error<ErrorCode::END_OF_FILE>("source have closed");
         }
 
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 2dd0394d2ae..27980976d81 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -114,8 +114,9 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
 void Pipeline::make_all_runnable() {
     DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", {
         auto pipeline_id = 
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
-                "Pipeline::make_all_runnable", "pipeline_id", 0);
+                "Pipeline::make_all_runnable.sleep", "pipeline_id", -1);
         if (pipeline_id == id()) {
+            LOG(WARNING) << "Pipeline::make_all_runnable.sleep sleep 10s";
             sleep(10);
         }
     });
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 27d1242cd42..d394535de07 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -331,6 +331,18 @@ Status PipelineTask::execute(bool* eos) {
 
     // The status must be runnable
     if (!_opened && !_fragment_context->is_canceled()) {
+        DBUG_EXECUTE_IF("PipelineTask::execute.open_sleep", {
+            auto required_pipeline_id =
+                    
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                            "PipelineTask::execute.open_sleep", "pipeline_id", 
-1);
+            auto required_task_id = 
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                    "PipelineTask::execute.open_sleep", "task_id", -1);
+            if (required_pipeline_id == pipeline_id() && required_task_id == 
task_id()) {
+                LOG(WARNING) << "PipelineTask::execute.open_sleep sleep 5s";
+                sleep(5);
+            }
+        });
+
         if (_wake_up_early) {
             *eos = true;
             _eos = true;
@@ -507,6 +519,19 @@ Status PipelineTask::execute(bool* eos) {
                 RETURN_IF_ERROR(close(Status::OK(), false));
             }
 
+            DBUG_EXECUTE_IF("PipelineTask::execute.sink_eos_sleep", {
+                auto required_pipeline_id =
+                        
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                                "PipelineTask::execute.sink_eos_sleep", 
"pipeline_id", -1);
+                auto required_task_id =
+                        
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                                "PipelineTask::execute.sink_eos_sleep", 
"task_id", -1);
+                if (required_pipeline_id == pipeline_id() && required_task_id 
== task_id()) {
+                    LOG(WARNING) << "PipelineTask::execute.sink_eos_sleep 
sleep 10s";
+                    sleep(10);
+                }
+            });
+
             status = _sink->sink(_state, block, *eos);
 
             if (status.is<ErrorCode::END_OF_FILE>()) {
diff --git 
a/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out 
b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out
index cb92be84e47..5e4d8ec9448 100644
Binary files 
a/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out and 
b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out differ
diff --git 
a/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy 
b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
index 8d1c33ff923..0b36d2da5ab 100644
--- 
a/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
+++ 
b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
@@ -75,4 +75,14 @@ suite("test_slow_close") {
     } finally {
         
GetDebugPoint().disableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep")
     }
+
+    sql "set ignore_runtime_filter_ids='0';"
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("PipelineTask::execute.open_sleep",[pipeline_id:
 4, task_id: 7])
+        
GetDebugPoint().enableDebugPointForAllBEs("PipelineTask::execute.sink_eos_sleep",[pipeline_id:
 4, task_id: 15])
+        qt_sql "select count(*),sleep(2) from (select t1.k1 from t5 join 
[broadcast] t1 on t1.k1=t5.k1) tmp join [broadcast] t3 join t3 t3s [broadcast] 
on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("PipelineTask::execute.open_sleep")
+        
GetDebugPoint().disableDebugPointForAllBEs("PipelineTask::execute.sink_eos_sleep")
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to