This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e23551dba78 [fix](spill) Finish dependency of join sink operator was 
released early (#49701)
e23551dba78 is described below

commit e23551dba787428a60447d45394bd2317805fa9a
Author: Jerry Hu <hushengg...@selectdb.com>
AuthorDate: Wed Apr 2 20:54:47 2025 +0800

    [fix](spill) Finish dependency of join sink operator was released early 
(#49701)
    
    ### What problem does this PR solve?
    
    The `inner_runtime_state` will be replaced in probe phase for spilling
    cases, and the finish dependency will be released.
    
    ```
    *** SIGABRT unknown detail explain (@0x21c4) received by PID 8644 (TID 
10986 OR 0x7f68dc440640) from PID 8644; stack trace: ***
     0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, 
siginfo_t*, void*) at /root/doris/be/src/common/signal_handler.h:421
     1# 0x00007F6CBA242520 in /lib/x86_64-linux-gnu/libc.so.6
     2# pthread_kill at ./nptl/pthread_kill.c:89
     3# raise at ../sysdeps/posix/raise.c:27
     4# abort at ./stdlib/abort.c:81
     5# _nl_load_domain at ./intl/loadmsgcat.c:1177
     6# 0x00007F6CBA239E96 in /lib/x86_64-linux-gnu/libc.so.6
     7# pthread_mutex_lock at ./nptl/pthread_mutex_lock.c:139
     8# doris::pipeline::PipelineTask::terminate() at 
/root/doris/be/src/pipeline/pipeline_task.cpp:316
     9# doris::pipeline::Pipeline::make_all_runnable() in 
/mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be
    10# 
doris::pipeline::PipelineFragmentContext::decrement_running_task(unsigned int) 
at /root/doris/be/src/pipeline/pipeline_fragment_context.cpp:1781
    11# doris::Defer::~Defer() at /root/doris/be/src/util/defer_op.h:37
    12# doris::pipeline::TaskScheduler::_do_work(int) at 
/root/doris/be/src/pipeline/task_scheduler.cpp:166
    13# doris::ThreadPool::dispatch_thread() at 
/root/doris/be/src/util/threadpool.cpp:623
    14# doris::Thread::supervise_thread(void*) at 
/root/doris/be/src/util/thread.cpp:499
    15# start_thread at ./nptl/pthread_create.c:442
    16# 0x00007F6CBA326850 at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:
    ```
---
 .../pipeline/exec/partitioned_hash_join_probe_operator.cpp  | 13 +------------
 be/src/pipeline/exec/partitioned_hash_join_probe_operator.h |  2 --
 .../pipeline/exec/partitioned_hash_join_sink_operator.cpp   | 10 ++++------
 be/src/pipeline/exec/partitioned_hash_join_sink_operator.h  |  2 ++
 .../operator/partitioned_hash_join_probe_operator_test.cpp  |  6 +-----
 .../operator/partitioned_hash_join_sink_operator_test.cpp   |  4 ++++
 6 files changed, 12 insertions(+), 25 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 12f2850cf05..90ea463ca1b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -577,14 +577,6 @@ Status 
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
     return Status::OK();
 }
 
-Status 
PartitionedHashJoinProbeOperatorX::_setup_internal_operator_for_non_spill(
-        PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state) {
-    DCHECK(local_state._shared_state->inner_runtime_state);
-    local_state._in_mem_shared_state_sptr =
-            std::move(local_state._shared_state->inner_shared_state);
-    return Status::OK();
-}
-
 Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
         PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state) 
const {
     local_state._shared_state->inner_runtime_state = 
RuntimeState::create_unique(
@@ -876,10 +868,7 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
                 return _revoke_memory(state);
             }
         } else {
-            if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
-                
RETURN_IF_ERROR(_setup_internal_operator_for_non_spill(local_state, state));
-            }
-
+            DCHECK(local_state._shared_state->inner_runtime_state);
             RETURN_IF_ERROR(_inner_probe_operator->push(
                     local_state._shared_state->inner_runtime_state.get(),
                     local_state._child_block.get(), local_state._child_eos));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 2e8efef4201..8dad0b8469b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -167,8 +167,6 @@ private:
 
     [[nodiscard]] Status 
_setup_internal_operators(PartitionedHashJoinProbeLocalState& local_state,
                                                    RuntimeState* state) const;
-    [[nodiscard]] Status _setup_internal_operator_for_non_spill(
-            PartitionedHashJoinProbeLocalState& local_state, RuntimeState* 
state);
 
     bool _should_revoke_memory(RuntimeState* state) const;
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index e27b215a911..1c4de355aaa 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -164,12 +164,7 @@ size_t 
PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* sta
 }
 
 Dependency* PartitionedHashJoinSinkLocalState::finishdependency() {
-    if (auto* tmp_sink_state = 
_shared_state->inner_runtime_state->get_sink_local_state()) {
-        auto* inner_sink_state = 
assert_cast<HashJoinBuildSinkLocalState*>(tmp_sink_state);
-        return inner_sink_state->finishdependency();
-    }
-    DCHECK(false) << "Should not reach here!";
-    return nullptr;
+    return _finish_dependency.get();
 }
 
 Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
@@ -197,6 +192,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
         // therefore, all runtime filters are temporarily disabled.
         
RETURN_IF_ERROR(inner_sink_state->_runtime_filter_producer_helper->skip_process(
                 _shared_state->inner_runtime_state.get()));
+        _finish_dependency->set_ready();
     }
 
     if (build_block.rows() <= 1) {
@@ -548,6 +544,8 @@ Status 
PartitionedHashJoinSinkLocalState::_setup_internal_operator(RuntimeState*
     RETURN_IF_ERROR(probe_local_state->open(state));
     RETURN_IF_ERROR(sink_local_state->open(state));
 
+    _finish_dependency = 
sink_local_state->finishdependency()->shared_from_this();
+
     /// Set these two values after all the work is ready.
     _shared_state->inner_shared_state = std::move(inner_shared_state);
     _shared_state->inner_runtime_state = std::move(inner_runtime_state);
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 7bca4da8ce2..cae48462014 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -24,6 +24,7 @@
 #include "common/be_mock_util.h"
 #include "common/status.h"
 #include "operator.h"
+#include "pipeline/dependency.h"
 #include "pipeline/exec/hashjoin_build_sink.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
 #include "pipeline/exec/join_build_sink_operator.h"
@@ -80,6 +81,7 @@ protected:
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
 
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
+    std::shared_ptr<Dependency> _finish_dependency;
 
     RuntimeProfile::Counter* _partition_timer = nullptr;
     RuntimeProfile::Counter* _partition_shuffle_timer = nullptr;
diff --git 
a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp 
b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
index b9b1a1329df..dbcd41b40ee 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
@@ -1129,14 +1129,10 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, Other) {
     auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
                                                         probe_operator.get(), 
shared_state);
 
-    auto st = 
probe_operator->_setup_internal_operator_for_non_spill(*local_state,
-                                                                     
_helper.runtime_state.get());
-    ASSERT_TRUE(st.ok()) << "Setup internal operator failed: " << 
st.to_string();
-
     local_state->_shared_state->need_to_spill = true;
     
ASSERT_FALSE(probe_operator->_should_revoke_memory(_helper.runtime_state.get()));
 
-    st = probe_operator->_revoke_memory(_helper.runtime_state.get());
+    auto st = probe_operator->_revoke_memory(_helper.runtime_state.get());
     ASSERT_TRUE(st.ok()) << "Revoke memory failed: " << st.to_string();
 }
 
diff --git 
a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp 
b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
index a9dae776a2d..010cd6ea223 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
@@ -348,6 +348,10 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, RevokeMemory) {
     sink_state->_shared_state->inner_runtime_state->emplace_sink_local_state(
             0, std::move(inner_sink_local_state));
 
+    sink_state->_finish_dependency =
+            Dependency::create_shared(sink_operator->operator_id(), 
sink_operator->node_id(),
+                                      "HashJoinBuildFinishDependency", true);
+
     // Expect revoke memory to trigger spilling
     status = sink_state->revoke_memory(_helper.runtime_state.get(), nullptr);
     ASSERT_TRUE(status.ok()) << "Revoke memory failed: " << status.to_string();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to