This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_repartition by this push:
     new 7548fe8f7f1 some tiny fix
7548fe8f7f1 is described below

commit 7548fe8f7f1de3e776536d3a87df01aa740297e0
Author: Hu Shenggang <[email protected]>
AuthorDate: Mon Mar 2 10:15:19 2026 +0800

    some tiny fix
---
 .../pipeline/exec/partitioned_aggregation_sink_operator.cpp |  2 --
 .../pipeline/exec/partitioned_hash_join_probe_operator.cpp  | 13 ++++++++-----
 2 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index ef53713d7d0..18d2211c689 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -367,9 +367,7 @@ Status 
PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
                             spill_infos[i].keys_, spill_infos[i].values_, 
nullptr, false);
                     RETURN_IF_ERROR(status);
                     spill_infos[i].keys_.clear();
-                    spill_infos[i].keys_.shrink_to_fit();
                     spill_infos[i].values_.clear();
-                    spill_infos[i].values_.shrink_to_fit();
                 }
             }
         }
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 2492cdc0479..bc2f42e1932 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -79,7 +79,7 @@ void PartitionedHashJoinProbeLocalState::init_counters() {
     _get_child_next_timer = ADD_TIMER_WITH_LEVEL(custom_profile(), 
"GetChildNextTime", 1);
 
     _probe_blocks_bytes =
-            ADD_COUNTER_WITH_LEVEL(custom_profile(), "ProbeBloksBytesInMem", 
TUnit::BYTES, 1);
+            ADD_COUNTER_WITH_LEVEL(custom_profile(), "ProbeBlocksBytesInMem", 
TUnit::BYTES, 1);
     _memory_usage_reserved =
             ADD_COUNTER_WITH_LEVEL(custom_profile(), "MemoryUsageReserved", 
TUnit::BYTES, 1);
 
@@ -609,8 +609,9 @@ Status 
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
         RETURN_IF_ERROR(partitioned_blocks[i]->add_rows(input_block, 
partition_indexes[i].data(),
                                                         
partition_indexes[i].data() + count));
 
-        if (partitioned_blocks[i]->rows() > 2 * 1024 * 1024 ||
-            (eos && partitioned_blocks[i]->rows() > 0)) {
+        if (partitioned_blocks[i]->rows() > 0 &&
+            (eos || partitioned_blocks[i]->allocated_bytes() >=
+                            
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM)) {
             
local_state._probe_blocks[i].emplace_back(partitioned_blocks[i]->to_block());
             partitioned_blocks[i].reset();
         } else {
@@ -1108,8 +1109,10 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
             local_state.update_profile_from_inner();
         }
 
-        local_state.add_num_rows_returned(block->rows());
-        COUNTER_UPDATE(local_state._blocks_returned_counter, 1);
+        if (!block->empty()) {
+            local_state.add_num_rows_returned(block->rows());
+            COUNTER_UPDATE(local_state._blocks_returned_counter, 1);
+        }
     }
     return Status::OK();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to