This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9fe0c1ce178 [refactor](be) Remove scanner eos flag (#63578)
9fe0c1ce178 is described below
commit 9fe0c1ce1785b53e8de3adeed8103077cbb71848
Author: Jerry Hu <[email protected]>
AuthorDate: Wed May 27 15:43:47 2026 +0800
[refactor](be) Remove scanner eos flag (#63578)
### What changed
Remove the extra `_alreay_eos` scanner state from the scan projection
path.
When `get_block()` reports eos while both `_padding_block` and the final
`_origin_block` contain data, the projection path now merges them
directly and returns `eos=true` in the same call. This can make only the
final output block larger than the normal batch target, but each source
block is already bounded by the lower scanner.
### Why
`_alreay_eos` only existed to carry the final eos handoff across calls,
and it also carried a typo in the member name. Merging the final
padding/origin blocks at eos removes that extra state and keeps the
block lifecycle simpler without changing query results.
### Validation
- Formatted modified C++ files with `build-support/run_clang_format.py`
using clang-format 16.
- `git diff --check`
- `ninja -C be/ut_build_ASAN -j 1
src/exec/CMakeFiles/Exec.dir/scan/scanner.cpp.o
test/CMakeFiles/doris_be_test.dir/exec/scan/scanner_late_arrival_rf_test.cpp.o`
- Attempted `./run-be-ut.sh --run -j 1
--filter=ScannerProjectionTest.merges_padding_block_when_limit_eos_without_extra_flag:ScannerLateArrivalRfTest.applied_rf_num_advances_after_late_arrival`;
the full BE UT target did not complete locally because it started a
broad rebuild on the shared host and process-resource pressure was
observed (`fork: Resource temporarily unavailable`).
### Release note
None
---
be/src/exec/scan/scanner.cpp | 53 ++++++++++------------
be/src/exec/scan/scanner.h | 1 -
be/test/exec/scan/scanner_late_arrival_rf_test.cpp | 52 ++++++++++++++++++++-
3 files changed, 75 insertions(+), 31 deletions(-)
diff --git a/be/src/exec/scan/scanner.cpp b/be/src/exec/scan/scanner.cpp
index ab76b884ef0..7d72d5838d1 100644
--- a/be/src/exec/scan/scanner.cpp
+++ b/be/src/exec/scan/scanner.cpp
@@ -87,39 +87,36 @@ Status Scanner::get_block_after_projects(RuntimeState*
state, Block* block, bool
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().vscanner_get_block);
auto& row_descriptor = _local_state->_parent->row_descriptor();
if (_output_row_descriptor) {
- if (_alreay_eos) {
- *eos = true;
- _padding_block.swap(_origin_block);
- } else {
-
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
- const auto min_batch_size = std::max(state->batch_size() / 2, 1);
- const auto block_max_bytes = state->preferred_block_size_bytes();
- while (_padding_block.rows() < min_batch_size &&
- _padding_block.bytes() < block_max_bytes && !*eos) {
- RETURN_IF_ERROR(get_block(state, &_origin_block, eos));
- if (_origin_block.rows() >= min_batch_size) {
- break;
- }
+
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
+ const auto min_batch_size = std::max(state->batch_size() / 2, 1);
+ const auto block_max_bytes = state->preferred_block_size_bytes();
+ while (_padding_block.rows() < min_batch_size &&
_padding_block.bytes() < block_max_bytes &&
+ !*eos) {
+ RETURN_IF_ERROR(get_block(state, &_origin_block, eos));
+ if (*eos) {
+ // For the final block, merge any padding directly and return
eos in this call.
+ // The merged tail can be larger than the target batch, but
each source block is
+ // already bounded by the lower scanner.
+ RETURN_IF_ERROR(_merge_padding_block());
+
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
+ break;
+ }
+ if (_origin_block.rows() >= min_batch_size) {
+ break;
+ }
- if (_origin_block.rows() + _padding_block.rows() <=
state->batch_size() &&
- _origin_block.bytes() + _padding_block.bytes() <=
block_max_bytes) {
- RETURN_IF_ERROR(_merge_padding_block());
-
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
- } else {
- if (_origin_block.rows() < _padding_block.rows()) {
- _padding_block.swap(_origin_block);
- }
- break;
+ if (_origin_block.rows() + _padding_block.rows() <=
state->batch_size() &&
+ _origin_block.bytes() + _padding_block.bytes() <=
block_max_bytes) {
+ RETURN_IF_ERROR(_merge_padding_block());
+
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
+ } else {
+ if (_origin_block.rows() < _padding_block.rows()) {
+ _padding_block.swap(_origin_block);
}
+ break;
}
}
- // first output the origin block change eos = false, next time output
padding block
- // set the eos to true
- if (*eos && !_padding_block.empty() && !_origin_block.empty()) {
- _alreay_eos = true;
- *eos = false;
- }
if (_origin_block.empty() && !_padding_block.empty()) {
_padding_block.swap(_origin_block);
}
diff --git a/be/src/exec/scan/scanner.h b/be/src/exec/scan/scanner.h
index 4f5d511e94b..4882c004b0b 100644
--- a/be/src/exec/scan/scanner.h
+++ b/be/src/exec/scan/scanner.h
@@ -240,7 +240,6 @@ protected:
std::vector<VExprContextSPtrs> _intermediate_projections;
Block _origin_block;
Block _padding_block;
- bool _alreay_eos = false;
VExprContextSPtrs _common_expr_ctxs_push_down;
diff --git a/be/test/exec/scan/scanner_late_arrival_rf_test.cpp
b/be/test/exec/scan/scanner_late_arrival_rf_test.cpp
index f1e21ebc4c3..0d31b694951 100644
--- a/be/test/exec/scan/scanner_late_arrival_rf_test.cpp
+++ b/be/test/exec/scan/scanner_late_arrival_rf_test.cpp
@@ -18,6 +18,9 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
+#include <list>
+
+#include "common/object_pool.h"
#include "core/data_type/data_type_factory.hpp"
#include "core/data_type/data_type_number.h"
#include "exec/operator/mock_scan_operator.h"
@@ -28,6 +31,10 @@
#include "exec/scan/scanner.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_descriptors.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "testutil/mock/mock_slot_ref.h"
namespace doris {
@@ -40,11 +47,22 @@ public:
RuntimeProfile* profile)
: Scanner(state, local_state, limit, profile) {}
+ void add_block(Block block) { _blocks.push_back(std::move(block)); }
+
protected:
- Status _get_block_impl(RuntimeState* /*state*/, Block* /*block*/, bool*
eof) override {
- *eof = true;
+ Status _get_block_impl(RuntimeState* /*state*/, Block* block, bool* eof)
override {
+ if (_blocks.empty()) {
+ *eof = true;
+ return Status::OK();
+ }
+ *eof = false;
+ block->swap(_blocks.front());
+ _blocks.pop_front();
return Status::OK();
}
+
+private:
+ std::list<Block> _blocks;
};
class ScannerLateArrivalRfTest : public RuntimeFilterTest {
@@ -117,4 +135,34 @@ TEST_F(ScannerLateArrivalRfTest,
applied_rf_num_advances_after_late_arrival) {
ASSERT_TRUE(scanner->_conjuncts.empty());
}
+TEST(ScannerProjectionTest,
merges_padding_block_when_limit_eos_without_extra_flag) {
+ ObjectPool pool;
+ auto data_type = std::make_shared<DataTypeInt32>();
+ auto row_descriptor = MockRowDescriptor({data_type}, &pool);
+
+ MockRuntimeState state;
+ state._batch_size = 6;
+
+ auto op = std::make_shared<MockScanOperatorX>();
+ op->_row_descriptor = row_descriptor;
+ op->_output_row_descriptor =
+ std::make_unique<MockRowDescriptor>(std::vector<DataTypePtr>
{data_type}, &pool);
+ op->_output_tuple_desc =
op->_output_row_descriptor->tuple_descriptors()[0];
+
+ auto local_state = std::make_shared<MockScanLocalState>(&state, op.get());
+ local_state->_projections = MockSlotRef::create_mock_contexts(0,
data_type);
+
+ RuntimeProfile profile("scanner");
+ TestScanner scanner(&state, local_state.get(), 7, &profile);
+ ASSERT_TRUE(scanner.init(&state, {}).ok());
+ scanner.add_block(ColumnHelper::create_block<DataTypeInt32>({0, 1}));
+ scanner.add_block(ColumnHelper::create_block<DataTypeInt32>({2, 3, 4, 5,
6}));
+
+ Block first_output;
+ bool eos = false;
+ ASSERT_TRUE(scanner.get_block_after_projects(&state, &first_output,
&eos).ok());
+ EXPECT_TRUE(eos);
+ EXPECT_EQ(first_output.rows(), 7);
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]