This is an automated email from the ASF dual-hosted git repository.
mrhhsg pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_repartition by this push:
new 7548fe8f7f1 some tiny fix
7548fe8f7f1 is described below
commit 7548fe8f7f1de3e776536d3a87df01aa740297e0
Author: Hu Shenggang <[email protected]>
AuthorDate: Mon Mar 2 10:15:19 2026 +0800
some tiny fix
---
.../pipeline/exec/partitioned_aggregation_sink_operator.cpp | 2 --
.../pipeline/exec/partitioned_hash_join_probe_operator.cpp | 13 ++++++++-----
2 files changed, 8 insertions(+), 7 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index ef53713d7d0..18d2211c689 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -367,9 +367,7 @@ Status
PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
spill_infos[i].keys_, spill_infos[i].values_,
nullptr, false);
RETURN_IF_ERROR(status);
spill_infos[i].keys_.clear();
- spill_infos[i].keys_.shrink_to_fit();
spill_infos[i].values_.clear();
- spill_infos[i].values_.shrink_to_fit();
}
}
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 2492cdc0479..bc2f42e1932 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -79,7 +79,7 @@ void PartitionedHashJoinProbeLocalState::init_counters() {
_get_child_next_timer = ADD_TIMER_WITH_LEVEL(custom_profile(),
"GetChildNextTime", 1);
_probe_blocks_bytes =
- ADD_COUNTER_WITH_LEVEL(custom_profile(), "ProbeBloksBytesInMem",
TUnit::BYTES, 1);
+ ADD_COUNTER_WITH_LEVEL(custom_profile(), "ProbeBlocksBytesInMem",
TUnit::BYTES, 1);
_memory_usage_reserved =
ADD_COUNTER_WITH_LEVEL(custom_profile(), "MemoryUsageReserved",
TUnit::BYTES, 1);
@@ -609,8 +609,9 @@ Status
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
RETURN_IF_ERROR(partitioned_blocks[i]->add_rows(input_block,
partition_indexes[i].data(),
partition_indexes[i].data() + count));
- if (partitioned_blocks[i]->rows() > 2 * 1024 * 1024 ||
- (eos && partitioned_blocks[i]->rows() > 0)) {
+ if (partitioned_blocks[i]->rows() > 0 &&
+ (eos || partitioned_blocks[i]->allocated_bytes() >=
+
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM)) {
local_state._probe_blocks[i].emplace_back(partitioned_blocks[i]->to_block());
partitioned_blocks[i].reset();
} else {
@@ -1108,8 +1109,10 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
local_state.update_profile_from_inner();
}
- local_state.add_num_rows_returned(block->rows());
- COUNTER_UPDATE(local_state._blocks_returned_counter, 1);
+ if (!block->empty()) {
+ local_state.add_num_rows_returned(block->rows());
+ COUNTER_UPDATE(local_state._blocks_returned_counter, 1);
+ }
}
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]