This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 76124f5814f [Opt](agg) kway merge opt for percentile func (#34382) 76124f5814f is described below commit 76124f5814f892df706439a56e11b33b77641e88 Author: HappenLee <happen...@hotmail.com> AuthorDate: Sun May 5 00:00:23 2024 +0800 [Opt](agg) kway merge opt for percentile func (#34382) --- be/src/util/counts.h | 176 ++++++++++++++++++++++++++++++++++++------- be/test/util/counts_test.cpp | 14 +++- 2 files changed, 161 insertions(+), 29 deletions(-) diff --git a/be/src/util/counts.h b/be/src/util/counts.h index 9b697e796be..70469d6fa72 100644 --- a/be/src/util/counts.h +++ b/be/src/util/counts.h @@ -21,6 +21,7 @@ #include <algorithm> #include <cmath> +#include <queue> #include "udf/udf.h" #include "vec/common/pod_array.h" @@ -144,17 +145,8 @@ public: Counts() = default; void merge(Counts* other) { - if (other == nullptr || other->_nums.empty()) { - return; - } - - if (_nums.empty()) { - _nums = std::move(other->_nums); - } else { - decltype(_nums) res(_nums.size() + other->_nums.size()); - std::merge(_nums.begin(), _nums.end(), other->_nums.begin(), other->_nums.end(), - res.begin()); - _nums = std::move(res); + if (other != nullptr && !other->_nums.empty()) { + _sorted_nums_vec.emplace_back(std::move(other->_nums)); } } @@ -167,10 +159,16 @@ public: } void serialize(vectorized::BufferWritable& buf) { - pdqsort(_nums.begin(), _nums.end()); - size_t size = _nums.size(); - write_binary(size, buf); - buf.write(reinterpret_cast<const char*>(_nums.data()), sizeof(int64_t) * size); + if (!_nums.empty()) { + pdqsort(_nums.begin(), _nums.end()); + size_t size = _nums.size(); + write_binary(size, buf); + buf.write(reinterpret_cast<const char*>(_nums.data()), sizeof(int64_t) * size); + } else { + // convert _sorted_nums_vec to _nums and do seiralize again + _convert_sorted_num_vec_to_nums(); + serialize(buf); + } } void unserialize(vectorized::BufferReadable& buf) { @@ -182,25 +180,149 @@ public: } double terminate(double quantile) { - if (_nums.empty()) { - // Although set null here, but the value is 0.0 and the call method just - // get val in aggregate_function_percentile_approx.h - return 0.0; + if (_sorted_nums_vec.size() <= 1) { + if (_sorted_nums_vec.size() == 1) { + _nums = std::move(_sorted_nums_vec[0]); + } + + if (_nums.empty()) { + // Although set null here, but the value is 0.0 and the call method just + // get val in aggregate_function_percentile_approx.h + return 0.0; + } + if (quantile == 1 || _nums.size() == 1) { + return _nums.back(); + } + if (UNLIKELY(!std::is_sorted(_nums.begin(), _nums.end()))) { + pdqsort(_nums.begin(), _nums.end()); + } + + double u = (_nums.size() - 1) * quantile; + auto index = static_cast<uint32_t>(u); + return _nums[index] + + (u - static_cast<double>(index)) * (_nums[index + 1] - _nums[index]); + } else { + DCHECK(_nums.empty()); + size_t rows = 0; + for (const auto& i : _sorted_nums_vec) { + rows += i.size(); + } + const bool reverse = quantile > 0.5 && rows > 2; + double u = (rows - 1) * quantile; + auto index = static_cast<uint32_t>(u); + // if reverse, the step of target should start 0 like not reverse + // so here rows need to minus index + 2 + // eg: rows = 10, index = 5 + // if not reverse, so the first number loc is 5, the second number loc is 6 + // if reverse, so the second number is 3, the first number is 4 + // 5 + 4 = 3 + 6 = 9 = rows - 1. + // the rows must GE 2 beacuse `_sorted_nums_vec` size GE 2 + size_t target = reverse ? rows - index - 2 : index; + if (quantile == 1) { + target = 0; + } + auto [first_number, second_number] = _merge_sort_and_get_numbers(target, reverse); + if (quantile == 1) { + return second_number; + } + return first_number + (u - static_cast<double>(index)) * (second_number - first_number); } - if (quantile == 1 || _nums.size() == 1) { - return _nums.back(); + } + +private: + struct Node { + int64_t value; + int array_index; + int64_t element_index; + + std::strong_ordering operator<=>(const Node& other) const { return value <=> other.value; } + }; + + void _convert_sorted_num_vec_to_nums() { + size_t rows = 0; + for (const auto& i : _sorted_nums_vec) { + rows += i.size(); } - if (UNLIKELY(!std::is_sorted(_nums.begin(), _nums.end()))) { - pdqsort(_nums.begin(), _nums.end()); + _nums.resize(rows); + size_t count = 0; + + std::priority_queue<Node, std::vector<Node>, std::greater<Node>> min_heap; + for (int i = 0; i < _sorted_nums_vec.size(); ++i) { + if (!_sorted_nums_vec[i].empty()) { + min_heap.emplace(_sorted_nums_vec[i][0], i, 0); + } } - double u = (_nums.size() - 1) * quantile; - auto index = static_cast<uint32_t>(u); - return _nums[index] + (u - static_cast<double>(index)) * (_nums[index + 1] - _nums[index]); + while (!min_heap.empty()) { + Node node = min_heap.top(); + min_heap.pop(); + _nums[count++] = node.value; + if (++node.element_index < _sorted_nums_vec[node.array_index].size()) { + node.value = _sorted_nums_vec[node.array_index][node.element_index]; + min_heap.push(node); + } + } + _sorted_nums_vec.clear(); + } + + std::pair<int64_t, int64_t> _merge_sort_and_get_numbers(int64_t target, bool reverse) { + int64_t first_number = 0, second_number = 0; + size_t count = 0; + if (reverse) { + std::priority_queue<Node> max_heap; + for (int i = 0; i < _sorted_nums_vec.size(); ++i) { + if (!_sorted_nums_vec[i].empty()) { + max_heap.emplace(_sorted_nums_vec[i][_sorted_nums_vec[i].size() - 1], i, + _sorted_nums_vec[i].size() - 1); + } + } + + while (!max_heap.empty()) { + Node node = max_heap.top(); + max_heap.pop(); + if (count == target) { + second_number = node.value; + } else if (count == target + 1) { + first_number = node.value; + break; + } + ++count; + if (--node.element_index >= 0) { + node.value = _sorted_nums_vec[node.array_index][node.element_index]; + max_heap.push(node); + } + } + + } else { + std::priority_queue<Node, std::vector<Node>, std::greater<Node>> min_heap; + for (int i = 0; i < _sorted_nums_vec.size(); ++i) { + if (!_sorted_nums_vec[i].empty()) { + min_heap.emplace(_sorted_nums_vec[i][0], i, 0); + } + } + + while (!min_heap.empty()) { + Node node = min_heap.top(); + min_heap.pop(); + if (count == target) { + first_number = node.value; + } else if (count == target + 1) { + second_number = node.value; + break; + } + ++count; + if (++node.element_index < _sorted_nums_vec[node.array_index].size()) { + node.value = _sorted_nums_vec[node.array_index][node.element_index]; + min_heap.push(node); + } + } + } + + return {first_number, second_number}; } -private: vectorized::PODArray<int64_t> _nums; + std::vector<vectorized::PODArray<int64_t>> _sorted_nums_vec; }; } // namespace doris diff --git a/be/test/util/counts_test.cpp b/be/test/util/counts_test.cpp index 6fba51e0561..20d9ea54c97 100644 --- a/be/test/util/counts_test.cpp +++ b/be/test/util/counts_test.cpp @@ -62,9 +62,19 @@ TEST_F(TCountsTest, TotalTest) { other1.increment(10, 1); other1.increment(99, 2); - counts.merge(&other1); + // deserialize other1 + cs->clear(); + other1.serialize(bw); + bw.commit(); + Counts other1_deserialized; + vectorized::BufferReadable br1(res); + other1_deserialized.unserialize(br1); + + Counts merge_res; + merge_res.merge(&other); + merge_res.merge(&other1_deserialized); // 1 1 1 1 2 5 7 7 9 9 10 19 50 50 50 99 99 100 100 100 - EXPECT_EQ(counts.terminate(0.3), 6.4); + EXPECT_EQ(merge_res.terminate(0.3), 6.4); } } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org