HappenLee commented on code in PR #45952:
URL: https://github.com/apache/doris/pull/45952#discussion_r1900390831


##########
be/src/vec/core/sort_cursor.h:
##########
@@ -314,4 +340,224 @@ struct MergeSortBlockCursor {
     }
 };
 
+enum class SortingQueueStrategy : uint8_t { Default, Batch };
+
+/// Allows to fetch data from multiple sort cursors in sorted order (merging 
sorted data streams).
+template <typename Cursor, SortingQueueStrategy strategy>
+class SortingQueueImpl {
+public:
+    SortingQueueImpl() = default;
+
+    template <typename Cursors>
+    explicit SortingQueueImpl(Cursors& cursors) {
+        size_t size = cursors.size();
+        queue.reserve(size);
+
+        for (size_t i = 0; i < size; ++i) {
+            queue.emplace_back(cursors[i]);
+        }
+
+        std::make_heap(queue.begin(), queue.end());
+
+        if constexpr (strategy == SortingQueueStrategy::Batch) {
+            if (!queue.empty()) {
+                update_batch_size();
+            }
+        }
+    }
+
+    bool is_valid() const { return !queue.empty(); }
+
+    Cursor& current()
+        requires(strategy == SortingQueueStrategy::Default)
+    {
+        return &queue.front();
+    }
+
+    std::pair<Cursor*, size_t> current()
+        requires(strategy == SortingQueueStrategy::Batch)
+    {
+        return {&queue.front(), batch_size};
+    }
+
+    size_t size() { return queue.size(); }
+
+    Cursor& next_child() { return queue[next_child_index()]; }
+
+    void ALWAYS_INLINE next()
+        requires(strategy == SortingQueueStrategy::Default)
+    {
+        assert(is_valid());
+
+        if (!queue.front()->is_last()) {
+            queue.front()->next();
+            update_top(true);
+        } else {
+            remove_top();
+        }
+    }
+
+    void ALWAYS_INLINE next(size_t batch_size_value)
+        requires(strategy == SortingQueueStrategy::Batch)
+    {
+        assert(is_valid());
+        assert(batch_size_value <= batch_size);
+        assert(batch_size_value > 0);
+
+        batch_size -= batch_size_value;
+        if (batch_size > 0) {
+            queue.front()->next(batch_size_value);
+            return;
+        }
+
+        if (!queue.front()->is_last(batch_size_value)) {
+            queue.front()->next(batch_size_value);
+            update_top(false);
+        } else {
+            remove_top();
+        }
+    }
+
+    void replace_top(Cursor new_top) {

Review Comment:
   seems unless api



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to