This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_repartition by this push:
     new 5c2735066ff Using spill_buffer_size_bytes as read limit when 
recovering data
5c2735066ff is described below

commit 5c2735066ffd72f5fc30332fe5e6814f9e7dfabb
Author: Hu Shenggang <[email protected]>
AuthorDate: Sat Feb 28 16:38:25 2026 +0800

    Using spill_buffer_size_bytes as read limit when recovering data
---
 be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index ec27cf75336..2492cdc0479 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -322,6 +322,7 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
         SCOPED_TIMER(_recovery_build_timer);
         bool eos = false;
         Status status;
+        size_t read_size = 0;
         while (!eos) {
             vectorized::Block block;
             status = build_stream->read_next_block_sync(&block, &eos);
@@ -336,6 +337,8 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
             if (UNLIKELY(state->is_cancelled())) {
                 break;
             }
+
+            read_size += block.allocated_bytes();
             if (!_recovered_build_block) {
                 _recovered_build_block = 
vectorized::MutableBlock::create_unique(std::move(block));
             } else {
@@ -344,7 +347,7 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
                     break;
                 }
             }
-            if (_recovered_build_block->allocated_bytes() >= 
state->spill_buffer_size_bytes()) {
+            if (read_size >= state->spill_buffer_size_bytes()) {
                 break;
             }
         }
@@ -400,7 +403,7 @@ Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
                 read_size += block.allocated_bytes();
                 blocks.emplace_back(std::move(block));
             }
-            if (read_size >= 
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
+            if (read_size >= state->spill_buffer_size_bytes()) {
                 break;
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to