This is an automated email from the ASF dual-hosted git repository.

zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f4a33613d69 [performance][Exec] scanner merge block before projection 
(#59492)
f4a33613d69 is described below

commit f4a33613d692f731e5119ff38a83a11f4465e3ce
Author: HappenLee <[email protected]>
AuthorDate: Sun Jan 4 12:16:20 2026 +0800

    [performance][Exec] scanner merge block before projection (#59492)
    
    ### What problem does this PR solve?
    
    Description
    Add batch size based block fetching strategy with min_batch_size (max of
    1/4 of state batch size and 1) to reduce frequent small block processing
    Implement block merging logic between _origin_block and _padding_block
    when total rows do not exceed state batch size
    Optimize EOS handling logic to ensure proper output sequence of origin
    block first then padding block
    Add block swap mechanism between origin and padding block for efficient
    data transfer
    Ensure correct column data clearing based on row descriptor's
    materialized slots count
---
 be/src/vec/exec/scan/scanner.cpp | 35 +++++++++++++++++++++++++++++++++--
 be/src/vec/exec/scan/scanner.h   | 12 ++++++++++++
 2 files changed, 45 insertions(+), 2 deletions(-)

diff --git a/be/src/vec/exec/scan/scanner.cpp b/be/src/vec/exec/scan/scanner.cpp
index 2857738297f..a1d7ec74994 100644
--- a/be/src/vec/exec/scan/scanner.cpp
+++ b/be/src/vec/exec/scan/scanner.cpp
@@ -79,8 +79,39 @@ Status Scanner::init(RuntimeState* state, const 
VExprContextSPtrs& conjuncts) {
 Status Scanner::get_block_after_projects(RuntimeState* state, 
vectorized::Block* block, bool* eos) {
     auto& row_descriptor = _local_state->_parent->row_descriptor();
     if (_output_row_descriptor) {
-        
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
-        RETURN_IF_ERROR(get_block(state, &_origin_block, eos));
+        if (_alreay_eos) {
+            *eos = true;
+            _padding_block.swap(_origin_block);
+        } else {
+            
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
+            const auto min_batch_size = std::max(state->batch_size() / 2, 1);
+            while (_padding_block.rows() < min_batch_size && !*eos) {
+                RETURN_IF_ERROR(get_block(state, &_origin_block, eos));
+                if (_origin_block.rows() >= min_batch_size) {
+                    break;
+                }
+
+                if (_origin_block.rows() + _padding_block.rows() <= 
state->batch_size()) {
+                    RETURN_IF_ERROR(_merge_padding_block());
+                    
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
+                } else {
+                    if (_origin_block.rows() < _padding_block.rows()) {
+                        _padding_block.swap(_origin_block);
+                    }
+                    break;
+                }
+            }
+        }
+
+        // first output the origin block change eos = false, next time output 
padding block
+        // set the eos to true
+        if (*eos && !_padding_block.empty() && !_origin_block.empty()) {
+            _alreay_eos = true;
+            *eos = false;
+        }
+        if (_origin_block.empty() && !_padding_block.empty()) {
+            _padding_block.swap(_origin_block);
+        }
         return _do_projections(&_origin_block, block);
     } else {
         return get_block(state, block, eos);
diff --git a/be/src/vec/exec/scan/scanner.h b/be/src/vec/exec/scan/scanner.h
index 26023c58b3b..9aad3716946 100644
--- a/be/src/vec/exec/scan/scanner.h
+++ b/be/src/vec/exec/scan/scanner.h
@@ -103,6 +103,16 @@ protected:
     // Subclass should implement this to return data.
     virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* 
eof) = 0;
 
+    Status _merge_padding_block() {
+        if (_padding_block.empty()) {
+            _padding_block.swap(_origin_block);
+        } else if (_origin_block.rows()) {
+            RETURN_IF_ERROR(
+                    
MutableBlock::build_mutable_block(&_padding_block).merge(_origin_block));
+        }
+        return Status::OK();
+    }
+
     // Update the counters before closing this scanner
     virtual void _collect_profile_before_close();
 
@@ -217,6 +227,8 @@ protected:
     // Used in common subexpression elimination to compute intermediate 
results.
     std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
     vectorized::Block _origin_block;
+    vectorized::Block _padding_block;
+    bool _alreay_eos = false;
 
     VExprContextSPtrs _common_expr_ctxs_push_down;
     // Late arriving runtime filters will update _conjuncts.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to