This is an automated email from the ASF dual-hosted git repository.
mrhhsg pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_repartition by this push:
new 4df2277eb35 Make spill stream RAII
4df2277eb35 is described below
commit 4df2277eb3553e51f83137febf815c49c0b25a16
Author: Hu Shenggang <[email protected]>
AuthorDate: Sat Feb 28 14:46:32 2026 +0800
Make spill stream RAII
---
be/src/pipeline/dependency.cpp | 13 +------
.../partitioned_aggregation_source_operator.cpp | 22 -----------
.../exec/partitioned_hash_join_probe_operator.cpp | 43 +---------------------
.../pipeline/exec/spill_sort_source_operator.cpp | 10 +----
be/src/vec/spill/spill_stream.cpp | 1 +
be/src/vec/spill/spill_stream_manager.cpp | 5 ---
be/src/vec/spill/spill_stream_manager.h | 3 --
.../partitioned_hash_join_probe_operator_test.cpp | 3 --
8 files changed, 5 insertions(+), 95 deletions(-)
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 993b907294a..9330fe54e4e 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -340,15 +340,7 @@ Status AggSpillPartition::get_spill_stream(RuntimeState*
state, int node_id,
return Status::OK();
}
void AggSpillPartition::close() {
- if (spilling_stream_) {
- (void)spilling_stream_->close();
-
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilling_stream_);
- spilling_stream_.reset();
- }
- for (auto& stream : spill_streams_) {
- (void)stream->close();
-
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
- }
+ spilling_stream_.reset();
spill_streams_.clear();
}
@@ -367,9 +359,6 @@ void SpillSortSharedState::close() {
return;
}
DCHECK(!false_close && is_closed);
- for (auto& stream : sorted_streams) {
-
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
- }
sorted_streams.clear();
}
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 02a55bd27c4..8de7d8a4411 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -98,20 +98,7 @@ Status PartitionedAggLocalState::close(RuntimeState* state) {
return Status::OK();
}
- // Clean up partition queue resources.
- for (auto& partition : _partition_queue) {
- for (auto& stream : partition.streams) {
- if (stream) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
- }
- }
- }
_partition_queue.clear();
- for (auto& stream : _current_partition.streams) {
- if (stream) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
- }
- }
_current_partition.streams.clear();
return Base::close(state);
@@ -365,7 +352,6 @@ Status
PartitionedAggLocalState::_recover_blocks_from_partition(RuntimeState* st
}
if (eos) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
partition.streams.pop_front();
}
}
@@ -427,7 +413,6 @@ Status
PartitionedAggLocalState::_repartition_partition(RuntimeState* state,
continue;
}
if (stream->get_written_bytes() == 0) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
stream.reset();
continue;
}
@@ -436,7 +421,6 @@ Status
PartitionedAggLocalState::_repartition_partition(RuntimeState* state,
while (!done && !state->is_cancelled()) {
RETURN_IF_ERROR(_repartitioner.repartition(state, stream,
output_streams, &done));
}
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
stream.reset();
}
partition.streams.clear();
@@ -454,8 +438,6 @@ Status
PartitionedAggLocalState::_repartition_partition(RuntimeState* state,
_max_partition_level_seen = new_level;
COUNTER_SET(_max_partition_level,
int64_t(_max_partition_level_seen));
}
- } else if (output_streams[i]) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(output_streams[i]);
}
}
@@ -568,7 +550,6 @@ Status PartitionedAggLocalState::flush_and_repartition(
continue;
}
if (stream->get_written_bytes() == 0) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
stream.reset();
continue;
}
@@ -577,7 +558,6 @@ Status PartitionedAggLocalState::flush_and_repartition(
while (!done && !state->is_cancelled()) {
RETURN_IF_ERROR(_repartitioner.repartition(state, stream,
output_streams, &done));
}
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
stream.reset();
}
remaining_streams.clear();
@@ -596,8 +576,6 @@ Status PartitionedAggLocalState::flush_and_repartition(
_max_partition_level_seen = new_level;
COUNTER_SET(_max_partition_level,
int64_t(_max_partition_level_seen));
}
- } else if (output_streams[i]) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(output_streams[i]);
}
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 6a49fdb6f85..ec27cf75336 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -180,24 +180,7 @@ Status
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
- // Clean up any remaining spill partition queue entries
- for (auto& entry : _spill_partition_queue) {
- if (entry.build_stream) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(entry.build_stream);
- }
- if (entry.probe_stream) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(entry.probe_stream);
- }
- }
_spill_partition_queue.clear();
- if (_current_partition.build_stream) {
- ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(
- _current_partition.build_stream);
- }
- if (_current_partition.probe_stream) {
- ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(
- _current_partition.probe_stream);
- }
_current_partition = SpillPartitionInfo {};
_queue_probe_blocks.clear();
@@ -366,7 +349,6 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
}
}
if (eos) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(build_stream);
build_stream.reset();
}
return status;
@@ -423,7 +405,6 @@ Status
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
}
}
if (eos) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(probe_stream);
probe_stream.reset();
}
return st;
@@ -487,11 +468,9 @@ Status
PartitionedHashJoinProbeLocalState::repartition_current_partition(
build_output_streams,
&done));
}
// Input build stream fully consumed, clean up
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(partition.build_stream);
partition.build_stream.reset();
} else if (partition.build_stream) {
// Stream exists but not ready for reading (empty or not finalized).
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(partition.build_stream);
partition.build_stream.reset();
}
RETURN_IF_ERROR(SpillRepartitioner::finalize(build_output_streams));
@@ -516,11 +495,9 @@ Status
PartitionedHashJoinProbeLocalState::repartition_current_partition(
probe_output_streams,
&done));
}
// Input probe stream fully consumed, clean up
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(partition.probe_stream);
partition.probe_stream.reset();
} else if (partition.probe_stream) {
// Stream exists but not ready for reading (empty or not finalized).
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(partition.probe_stream);
partition.probe_stream.reset();
}
RETURN_IF_ERROR(SpillRepartitioner::finalize(probe_output_streams));
@@ -536,16 +513,6 @@ Status
PartitionedHashJoinProbeLocalState::repartition_current_partition(
_max_partition_level_seen = new_level;
COUNTER_SET(_max_partition_level,
int64_t(_max_partition_level_seen));
}
- } else {
- // Clean up empty streams
- if (build_output_streams[i]) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(
- build_output_streams[i]);
- }
- if (probe_output_streams[i]) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(
- probe_output_streams[i]);
- }
}
}
@@ -767,14 +734,8 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
int64_t(local_state._max_partition_level_seen));
} else {
// No build data for this partition — discard streams.
- if (build_stream) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(build_stream);
- build_stream.reset();
- }
- if (probe_stream) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(probe_stream);
- probe_stream.reset();
- }
+ build_stream.reset();
+ probe_stream.reset();
}
}
local_state._spill_queue_initialized = true;
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 097b61b0794..74b47fc8bb5 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -74,12 +74,7 @@ Status
SpillSortLocalState::_execute_merge_sort_spill_streams(RuntimeState* stat
auto& parent = Base::_parent->template cast<Parent>();
SCOPED_TIMER(_spill_merge_sort_timer);
Status status;
- Defer defer {[&]() {
- for (auto& stream : _current_merging_streams) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
- }
- _current_merging_streams.clear();
- }};
+ Defer defer {[&]() { _current_merging_streams.clear(); }};
vectorized::Block merge_sorted_block;
vectorized::SpillStreamSPtr tmp_stream;
while (!state->is_cancelled()) {
@@ -236,9 +231,6 @@ Status SpillSortSourceOperatorX::close(RuntimeState* state)
{
// close shared state. Centralize cleanup so resources are released when
// the pipeline task finishes.
auto& local_state = get_local_state(state);
- for (auto& stream : local_state._current_merging_streams) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
- }
local_state._current_merging_streams.clear();
local_state._merger.reset();
diff --git a/be/src/vec/spill/spill_stream.cpp
b/be/src/vec/spill/spill_stream.cpp
index e84a34b2558..f21ad80d011 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -53,6 +53,7 @@ SpillStream::SpillStream(RuntimeState* state, int64_t
stream_id, SpillDataDir* d
}
SpillStream::~SpillStream() {
+ (void)close();
gc();
}
diff --git a/be/src/vec/spill/spill_stream_manager.cpp
b/be/src/vec/spill/spill_stream_manager.cpp
index 7dea14e4043..ef6b8f3853a 100644
--- a/be/src/vec/spill/spill_stream_manager.cpp
+++ b/be/src/vec/spill/spill_stream_manager.cpp
@@ -184,11 +184,6 @@ Status
SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea
return Status::OK();
}
-void SpillStreamManager::delete_spill_stream(SpillStreamSPtr stream) {
- (void)stream->close();
- stream->gc();
-}
-
void SpillStreamManager::gc(int32_t max_work_time_ms) {
bool exists = true;
bool has_work = false;
diff --git a/be/src/vec/spill/spill_stream_manager.h
b/be/src/vec/spill/spill_stream_manager.h
index 7f76ce1e488..29aed0d19fa 100644
--- a/be/src/vec/spill/spill_stream_manager.h
+++ b/be/src/vec/spill/spill_stream_manager.h
@@ -133,9 +133,6 @@ public:
int32_t node_id, size_t batch_bytes,
RuntimeProfile* operator_profile);
- // 标记SpillStream需要被删除,在GC线程中异步删除落盘文件
- void delete_spill_stream(SpillStreamSPtr spill_stream);
-
void gc(int32_t max_work_time_ms);
void update_spill_write_bytes(int64_t bytes) {
_spill_write_bytes_counter->increment(bytes); }
diff --git
a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
index f9eac858e48..893f23c4ead 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
@@ -215,8 +215,6 @@ TEST_F(PartitionedHashJoinProbeOperatorTest,
spill_probe_blocks) {
if (!local_state->_probe_spilling_streams[i]) {
continue;
}
- ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(
- local_state->_probe_spilling_streams[i]);
local_state->_probe_spilling_streams[i].reset();
}
@@ -411,7 +409,6 @@ TEST_F(PartitionedHashJoinProbeOperatorTest,
RecoverProbeBlocksFromDiskError) {
auto status =
local_state->recover_probe_blocks_from_disk(_helper.runtime_state.get(),
test_partition,
has_data);
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilling_stream);
spilling_stream.reset();
ASSERT_FALSE(status.ok());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]