This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f8ececeefd1 [sort](ut) Test sort merger with single stream (#48415) f8ececeefd1 is described below commit f8ececeefd11aa9c79c7f8223fb31b412bc45bcf Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Thu Feb 27 17:26:12 2025 +0800 [sort](ut) Test sort merger with single stream (#48415) Sort merger should support single stream input with a non-empty block with `eos=true`. --- be/src/vec/core/sort_cursor.h | 5 ++- be/src/vec/runtime/vsorted_run_merger.cpp | 4 +-- be/test/vec/runtime/sort_merger_test.cpp | 52 +++++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 3 deletions(-) diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index f9cc66cc4dd..9331508c376 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -216,11 +216,14 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { } } MergeSortCursorImpl::reset(); + } else { + pos = 0; + rows = block->rows(); } } Block* block_ptr() override { return block.get(); } - bool eof() const override { return is_last() && _is_eof; } + bool eof() const override { return is_last(0) && _is_eof; } VExprContextSPtrs _ordering_expr; BlockSupplier _block_supplier {}; diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index 16150cab63f..25376f9216b 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -192,6 +192,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { ++merged_rows; } + current->next(); if (_need_more_data(current)) { do_insert(); return Status::OK(); @@ -210,8 +211,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { } bool VSortedRunMerger::_need_more_data(MergeSortCursor& current) { - if (!current->is_last()) { - current->next(); + if (!current->is_last(0)) { _priority_queue.push(current); return false; } else if (current->eof()) { diff --git a/be/test/vec/runtime/sort_merger_test.cpp b/be/test/vec/runtime/sort_merger_test.cpp index dece7b31074..349a47beb49 100644 --- a/be/test/vec/runtime/sort_merger_test.cpp +++ b/be/test/vec/runtime/sort_merger_test.cpp @@ -433,4 +433,56 @@ TEST(SortMergerTest, TEST_SMALL_OFFSET_SINGLE_STREAM) { } } +TEST(SortMergerTest, TEST_SINGLE_STREAM) { + /** + * in: [([NULL], eos = true)] + * offset = 0, limit = -1, NULL_FIRST, ASC + * out: [NULL] + */ + const int num_children = 1; + const int batch_size = 5; + std::vector<int> round; + round.resize(num_children, 0); + const int num_round = 1; + + std::unique_ptr<VSortedRunMerger> merger; + auto profile = std::make_shared<RuntimeProfile>(""); + auto ordering_expr = MockSlotRef::create_mock_contexts( + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())); + { + std::vector<bool> is_asc_order = {true}; + std::vector<bool> nulls_first = {true}; + const int limit = -1; + const int offset = 0; + merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, + limit, offset, profile.get())); + } + { + std::vector<vectorized::BlockSupplier> child_block_suppliers; + for (int child_idx = 0; child_idx < num_children; child_idx++) { + vectorized::BlockSupplier block_supplier = + [&, round_vec = &round, num_round = num_round, id = child_idx]( + vectorized::Block* block, bool* eos) { + *block = ColumnHelper::create_nullable_block<DataTypeInt64>({0}, {1}); + *eos = ++((*round_vec)[id]) == num_round; + return Status::OK(); + }; + child_block_suppliers.push_back(block_supplier); + } + EXPECT_TRUE(merger->prepare(child_block_suppliers).ok()); + EXPECT_EQ(merger->_priority_queue.size(), 1); + EXPECT_EQ(merger->_priority_queue.top()->pos, 0); + EXPECT_EQ(merger->_priority_queue.top()->rows, 1); + EXPECT_EQ(merger->_priority_queue.top()->block_ptr()->rows(), 1); + } + { + vectorized::Block block; + bool eos = false; + EXPECT_TRUE(merger->get_next(&block, &eos).ok()); + auto expect_block = ColumnHelper::create_nullable_column<DataTypeInt64>({0}, {1}); + EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column, expect_block)); + EXPECT_TRUE(eos); + } +} + } // namespace doris::vectorized \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org