This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fb501b09ef [refactor](spill) unify the entry point of spill tasks 
(#37020)
8fb501b09ef is described below

commit 8fb501b09efd77a299ab3405ff25f7a652f14b7a
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Mon Jul 1 09:52:32 2024 +0800

    [refactor](spill) unify the entry point of spill tasks (#37020)
---
 .../exec/partitioned_aggregation_sink_operator.cpp |  30 ++----
 .../partitioned_aggregation_source_operator.cpp    |  29 ++----
 .../exec/partitioned_aggregation_source_operator.h |   1 -
 .../exec/partitioned_hash_join_probe_operator.cpp  |  76 +++-----------
 .../exec/partitioned_hash_join_probe_operator.h    |   4 +-
 .../exec/partitioned_hash_join_sink_operator.cpp   | 112 +++++++--------------
 .../exec/partitioned_hash_join_sink_operator.h     |   3 +-
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  30 +-----
 .../pipeline/exec/spill_sort_source_operator.cpp   |  27 +----
 be/src/pipeline/exec/spill_sort_source_operator.h  |   1 -
 be/src/pipeline/exec/spill_utils.h                 |  76 ++++++++++++++
 11 files changed, 152 insertions(+), 237 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index b833289e0e0..4399f3c7045 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -22,6 +22,7 @@
 
 #include "aggregation_sink_operator.h"
 #include "common/status.h"
+#include "pipeline/exec/spill_utils.h"
 #include "runtime/fragment_mgr.h"
 #include "vec/spill/spill_stream_manager.h"
 
@@ -253,14 +254,7 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
         }
     }};
 
-    auto execution_context = state->get_task_execution_context();
-    /// Resources in shared state will be released when the operator is closed,
-    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
-    /// So, we need hold the pointer of shared state.
-    std::weak_ptr<PartitionedAggSharedState> shared_state_holder =
-            _shared_state->shared_from_this();
     auto query_id = state->query_id();
-    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
@@ -269,20 +263,10 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
                 "fault_inject partitioned_agg_sink revoke_memory submit_func 
failed");
         return status;
     });
-    status = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
-            [this, &parent, state, query_id, mem_tracker, shared_state_holder, 
execution_context,
-             submit_timer] {
-                SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
-                std::shared_ptr<TaskExecutionContext> execution_context_lock;
-                auto shared_state_sptr = shared_state_holder.lock();
-                if (shared_state_sptr) {
-                    execution_context_lock = execution_context.lock();
-                }
-                if (!shared_state_sptr || !execution_context_lock) {
-                    LOG(INFO) << "query " << print_id(query_id)
-                              << " execution_context released, maybe query was 
cancelled.";
-                    return Status::Cancelled("Cancelled");
-                }
+
+    auto spill_runnable = std::make_shared<SpillRunnable>(
+            state, _shared_state->shared_from_this(),
+            [this, &parent, state, query_id, submit_timer] {
                 
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
                     auto st = Status::InternalError(
                             "fault_inject partitioned_agg_sink "
@@ -332,7 +316,9 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
                         
parent._agg_sink_operator->reset_hash_table(runtime_state);
                 return Base::_shared_state->sink_status;
             });
-    return status;
+
+    return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
+            std::move(spill_runnable));
 }
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index fd609d95eef..a8c4e7b0bcc 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -23,6 +23,7 @@
 #include "common/exception.h"
 #include "common/status.h"
 #include "pipeline/exec/operator.h"
+#include "pipeline/exec/spill_utils.h"
 #include "runtime/fragment_mgr.h"
 #include "util/runtime_profile.h"
 #include "vec/spill/spill_stream_manager.h"
@@ -204,18 +205,11 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
     
RETURN_IF_ERROR(Base::_shared_state->in_mem_shared_state->reset_hash_table());
     _dependency->Dependency::block();
 
-    auto execution_context = state->get_task_execution_context();
-    /// Resources in shared state will be released when the operator is closed,
-    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
-    /// So, we need hold the pointer of shared state.
-    std::weak_ptr<PartitionedAggSharedState> shared_state_holder =
-            _shared_state->shared_from_this();
     auto query_id = state->query_id();
-    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
-    auto spill_func = [this, state, query_id, execution_context, submit_timer] 
{
+    auto spill_func = [this, state, query_id, submit_timer] {
         _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
         Defer defer {[&]() {
             if (!_status.ok() || state->is_cancelled()) {
@@ -276,19 +270,7 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
         return _status;
     };
 
-    auto exception_catch_func = [spill_func, query_id, mem_tracker, 
shared_state_holder,
-                                 execution_context, this]() {
-        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
-        std::shared_ptr<TaskExecutionContext> execution_context_lock;
-        auto shared_state_sptr = shared_state_holder.lock();
-        if (shared_state_sptr) {
-            execution_context_lock = execution_context.lock();
-        }
-        if (!shared_state_sptr || !execution_context_lock) {
-            LOG(INFO) << "query " << print_id(query_id)
-                      << " execution_context released, maybe query was 
cancelled.";
-            return;
-        }
+    auto exception_catch_func = [spill_func, query_id, this]() {
         
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel",
 {
             auto st = Status::InternalError(
                     "fault_inject partitioned_agg_source "
@@ -308,7 +290,8 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
         return Status::Error<INTERNAL_ERROR>(
                 "fault_inject partitioned_agg_source submit_func failed");
     });
-    return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
-            exception_catch_func);
+    return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
+            std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
+                                            exception_catch_func));
 }
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index a847b7fcf88..994290a15bb 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -94,7 +94,6 @@ public:
 
 private:
     friend class PartitionedAggLocalState;
-    Status _initiate_merge_spill_partition_agg_data(RuntimeState* state);
 
     std::unique_ptr<AggSourceOperatorX> _agg_source_operator;
 };
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 261979e3261..1ff927bcc6d 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -158,15 +158,7 @@ Status 
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
 
 Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* 
state) {
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
-    auto execution_context = state->get_task_execution_context();
-    /// Resources in shared state will be released when the operator is closed,
-    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
-    /// So, we need hold the pointer of shared state.
-    std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
-            _shared_state->shared_from_this();
-
     auto query_id = state->query_id();
-    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
@@ -216,19 +208,7 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
         return Status::OK();
     };
 
-    auto exception_catch_func = [query_id, mem_tracker, shared_state_holder, 
execution_context,
-                                 spill_func, this]() {
-        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
-        std::shared_ptr<TaskExecutionContext> execution_context_lock;
-        auto shared_state_sptr = shared_state_holder.lock();
-        if (shared_state_sptr) {
-            execution_context_lock = execution_context.lock();
-        }
-        if (!shared_state_sptr || !execution_context_lock) {
-            LOG(INFO) << "query: " << print_id(query_id)
-                      << " execution_context released, maybe query was 
cancelled.";
-            return;
-        }
+    auto exception_catch_func = [query_id, spill_func, this]() {
         
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel",
 {
             ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
                     query_id, Status::InternalError("fault_inject 
partitioned_hash_join_probe "
@@ -250,7 +230,10 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
         return Status::Error<INTERNAL_ERROR>(
                 "fault_inject partitioned_hash_join_probe spill_probe_blocks 
submit_func failed");
     });
-    return spill_io_pool->submit_func(exception_catch_func);
+
+    auto spill_runnable = std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
+                                                          
exception_catch_func);
+    return spill_io_pool->submit(std::move(spill_runnable));
 }
 
 Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t 
partition_index) {
@@ -288,15 +271,10 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
         return Status::OK();
     }
 
-    auto execution_context = state->get_task_execution_context();
-    /// Resources in shared state will be released when the operator is closed,
-    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
-    /// So, we need hold the pointer of shared state.
     std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
             _shared_state->shared_from_this();
 
     auto query_id = state->query_id();
-    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
@@ -362,19 +340,7 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
         _dependency->set_ready();
     };
 
-    auto exception_catch_func = [read_func, query_id, mem_tracker, 
shared_state_holder,
-                                 execution_context, state, this]() {
-        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
-        std::shared_ptr<TaskExecutionContext> execution_context_lock;
-        auto shared_state_sptr = shared_state_holder.lock();
-        if (shared_state_sptr) {
-            execution_context_lock = execution_context.lock();
-        }
-        if (!shared_state_sptr || !execution_context_lock || 
state->is_cancelled()) {
-            LOG(INFO) << "query: " << print_id(query_id)
-                      << " execution_context released, maybe query was 
cancelled.";
-            return;
-        }
+    auto exception_catch_func = [read_func, query_id, this]() {
         
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel",
 {
             ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
                     query_id, Status::InternalError("fault_inject 
partitioned_hash_join_probe "
@@ -403,7 +369,9 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
                                 "fault_inject partitioned_hash_join_probe "
                                 "recovery_build_blocks submit_func failed");
                     });
-    return spill_io_pool->submit_func(exception_catch_func);
+    auto spill_runnable = std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
+                                                          
exception_catch_func);
+    return spill_io_pool->submit(std::move(spill_runnable));
 }
 
 std::string PartitionedHashJoinProbeLocalState::debug_string(int 
indentation_level) const {
@@ -426,14 +394,7 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
 
     auto& blocks = _probe_blocks[partition_index];
 
-    /// TODO: maybe recovery more blocks each time.
-    auto execution_context = state->get_task_execution_context();
-    std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
-            _shared_state->shared_from_this();
-
     auto query_id = state->query_id();
-    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
-
     MonotonicStopWatch submit_timer;
     submit_timer.start();
 
@@ -470,19 +431,7 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
         _dependency->set_ready();
     };
 
-    auto exception_catch_func = [read_func, mem_tracker, shared_state_holder, 
execution_context,
-                                 query_id, this]() {
-        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
-        std::shared_ptr<TaskExecutionContext> execution_context_lock;
-        auto shared_state_sptr = shared_state_holder.lock();
-        if (shared_state_sptr) {
-            execution_context_lock = execution_context.lock();
-        }
-        if (!shared_state_sptr || !execution_context_lock) {
-            LOG(INFO) << "query: " << print_id(query_id)
-                      << " execution_context released, maybe query was 
cancelled.";
-            return;
-        }
+    auto exception_catch_func = [read_func, query_id, this]() {
         
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel",
 {
             ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
                     query_id, Status::InternalError("fault_inject 
partitioned_hash_join_probe "
@@ -511,7 +460,8 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
                                 "fault_inject partitioned_hash_join_probe "
                                 "recovery_probe_blocks submit_func failed");
                     });
-    return spill_io_pool->submit_func(exception_catch_func);
+    return spill_io_pool->submit(std::make_shared<SpillRunnable>(
+            state, _shared_state->shared_from_this(), exception_catch_func));
 }
 
 
PartitionedHashJoinProbeOperatorX::PartitionedHashJoinProbeOperatorX(ObjectPool*
 pool,
@@ -538,7 +488,7 @@ Status PartitionedHashJoinProbeOperatorX::init(const 
TPlanNode& tnode, RuntimeSt
     for (auto& conjunct : tnode.hash_join_node.eq_join_conjuncts) {
         _probe_exprs.emplace_back(conjunct.left);
     }
-    _partitioner = std::make_unique<PartitionerType>(_partition_count);
+    _partitioner = std::make_unique<SpillPartitionerType>(_partition_count);
     RETURN_IF_ERROR(_partitioner->init(_probe_exprs));
 
     return Status::OK();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 1fbed85a123..6ee718a3354 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -24,15 +24,13 @@
 #include "pipeline/exec/hashjoin_build_sink.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
 #include "pipeline/exec/join_build_sink_operator.h"
-#include "vec/runtime/partitioner.h"
+#include "pipeline/exec/spill_utils.h"
 
 namespace doris {
 class RuntimeState;
 
 namespace pipeline {
 
-using PartitionerType = 
vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>;
-
 class PartitionedHashJoinProbeOperatorX;
 
 class PartitionedHashJoinProbeLocalState final
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index cb104cfc7cd..4aa0bd42a84 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -118,16 +118,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
         return Status::OK();
     }
 
-    auto execution_context = state->get_task_execution_context();
-    /// Resources in shared state will be released when the operator is closed,
-    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
-    /// So, we need hold the pointer of shared state.
-    std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
-            _shared_state->shared_from_this();
-    auto query_id = state->query_id();
-    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
-    auto spill_func = [state, num_slots,
-                       this](std::vector<vectorized::Block>& build_blocks) 
mutable {
+    auto spill_func = [build_blocks = std::move(build_blocks), state, 
num_slots, this]() mutable {
         auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
         auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
         std::vector<std::vector<uint32_t>> 
partitions_indexes(p._partition_count);
@@ -211,28 +202,9 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
         _dependency->set_ready();
     };
 
-    auto exception_catch_func = [build_blocks = std::move(build_blocks), 
spill_func,
-                                 shared_state_holder, execution_context, 
state, query_id,
-                                 mem_tracker, this]() mutable {
-        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
-        Defer defer {[&]() {
-            // need to reset build_block here, or else build_block will be 
destructed
-            // after SCOPED_ATTACH_TASK_WITH_ID and will trigger 
memory_orphan_check failure
-            build_blocks.clear();
-        }};
-
-        std::shared_ptr<TaskExecutionContext> execution_context_lock;
-        auto shared_state_sptr = shared_state_holder.lock();
-        if (shared_state_sptr) {
-            execution_context_lock = execution_context.lock();
-        }
-        if (!shared_state_sptr || !execution_context_lock || 
state->is_cancelled()) {
-            LOG(INFO) << "execution_context released, maybe query was 
canceled.";
-            return;
-        }
-
+    auto exception_catch_func = [spill_func, this]() mutable {
         auto status = [&]() {
-            RETURN_IF_CATCH_EXCEPTION(spill_func(build_blocks));
+            RETURN_IF_CATCH_EXCEPTION(spill_func());
             return Status::OK();
         }();
 
@@ -243,6 +215,10 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
             _dependency->set_ready();
         }
     };
+
+    auto spill_runnable = std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
+                                                          
exception_catch_func);
+
     auto* thread_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
 
     _dependency->block();
@@ -252,7 +228,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
                         "fault_inject partitioned_hash_join_sink "
                         "revoke_unpartitioned_block submit_func failed");
             });
-    return thread_pool->submit_func(exception_catch_func);
+    return thread_pool->submit(std::move(spill_runnable));
 }
 
 Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
@@ -268,15 +244,7 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
 
     _spilling_streams_count = _shared_state->partitioned_build_blocks.size();
 
-    auto execution_context = state->get_task_execution_context();
-    /// Resources in shared state will be released when the operator is closed,
-    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
-    /// So, we need hold the pointer of shared state.
-    std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
-            _shared_state->shared_from_this();
-
     auto query_id = state->query_id();
-    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
 
     for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); 
++i) {
         vectorized::SpillStreamSPtr& spilling_stream = 
_shared_state->spilled_streams[i];
@@ -302,41 +270,35 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
             st = Status::Error<INTERNAL_ERROR>(
                     "fault_inject partitioned_hash_join_sink revoke_memory 
submit_func failed");
         });
-        if (st.ok()) {
-            st = spill_io_pool->submit_func([this, query_id, mem_tracker, 
shared_state_holder,
-                                             execution_context, 
spilling_stream, i, submit_timer] {
-                SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
-                std::shared_ptr<TaskExecutionContext> execution_context_lock;
-                auto shared_state_sptr = shared_state_holder.lock();
-                if (shared_state_sptr) {
-                    execution_context_lock = execution_context.lock();
-                }
-                if (!shared_state_sptr || !execution_context_lock) {
-                    LOG(INFO) << "execution_context released, maybe query was 
cancelled.";
-                    return;
-                }
-                
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel",
 {
-                    ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
-                            query_id,
-                            Status::InternalError("fault_inject 
partitioned_hash_join_sink "
-                                                  "revoke_memory canceled"));
-                    return;
+
+        auto spill_runnable = std::make_shared<SpillRunnable>(
+                state, _shared_state->shared_from_this(),
+                [this, query_id, spilling_stream, i, submit_timer] {
+                    DBUG_EXECUTE_IF(
+                            
"fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", {
+                                
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+                                        query_id, Status::InternalError(
+                                                          "fault_inject 
partitioned_hash_join_sink "
+                                                          "revoke_memory 
canceled"));
+                                return;
+                            });
+                    
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+                    SCOPED_TIMER(_spill_build_timer);
+
+                    auto status = [&]() {
+                        RETURN_IF_CATCH_EXCEPTION(_spill_to_disk(i, 
spilling_stream));
+                        return Status::OK();
+                    }();
+
+                    if (!status.OK()) {
+                        std::unique_lock<std::mutex> lock(_spill_lock);
+                        _dependency->set_ready();
+                        _spill_status_ok = false;
+                        _spill_status = std::move(status);
+                    }
                 });
-                
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
-                SCOPED_TIMER(_spill_build_timer);
-
-                auto status = [&]() {
-                    RETURN_IF_CATCH_EXCEPTION(_spill_to_disk(i, 
spilling_stream));
-                    return Status::OK();
-                }();
-
-                if (!status.OK()) {
-                    std::unique_lock<std::mutex> lock(_spill_lock);
-                    _dependency->set_ready();
-                    _spill_status_ok = false;
-                    _spill_status = std::move(status);
-                }
-            });
+        if (st.ok()) {
+            st = spill_io_pool->submit(std::move(spill_runnable));
         }
 
         if (!st.ok()) {
@@ -452,7 +414,7 @@ Status PartitionedHashJoinSinkOperatorX::init(const 
TPlanNode& tnode, RuntimeSta
         _build_exprs.emplace_back(eq_join_conjunct.right);
         partition_exprs.emplace_back(eq_join_conjunct.right);
     }
-    _partitioner = std::make_unique<PartitionerType>(_partition_count);
+    _partitioner = std::make_unique<SpillPartitionerType>(_partition_count);
     RETURN_IF_ERROR(_partitioner->init(_build_exprs));
 
     return Status::OK();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 5cd3ff21187..1592c29cdb0 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -24,6 +24,7 @@
 #include "pipeline/exec/hashjoin_build_sink.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
 #include "pipeline/exec/join_build_sink_operator.h"
+#include "pipeline/exec/spill_utils.h"
 #include "vec/runtime/partitioner.h"
 
 namespace doris {
@@ -31,8 +32,6 @@ class RuntimeState;
 
 namespace pipeline {
 
-using PartitionerType = 
vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>;
-
 class PartitionedHashJoinSinkOperatorX;
 
 class PartitionedHashJoinSinkLocalState
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index d15b936b4c6..b7fae82ca54 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -18,6 +18,7 @@
 #include "spill_sort_sink_operator.h"
 
 #include "pipeline/exec/sort_sink_operator.h"
+#include "pipeline/exec/spill_utils.h"
 #include "runtime/fragment_mgr.h"
 #include "vec/spill/spill_stream_manager.h"
 
@@ -229,16 +230,7 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
     if (!_eos) {
         Base::_dependency->Dependency::block();
     }
-
-    auto execution_context = state->get_task_execution_context();
-
-    /// Resources in shared state will be released when the operator is closed,
-    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
-    /// So, we need hold the pointer of shared state.
-    std::weak_ptr<SpillSortSharedState> shared_state_holder = 
_shared_state->shared_from_this();
-
     auto query_id = state->query_id();
-    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
@@ -300,19 +292,7 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
         return Status::OK();
     };
 
-    auto exception_catch_func = [this, query_id, mem_tracker, 
shared_state_holder,
-                                 execution_context, spill_func]() {
-        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
-        std::shared_ptr<TaskExecutionContext> execution_context_lock;
-        auto shared_state_sptr = shared_state_holder.lock();
-        if (shared_state_sptr) {
-            execution_context_lock = execution_context.lock();
-        }
-        if (!shared_state_sptr || !execution_context_lock) {
-            LOG(INFO) << "query " << print_id(query_id)
-                      << " execution_context released, maybe query was 
cancelled.";
-            return;
-        }
+    auto exception_catch_func = [this, query_id, spill_func]() {
         DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", 
{
             ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
                     query_id, Status::InternalError("fault_inject 
spill_sort_sink "
@@ -331,9 +311,9 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state) {
                 "revoke_memory submit_func failed");
     });
     if (status.ok()) {
-        status =
-                
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
-                        exception_catch_func);
+        status = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
+                std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
+                                                exception_catch_func));
     }
     if (!status.ok()) {
         if (!_eos) {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 7d62a18461b..b322f33caa2 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -18,6 +18,7 @@
 #include "spill_sort_source_operator.h"
 
 #include "common/status.h"
+#include "pipeline/exec/spill_utils.h"
 #include "runtime/fragment_mgr.h"
 #include "sort_source_operator.h"
 #include "util/runtime_profile.h"
@@ -80,13 +81,7 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
                << " merge spill data";
     _dependency->Dependency::block();
 
-    auto execution_context = state->get_task_execution_context();
-    /// Resources in shared state will be released when the operator is closed,
-    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
-    /// So, we need hold the pointer of shared state.
-    std::weak_ptr<SpillSortSharedState> shared_state_holder = 
_shared_state->shared_from_this();
     auto query_id = state->query_id();
-    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
@@ -180,20 +175,7 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
         return Status::OK();
     };
 
-    auto exception_catch_func = [this, query_id, mem_tracker, 
shared_state_holder,
-                                 execution_context, spill_func]() {
-        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
-        std::shared_ptr<TaskExecutionContext> execution_context_lock;
-        auto shared_state_sptr = shared_state_holder.lock();
-        if (shared_state_sptr) {
-            execution_context_lock = execution_context.lock();
-        }
-        if (!shared_state_sptr || !execution_context_lock) {
-            LOG(INFO) << "query " << print_id(query_id)
-                      << " execution_context released, maybe query was 
cancelled.";
-            return;
-        }
-
+    auto exception_catch_func = [this, spill_func]() {
         _status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); 
}();
     };
 
@@ -202,8 +184,9 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
                 "fault_inject spill_sort_source "
                 "merge_sort_spill_data submit_func failed");
     });
-    return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
-            exception_catch_func);
+    return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
+            std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
+                                            exception_catch_func));
 }
 
 Status SpillSortLocalState::_create_intermediate_merger(
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h 
b/be/src/pipeline/exec/spill_sort_source_operator.h
index ffe8e8a6898..09367415d91 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -90,7 +90,6 @@ public:
 
 private:
     friend class SpillSortLocalState;
-    Status _initiate_merge_spill_partition_agg_data(RuntimeState* state);
 
     std::unique_ptr<SortSourceOperatorX> _sort_source_operator;
 };
diff --git a/be/src/pipeline/exec/spill_utils.h 
b/be/src/pipeline/exec/spill_utils.h
new file mode 100644
index 00000000000..f2f19512cbd
--- /dev/null
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/query_context.h"
+#include "runtime/runtime_state.h"
+#include "runtime/task_execution_context.h"
+#include "runtime/thread_context.h"
+#include "util/threadpool.h"
+#include "vec/runtime/partitioner.h"
+
+namespace doris::pipeline {
+using SpillPartitionerType = 
vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>;
+
+class SpillRunnable : public Runnable {
+public:
+    SpillRunnable(RuntimeState* state, const 
std::shared_ptr<BasicSharedState>& shared_state,
+                  std::function<void()> func)
+            : _state(state),
+              _mem_tracker(state->get_query_ctx()->query_mem_tracker),
+              _task_id(state->query_id()),
+              _task_context_holder(state->get_task_execution_context()),
+              _shared_state_holder(shared_state),
+              _func(std::move(func)) {}
+
+    ~SpillRunnable() override = default;
+
+    void run() override {
+        SCOPED_ATTACH_TASK_WITH_ID(_mem_tracker, _task_id);
+        Defer defer([&] {
+            std::function<void()> tmp;
+            std::swap(tmp, _func);
+        });
+
+        auto task_context_holder = _task_context_holder.lock();
+        if (!task_context_holder) {
+            return;
+        }
+
+        auto shared_state_holder = _shared_state_holder.lock();
+        if (!shared_state_holder) {
+            return;
+        }
+
+        if (_state->is_cancelled()) {
+            return;
+        }
+        _func();
+    }
+
+private:
+    RuntimeState* _state;
+    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
+    TUniqueId _task_id;
+    std::weak_ptr<TaskExecutionContext> _task_context_holder;
+    std::weak_ptr<BasicSharedState> _shared_state_holder;
+    std::function<void()> _func;
+};
+
+} // namespace doris::pipeline
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to