This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fe0c1ce178 [refactor](be) Remove scanner eos flag (#63578)
9fe0c1ce178 is described below

commit 9fe0c1ce1785b53e8de3adeed8103077cbb71848
Author: Jerry Hu <[email protected]>
AuthorDate: Wed May 27 15:43:47 2026 +0800

    [refactor](be) Remove scanner eos flag (#63578)
    
    ### What changed
    
    Remove the extra `_alreay_eos` scanner state from the scan projection
    path.
    
    When `get_block()` reports eos while both `_padding_block` and the final
    `_origin_block` contain data, the projection path now merges them
    directly and returns `eos=true` in the same call. This can make only the
    final output block larger than the normal batch target, but each source
    block is already bounded by the lower scanner.
    
    ### Why
    
    `_alreay_eos` only existed to carry the final eos handoff across calls,
    and it also carried a typo in the member name. Merging the final
    padding/origin blocks at eos removes that extra state and keeps the
    block lifecycle simpler without changing query results.
    
    ### Validation
    
    - Formatted modified C++ files with `build-support/run_clang_format.py`
    using clang-format 16.
    - `git diff --check`
    - `ninja -C be/ut_build_ASAN -j 1
    src/exec/CMakeFiles/Exec.dir/scan/scanner.cpp.o
    
test/CMakeFiles/doris_be_test.dir/exec/scan/scanner_late_arrival_rf_test.cpp.o`
    - Attempted `./run-be-ut.sh --run -j 1
    
--filter=ScannerProjectionTest.merges_padding_block_when_limit_eos_without_extra_flag:ScannerLateArrivalRfTest.applied_rf_num_advances_after_late_arrival`;
    the full BE UT target did not complete locally because it started a
    broad rebuild on the shared host and process-resource pressure was
    observed (`fork: Resource temporarily unavailable`).
    
    ### Release note
    
    None
---
 be/src/exec/scan/scanner.cpp                       | 53 ++++++++++------------
 be/src/exec/scan/scanner.h                         |  1 -
 be/test/exec/scan/scanner_late_arrival_rf_test.cpp | 52 ++++++++++++++++++++-
 3 files changed, 75 insertions(+), 31 deletions(-)

diff --git a/be/src/exec/scan/scanner.cpp b/be/src/exec/scan/scanner.cpp
index ab76b884ef0..7d72d5838d1 100644
--- a/be/src/exec/scan/scanner.cpp
+++ b/be/src/exec/scan/scanner.cpp
@@ -87,39 +87,36 @@ Status Scanner::get_block_after_projects(RuntimeState* 
state, Block* block, bool
     
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().vscanner_get_block);
     auto& row_descriptor = _local_state->_parent->row_descriptor();
     if (_output_row_descriptor) {
-        if (_alreay_eos) {
-            *eos = true;
-            _padding_block.swap(_origin_block);
-        } else {
-            
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
-            const auto min_batch_size = std::max(state->batch_size() / 2, 1);
-            const auto block_max_bytes = state->preferred_block_size_bytes();
-            while (_padding_block.rows() < min_batch_size &&
-                   _padding_block.bytes() < block_max_bytes && !*eos) {
-                RETURN_IF_ERROR(get_block(state, &_origin_block, eos));
-                if (_origin_block.rows() >= min_batch_size) {
-                    break;
-                }
+        
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
+        const auto min_batch_size = std::max(state->batch_size() / 2, 1);
+        const auto block_max_bytes = state->preferred_block_size_bytes();
+        while (_padding_block.rows() < min_batch_size && 
_padding_block.bytes() < block_max_bytes &&
+               !*eos) {
+            RETURN_IF_ERROR(get_block(state, &_origin_block, eos));
+            if (*eos) {
+                // For the final block, merge any padding directly and return 
eos in this call.
+                // The merged tail can be larger than the target batch, but 
each source block is
+                // already bounded by the lower scanner.
+                RETURN_IF_ERROR(_merge_padding_block());
+                
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
+                break;
+            }
+            if (_origin_block.rows() >= min_batch_size) {
+                break;
+            }
 
-                if (_origin_block.rows() + _padding_block.rows() <= 
state->batch_size() &&
-                    _origin_block.bytes() + _padding_block.bytes() <= 
block_max_bytes) {
-                    RETURN_IF_ERROR(_merge_padding_block());
-                    
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
-                } else {
-                    if (_origin_block.rows() < _padding_block.rows()) {
-                        _padding_block.swap(_origin_block);
-                    }
-                    break;
+            if (_origin_block.rows() + _padding_block.rows() <= 
state->batch_size() &&
+                _origin_block.bytes() + _padding_block.bytes() <= 
block_max_bytes) {
+                RETURN_IF_ERROR(_merge_padding_block());
+                
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
+            } else {
+                if (_origin_block.rows() < _padding_block.rows()) {
+                    _padding_block.swap(_origin_block);
                 }
+                break;
             }
         }
 
-        // first output the origin block change eos = false, next time output 
padding block
-        // set the eos to true
-        if (*eos && !_padding_block.empty() && !_origin_block.empty()) {
-            _alreay_eos = true;
-            *eos = false;
-        }
         if (_origin_block.empty() && !_padding_block.empty()) {
             _padding_block.swap(_origin_block);
         }
diff --git a/be/src/exec/scan/scanner.h b/be/src/exec/scan/scanner.h
index 4f5d511e94b..4882c004b0b 100644
--- a/be/src/exec/scan/scanner.h
+++ b/be/src/exec/scan/scanner.h
@@ -240,7 +240,6 @@ protected:
     std::vector<VExprContextSPtrs> _intermediate_projections;
     Block _origin_block;
     Block _padding_block;
-    bool _alreay_eos = false;
 
     VExprContextSPtrs _common_expr_ctxs_push_down;
 
diff --git a/be/test/exec/scan/scanner_late_arrival_rf_test.cpp 
b/be/test/exec/scan/scanner_late_arrival_rf_test.cpp
index f1e21ebc4c3..0d31b694951 100644
--- a/be/test/exec/scan/scanner_late_arrival_rf_test.cpp
+++ b/be/test/exec/scan/scanner_late_arrival_rf_test.cpp
@@ -18,6 +18,9 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include <list>
+
+#include "common/object_pool.h"
 #include "core/data_type/data_type_factory.hpp"
 #include "core/data_type/data_type_number.h"
 #include "exec/operator/mock_scan_operator.h"
@@ -28,6 +31,10 @@
 #include "exec/scan/scanner.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_descriptors.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "testutil/mock/mock_slot_ref.h"
 
 namespace doris {
 
@@ -40,11 +47,22 @@ public:
                 RuntimeProfile* profile)
             : Scanner(state, local_state, limit, profile) {}
 
+    void add_block(Block block) { _blocks.push_back(std::move(block)); }
+
 protected:
-    Status _get_block_impl(RuntimeState* /*state*/, Block* /*block*/, bool* 
eof) override {
-        *eof = true;
+    Status _get_block_impl(RuntimeState* /*state*/, Block* block, bool* eof) 
override {
+        if (_blocks.empty()) {
+            *eof = true;
+            return Status::OK();
+        }
+        *eof = false;
+        block->swap(_blocks.front());
+        _blocks.pop_front();
         return Status::OK();
     }
+
+private:
+    std::list<Block> _blocks;
 };
 
 class ScannerLateArrivalRfTest : public RuntimeFilterTest {
@@ -117,4 +135,34 @@ TEST_F(ScannerLateArrivalRfTest, 
applied_rf_num_advances_after_late_arrival) {
     ASSERT_TRUE(scanner->_conjuncts.empty());
 }
 
+TEST(ScannerProjectionTest, 
merges_padding_block_when_limit_eos_without_extra_flag) {
+    ObjectPool pool;
+    auto data_type = std::make_shared<DataTypeInt32>();
+    auto row_descriptor = MockRowDescriptor({data_type}, &pool);
+
+    MockRuntimeState state;
+    state._batch_size = 6;
+
+    auto op = std::make_shared<MockScanOperatorX>();
+    op->_row_descriptor = row_descriptor;
+    op->_output_row_descriptor =
+            std::make_unique<MockRowDescriptor>(std::vector<DataTypePtr> 
{data_type}, &pool);
+    op->_output_tuple_desc = 
op->_output_row_descriptor->tuple_descriptors()[0];
+
+    auto local_state = std::make_shared<MockScanLocalState>(&state, op.get());
+    local_state->_projections = MockSlotRef::create_mock_contexts(0, 
data_type);
+
+    RuntimeProfile profile("scanner");
+    TestScanner scanner(&state, local_state.get(), 7, &profile);
+    ASSERT_TRUE(scanner.init(&state, {}).ok());
+    scanner.add_block(ColumnHelper::create_block<DataTypeInt32>({0, 1}));
+    scanner.add_block(ColumnHelper::create_block<DataTypeInt32>({2, 3, 4, 5, 
6}));
+
+    Block first_output;
+    bool eos = false;
+    ASSERT_TRUE(scanner.get_block_after_projects(&state, &first_output, 
&eos).ok());
+    EXPECT_TRUE(eos);
+    EXPECT_EQ(first_output.rows(), 7);
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to