This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_repartition by this push:
new a92b15028d5 remove some codes
a92b15028d5 is described below
commit a92b15028d556adcd69e4dc49301dc8798d3ca7f
Author: yiguolei <[email protected]>
AuthorDate: Fri Feb 27 15:17:52 2026 +0800
remove some codes
---
.../exec/partitioned_hash_join_sink_operator.cpp | 6 ++++--
.../pipeline/exec/spill_sort_source_operator.cpp | 22 +++-------------------
2 files changed, 7 insertions(+), 21 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 85607b2e235..ed8b77be341 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -530,8 +530,10 @@ Status
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
if (eos) {
local_state._child_eos = true;
- RETURN_IF_ERROR(local_state._force_flush_partitions(state));
- RETURN_IF_ERROR(local_state._finish_spilling(state));
+ if (local_state._shared_state->_is_spilled) {
+ RETURN_IF_ERROR(local_state._force_flush_partitions(state));
+ RETURN_IF_ERROR(local_state._finish_spilling(state));
+ }
local_state._dependency->set_ready_to_read();
}
return Status::OK();
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 9734fcbbee0..097b61b0794 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -75,20 +75,10 @@ Status
SpillSortLocalState::_execute_merge_sort_spill_streams(RuntimeState* stat
SCOPED_TIMER(_spill_merge_sort_timer);
Status status;
Defer defer {[&]() {
- if (!status.ok() || state->is_cancelled()) {
- if (!status.ok()) {
- LOG(WARNING) << fmt::format(
- "Query:{}, sort source:{}, task:{}, merge spill data
error:{}",
- print_id(query_id), _parent->node_id(),
state->task_id(), status);
- }
- for (auto& stream : _current_merging_streams) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
- }
- _current_merging_streams.clear();
- } else {
- VLOG_DEBUG << fmt::format("Query:{}, sort source:{}, task:{},
merge spill data finish",
- print_id(query_id), _parent->node_id(),
state->task_id());
+ for (auto& stream : _current_merging_streams) {
+
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
}
+ _current_merging_streams.clear();
}};
vectorized::Block merge_sorted_block;
vectorized::SpillStreamSPtr tmp_stream;
@@ -102,7 +92,6 @@ Status
SpillSortLocalState::_execute_merge_sort_spill_streams(RuntimeState* stat
RETURN_IF_ERROR(_create_intermediate_merger(
max_stream_count,
parent._sort_source_operator->get_sort_description(_runtime_state.get())));
-
// It is a fast path, because all the remaining streams can be merged
in a run
if (_shared_state->sorted_streams.empty()) {
return Status::OK();
@@ -138,11 +127,6 @@ Status
SpillSortLocalState::_execute_merge_sort_spill_streams(RuntimeState* stat
}
RETURN_IF_ERROR(status);
}
-
- for (auto& stream : _current_merging_streams) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
- }
- _current_merging_streams.clear();
}
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]