BiteTheDDDDt commented on code in PR #45952: URL: https://github.com/apache/doris/pull/45952#discussion_r1900567498
########## be/src/vec/core/sort_cursor.h: ########## @@ -314,4 +340,224 @@ struct MergeSortBlockCursor { } }; +enum class SortingQueueStrategy : uint8_t { Default, Batch }; + +/// Allows to fetch data from multiple sort cursors in sorted order (merging sorted data streams). +template <typename Cursor, SortingQueueStrategy strategy> +class SortingQueueImpl { +public: + SortingQueueImpl() = default; + + template <typename Cursors> + explicit SortingQueueImpl(Cursors& cursors) { + size_t size = cursors.size(); + queue.reserve(size); + + for (size_t i = 0; i < size; ++i) { + queue.emplace_back(cursors[i]); + } + + std::make_heap(queue.begin(), queue.end()); + + if constexpr (strategy == SortingQueueStrategy::Batch) { + if (!queue.empty()) { + update_batch_size(); + } + } + } + + bool is_valid() const { return !queue.empty(); } + + Cursor& current() + requires(strategy == SortingQueueStrategy::Default) + { + return &queue.front(); + } + + std::pair<Cursor*, size_t> current() + requires(strategy == SortingQueueStrategy::Batch) + { + return {&queue.front(), batch_size}; + } + + size_t size() { return queue.size(); } + + Cursor& next_child() { return queue[next_child_index()]; } + + void ALWAYS_INLINE next() + requires(strategy == SortingQueueStrategy::Default) + { + assert(is_valid()); + + if (!queue.front()->is_last()) { + queue.front()->next(); + update_top(true); + } else { + remove_top(); + } + } + + void ALWAYS_INLINE next(size_t batch_size_value) + requires(strategy == SortingQueueStrategy::Batch) + { + assert(is_valid()); + assert(batch_size_value <= batch_size); + assert(batch_size_value > 0); + + batch_size -= batch_size_value; + if (batch_size > 0) { + queue.front()->next(batch_size_value); + return; + } + + if (!queue.front()->is_last(batch_size_value)) { + queue.front()->next(batch_size_value); + update_top(false); + } else { + remove_top(); + } + } + + void replace_top(Cursor new_top) { + queue.front() = new_top; + update_top(true); + } + + void remove_top() { + std::pop_heap(queue.begin(), queue.end()); + queue.pop_back(); + next_child_idx = 0; + + if constexpr (strategy == SortingQueueStrategy::Batch) { + if (queue.empty()) { + batch_size = 0; + } else { + update_batch_size(); + } + } + } + + void push(MergeSortCursorImpl& cursor) { + queue.emplace_back(&cursor); + std::push_heap(queue.begin(), queue.end()); + next_child_idx = 0; + + if constexpr (strategy == SortingQueueStrategy::Batch) { + update_batch_size(); + } + } + +private: + using Container = std::vector<Cursor>; + Container queue; + + /// Cache comparison between first and second child if the order in queue has not been changed. + size_t next_child_idx = 0; + size_t batch_size = 0; + + size_t ALWAYS_INLINE next_child_index() { + if (next_child_idx == 0) { + next_child_idx = 1; + + if (queue.size() > 2 && queue[1].greater(queue[2])) { + ++next_child_idx; + } + } + + return next_child_idx; + } + + /// This is adapted version of the function __sift_down from libc++. + /// Why cannot simply use std::priority_queue? + /// - because it doesn't support updating the top element and requires pop and push instead. + /// Also look at "Boost.Heap" library. + void ALWAYS_INLINE update_top(bool check_in_order) { + size_t size = queue.size(); + if (size < 2) { + return; + } + + auto begin = queue.begin(); + + size_t child_idx = next_child_index(); + auto child_it = begin + child_idx; + + /// Check if we are in order. + if (check_in_order && (*child_it).greater(*begin)) { + if constexpr (strategy == SortingQueueStrategy::Batch) { + update_batch_size(); + } + return; + } + + next_child_idx = 0; + + auto curr_it = begin; + auto top(std::move(*begin)); + do { + /// We are not in heap-order, swap the parent with it's largest child. + *curr_it = std::move(*child_it); + curr_it = child_it; + + // recompute the child based off of the updated parent + child_idx = 2 * child_idx + 1; + + if (child_idx >= size) { + break; + } + + child_it = begin + child_idx; + + if ((child_idx + 1) < size && (*child_it).greater(*(child_it + 1))) { + /// Right child exists and is greater than left child. + ++child_it; + ++child_idx; + } + + /// Check if we are in order. + } while (!((*child_it).greater(top))); + *curr_it = std::move(top); + + if constexpr (strategy == SortingQueueStrategy::Batch) { + update_batch_size(); + } + } + + /// Update batch size of elements that client can extract from current cursor + void update_batch_size() { + assert(!queue.empty()); + + auto& begin_cursor = *queue.begin(); + size_t min_cursor_size = begin_cursor->get_size(); + size_t min_cursor_pos = begin_cursor->pos; + + if (queue.size() == 1) { + batch_size = min_cursor_size - min_cursor_pos; + return; + } + + batch_size = 1; + size_t child_idx = next_child_index(); + auto& next_child_cursor = *(queue.begin() + child_idx); + if (min_cursor_pos + batch_size < min_cursor_size && + next_child_cursor.greater_with_offset(begin_cursor, 0, batch_size)) { + ++batch_size; + } else { + return; + } + if (begin_cursor.totally_less_or_equals(next_child_cursor)) { + batch_size = min_cursor_size - min_cursor_pos; + return; + } + + while (min_cursor_pos + batch_size < min_cursor_size && Review Comment: binary search will introduce least log(batch_size) comparison -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org