This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f368c9d023e1b952d38a13e4c31c9506f39015da
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Thu Oct 17 14:08:23 2024 +0800

    refactor SpillRunnable (#41991)
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/pipeline/dependency.cpp                     |   1 -
 be/src/pipeline/dependency.h                       |   8 +-
 be/src/pipeline/exec/operator.h                    |  16 +-
 .../exec/partitioned_aggregation_sink_operator.cpp |  62 ++++---
 .../partitioned_aggregation_source_operator.cpp    |  70 ++++----
 .../exec/partitioned_aggregation_source_operator.h |   1 -
 .../exec/partitioned_hash_join_probe_operator.cpp  | 168 +++++++------------
 .../exec/partitioned_hash_join_probe_operator.h    |   6 -
 .../exec/partitioned_hash_join_sink_operator.cpp   | 179 +++++++--------------
 .../exec/partitioned_hash_join_sink_operator.h     |  13 +-
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  78 ++++-----
 .../pipeline/exec/spill_sort_source_operator.cpp   |  81 +++++-----
 be/src/pipeline/exec/spill_sort_source_operator.h  |   1 -
 be/src/pipeline/exec/spill_utils.h                 | 129 ++++++++++++---
 be/src/vec/spill/spill_stream.cpp                  |   1 +
 be/src/vec/spill/spill_stream.h                    |   4 +-
 be/src/vec/spill/spill_writer.cpp                  |  11 +-
 be/src/vec/spill/spill_writer.h                    |   6 +-
 18 files changed, 384 insertions(+), 451 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index d95c74615c1..9ad0ff1b57f 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -343,7 +343,6 @@ Status AggSpillPartition::get_spill_stream(RuntimeState* 
state, int node_id,
             std::numeric_limits<int32_t>::max(), 
std::numeric_limits<size_t>::max(), profile));
     spill_streams_.emplace_back(spilling_stream_);
     spill_stream = spilling_stream_;
-    spill_stream->set_write_counters(profile);
     return Status::OK();
 }
 void AggSpillPartition::close() {
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index f0e14329698..b9cea1cc678 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -426,16 +426,18 @@ private:
 struct BasicSpillSharedState {
     virtual ~BasicSpillSharedState() = default;
 
+    AtomicStatus _spill_status;
+
     // These two counters are shared to spill source operators as the initial 
value
     // of 'SpillWriteFileCurrentSize' and 'SpillWriteFileCurrentCount'.
     // Total bytes of spill data written to disk file(after serialized)
-    RuntimeProfile::Counter* _spill_write_file_data_size = nullptr;
+    RuntimeProfile::Counter* _spill_write_file_total_size = nullptr;
     RuntimeProfile::Counter* _spill_file_total_count = nullptr;
 
     void setup_shared_profile(RuntimeProfile* sink_profile) {
         _spill_file_total_count =
                 ADD_COUNTER_WITH_LEVEL(sink_profile, 
"SpillWriteFileTotalCount", TUnit::UNIT, 1);
-        _spill_write_file_data_size =
+        _spill_write_file_total_size =
                 ADD_COUNTER_WITH_LEVEL(sink_profile, 
"SpillWriteFileTotalSize", TUnit::BYTES, 1);
     }
 
@@ -463,7 +465,6 @@ struct PartitionedAggSharedState : public BasicSharedState,
     size_t partition_count_bits;
     size_t partition_count;
     size_t max_partition_index;
-    Status sink_status;
     bool is_spilled = false;
     std::atomic_bool is_closed = false;
     std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions;
@@ -544,7 +545,6 @@ struct SpillSortSharedState : public BasicSharedState,
     bool enable_spill = false;
     bool is_spilled = false;
     std::atomic_bool is_closed = false;
-    Status sink_status;
     std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr;
 
     std::deque<vectorized::SpillStreamSPtr> sorted_streams;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index f70051f091c..f1d1716aeb6 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -319,7 +319,7 @@ public:
                 ADD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillWriteBlockCount", TUnit::UNIT, 1);
         _spill_write_block_data_size =
                 ADD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillWriteBlockDataSize", TUnit::BYTES, 1);
-        _spill_write_file_data_size =
+        _spill_write_file_total_size =
                 ADD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillWriteFileTotalSize", TUnit::BYTES, 1);
         _spill_write_rows_count =
                 ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteRows", 
TUnit::UNIT, 1);
@@ -366,14 +366,16 @@ public:
         if (_copy_shared_spill_profile) {
             _copy_shared_spill_profile = false;
             const auto* spill_shared_state = (const 
BasicSpillSharedState*)Base::_shared_state;
-            COUNTER_SET(_spill_file_current_size,
-                        
spill_shared_state->_spill_write_file_data_size->value());
-            COUNTER_SET(_spill_file_current_count,
-                        spill_shared_state->_spill_file_total_count->value());
+            COUNTER_UPDATE(_spill_file_current_size,
+                           
spill_shared_state->_spill_write_file_total_size->value());
+            COUNTER_UPDATE(_spill_file_current_count,
+                           
spill_shared_state->_spill_file_total_count->value());
             Base::_shared_state->update_spill_stream_profiles(Base::profile());
         }
     }
 
+    std::atomic_int _spilling_task_count {0};
+
     // Total time of spill, including spill task scheduling time,
     // serialize block time, write disk file time,
     // and read disk file time, deserialize block time etc.
@@ -397,7 +399,7 @@ public:
     // Total bytes of spill data in Block format(in memory format)
     RuntimeProfile::Counter* _spill_write_block_data_size = nullptr;
     // Total bytes of spill data written to disk file(after serialized)
-    RuntimeProfile::Counter* _spill_write_file_data_size = nullptr;
+    RuntimeProfile::Counter* _spill_write_file_total_size = nullptr;
     RuntimeProfile::Counter* _spill_write_rows_count = nullptr;
     RuntimeProfile::Counter* _spill_file_total_count = nullptr;
     RuntimeProfile::Counter* _spill_file_current_count = nullptr;
@@ -755,6 +757,8 @@ public:
         COUNTER_SET(_spill_min_rows_of_partition, min_rows);
     }
 
+    std::atomic_int _spilling_task_count {0};
+
     std::vector<int64_t> _rows_in_partitions;
 
     // Total time of spill, including spill task scheduling time,
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index f5c09459f85..cd4eba2abfe 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -178,7 +178,9 @@ Status 
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
     local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
-    RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status);
+    if (!local_state._shared_state->_spill_status.ok()) {
+        return local_state._shared_state->_spill_status.status();
+    }
     local_state._eos = eos;
     auto* runtime_state = local_state._runtime_state.get();
     DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::sink", {
@@ -219,7 +221,7 @@ Status PartitionedAggSinkOperatorX::revoke_memory(
 
 size_t PartitionedAggSinkOperatorX::revocable_mem_size(RuntimeState* state) 
const {
     auto& local_state = get_local_state(state);
-    if (!local_state.Base::_shared_state->sink_status.ok()) {
+    if (!local_state._shared_state->_spill_status.ok()) {
         return UINT64_MAX;
     }
     auto* runtime_state = local_state._runtime_state.get();
@@ -270,7 +272,9 @@ Status PartitionedAggSinkLocalState::revoke_memory(
                << Base::_parent->node_id()
                << " revoke_memory, size: " << 
_parent->revocable_mem_size(state)
                << ", eos: " << _eos;
-    RETURN_IF_ERROR(Base::_shared_state->sink_status);
+    if (!_shared_state->_spill_status.ok()) {
+        return _shared_state->_spill_status.status();
+    }
     if (!_shared_state->is_spilled) {
         _shared_state->is_spilled = true;
         profile()->add_info_string("Spilled", "true");
@@ -292,8 +296,6 @@ Status PartitionedAggSinkLocalState::revoke_memory(
 
     auto query_id = state->query_id();
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
     
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_submit_func",
 {
         status = Status::Error<INTERNAL_ERROR>(
                 "fault_inject partitioned_agg_sink revoke_memory submit_func 
failed");
@@ -301,32 +303,28 @@ Status PartitionedAggSinkLocalState::revoke_memory(
     });
 
     state->get_query_ctx()->increase_revoking_tasks_count();
-    auto spill_runnable = std::make_shared<SpillRunnable>(
-            state, _profile, true, _shared_state->shared_from_this(),
-            [this, &parent, state, query_id, size_to_revoke, spill_context, 
submit_timer] {
-                auto submit_elapsed_time = submit_timer.elapsed_time();
-                _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
-                exec_time_counter()->update(submit_elapsed_time);
-                _spill_total_timer->update(submit_elapsed_time);
-
-                SCOPED_TIMER(exec_time_counter());
-                SCOPED_TIMER(_spill_total_timer);
-                SCOPED_TIMER(_spill_write_timer);
 
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+    _spilling_task_count = 1;
+    auto spill_runnable = std::make_shared<SpillRunnable>(
+            state, spill_context, _spilling_task_count, _profile, submit_timer,
+            _shared_state->shared_from_this(), Base::_spill_dependency, true, 
true,
+            [this, &parent, state, query_id, size_to_revoke, spill_context] {
+                Status status;
                 
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
-                    auto st = Status::InternalError(
+                    status = Status::InternalError(
                             "fault_inject partitioned_agg_sink "
                             "revoke_memory canceled");
-                    
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, st);
-                    return st;
+                    
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
+                    return status;
                 });
                 Defer defer {[&]() {
-                    if (!_shared_state->sink_status.ok() || 
state->is_cancelled()) {
-                        if (!_shared_state->sink_status.ok()) {
-                            LOG(WARNING)
-                                    << "query " << print_id(query_id) << " agg 
node "
-                                    << Base::_parent->node_id()
-                                    << " revoke_memory error: " << 
Base::_shared_state->sink_status;
+                    if (!status.ok() || state->is_cancelled()) {
+                        if (!status.ok()) {
+                            LOG(WARNING) << "query " << print_id(query_id) << 
" agg node "
+                                         << Base::_parent->node_id()
+                                         << " revoke_memory error: " << status;
                         }
                         _shared_state->close();
                     } else {
@@ -340,15 +338,10 @@ Status PartitionedAggSinkLocalState::revoke_memory(
                         _finish_dependency->set_ready();
                     }
                     state->get_query_ctx()->decrease_revoking_tasks_count();
-                    Base::_spill_dependency->Dependency::set_ready();
-
-                    if (spill_context) {
-                        spill_context->on_task_finished();
-                    }
                 }};
                 auto* runtime_state = _runtime_state.get();
                 auto* agg_data = 
parent._agg_sink_operator->get_agg_data(runtime_state);
-                Base::_shared_state->sink_status = std::visit(
+                status = std::visit(
                         vectorized::Overload {
                                 [&](std::monostate& arg) -> Status {
                                     return Status::InternalError("Unit hash 
table");
@@ -359,10 +352,9 @@ Status PartitionedAggSinkLocalState::revoke_memory(
                                             state, agg_method, hash_table, 
size_to_revoke, _eos));
                                 }},
                         agg_data->method_variant);
-                RETURN_IF_ERROR(Base::_shared_state->sink_status);
-                Base::_shared_state->sink_status =
-                        
parent._agg_sink_operator->reset_hash_table(runtime_state);
-                return Base::_shared_state->sink_status;
+                RETURN_IF_ERROR(status);
+                status = 
parent._agg_sink_operator->reset_hash_table(runtime_state);
+                return status;
             });
 
     return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index f80110e5326..04042c5841a 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -142,21 +142,24 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
                                                 bool* eos) {
     auto& local_state = get_local_state(state);
     local_state.copy_shared_spill_profile();
+    Status status;
     Defer defer {[&]() {
-        if (!local_state._status.ok() || *eos) {
+        if (!status.ok() || *eos) {
             local_state._shared_state->close();
         }
     }};
 
     local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    RETURN_IF_ERROR(local_state._status);
+    status = local_state._shared_state->_spill_status.status();
+    RETURN_IF_ERROR(status);
 
     if (local_state._shared_state->is_spilled &&
         local_state._need_to_merge_data_for_current_partition) {
         if (local_state._blocks.empty() && 
!local_state._current_partition_eos) {
             bool has_recovering_data = false;
-            RETURN_IF_ERROR(local_state.recover_blocks_from_disk(state, 
has_recovering_data));
+            status = local_state.recover_blocks_from_disk(state, 
has_recovering_data);
+            RETURN_IF_ERROR(status);
             *eos = !has_recovering_data;
             return Status::OK();
         } else if (!local_state._blocks.empty()) {
@@ -165,8 +168,9 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
                 auto block = std::move(local_state._blocks.front());
                 merged_rows += block.rows();
                 local_state._blocks.erase(local_state._blocks.begin());
-                
RETURN_IF_ERROR(_agg_source_operator->merge_with_serialized_key_helper<false>(
-                        local_state._runtime_state.get(), &block));
+                status = 
_agg_source_operator->merge_with_serialized_key_helper<false>(
+                        local_state._runtime_state.get(), &block);
+                RETURN_IF_ERROR(status);
             }
             local_state._estimate_memory_usage +=
                     
_agg_source_operator->get_estimated_memory_size_for_merging(
@@ -183,8 +187,8 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
     // not spilled in sink or current partition still has data
     auto* runtime_state = local_state._runtime_state.get();
     
local_state._shared_state->in_mem_shared_state->aggregate_data_container->init_once();
-    local_state._status = _agg_source_operator->get_block(runtime_state, 
block, eos);
-    RETURN_IF_ERROR(local_state._status);
+    status = _agg_source_operator->get_block(runtime_state, block, eos);
+    RETURN_IF_ERROR(status);
     if (local_state._runtime_state) {
         auto* source_local_state =
                 
local_state._runtime_state->get_local_state(_agg_source_operator->operator_id());
@@ -195,7 +199,8 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
             !local_state._shared_state->spill_partitions.empty()) {
             local_state._current_partition_eos = false;
             local_state._need_to_merge_data_for_current_partition = true;
-            
RETURN_IF_ERROR(local_state._shared_state->in_mem_shared_state->reset_hash_table());
+            status = 
local_state._shared_state->in_mem_shared_state->reset_hash_table();
+            RETURN_IF_ERROR(status);
             *eos = false;
         }
     }
@@ -240,15 +245,15 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
 
     has_data = true;
     auto spill_func = [this, state, query_id] {
+        Status status;
         Defer defer {[&]() {
-            if (!_status.ok() || state->is_cancelled()) {
-                if (!_status.ok()) {
+            if (!status.ok() || state->is_cancelled()) {
+                if (!status.ok()) {
                     LOG(WARNING) << "query " << print_id(query_id) << " agg 
node "
-                                 << _parent->node_id() << " recover agg data 
error: " << _status;
+                                 << _parent->node_id() << " recover agg data 
error: " << status;
                 }
                 _shared_state->close();
             }
-            _spill_dependency->Dependency::set_ready();
         }};
         bool has_agg_data = false;
         size_t accumulated_blocks_size = 0;
@@ -262,15 +267,15 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
                     {
                         
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::recover_spill_data",
                                         {
-                                            _status = 
Status::Error<INTERNAL_ERROR>(
+                                            status = 
Status::Error<INTERNAL_ERROR>(
                                                     "fault_inject 
partitioned_agg_source "
                                                     "recover_spill_data 
failed");
                                         });
-                        if (_status.ok()) {
-                            _status = stream->read_next_block_sync(&block, 
&eos);
+                        if (status.ok()) {
+                            status = stream->read_next_block_sync(&block, 
&eos);
                         }
                     }
-                    RETURN_IF_ERROR(_status);
+                    RETURN_IF_ERROR(status);
 
                     if (!block.empty()) {
                         has_agg_data = true;
@@ -292,34 +297,20 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
                 }
             }
         }
-        return _status;
+        return status;
     };
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-    auto exception_catch_func = [spill_func, query_id, submit_timer, this]() {
-        auto submit_elapsed_time = submit_timer.elapsed_time();
-        _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
-        exec_time_counter()->update(submit_elapsed_time);
-        _spill_total_timer->update(submit_elapsed_time);
-
-        SCOPED_TIMER(exec_time_counter());
-        SCOPED_TIMER(_spill_total_timer);
-        SCOPED_TIMER(_spill_recover_time);
-
+    auto exception_catch_func = [spill_func, query_id]() {
         
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel",
 {
             auto st = Status::InternalError(
                     "fault_inject partitioned_agg_source "
                     "merge spill data canceled");
             ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, st);
-            return;
+            return st;
         });
 
         auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); 
}); }();
-
-        if (!status.ok()) {
-            _status = status;
-        }
+        return status;
     };
 
     DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::submit_func", {
@@ -327,9 +318,14 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
                 "fault_inject partitioned_agg_source submit_func failed");
     });
     _spill_dependency->block();
+
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+    _spilling_task_count = 1;
     return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
-            std::make_shared<SpillRunnable>(state, _runtime_profile.get(), 
false,
-                                            _shared_state->shared_from_this(),
-                                            exception_catch_func));
+            std::make_shared<SpillRunnable>(state, nullptr, 
_spilling_task_count,
+                                            _runtime_profile.get(), 
submit_timer,
+                                            _shared_state->shared_from_this(), 
_spill_dependency,
+                                            false, false, 
exception_catch_func));
 }
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index c2a8c71e672..f51695b982b 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -53,7 +53,6 @@ protected:
     std::unique_ptr<RuntimeState> _runtime_state;
 
     bool _opened = false;
-    Status _status;
     std::unique_ptr<std::promise<Status>> _spill_merge_promise;
     std::future<Status> _spill_merge_future;
     bool _current_partition_eos = true;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 6df2a7ca4ab..f1ca6f8d7de 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -23,6 +23,7 @@
 #include <algorithm>
 #include <utility>
 
+#include "common/exception.h"
 #include "common/logging.h"
 #include "common/status.h"
 #include "pipeline/pipeline_task.h"
@@ -205,7 +206,6 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
                         state, spilling_stream, print_id(state->query_id()), 
"hash_probe",
                         _parent->node_id(), 
std::numeric_limits<int32_t>::max(),
                         std::numeric_limits<size_t>::max(), 
_runtime_profile.get()));
-                spilling_stream->set_write_counters(_runtime_profile.get());
             }
 
             auto merged_block = 
vectorized::MutableBlock::create_unique(blocks[0].clone_empty());
@@ -243,36 +243,17 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
         return Status::OK();
     };
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-
-    auto exception_catch_func = [query_id, spill_func, spill_context, 
submit_timer, this]() {
-        auto submit_elapsed_time = submit_timer.elapsed_time();
-        _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
-        exec_time_counter()->update(submit_elapsed_time);
-        _spill_total_timer->update(submit_elapsed_time);
-
-        SCOPED_TIMER(exec_time_counter());
-        SCOPED_TIMER(_spill_total_timer);
-        SCOPED_TIMER(_spill_write_timer);
-
+    auto exception_catch_func = [query_id, spill_func, spill_context]() {
         
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel",
 {
-            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
-                    query_id, Status::InternalError("fault_inject 
partitioned_hash_join_probe "
-                                                    "spill_probe_blocks 
canceled"));
-            return;
+            auto status = Status::InternalError(
+                    "fault_inject partitioned_hash_join_probe "
+                    "spill_probe_blocks canceled");
+            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, 
status);
+            return status;
         });
 
         auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); 
}); }();
-
-        if (!status.ok()) {
-            _spill_status_ok = false;
-            _spill_status = std::move(status);
-        }
-        _spill_dependency->set_ready();
-        if (spill_context) {
-            spill_context->on_non_sink_task_finished();
-        }
+        return status;
     };
 
     if (spill_context) {
@@ -284,9 +265,14 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
                 "fault_inject partitioned_hash_join_probe spill_probe_blocks 
submit_func failed");
     });
 
-    auto spill_runnable = std::make_shared<SpillRunnable>(state, 
_runtime_profile.get(), true,
-                                                          
_shared_state->shared_from_this(),
-                                                          
exception_catch_func);
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+
+    _spilling_task_count = 1;
+    auto spill_runnable = std::make_shared<SpillRunnable>(
+            state, spill_context, _spilling_task_count, 
_runtime_profile.get(), submit_timer,
+            _shared_state->shared_from_this(), _spill_dependency, false, true,
+            exception_catch_func);
     return spill_io_pool->submit(std::move(spill_runnable));
 }
 
@@ -314,39 +300,26 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
     }
     spilled_stream->set_read_counters(profile());
 
-    std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
-            _shared_state->shared_from_this();
-
     auto query_id = state->query_id();
 
-    auto read_func = [this, query_id, state, spilled_stream = spilled_stream, 
shared_state_holder,
-                      partition_index] {
-        auto shared_state_sptr = shared_state_holder.lock();
-        if (!shared_state_sptr || state->is_cancelled()) {
-            LOG(INFO) << "query: " << print_id(query_id)
-                      << " execution_context released, maybe query was 
cancelled.";
-            return;
-        }
-
+    auto read_func = [this, query_id, state, spilled_stream = spilled_stream, 
partition_index] {
         SCOPED_TIMER(_recovery_build_timer);
 
         bool eos = false;
         VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " 
<< _parent->node_id()
                    << ", task id: " << state->task_id() << ", partition: " << 
partition_index
                    << ", recoverying build data";
+        Status status;
         while (!eos) {
             vectorized::Block block;
-            Status st;
             
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks",
 {
-                st = Status::Error<INTERNAL_ERROR>(
+                status = Status::Error<INTERNAL_ERROR>(
                         "fault_inject partitioned_hash_join_probe 
recover_build_blocks failed");
             });
-            if (st.ok()) {
-                st = spilled_stream->read_next_block_sync(&block, &eos);
+            if (status.ok()) {
+                status = spilled_stream->read_next_block_sync(&block, &eos);
             }
-            if (!st.ok()) {
-                _spill_status_ok = false;
-                _spill_status = std::move(st);
+            if (!status.ok()) {
                 break;
             }
             COUNTER_UPDATE(_recovery_build_rows, block.rows());
@@ -365,10 +338,8 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
                 _recovered_build_block = 
vectorized::MutableBlock::create_unique(std::move(block));
             } else {
                 DCHECK_EQ(_recovered_build_block->columns(), block.columns());
-                st = _recovered_build_block->merge(std::move(block));
-                if (!st.ok()) {
-                    _spill_status_ok = false;
-                    _spill_status = std::move(st);
+                status = _recovered_build_block->merge(std::move(block));
+                if (!status.ok()) {
                     break;
                 }
             }
@@ -381,43 +352,29 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
 
         if (eos) {
             
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
-            shared_state_sptr->spilled_streams[partition_index].reset();
+            _shared_state->spilled_streams[partition_index].reset();
             VLOG_DEBUG << "query: " << print_id(state->query_id())
                        << ", node: " << _parent->node_id() << ", task id: " << 
state->task_id()
                        << ", partition: " << partition_index;
         }
+        return status;
     };
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-
-    auto exception_catch_func = [read_func, query_id, submit_timer, this]() {
-        auto submit_elapsed_time = submit_timer.elapsed_time();
-        _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
-        exec_time_counter()->update(submit_elapsed_time);
-        _spill_total_timer->update(submit_elapsed_time);
-
-        SCOPED_TIMER(exec_time_counter());
-        SCOPED_TIMER(_spill_total_timer);
-        SCOPED_TIMER(_spill_recover_time);
-
+    auto exception_catch_func = [read_func, query_id]() {
         
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel",
 {
-            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
-                    query_id, Status::InternalError("fault_inject 
partitioned_hash_join_probe "
-                                                    "recover_build_blocks 
canceled"));
-            return;
+            auto status = Status::InternalError(
+                    "fault_inject partitioned_hash_join_probe "
+                    "recover_build_blocks canceled");
+            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, 
status);
+            return status;
         });
 
         auto status = [&]() {
-            RETURN_IF_CATCH_EXCEPTION(read_func());
+            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(read_func());
             return Status::OK();
         }();
 
-        if (!status.ok()) {
-            _spill_status_ok = false;
-            _spill_status = std::move(status);
-        }
-        _spill_dependency->set_ready();
+        return status;
     };
 
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
@@ -440,9 +397,15 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
                                 "fault_inject partitioned_hash_join_probe "
                                 "recovery_build_blocks submit_func failed");
                     });
-    auto spill_runnable = std::make_shared<SpillRunnable>(state, 
_runtime_profile.get(), false,
-                                                          
_shared_state->shared_from_this(),
-                                                          
exception_catch_func);
+
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+
+    _spilling_task_count = 1;
+    auto spill_runnable = std::make_shared<SpillRunnable>(
+            state, nullptr, _spilling_task_count, _runtime_profile.get(), 
submit_timer,
+            _shared_state->shared_from_this(), _spill_dependency, false, false,
+            exception_catch_func);
     VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << 
_parent->node_id()
                << ", task id: " << state->task_id() << ", partition: " << 
partition_index
                << " recover_build_blocks_from_disk submit func";
@@ -500,8 +463,7 @@ Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
         while (!eos && !_state->is_cancelled() && st.ok()) {
             st = spilled_stream->read_next_block_sync(&block, &eos);
             if (!st.ok()) {
-                _spill_status_ok = false;
-                _spill_status = std::move(st);
+                break;
             } else {
                 COUNTER_UPDATE(_recovery_probe_rows, block.rows());
                 COUNTER_UPDATE(_recovery_probe_blocks, 1);
@@ -519,37 +481,24 @@ Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
             
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
             spilled_stream.reset();
         }
+        return st;
     };
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-    auto exception_catch_func = [read_func, query_id, submit_timer, this]() {
-        auto submit_elapsed_time = submit_timer.elapsed_time();
-        _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
-        exec_time_counter()->update(submit_elapsed_time);
-        _spill_total_timer->update(submit_elapsed_time);
-
-        SCOPED_TIMER(exec_time_counter());
-        SCOPED_TIMER(_spill_total_timer);
-        SCOPED_TIMER(_spill_recover_time);
-
+    auto exception_catch_func = [read_func, query_id]() {
         
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel",
 {
-            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
-                    query_id, Status::InternalError("fault_inject 
partitioned_hash_join_probe "
-                                                    "recover_probe_blocks 
canceled"));
-            return;
+            auto status = Status::InternalError(
+                    "fault_inject partitioned_hash_join_probe "
+                    "recover_probe_blocks canceled");
+            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, 
status);
+            return status;
         });
 
         auto status = [&]() {
-            RETURN_IF_CATCH_EXCEPTION(read_func());
+            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(read_func());
             return Status::OK();
         }();
 
-        if (!status.ok()) {
-            _spill_status_ok = false;
-            _spill_status = std::move(status);
-        }
-        _spill_dependency->set_ready();
+        return status;
     };
 
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
@@ -562,8 +511,12 @@ Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
                                 "fault_inject partitioned_hash_join_probe "
                                 "recovery_probe_blocks submit_func failed");
                     });
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+    _spilling_task_count = 1;
     return spill_io_pool->submit(std::make_shared<SpillRunnable>(
-            state, _runtime_profile.get(), false, 
_shared_state->shared_from_this(),
+            state, nullptr, _spilling_task_count, _runtime_profile.get(), 
submit_timer,
+            _shared_state->shared_from_this(), _spill_dependency, false, false,
             exception_catch_func));
 }
 
@@ -759,9 +712,8 @@ Status 
PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
 Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
                                                vectorized::Block* 
output_block, bool* eos) const {
     auto& local_state = get_local_state(state);
-    if (!local_state._spill_status_ok) {
-        DCHECK_NE(local_state._spill_status.code(), 0);
-        return local_state._spill_status;
+    if (!local_state._shared_state->_spill_status.ok()) {
+        return local_state._shared_state->_spill_status.status();
     }
 
     const auto partition_index = local_state._partition_cursor;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index e76cfdeb2a7..f020c0a832a 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -74,12 +74,6 @@ private:
     std::unique_ptr<vectorized::Block> _child_block;
     bool _child_eos {false};
 
-    std::mutex _spill_lock;
-    Status _spill_status;
-
-    std::atomic<int> _spilling_task_count {0};
-    std::atomic<bool> _spill_status_ok {true};
-
     std::vector<std::unique_ptr<vectorized::MutableBlock>> _partitioned_blocks;
     std::unique_ptr<vectorized::MutableBlock> _recovered_build_block;
     std::map<uint32_t, std::vector<vectorized::Block>> _probe_blocks;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 474f03363f2..d3d010e0d7c 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -73,7 +73,6 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState* 
state) {
                 state, spilling_stream, print_id(state->query_id()),
                 fmt::format("hash_build_sink_{}", i), _parent->node_id(),
                 std::numeric_limits<int32_t>::max(), 
std::numeric_limits<size_t>::max(), _profile));
-        spilling_stream->set_write_counters(_profile);
     }
     return p._partitioner->clone(state, _partitioner);
 }
@@ -203,21 +202,6 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
         std::for_each(partitions_indexes.begin(), partitions_indexes.end(),
                       [](std::vector<uint32_t>& indices) { 
indices.reserve(reserved_size); });
 
-        auto flush_rows = [&state, 
this](std::unique_ptr<vectorized::MutableBlock>& partition_block,
-                                         vectorized::SpillStreamSPtr& 
spilling_stream) {
-            auto block = partition_block->to_block();
-            auto status = spilling_stream->spill_block(state, block, false);
-
-            if (!status.ok()) {
-                std::unique_lock<std::mutex> lock(_spill_lock);
-                _spill_status = status;
-                _spill_status_ok = false;
-                _spill_dependency->set_ready();
-                return false;
-            }
-            return true;
-        };
-
         size_t total_rows = build_block.rows();
         size_t offset = 1;
         while (offset < total_rows) {
@@ -261,22 +245,14 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
                 int64_t old_mem = partition_block->allocated_bytes();
                 {
                     SCOPED_TIMER(_partition_shuffle_timer);
-                    Status st = partition_block->add_rows(&sub_block, begin, 
end);
-                    if (!st.ok()) {
-                        std::unique_lock<std::mutex> lock(_spill_lock);
-                        _spill_status = st;
-                        _spill_status_ok = false;
-                        _spill_dependency->set_ready();
-                        return;
-                    }
+                    RETURN_IF_ERROR(partition_block->add_rows(&sub_block, 
begin, end));
                     partitions_indexes[partition_idx].clear();
                 }
                 int64_t new_mem = partition_block->allocated_bytes();
 
                 if (partition_block->rows() >= reserved_size || is_last_block) 
{
-                    if (!flush_rows(partition_block, spilling_stream)) {
-                        return;
-                    }
+                    auto block = partition_block->to_block();
+                    RETURN_IF_ERROR(spilling_stream->spill_block(state, block, 
false));
                     partition_block =
                             
vectorized::MutableBlock::create_unique(build_block.clone_empty());
                     COUNTER_UPDATE(_memory_used_counter, -new_mem);
@@ -287,40 +263,20 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
             }
         }
 
-        _spill_dependency->set_ready();
+        return Status::OK();
     };
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-    auto exception_catch_func = [spill_func, spill_context, submit_timer, 
this]() mutable {
-        auto submit_elapsed_time = submit_timer.elapsed_time();
-        _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
-        exec_time_counter()->update(submit_elapsed_time);
-        _spill_total_timer->update(submit_elapsed_time);
-
-        SCOPED_TIMER(exec_time_counter());
-        SCOPED_TIMER(_spill_total_timer);
-        SCOPED_TIMER(_spill_write_timer);
-
-        auto status = [&]() {
-            RETURN_IF_CATCH_EXCEPTION(spill_func());
-            return Status::OK();
-        }();
-
-        if (!status.ok()) {
-            std::unique_lock<std::mutex> lock(_spill_lock);
-            _spill_status = status;
-            _spill_status_ok = false;
-            _spill_dependency->set_ready();
-        }
-
-        if (spill_context) {
-            spill_context->on_task_finished();
-        }
+    auto exception_catch_func = [spill_func]() mutable {
+        auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return spill_func()); 
}();
+        return status;
     };
 
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+    _spilling_task_count = 1;
     auto spill_runnable = std::make_shared<SpillRunnable>(
-            state, _profile, true, _shared_state->shared_from_this(), 
exception_catch_func);
+            state, spill_context, _spilling_task_count, _profile, submit_timer,
+            _shared_state->shared_from_this(), _spill_dependency, true, true, 
exception_catch_func);
 
     auto* thread_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
 
@@ -340,7 +296,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
     VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", task: " << 
state->task_id()
                << " hash join sink " << _parent->node_id() << " revoke_memory"
                << ", eos: " << _child_eos;
-    DCHECK_EQ(_spilling_streams_count, 0);
+    DCHECK_EQ(_spilling_task_count, 0);
     CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr);
 
     if (!_shared_state->need_to_spill) {
@@ -349,20 +305,40 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
         return _revoke_unpartitioned_block(state, spill_context);
     }
 
-    _spilling_streams_count = _shared_state->partitioned_build_blocks.size();
+    _spilling_task_count = _shared_state->partitioned_build_blocks.size();
 
     auto query_id = state->query_id();
 
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     DCHECK(spill_io_pool != nullptr);
 
+    auto spill_fin_cb = [this, state, query_id, spill_context]() {
+        Status status;
+        if (_child_eos) {
+            VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) << 
", hash join sink "
+                       << _parent->node_id() << " set_ready_to_read"
+                       << ", task id: " << state->task_id();
+            std::for_each(_shared_state->partitioned_build_blocks.begin(),
+                          _shared_state->partitioned_build_blocks.end(), 
[&](auto& block) {
+                              if (block) {
+                                  COUNTER_UPDATE(_in_mem_rows_counter, 
block->rows());
+                              }
+                          });
+            auto st = _finish_spilling();
+            if (status.ok()) {
+                status = st;
+            }
+            _dependency->set_ready_to_read();
+        }
+        return status;
+    };
     for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); 
++i) {
         vectorized::SpillStreamSPtr& spilling_stream = 
_shared_state->spilled_streams[i];
         auto& mutable_block = _shared_state->partitioned_build_blocks[i];
 
         if (!mutable_block ||
             mutable_block->allocated_bytes() < 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
-            --_spilling_streams_count;
+            --_spilling_task_count;
             continue;
         }
 
@@ -380,54 +356,40 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
         // so that when a stream finished, it should desc -1
         state->get_query_ctx()->increase_revoking_tasks_count();
         auto spill_runnable = std::make_shared<SpillRunnable>(
-                state, _profile, true, _shared_state->shared_from_this(),
-                [this, query_id, spilling_stream, i, submit_timer, 
spill_context] {
-                    auto submit_elapsed_time = submit_timer.elapsed_time();
-                    
_spill_write_wait_in_queue_timer->update(submit_elapsed_time);
-                    exec_time_counter()->update(submit_elapsed_time);
-                    _spill_total_timer->update(submit_elapsed_time);
-
-                    SCOPED_TIMER(exec_time_counter());
-                    SCOPED_TIMER(_spill_total_timer);
-                    SCOPED_TIMER(_spill_write_timer);
-
+                state, spill_context, _spilling_task_count, _profile, 
submit_timer,
+                _shared_state->shared_from_this(), _spill_dependency, true, 
true,
+                [this, query_id, spilling_stream, i, spill_context] {
                     DBUG_EXECUTE_IF(
                             
"fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", {
-                                
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
-                                        query_id, Status::InternalError(
-                                                          "fault_inject 
partitioned_hash_join_sink "
-                                                          "revoke_memory 
canceled"));
-                                return;
+                                auto status = Status::InternalError(
+                                        "fault_inject 
partitioned_hash_join_sink "
+                                        "revoke_memory canceled");
+                                
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id,
+                                                                               
      status);
+                                return status;
                             });
                     SCOPED_TIMER(_spill_build_timer);
 
                     auto status = [&]() {
                         RETURN_IF_CATCH_EXCEPTION(
-                                _spill_to_disk(i, spilling_stream, 
spill_context));
-                        return Status::OK();
+                                return _spill_to_disk(i, spilling_stream, 
spill_context));
                     }();
 
                     _state->get_query_ctx()->decrease_revoking_tasks_count();
-
-                    if (!status.ok()) {
-                        std::unique_lock<std::mutex> lock(_spill_lock);
-                        _spill_dependency->set_ready();
-                        _spill_status_ok = false;
-                        _spill_status = std::move(status);
-                    }
-                });
+                    return status;
+                },
+                spill_fin_cb);
         if (st.ok()) {
             st = spill_io_pool->submit(std::move(spill_runnable));
         }
 
         if (!st.ok()) {
-            --_spilling_streams_count;
+            --_spilling_task_count;
             return st;
         }
     }
 
-    std::unique_lock<std::mutex> lock(_spill_lock);
-    if (_spilling_streams_count > 0) {
+    if (_spilling_task_count > 0) {
         _spill_dependency->block();
     } else if (_child_eos) {
         VLOG_DEBUG << "query:" << print_id(state->query_id()) << ", hash join 
sink "
@@ -502,50 +464,24 @@ Status 
PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
     return Status::OK();
 }
 
-void PartitionedHashJoinSinkLocalState::_spill_to_disk(
+Status PartitionedHashJoinSinkLocalState::_spill_to_disk(
         uint32_t partition_index, const vectorized::SpillStreamSPtr& 
spilling_stream,
         const std::shared_ptr<SpillContext>& spill_context) {
     auto& partitioned_block = 
_shared_state->partitioned_build_blocks[partition_index];
 
-    if (_spill_status_ok) {
+    Status status = _shared_state->_spill_status.status();
+    if (status.ok()) {
         auto block = partitioned_block->to_block();
         int64_t block_mem_usage = block.allocated_bytes();
         Defer defer {[&]() { COUNTER_UPDATE(memory_used_counter(), 
-block_mem_usage); }};
         partitioned_block = 
vectorized::MutableBlock::create_unique(block.clone_empty());
-        auto st = spilling_stream->spill_block(state(), block, false);
-        if (!st.ok()) {
-            _spill_status_ok = false;
-            std::lock_guard<std::mutex> l(_spill_status_lock);
-            _spill_status = st;
-        }
+        status = spilling_stream->spill_block(state(), block, false);
     }
 
     VLOG_DEBUG << "query: " << print_id(_state->query_id()) << ", task: " << 
_state->task_id()
                << ", join sink " << _parent->node_id() << " revoke done";
-    auto num = _spilling_streams_count.fetch_sub(1);
-    DCHECK_GE(_spilling_streams_count, 0);
-
-    if (num == 1) {
-        std::unique_lock<std::mutex> lock(_spill_lock);
-        _spill_dependency->set_ready();
-        if (_child_eos) {
-            VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) << 
", hash join sink "
-                       << _parent->node_id() << " set_ready_to_read"
-                       << ", task id: " << state()->task_id();
-            std::for_each(_shared_state->partitioned_build_blocks.begin(),
-                          _shared_state->partitioned_build_blocks.end(), 
[&](auto& block) {
-                              if (block) {
-                                  COUNTER_UPDATE(_in_mem_rows_counter, 
block->rows());
-                              }
-                          });
-            _spill_status = _finish_spilling();
-            _dependency->set_ready_to_read();
-        }
 
-        if (spill_context) {
-            spill_context->on_task_finished();
-        }
-    }
+    return status;
 }
 
 PartitionedHashJoinSinkOperatorX::PartitionedHashJoinSinkOperatorX(
@@ -635,9 +571,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
     CHECK_EQ(local_state._spill_dependency->is_blocked_by(nullptr), nullptr);
     local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    if (!local_state._spill_status_ok) {
-        DCHECK_NE(local_state._spill_status.code(), 0);
-        return local_state._spill_status;
+    if (!local_state._shared_state->_spill_status.ok()) {
+        return local_state._shared_state->_spill_status.status();
     }
 
     local_state._child_eos = eos;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index f96f54dc4db..e6993faa462 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -53,9 +53,9 @@ protected:
     PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent, 
RuntimeState* state)
             : 
PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState>(parent, state) {}
 
-    void _spill_to_disk(uint32_t partition_index,
-                        const vectorized::SpillStreamSPtr& spilling_stream,
-                        const std::shared_ptr<SpillContext>& spill_context);
+    Status _spill_to_disk(uint32_t partition_index,
+                          const vectorized::SpillStreamSPtr& spilling_stream,
+                          const std::shared_ptr<SpillContext>& spill_context);
 
     Status _partition_block(RuntimeState* state, vectorized::Block* in_block, 
size_t begin,
                             size_t end);
@@ -67,17 +67,10 @@ protected:
 
     friend class PartitionedHashJoinSinkOperatorX;
 
-    std::atomic_int _spilling_streams_count {0};
-    std::atomic<bool> _spill_status_ok {true};
-    std::mutex _spill_lock;
-
     vectorized::Block _pending_block;
 
     bool _child_eos {false};
 
-    Status _spill_status;
-    std::mutex _spill_status_lock;
-
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
 
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 5d8355b865d..48a1b2b6ec3 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -144,7 +144,7 @@ Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* 
state,
 
 size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
     auto& local_state = get_local_state(state);
-    if (!local_state.Base::_shared_state->sink_status.ok()) {
+    if (!local_state._shared_state->_spill_status.ok()) {
         return UINT64_MAX;
     }
     return 
_sort_sink_operator->get_revocable_mem_size(local_state._runtime_state.get());
@@ -155,7 +155,9 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Bloc
     auto& local_state = get_local_state(state);
     local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status);
+    if (!local_state._shared_state->_spill_status.ok()) {
+        return local_state._shared_state->_spill_status.status();
+    }
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     if (in_block->rows() > 0) {
         
local_state._shared_state->update_spill_block_batch_row_count(in_block);
@@ -198,14 +200,15 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
     VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node "
                << Base::_parent->node_id() << " revoke_memory"
                << ", eos: " << _eos;
-    RETURN_IF_ERROR(Base::_shared_state->sink_status);
+    if (!_shared_state->_spill_status.ok()) {
+        return _shared_state->_spill_status.status();
+    }
 
     auto status = 
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
             state, _spilling_stream, print_id(state->query_id()), "sort", 
_parent->node_id(),
             _shared_state->spill_block_batch_row_count,
             SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, profile());
     RETURN_IF_ERROR(status);
-    _spilling_stream->set_write_counters(_profile);
 
     _shared_state->sorted_streams.emplace_back(_spilling_stream);
 
@@ -218,12 +221,12 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
     auto query_id = state->query_id();
 
     auto spill_func = [this, state, query_id, &parent] {
+        Status status;
         Defer defer {[&]() {
-            if (!_shared_state->sink_status.ok() || state->is_cancelled()) {
-                if (!_shared_state->sink_status.ok()) {
+            if (!status.ok() || state->is_cancelled()) {
+                if (!status.ok()) {
                     LOG(WARNING) << "query " << print_id(query_id) << " sort 
node "
-                                 << _parent->node_id()
-                                 << " revoke memory error: " << 
_shared_state->sink_status;
+                                 << _parent->node_id() << " revoke memory 
error: " << status;
                 }
                 _shared_state->close();
             } else {
@@ -231,7 +234,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state,
                            << " revoke memory finish";
             }
 
-            if (!_shared_state->sink_status.ok()) {
+            if (!status.ok()) {
                 _shared_state->close();
             }
 
@@ -240,14 +243,11 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
             if (_eos) {
                 _dependency->set_ready_to_read();
                 _finish_dependency->set_ready();
-            } else {
-                _spill_dependency->Dependency::set_ready();
             }
         }};
 
-        _shared_state->sink_status =
-                
parent._sort_sink_operator->prepare_for_spill(_runtime_state.get());
-        RETURN_IF_ERROR(_shared_state->sink_status);
+        status = 
parent._sort_sink_operator->prepare_for_spill(_runtime_state.get());
+        RETURN_IF_ERROR(status);
 
         auto* sink_local_state = _runtime_state->get_sink_local_state();
         update_profile(sink_local_state->profile());
@@ -257,13 +257,13 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
         while (!eos && !state->is_cancelled()) {
             {
                 SCOPED_TIMER(_spill_merge_sort_timer);
-                _shared_state->sink_status = 
parent._sort_sink_operator->merge_sort_read_for_spill(
+                status = parent._sort_sink_operator->merge_sort_read_for_spill(
                         _runtime_state.get(), &block, 
_shared_state->spill_block_batch_row_count,
                         &eos);
             }
-            RETURN_IF_ERROR(_shared_state->sink_status);
-            _shared_state->sink_status = _spilling_stream->spill_block(state, 
block, eos);
-            RETURN_IF_ERROR(_shared_state->sink_status);
+            RETURN_IF_ERROR(status);
+            status = _spilling_stream->spill_block(state, block, eos);
+            RETURN_IF_ERROR(status);
             block.clear_column_data();
         }
         parent._sort_sink_operator->reset(_runtime_state.get());
@@ -271,33 +271,18 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
         return Status::OK();
     };
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-
-    auto exception_catch_func = [this, query_id, spill_context, submit_timer, 
spill_func]() {
-        auto submit_elapsed_time = submit_timer.elapsed_time();
-        _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
-        exec_time_counter()->update(submit_elapsed_time);
-        _spill_total_timer->update(submit_elapsed_time);
-
-        SCOPED_TIMER(exec_time_counter());
-        SCOPED_TIMER(_spill_total_timer);
-        SCOPED_TIMER(_spill_write_timer);
-
+    auto exception_catch_func = [query_id, spill_context, spill_func]() {
         DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", 
{
-            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
-                    query_id, Status::InternalError("fault_inject 
spill_sort_sink "
-                                                    "revoke_memory canceled"));
-            return;
+            auto status = Status::InternalError(
+                    "fault_inject spill_sort_sink "
+                    "revoke_memory canceled");
+            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, 
status);
+            return status;
         });
 
-        _shared_state->sink_status = [&]() {
-            RETURN_IF_CATCH_EXCEPTION({ return spill_func(); });
-        }();
+        auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); 
}); }();
 
-        if (spill_context) {
-            spill_context->on_task_finished();
-        }
+        return status;
     };
 
     
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_submit_func", {
@@ -307,10 +292,15 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
     });
     if (status.ok()) {
         state->get_query_ctx()->increase_revoking_tasks_count();
+
+        MonotonicStopWatch submit_timer;
+        submit_timer.start();
+        _spilling_task_count = 1;
         status = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
-                std::make_shared<SpillRunnable>(state, _profile, true,
-                                                
_shared_state->shared_from_this(),
-                                                exception_catch_func));
+                std::make_shared<SpillRunnable>(
+                        state, spill_context, _spilling_task_count, _profile, 
submit_timer,
+                        _shared_state->shared_from_this(), _spill_dependency, 
true, true,
+                        exception_catch_func));
     }
     if (!status.ok()) {
         if (!_eos) {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 5cc124caaea..2308c49d893 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -81,11 +81,12 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
 
     auto spill_func = [this, state, query_id, &parent] {
         SCOPED_TIMER(_spill_merge_sort_timer);
+        Status status;
         Defer defer {[&]() {
-            if (!_status.ok() || state->is_cancelled()) {
-                if (!_status.ok()) {
+            if (!status.ok() || state->is_cancelled()) {
+                if (!status.ok()) {
                     LOG(WARNING) << "query " << print_id(query_id) << " sort 
node "
-                                 << _parent->node_id() << " merge spill data 
error: " << _status;
+                                 << _parent->node_id() << " merge spill data 
error: " << status;
                 }
                 _shared_state->close();
                 for (auto& stream : _current_merging_streams) {
@@ -96,7 +97,6 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
                 VLOG_DEBUG << "query " << print_id(query_id) << " sort node " 
<< _parent->node_id()
                            << " merge spill data finish";
             }
-            _spill_dependency->Dependency::set_ready();
         }};
         vectorized::Block merge_sorted_block;
         vectorized::SpillStreamSPtr tmp_stream;
@@ -108,11 +108,11 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
                        << ", curren merge max stream count: " << 
max_stream_count;
             {
                 SCOPED_TIMER(Base::_spill_recover_time);
-                _status = _create_intermediate_merger(
+                status = _create_intermediate_merger(
                         max_stream_count,
                         
parent._sort_source_operator->get_sort_description(_runtime_state.get()));
             }
-            RETURN_IF_ERROR(_status);
+            RETURN_IF_ERROR(status);
 
             // all the remaining streams can be merged in a run
             if (_shared_state->sorted_streams.empty()) {
@@ -120,12 +120,11 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
             }
 
             {
-                _status = 
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+                status = 
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
                         state, tmp_stream, print_id(state->query_id()), 
"sort", _parent->node_id(),
                         _shared_state->spill_block_batch_row_count,
                         SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, 
profile());
-                RETURN_IF_ERROR(_status);
-                tmp_stream->set_write_counters(profile());
+                RETURN_IF_ERROR(status);
 
                 _shared_state->sorted_streams.emplace_back(tmp_stream);
 
@@ -135,24 +134,24 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
                     {
                         SCOPED_TIMER(Base::_spill_recover_time);
                         
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::recover_spill_data", {
-                            _status = Status::Error<INTERNAL_ERROR>(
+                            status = Status::Error<INTERNAL_ERROR>(
                                     "fault_inject spill_sort_source "
                                     "recover_spill_data failed");
                         });
-                        if (_status.ok()) {
-                            _status = _merger->get_next(&merge_sorted_block, 
&eos);
+                        if (status.ok()) {
+                            status = _merger->get_next(&merge_sorted_block, 
&eos);
                         }
                     }
-                    RETURN_IF_ERROR(_status);
-                    _status = tmp_stream->spill_block(state, 
merge_sorted_block, eos);
-                    if (_status.ok()) {
+                    RETURN_IF_ERROR(status);
+                    status = tmp_stream->spill_block(state, 
merge_sorted_block, eos);
+                    if (status.ok()) {
                         
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::spill_merged_data", {
-                            _status = Status::Error<INTERNAL_ERROR>(
+                            status = Status::Error<INTERNAL_ERROR>(
                                     "fault_inject spill_sort_source "
                                     "spill_merged_data failed");
                         });
                     }
-                    RETURN_IF_ERROR(_status);
+                    RETURN_IF_ERROR(status);
                 }
             }
             for (auto& stream : _current_merging_streams) {
@@ -163,20 +162,9 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
         return Status::OK();
     };
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-
-    auto exception_catch_func = [this, spill_func, submit_timer]() {
-        auto submit_elapsed_time = submit_timer.elapsed_time();
-        _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
-        exec_time_counter()->update(submit_elapsed_time);
-        _spill_total_timer->update(submit_elapsed_time);
-
-        SCOPED_TIMER(exec_time_counter());
-        SCOPED_TIMER(_spill_total_timer);
-        SCOPED_TIMER(_spill_recover_time);
-
-        _status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); 
}();
+    auto exception_catch_func = [spill_func]() {
+        auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); 
}); }();
+        return status;
     };
 
     
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::merge_sort_spill_data_submit_func",
 {
@@ -184,10 +172,15 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
                 "fault_inject spill_sort_source "
                 "merge_sort_spill_data submit_func failed");
     });
+
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+    _spilling_task_count = 1;
     return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
-            std::make_shared<SpillRunnable>(state, _runtime_profile.get(), 
false,
-                                            _shared_state->shared_from_this(),
-                                            exception_catch_func));
+            std::make_shared<SpillRunnable>(state, nullptr, 
_spilling_task_count,
+                                            _runtime_profile.get(), 
submit_timer,
+                                            _shared_state->shared_from_this(), 
_spill_dependency,
+                                            false, false, 
exception_catch_func));
 }
 
 Status SpillSortLocalState::_create_intermediate_merger(
@@ -261,8 +254,9 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Bloc
                                            bool* eos) {
     auto& local_state = get_local_state(state);
     local_state.copy_shared_spill_profile();
+    Status status;
     Defer defer {[&]() {
-        if (!local_state._status.ok() || *eos) {
+        if (!status.ok() || *eos) {
             local_state._shared_state->close();
             for (auto& stream : local_state._current_merging_streams) {
                 
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
@@ -272,20 +266,21 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Bloc
     }};
     local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    RETURN_IF_ERROR(local_state._status);
+    status = local_state._shared_state->_spill_status.status();
+    RETURN_IF_ERROR(status);
 
     if (local_state._shared_state->is_spilled) {
         if (!local_state._merger) {
-            local_state._status = 
local_state.initiate_merge_sort_spill_streams(state);
-            return local_state._status;
+            status = local_state.initiate_merge_sort_spill_streams(state);
+            return status;
         } else {
-            local_state._status = local_state._merger->get_next(block, eos);
-            RETURN_IF_ERROR(local_state._status);
+            SCOPED_TIMER(local_state._spill_total_timer);
+            status = local_state._merger->get_next(block, eos);
+            RETURN_IF_ERROR(status);
         }
     } else {
-        local_state._status =
-                
_sort_source_operator->get_block(local_state._runtime_state.get(), block, eos);
-        RETURN_IF_ERROR(local_state._status);
+        status = 
_sort_source_operator->get_block(local_state._runtime_state.get(), block, eos);
+        RETURN_IF_ERROR(status);
     }
     local_state.reached_limit(block, eos);
     return Status::OK();
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h 
b/be/src/pipeline/exec/spill_sort_source_operator.h
index 7536dd15e92..a7b8e8efde8 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -54,7 +54,6 @@ protected:
     std::unique_ptr<RuntimeState> _runtime_state;
 
     bool _opened = false;
-    Status _status;
 
     int64_t _external_sort_bytes_threshold = 134217728; // 128M
     std::vector<vectorized::SpillStreamSPtr> _current_merging_streams;
diff --git a/be/src/pipeline/exec/spill_utils.h 
b/be/src/pipeline/exec/spill_utils.h
index 2ea5cedcdb0..687779badbb 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -79,21 +79,40 @@ private:
 
 class SpillRunnable : public Runnable {
 public:
-    SpillRunnable(RuntimeState* state, RuntimeProfile* profile, bool is_write,
-                  const std::shared_ptr<BasicSharedState>& shared_state, 
std::function<void()> func)
-            : _state(state),
+    SpillRunnable(RuntimeState* state, std::shared_ptr<SpillContext> 
spill_context,
+                  std::atomic_int& spilling_task_count, RuntimeProfile* 
profile,
+                  MonotonicStopWatch submit_timer,
+                  const std::shared_ptr<BasicSpillSharedState>& shared_state,
+                  std::shared_ptr<Dependency> spill_dependency, bool is_sink, 
bool is_write,
+                  std::function<Status()> spill_exec_func,
+                  std::function<Status()> spill_fin_cb = {})
+            : _is_sink(is_sink),
               _is_write(is_write),
+              _state(state),
+              _spill_context(std::move(spill_context)),
+              _spilling_task_count(spilling_task_count),
+              _spill_dependency(std::move(spill_dependency)),
+              _submit_timer(submit_timer),
               _task_context_holder(state->get_task_execution_context()),
               _shared_state_holder(shared_state),
-              _func(std::move(func)) {
-        write_wait_in_queue_task_count = 
profile->get_counter("SpillWriteTaskWaitInQueueCount");
-        writing_task_count = profile->get_counter("SpillWriteTaskCount");
-        read_wait_in_queue_task_count = 
profile->get_counter("SpillReadTaskWaitInQueueCount");
-        reading_task_count = profile->get_counter("SpillReadTaskCount");
+              _spill_exec_func(std::move(spill_exec_func)),
+              _spill_fin_cb(std::move(spill_fin_cb)) {
+        _exec_timer = profile->get_counter("ExecTime");
+        _spill_total_timer = profile->get_counter("SpillTotalTime");
+
+        _spill_write_timer = profile->get_counter("SpillWriteTime");
+        _spill_write_wait_in_queue_timer = 
profile->get_counter("SpillWriteTaskWaitInQueueTime");
+        _write_wait_in_queue_task_count = 
profile->get_counter("SpillWriteTaskWaitInQueueCount");
+        _writing_task_count = profile->get_counter("SpillWriteTaskCount");
+
+        _spill_revover_timer = profile->get_counter("SpillRecoverTime");
+        _spill_read_wait_in_queue_timer = 
profile->get_counter("SpillReadTaskWaitInQueueTime");
+        _read_wait_in_queue_task_count = 
profile->get_counter("SpillReadTaskWaitInQueueCount");
+        _reading_task_count = profile->get_counter("SpillReadTaskCount");
         if (is_write) {
-            COUNTER_UPDATE(write_wait_in_queue_task_count, 1);
+            COUNTER_UPDATE(_write_wait_in_queue_task_count, 1);
         } else {
-            COUNTER_UPDATE(read_wait_in_queue_task_count, 1);
+            COUNTER_UPDATE(_read_wait_in_queue_task_count, 1);
         }
     }
 
@@ -106,22 +125,46 @@ public:
         if (!task_context_holder) {
             return;
         }
+
+        auto submit_elapsed_time = _submit_timer.elapsed_time();
+        if (_is_write) {
+            _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
+        } else {
+            _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
+        }
+        _exec_timer->update(submit_elapsed_time);
+        _spill_total_timer->update(submit_elapsed_time);
+
+        SCOPED_TIMER(_exec_timer);
+        SCOPED_TIMER(_spill_total_timer);
+
+        std::shared_ptr<ScopedTimer<MonotonicStopWatch>> write_or_read_timer;
         if (_is_write) {
-            COUNTER_UPDATE(write_wait_in_queue_task_count, -1);
-            COUNTER_UPDATE(writing_task_count, 1);
+            write_or_read_timer =
+                    
std::make_shared<ScopedTimer<MonotonicStopWatch>>(_spill_write_timer);
+            COUNTER_UPDATE(_write_wait_in_queue_task_count, -1);
+            COUNTER_UPDATE(_writing_task_count, 1);
         } else {
-            COUNTER_UPDATE(read_wait_in_queue_task_count, -1);
-            COUNTER_UPDATE(reading_task_count, 1);
+            write_or_read_timer =
+                    
std::make_shared<ScopedTimer<MonotonicStopWatch>>(_spill_revover_timer);
+            COUNTER_UPDATE(_read_wait_in_queue_task_count, -1);
+            COUNTER_UPDATE(_reading_task_count, 1);
         }
         SCOPED_ATTACH_TASK(_state);
         Defer defer([&] {
             if (_is_write) {
-                COUNTER_UPDATE(writing_task_count, -1);
+                COUNTER_UPDATE(_writing_task_count, -1);
             } else {
-                COUNTER_UPDATE(reading_task_count, -1);
+                COUNTER_UPDATE(_reading_task_count, -1);
+            }
+            {
+                std::function<Status()> tmp;
+                std::swap(tmp, _spill_exec_func);
+            }
+            {
+                std::function<Status()> tmp;
+                std::swap(tmp, _spill_fin_cb);
             }
-            std::function<void()> tmp;
-            std::swap(tmp, _func);
         });
 
         auto shared_state_holder = _shared_state_holder.lock();
@@ -132,19 +175,53 @@ public:
         if (_state->is_cancelled()) {
             return;
         }
-        _func();
+        shared_state_holder->_spill_status.update(_spill_exec_func());
+
+        auto num = _spilling_task_count.fetch_sub(1);
+        DCHECK_GE(_spilling_task_count, 0);
+
+        if (num == 1) {
+            if (_spill_fin_cb) {
+                shared_state_holder->_spill_status.update(_spill_fin_cb());
+            }
+            if (_spill_context) {
+                if (_is_sink) {
+                    _spill_context->on_task_finished();
+                } else {
+                    _spill_context->on_non_sink_task_finished();
+                }
+            }
+            _spill_dependency->set_ready();
+        }
     }
 
 private:
-    RuntimeState* _state;
+    bool _is_sink;
     bool _is_write;
-    RuntimeProfile::Counter* write_wait_in_queue_task_count = nullptr;
-    RuntimeProfile::Counter* writing_task_count = nullptr;
-    RuntimeProfile::Counter* read_wait_in_queue_task_count = nullptr;
-    RuntimeProfile::Counter* reading_task_count = nullptr;
+    RuntimeState* _state;
+    std::shared_ptr<SpillContext> _spill_context;
+    std::atomic_int& _spilling_task_count;
+    std::shared_ptr<Dependency> _spill_dependency;
+
+    MonotonicStopWatch _submit_timer;
+
+    RuntimeProfile::Counter* _exec_timer = nullptr;
+    RuntimeProfile::Counter* _spill_total_timer;
+
+    RuntimeProfile::Counter* _spill_write_timer;
+    RuntimeProfile::Counter* _spill_write_wait_in_queue_timer = nullptr;
+    RuntimeProfile::Counter* _write_wait_in_queue_task_count = nullptr;
+    RuntimeProfile::Counter* _writing_task_count = nullptr;
+
+    RuntimeProfile::Counter* _spill_revover_timer;
+    RuntimeProfile::Counter* _spill_read_wait_in_queue_timer = nullptr;
+    RuntimeProfile::Counter* _read_wait_in_queue_task_count = nullptr;
+    RuntimeProfile::Counter* _reading_task_count = nullptr;
+
     std::weak_ptr<TaskExecutionContext> _task_context_holder;
-    std::weak_ptr<BasicSharedState> _shared_state_holder;
-    std::function<void()> _func;
+    std::weak_ptr<BasicSpillSharedState> _shared_state_holder;
+    std::function<Status()> _spill_exec_func;
+    std::function<Status()> _spill_fin_cb;
 };
 
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/vec/spill/spill_stream.cpp 
b/be/src/vec/spill/spill_stream.cpp
index ce07b4d331c..000617067f4 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -97,6 +97,7 @@ void SpillStream::gc() {
 Status SpillStream::prepare() {
     writer_ =
             std::make_unique<SpillWriter>(profile_, stream_id_, batch_rows_, 
data_dir_, spill_dir_);
+    _set_write_counters(profile_);
 
     reader_ = std::make_unique<SpillReader>(stream_id_, 
writer_->get_file_path());
 
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index 6779234279c..9682130aad0 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -63,8 +63,6 @@ public:
 
     Status read_next_block_sync(Block* block, bool* eos);
 
-    void set_write_counters(RuntimeProfile* profile) { 
writer_->set_counters(profile); }
-
     void set_read_counters(RuntimeProfile* profile) { 
reader_->set_counters(profile); }
 
     void update_shared_profiles(RuntimeProfile* source_op_profile);
@@ -76,6 +74,8 @@ private:
 
     Status prepare();
 
+    void _set_write_counters(RuntimeProfile* profile) { 
writer_->set_counters(profile); }
+
     RuntimeState* state_ = nullptr;
     int64_t stream_id_;
     SpillDataDir* data_dir_ = nullptr;
diff --git a/be/src/vec/spill/spill_writer.cpp 
b/be/src/vec/spill/spill_writer.cpp
index 26d8f06a623..f6e16518043 100644
--- a/be/src/vec/spill/spill_writer.cpp
+++ b/be/src/vec/spill/spill_writer.cpp
@@ -51,8 +51,10 @@ Status SpillWriter::close() {
     }
 
     total_written_bytes_ += meta_.size();
-    COUNTER_UPDATE(_write_file_data_bytes_counter, meta_.size());
-
+    COUNTER_UPDATE(_write_file_total_size, meta_.size());
+    if (_write_file_current_size) {
+        COUNTER_UPDATE(_write_file_current_size, meta_.size());
+    }
     data_dir_->update_spill_data_usage(meta_.size());
 
     RETURN_IF_ERROR(file_writer_->close());
@@ -146,7 +148,10 @@ Status SpillWriter::_write_internal(const Block& block, 
size_t& written_bytes) {
                     max_sub_block_size_ = std::max(max_sub_block_size_, 
(size_t)buff_size);
 
                     meta_.append((const char*)&total_written_bytes_, 
sizeof(size_t));
-                    COUNTER_UPDATE(_write_file_data_bytes_counter, buff_size);
+                    COUNTER_UPDATE(_write_file_total_size, buff_size);
+                    if (_write_file_current_size) {
+                        COUNTER_UPDATE(_write_file_current_size, buff_size);
+                    }
                     COUNTER_UPDATE(_write_block_counter, 1);
                     total_written_bytes_ += buff_size;
                     ++written_blocks_;
diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h
index 1212d7652a8..23a8e3030e2 100644
--- a/be/src/vec/spill/spill_writer.h
+++ b/be/src/vec/spill/spill_writer.h
@@ -61,7 +61,8 @@ public:
         _serialize_timer = 
profile->get_counter("SpillWriteSerializeBlockTime");
         _write_block_counter = profile->get_counter("SpillWriteBlockCount");
         _write_block_bytes_counter = 
profile->get_counter("SpillWriteBlockDataSize");
-        _write_file_data_bytes_counter = 
profile->get_counter("SpillWriteFileTotalSize");
+        _write_file_total_size = 
profile->get_counter("SpillWriteFileTotalSize");
+        _write_file_current_size = 
profile->get_counter("SpillWriteFileCurrentSize");
         _write_rows_counter = profile->get_counter("SpillWriteRows");
     }
 
@@ -86,7 +87,8 @@ private:
     RuntimeProfile::Counter* _serialize_timer = nullptr;
     RuntimeProfile::Counter* _write_block_counter = nullptr;
     RuntimeProfile::Counter* _write_block_bytes_counter = nullptr;
-    RuntimeProfile::Counter* _write_file_data_bytes_counter = nullptr;
+    RuntimeProfile::Counter* _write_file_total_size = nullptr;
+    RuntimeProfile::Counter* _write_file_current_size = nullptr;
     RuntimeProfile::Counter* _write_rows_counter = nullptr;
     RuntimeProfile::Counter* _memory_used_counter = nullptr;
     RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to