This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_repartition by this push:
     new 4df2277eb35 Make spill stream RAII
4df2277eb35 is described below

commit 4df2277eb3553e51f83137febf815c49c0b25a16
Author: Hu Shenggang <[email protected]>
AuthorDate: Sat Feb 28 14:46:32 2026 +0800

    Make spill stream RAII
---
 be/src/pipeline/dependency.cpp                     | 13 +------
 .../partitioned_aggregation_source_operator.cpp    | 22 -----------
 .../exec/partitioned_hash_join_probe_operator.cpp  | 43 +---------------------
 .../pipeline/exec/spill_sort_source_operator.cpp   | 10 +----
 be/src/vec/spill/spill_stream.cpp                  |  1 +
 be/src/vec/spill/spill_stream_manager.cpp          |  5 ---
 be/src/vec/spill/spill_stream_manager.h            |  3 --
 .../partitioned_hash_join_probe_operator_test.cpp  |  3 --
 8 files changed, 5 insertions(+), 95 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 993b907294a..9330fe54e4e 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -340,15 +340,7 @@ Status AggSpillPartition::get_spill_stream(RuntimeState* 
state, int node_id,
     return Status::OK();
 }
 void AggSpillPartition::close() {
-    if (spilling_stream_) {
-        (void)spilling_stream_->close();
-        
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilling_stream_);
-        spilling_stream_.reset();
-    }
-    for (auto& stream : spill_streams_) {
-        (void)stream->close();
-        
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
-    }
+    spilling_stream_.reset();
     spill_streams_.clear();
 }
 
@@ -367,9 +359,6 @@ void SpillSortSharedState::close() {
         return;
     }
     DCHECK(!false_close && is_closed);
-    for (auto& stream : sorted_streams) {
-        
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
-    }
     sorted_streams.clear();
 }
 
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 02a55bd27c4..8de7d8a4411 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -98,20 +98,7 @@ Status PartitionedAggLocalState::close(RuntimeState* state) {
         return Status::OK();
     }
 
-    // Clean up partition queue resources.
-    for (auto& partition : _partition_queue) {
-        for (auto& stream : partition.streams) {
-            if (stream) {
-                
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
-            }
-        }
-    }
     _partition_queue.clear();
-    for (auto& stream : _current_partition.streams) {
-        if (stream) {
-            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
-        }
-    }
     _current_partition.streams.clear();
 
     return Base::close(state);
@@ -365,7 +352,6 @@ Status 
PartitionedAggLocalState::_recover_blocks_from_partition(RuntimeState* st
             }
 
             if (eos) {
-                
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
                 partition.streams.pop_front();
             }
         }
@@ -427,7 +413,6 @@ Status 
PartitionedAggLocalState::_repartition_partition(RuntimeState* state,
             continue;
         }
         if (stream->get_written_bytes() == 0) {
-            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
             stream.reset();
             continue;
         }
@@ -436,7 +421,6 @@ Status 
PartitionedAggLocalState::_repartition_partition(RuntimeState* state,
         while (!done && !state->is_cancelled()) {
             RETURN_IF_ERROR(_repartitioner.repartition(state, stream, 
output_streams, &done));
         }
-        
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
         stream.reset();
     }
     partition.streams.clear();
@@ -454,8 +438,6 @@ Status 
PartitionedAggLocalState::_repartition_partition(RuntimeState* state,
                 _max_partition_level_seen = new_level;
                 COUNTER_SET(_max_partition_level, 
int64_t(_max_partition_level_seen));
             }
-        } else if (output_streams[i]) {
-            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(output_streams[i]);
         }
     }
 
@@ -568,7 +550,6 @@ Status PartitionedAggLocalState::flush_and_repartition(
             continue;
         }
         if (stream->get_written_bytes() == 0) {
-            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
             stream.reset();
             continue;
         }
@@ -577,7 +558,6 @@ Status PartitionedAggLocalState::flush_and_repartition(
         while (!done && !state->is_cancelled()) {
             RETURN_IF_ERROR(_repartitioner.repartition(state, stream, 
output_streams, &done));
         }
-        
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
         stream.reset();
     }
     remaining_streams.clear();
@@ -596,8 +576,6 @@ Status PartitionedAggLocalState::flush_and_repartition(
                 _max_partition_level_seen = new_level;
                 COUNTER_SET(_max_partition_level, 
int64_t(_max_partition_level_seen));
             }
-        } else if (output_streams[i]) {
-            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(output_streams[i]);
         }
     }
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 6a49fdb6f85..ec27cf75336 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -180,24 +180,7 @@ Status 
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
     }
-    // Clean up any remaining spill partition queue entries
-    for (auto& entry : _spill_partition_queue) {
-        if (entry.build_stream) {
-            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(entry.build_stream);
-        }
-        if (entry.probe_stream) {
-            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(entry.probe_stream);
-        }
-    }
     _spill_partition_queue.clear();
-    if (_current_partition.build_stream) {
-        ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(
-                _current_partition.build_stream);
-    }
-    if (_current_partition.probe_stream) {
-        ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(
-                _current_partition.probe_stream);
-    }
     _current_partition = SpillPartitionInfo {};
     _queue_probe_blocks.clear();
 
@@ -366,7 +349,6 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
             }
         }
         if (eos) {
-            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(build_stream);
             build_stream.reset();
         }
         return status;
@@ -423,7 +405,6 @@ Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
             }
         }
         if (eos) {
-            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(probe_stream);
             probe_stream.reset();
         }
         return st;
@@ -487,11 +468,9 @@ Status 
PartitionedHashJoinProbeLocalState::repartition_current_partition(
                                                        build_output_streams, 
&done));
         }
         // Input build stream fully consumed, clean up
-        
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(partition.build_stream);
         partition.build_stream.reset();
     } else if (partition.build_stream) {
         // Stream exists but not ready for reading (empty or not finalized).
-        
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(partition.build_stream);
         partition.build_stream.reset();
     }
     RETURN_IF_ERROR(SpillRepartitioner::finalize(build_output_streams));
@@ -516,11 +495,9 @@ Status 
PartitionedHashJoinProbeLocalState::repartition_current_partition(
                                                        probe_output_streams, 
&done));
         }
         // Input probe stream fully consumed, clean up
-        
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(partition.probe_stream);
         partition.probe_stream.reset();
     } else if (partition.probe_stream) {
         // Stream exists but not ready for reading (empty or not finalized).
-        
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(partition.probe_stream);
         partition.probe_stream.reset();
     }
     RETURN_IF_ERROR(SpillRepartitioner::finalize(probe_output_streams));
@@ -536,16 +513,6 @@ Status 
PartitionedHashJoinProbeLocalState::repartition_current_partition(
                 _max_partition_level_seen = new_level;
                 COUNTER_SET(_max_partition_level, 
int64_t(_max_partition_level_seen));
             }
-        } else {
-            // Clean up empty streams
-            if (build_output_streams[i]) {
-                
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(
-                        build_output_streams[i]);
-            }
-            if (probe_output_streams[i]) {
-                
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(
-                        probe_output_streams[i]);
-            }
         }
     }
 
@@ -767,14 +734,8 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
                             int64_t(local_state._max_partition_level_seen));
             } else {
                 // No build data for this partition — discard streams.
-                if (build_stream) {
-                    
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(build_stream);
-                    build_stream.reset();
-                }
-                if (probe_stream) {
-                    
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(probe_stream);
-                    probe_stream.reset();
-                }
+                build_stream.reset();
+                probe_stream.reset();
             }
         }
         local_state._spill_queue_initialized = true;
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 097b61b0794..74b47fc8bb5 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -74,12 +74,7 @@ Status 
SpillSortLocalState::_execute_merge_sort_spill_streams(RuntimeState* stat
     auto& parent = Base::_parent->template cast<Parent>();
     SCOPED_TIMER(_spill_merge_sort_timer);
     Status status;
-    Defer defer {[&]() {
-        for (auto& stream : _current_merging_streams) {
-            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
-        }
-        _current_merging_streams.clear();
-    }};
+    Defer defer {[&]() { _current_merging_streams.clear(); }};
     vectorized::Block merge_sorted_block;
     vectorized::SpillStreamSPtr tmp_stream;
     while (!state->is_cancelled()) {
@@ -236,9 +231,6 @@ Status SpillSortSourceOperatorX::close(RuntimeState* state) 
{
     // close shared state. Centralize cleanup so resources are released when
     // the pipeline task finishes.
     auto& local_state = get_local_state(state);
-    for (auto& stream : local_state._current_merging_streams) {
-        
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
-    }
     local_state._current_merging_streams.clear();
     local_state._merger.reset();
 
diff --git a/be/src/vec/spill/spill_stream.cpp 
b/be/src/vec/spill/spill_stream.cpp
index e84a34b2558..f21ad80d011 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -53,6 +53,7 @@ SpillStream::SpillStream(RuntimeState* state, int64_t 
stream_id, SpillDataDir* d
 }
 
 SpillStream::~SpillStream() {
+    (void)close();
     gc();
 }
 
diff --git a/be/src/vec/spill/spill_stream_manager.cpp 
b/be/src/vec/spill/spill_stream_manager.cpp
index 7dea14e4043..ef6b8f3853a 100644
--- a/be/src/vec/spill/spill_stream_manager.cpp
+++ b/be/src/vec/spill/spill_stream_manager.cpp
@@ -184,11 +184,6 @@ Status 
SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea
     return Status::OK();
 }
 
-void SpillStreamManager::delete_spill_stream(SpillStreamSPtr stream) {
-    (void)stream->close();
-    stream->gc();
-}
-
 void SpillStreamManager::gc(int32_t max_work_time_ms) {
     bool exists = true;
     bool has_work = false;
diff --git a/be/src/vec/spill/spill_stream_manager.h 
b/be/src/vec/spill/spill_stream_manager.h
index 7f76ce1e488..29aed0d19fa 100644
--- a/be/src/vec/spill/spill_stream_manager.h
+++ b/be/src/vec/spill/spill_stream_manager.h
@@ -133,9 +133,6 @@ public:
                                  int32_t node_id, size_t batch_bytes,
                                  RuntimeProfile* operator_profile);
 
-    // 标记SpillStream需要被删除,在GC线程中异步删除落盘文件
-    void delete_spill_stream(SpillStreamSPtr spill_stream);
-
     void gc(int32_t max_work_time_ms);
 
     void update_spill_write_bytes(int64_t bytes) { 
_spill_write_bytes_counter->increment(bytes); }
diff --git 
a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp 
b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
index f9eac858e48..893f23c4ead 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
@@ -215,8 +215,6 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
spill_probe_blocks) {
         if (!local_state->_probe_spilling_streams[i]) {
             continue;
         }
-        ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(
-                local_state->_probe_spilling_streams[i]);
         local_state->_probe_spilling_streams[i].reset();
     }
 
@@ -411,7 +409,6 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverProbeBlocksFromDiskError) {
     auto status = 
local_state->recover_probe_blocks_from_disk(_helper.runtime_state.get(),
                                                               test_partition, 
has_data);
 
-    
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilling_stream);
     spilling_stream.reset();
 
     ASSERT_FALSE(status.ok());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to