This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f8ececeefd1 [sort](ut) Test sort merger with single stream (#48415)
f8ececeefd1 is described below

commit f8ececeefd11aa9c79c7f8223fb31b412bc45bcf
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Thu Feb 27 17:26:12 2025 +0800

    [sort](ut) Test sort merger with single stream (#48415)
    
    Sort merger should support single stream input with a non-empty block with 
`eos=true`.
---
 be/src/vec/core/sort_cursor.h             |  5 ++-
 be/src/vec/runtime/vsorted_run_merger.cpp |  4 +--
 be/test/vec/runtime/sort_merger_test.cpp  | 52 +++++++++++++++++++++++++++++++
 3 files changed, 58 insertions(+), 3 deletions(-)

diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index f9cc66cc4dd..9331508c376 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -216,11 +216,14 @@ struct BlockSupplierSortCursorImpl : public 
MergeSortCursorImpl {
                 }
             }
             MergeSortCursorImpl::reset();
+        } else {
+            pos = 0;
+            rows = block->rows();
         }
     }
 
     Block* block_ptr() override { return block.get(); }
-    bool eof() const override { return is_last() && _is_eof; }
+    bool eof() const override { return is_last(0) && _is_eof; }
 
     VExprContextSPtrs _ordering_expr;
     BlockSupplier _block_supplier {};
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp 
b/be/src/vec/runtime/vsorted_run_merger.cpp
index 16150cab63f..25376f9216b 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -192,6 +192,7 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
                 ++merged_rows;
             }
 
+            current->next();
             if (_need_more_data(current)) {
                 do_insert();
                 return Status::OK();
@@ -210,8 +211,7 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
 }
 
 bool VSortedRunMerger::_need_more_data(MergeSortCursor& current) {
-    if (!current->is_last()) {
-        current->next();
+    if (!current->is_last(0)) {
         _priority_queue.push(current);
         return false;
     } else if (current->eof()) {
diff --git a/be/test/vec/runtime/sort_merger_test.cpp 
b/be/test/vec/runtime/sort_merger_test.cpp
index dece7b31074..349a47beb49 100644
--- a/be/test/vec/runtime/sort_merger_test.cpp
+++ b/be/test/vec/runtime/sort_merger_test.cpp
@@ -433,4 +433,56 @@ TEST(SortMergerTest, TEST_SMALL_OFFSET_SINGLE_STREAM) {
     }
 }
 
+TEST(SortMergerTest, TEST_SINGLE_STREAM) {
+    /**
+     * in: [([NULL], eos = true)]
+     *     offset = 0, limit = -1, NULL_FIRST, ASC
+     * out: [NULL]
+     */
+    const int num_children = 1;
+    const int batch_size = 5;
+    std::vector<int> round;
+    round.resize(num_children, 0);
+    const int num_round = 1;
+
+    std::unique_ptr<VSortedRunMerger> merger;
+    auto profile = std::make_shared<RuntimeProfile>("");
+    auto ordering_expr = MockSlotRef::create_mock_contexts(
+            
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()));
+    {
+        std::vector<bool> is_asc_order = {true};
+        std::vector<bool> nulls_first = {true};
+        const int limit = -1;
+        const int offset = 0;
+        merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, 
nulls_first, batch_size,
+                                          limit, offset, profile.get()));
+    }
+    {
+        std::vector<vectorized::BlockSupplier> child_block_suppliers;
+        for (int child_idx = 0; child_idx < num_children; child_idx++) {
+            vectorized::BlockSupplier block_supplier =
+                    [&, round_vec = &round, num_round = num_round, id = 
child_idx](
+                            vectorized::Block* block, bool* eos) {
+                        *block = 
ColumnHelper::create_nullable_block<DataTypeInt64>({0}, {1});
+                        *eos = ++((*round_vec)[id]) == num_round;
+                        return Status::OK();
+                    };
+            child_block_suppliers.push_back(block_supplier);
+        }
+        EXPECT_TRUE(merger->prepare(child_block_suppliers).ok());
+        EXPECT_EQ(merger->_priority_queue.size(), 1);
+        EXPECT_EQ(merger->_priority_queue.top()->pos, 0);
+        EXPECT_EQ(merger->_priority_queue.top()->rows, 1);
+        EXPECT_EQ(merger->_priority_queue.top()->block_ptr()->rows(), 1);
+    }
+    {
+        vectorized::Block block;
+        bool eos = false;
+        EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+        auto expect_block = 
ColumnHelper::create_nullable_column<DataTypeInt64>({0}, {1});
+        
EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column, 
expect_block));
+        EXPECT_TRUE(eos);
+    }
+}
+
 } // namespace doris::vectorized
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to