This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e23551dba78 [fix](spill) Finish dependency of join sink operator was released early (#49701) e23551dba78 is described below commit e23551dba787428a60447d45394bd2317805fa9a Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Wed Apr 2 20:54:47 2025 +0800 [fix](spill) Finish dependency of join sink operator was released early (#49701) ### What problem does this PR solve? The `inner_runtime_state` will be replaced in probe phase for spilling cases, and the finish dependency will be released. ``` *** SIGABRT unknown detail explain (@0x21c4) received by PID 8644 (TID 10986 OR 0x7f68dc440640) from PID 8644; stack trace: *** 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /root/doris/be/src/common/signal_handler.h:421 1# 0x00007F6CBA242520 in /lib/x86_64-linux-gnu/libc.so.6 2# pthread_kill at ./nptl/pthread_kill.c:89 3# raise at ../sysdeps/posix/raise.c:27 4# abort at ./stdlib/abort.c:81 5# _nl_load_domain at ./intl/loadmsgcat.c:1177 6# 0x00007F6CBA239E96 in /lib/x86_64-linux-gnu/libc.so.6 7# pthread_mutex_lock at ./nptl/pthread_mutex_lock.c:139 8# doris::pipeline::PipelineTask::terminate() at /root/doris/be/src/pipeline/pipeline_task.cpp:316 9# doris::pipeline::Pipeline::make_all_runnable() in /mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be 10# doris::pipeline::PipelineFragmentContext::decrement_running_task(unsigned int) at /root/doris/be/src/pipeline/pipeline_fragment_context.cpp:1781 11# doris::Defer::~Defer() at /root/doris/be/src/util/defer_op.h:37 12# doris::pipeline::TaskScheduler::_do_work(int) at /root/doris/be/src/pipeline/task_scheduler.cpp:166 13# doris::ThreadPool::dispatch_thread() at /root/doris/be/src/util/threadpool.cpp:623 14# doris::Thread::supervise_thread(void*) at /root/doris/be/src/util/thread.cpp:499 15# start_thread at ./nptl/pthread_create.c:442 16# 0x00007F6CBA326850 at ../sysdeps/unix/sysv/linux/x86_64/clone3.S: ``` --- .../pipeline/exec/partitioned_hash_join_probe_operator.cpp | 13 +------------ be/src/pipeline/exec/partitioned_hash_join_probe_operator.h | 2 -- .../pipeline/exec/partitioned_hash_join_sink_operator.cpp | 10 ++++------ be/src/pipeline/exec/partitioned_hash_join_sink_operator.h | 2 ++ .../operator/partitioned_hash_join_probe_operator_test.cpp | 6 +----- .../operator/partitioned_hash_join_sink_operator_test.cpp | 4 ++++ 6 files changed, 12 insertions(+), 25 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 12f2850cf05..90ea463ca1b 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -577,14 +577,6 @@ Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized:: return Status::OK(); } -Status PartitionedHashJoinProbeOperatorX::_setup_internal_operator_for_non_spill( - PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state) { - DCHECK(local_state._shared_state->inner_runtime_state); - local_state._in_mem_shared_state_sptr = - std::move(local_state._shared_state->inner_shared_state); - return Status::OK(); -} - Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators( PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state) const { local_state._shared_state->inner_runtime_state = RuntimeState::create_unique( @@ -876,10 +868,7 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori return _revoke_memory(state); } } else { - if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { - RETURN_IF_ERROR(_setup_internal_operator_for_non_spill(local_state, state)); - } - + DCHECK(local_state._shared_state->inner_runtime_state); RETURN_IF_ERROR(_inner_probe_operator->push( local_state._shared_state->inner_runtime_state.get(), local_state._child_block.get(), local_state._child_eos)); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 2e8efef4201..8dad0b8469b 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -167,8 +167,6 @@ private: [[nodiscard]] Status _setup_internal_operators(PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state) const; - [[nodiscard]] Status _setup_internal_operator_for_non_spill( - PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state); bool _should_revoke_memory(RuntimeState* state) const; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index e27b215a911..1c4de355aaa 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -164,12 +164,7 @@ size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* sta } Dependency* PartitionedHashJoinSinkLocalState::finishdependency() { - if (auto* tmp_sink_state = _shared_state->inner_runtime_state->get_sink_local_state()) { - auto* inner_sink_state = assert_cast<HashJoinBuildSinkLocalState*>(tmp_sink_state); - return inner_sink_state->finishdependency(); - } - DCHECK(false) << "Should not reach here!"; - return nullptr; + return _finish_dependency.get(); } Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( @@ -197,6 +192,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( // therefore, all runtime filters are temporarily disabled. RETURN_IF_ERROR(inner_sink_state->_runtime_filter_producer_helper->skip_process( _shared_state->inner_runtime_state.get())); + _finish_dependency->set_ready(); } if (build_block.rows() <= 1) { @@ -548,6 +544,8 @@ Status PartitionedHashJoinSinkLocalState::_setup_internal_operator(RuntimeState* RETURN_IF_ERROR(probe_local_state->open(state)); RETURN_IF_ERROR(sink_local_state->open(state)); + _finish_dependency = sink_local_state->finishdependency()->shared_from_this(); + /// Set these two values after all the work is ready. _shared_state->inner_shared_state = std::move(inner_shared_state); _shared_state->inner_runtime_state = std::move(inner_runtime_state); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 7bca4da8ce2..cae48462014 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -24,6 +24,7 @@ #include "common/be_mock_util.h" #include "common/status.h" #include "operator.h" +#include "pipeline/dependency.h" #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/join_build_sink_operator.h" @@ -80,6 +81,7 @@ protected: std::unique_ptr<vectorized::PartitionerBase> _partitioner; std::unique_ptr<RuntimeProfile> _internal_runtime_profile; + std::shared_ptr<Dependency> _finish_dependency; RuntimeProfile::Counter* _partition_timer = nullptr; RuntimeProfile::Counter* _partition_shuffle_timer = nullptr; diff --git a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp index b9b1a1329df..dbcd41b40ee 100644 --- a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp +++ b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp @@ -1129,14 +1129,10 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, Other) { auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), probe_operator.get(), shared_state); - auto st = probe_operator->_setup_internal_operator_for_non_spill(*local_state, - _helper.runtime_state.get()); - ASSERT_TRUE(st.ok()) << "Setup internal operator failed: " << st.to_string(); - local_state->_shared_state->need_to_spill = true; ASSERT_FALSE(probe_operator->_should_revoke_memory(_helper.runtime_state.get())); - st = probe_operator->_revoke_memory(_helper.runtime_state.get()); + auto st = probe_operator->_revoke_memory(_helper.runtime_state.get()); ASSERT_TRUE(st.ok()) << "Revoke memory failed: " << st.to_string(); } diff --git a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp index a9dae776a2d..010cd6ea223 100644 --- a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp +++ b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp @@ -348,6 +348,10 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, RevokeMemory) { sink_state->_shared_state->inner_runtime_state->emplace_sink_local_state( 0, std::move(inner_sink_local_state)); + sink_state->_finish_dependency = + Dependency::create_shared(sink_operator->operator_id(), sink_operator->node_id(), + "HashJoinBuildFinishDependency", true); + // Expect revoke memory to trigger spilling status = sink_state->revoke_memory(_helper.runtime_state.get(), nullptr); ASSERT_TRUE(status.ok()) << "Revoke memory failed: " << status.to_string(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org