This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_repartition by this push:
new 05670c557c4 fix probe revokeable memory size bug
05670c557c4 is described below
commit 05670c557c4f05caffe542554e413e7ba87c4625
Author: yiguolei <[email protected]>
AuthorDate: Wed Mar 4 11:52:09 2026 +0800
fix probe revokeable memory size bug
---
.../exec/partitioned_hash_join_probe_operator.cpp | 34 ++++++++++++----------
.../java/org/apache/doris/qe/SessionVariable.java | 4 +--
2 files changed, 21 insertions(+), 17 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index f5949ab6436..49156b83c9e 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -822,27 +822,31 @@ size_t
PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state
if (!local_state._shared_state->_is_spilled) {
return 0;
}
+
+ size_t mem_size = 0;
+ if (!local_state._child_eos) {
+ auto& probe_blocks = local_state._probe_blocks;
+ for (uint32_t i = 0; i < _partition_count; ++i) {
+ for (auto& block : probe_blocks[i]) {
+ mem_size += block.allocated_bytes();
+ }
+
+ auto& partitioned_block = local_state._partitioned_blocks[i];
+ if (partitioned_block) {
+ auto block_bytes = partitioned_block->allocated_bytes();
+ if (block_bytes >= state->spill_buffer_size_bytes()) {
+ mem_size += block_bytes;
+ }
+ }
+ }
+ return mem_size > state->spill_min_revocable_mem() ? mem_size : 0;
+ }
if (!local_state._current_partition.is_valid() ||
local_state._current_partition.build_finished) {
// No active partition — no revocable memory.
// Or if current partition has finished build hash table.
return 0;
}
- size_t mem_size = 0;
- auto& probe_blocks = local_state._probe_blocks;
- for (uint32_t i = 0; i < _partition_count; ++i) {
- for (auto& block : probe_blocks[i]) {
- mem_size += block.allocated_bytes();
- }
-
- auto& partitioned_block = local_state._partitioned_blocks[i];
- if (partitioned_block) {
- auto block_bytes = partitioned_block->allocated_bytes();
- if (block_bytes >=
vectorized::SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
- mem_size += block_bytes;
- }
- }
- }
// Include build-side memory that has been recovered but not yet consumed
by the hash table.
// This data is revocable because we can repartition instead of building
the hash table.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index d0e4c333d42..aa5d0b3e13a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -3132,8 +3132,8 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = SPILL_AGGREGATION_SINK_MEM_LIMIT_BYTES, fuzzy
= true, needForward = true,
description = {"一旦触发 spill 后,aggregation sink 的 revocable memory
超过该阈值就主动落盘(字节)。默认 64MB。",
"After spill is triggered, aggregation sink will proactively
spill when revocable memory "
- + "exceeds this threshold (in bytes). Default is 64MB."})
- public long spillAggregationSinkMemLimitBytes = 64L * 1024L * 1024L;
+ + "exceeds this threshold (in bytes). Default is 64GB."})
+ public long spillAggregationSinkMemLimitBytes = 64L * 1024L * 1024L *
1024L;
@VariableMgr.VarAttr(name = SPILL_SORT_SINK_MEM_LIMIT_BYTES, fuzzy = true,
needForward = true,
description = {"一旦触发 spill 后,sort sink 的 revocable memory
超过该阈值就主动落盘(字节)。默认 64MB。",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]