This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch opt_perf
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/opt_perf by this push:
     new 983abe90fd [improvement](agg) speed up merge agg results (#12939)
983abe90fd is described below

commit 983abe90fd7813ea7146022351303bdb05415889
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Sat Sep 24 14:43:19 2022 +0800

    [improvement](agg) speed up merge agg results (#12939)
---
 be/src/exec/olap_common.h                          | 12 ++--
 .../vec/aggregate_functions/aggregate_function.h   | 20 ++++++
 .../aggregate_functions/aggregate_function_avg.h   | 13 ++++
 .../aggregate_functions/aggregate_function_count.h | 18 ++++++
 .../aggregate_function_min_max.h                   | 15 +++++
 .../aggregate_functions/aggregate_function_sum.h   | 10 +++
 .../aggregate_functions/aggregate_function_uniq.h  | 71 ++++++++++++++++++----
 be/src/vec/common/sort/sorter.cpp                  |  6 +-
 be/src/vec/exec/vaggregation_node.h                | 34 ++++++-----
 9 files changed, 165 insertions(+), 34 deletions(-)

diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h
index 87cac1e4ac..7a339187fa 100644
--- a/be/src/exec/olap_common.h
+++ b/be/src/exec/olap_common.h
@@ -571,27 +571,27 @@ void 
ColumnValueRange<primitive_type>::convert_to_fixed_value() {
 }
 
 template <>
-[[noreturn]] std::vector<ColumnValueRange<PrimitiveType::TYPE_STRING>>
+std::vector<ColumnValueRange<PrimitiveType::TYPE_STRING>>
 ColumnValueRange<PrimitiveType::TYPE_STRING>::split(size_t count);
 
 template <>
-[[noreturn]] std::vector<ColumnValueRange<PrimitiveType::TYPE_CHAR>>
+std::vector<ColumnValueRange<PrimitiveType::TYPE_CHAR>>
 ColumnValueRange<PrimitiveType::TYPE_CHAR>::split(size_t count);
 
 template <>
-[[noreturn]] std::vector<ColumnValueRange<PrimitiveType::TYPE_VARCHAR>>
+std::vector<ColumnValueRange<PrimitiveType::TYPE_VARCHAR>>
 ColumnValueRange<PrimitiveType::TYPE_VARCHAR>::split(size_t count);
 
 template <>
-[[noreturn]] std::vector<ColumnValueRange<PrimitiveType::TYPE_HLL>>
+std::vector<ColumnValueRange<PrimitiveType::TYPE_HLL>>
 ColumnValueRange<PrimitiveType::TYPE_HLL>::split(size_t count);
 
 template <>
-[[noreturn]] std::vector<ColumnValueRange<PrimitiveType::TYPE_DECIMALV2>>
+std::vector<ColumnValueRange<PrimitiveType::TYPE_DECIMALV2>>
 ColumnValueRange<PrimitiveType::TYPE_DECIMALV2>::split(size_t count);
 
 template <>
-[[noreturn]] std::vector<ColumnValueRange<PrimitiveType::TYPE_LARGEINT>>
+std::vector<ColumnValueRange<PrimitiveType::TYPE_LARGEINT>>
 ColumnValueRange<PrimitiveType::TYPE_LARGEINT>::split(size_t count);
 
 template <PrimitiveType primitive_type>
diff --git a/be/src/vec/aggregate_functions/aggregate_function.h 
b/be/src/vec/aggregate_functions/aggregate_function.h
index 6309b0c994..2a7eecd2f3 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -144,6 +144,11 @@ public:
     virtual void deserialize_and_merge_from_column(AggregateDataPtr __restrict 
place,
                                                    const IColumn& column, 
Arena* arena) const = 0;
 
+    virtual void deserialize_and_merge_with_keys_from_column(AggregateDataPtr* 
places,
+                                                             const size_t 
offset,
+                                                             const IColumn& 
column, Arena* arena,
+                                                             size_t num_rows) 
const = 0;
+
     /// Returns true if a function requires Arena to handle own states (see 
add(), merge(), deserialize()).
     virtual bool allocates_memory_in_arena() const { return false; }
 
@@ -347,6 +352,21 @@ public:
         deserialize_vec(places, assert_cast<const ColumnString*>(&column), 
arena, num_rows);
     }
 
+    void deserialize_and_merge_with_keys_from_column(AggregateDataPtr* places, 
const size_t offset,
+                                                     const IColumn& column, 
Arena* arena,
+                                                     size_t num_rows) const 
override {
+        const auto size_of_data = static_cast<const 
Derived*>(this)->size_of_data();
+        char tmp[size_of_data];
+        for (size_t i = 0; i != num_rows; ++i) {
+            VectorBufferReader buffer_reader(
+                    assert_cast<const ColumnString&>(column).get_data_at(i));
+            static_cast<const Derived*>(this)->create(tmp);
+            static_cast<const Derived*>(this)->deserialize(tmp, buffer_reader, 
arena);
+            static_cast<const Derived*>(this)->merge(places[i] + offset, tmp, 
arena);
+            static_cast<const Derived*>(this)->destroy(tmp);
+        }
+    }
+
     void merge_vec(const AggregateDataPtr* places, size_t offset, 
ConstAggregateDataPtr rhs,
                    Arena* arena, const size_t num_rows) const override {
         const auto size_of_data = static_cast<const 
Derived*>(this)->size_of_data();
diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.h 
b/be/src/vec/aggregate_functions/aggregate_function_avg.h
index c80c46b8f9..04a8cb7ca3 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_avg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_avg.h
@@ -151,6 +151,19 @@ public:
         memcpy(places, data, sizeof(Data) * num_rows);
     }
 
+    void deserialize_and_merge_with_keys_from_column(AggregateDataPtr* places, 
const size_t offset,
+                                                     const IColumn& column, 
Arena* arena,
+                                                     size_t num_rows) const 
override {
+        auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
+        DCHECK(col.size() >= num_rows) << "source column's size should greater 
than num_rows";
+        auto* src_data = reinterpret_cast<const Data*>(col.get_data().data());
+        for (size_t i = 0; i != num_rows; ++i) {
+            auto& data = this->data(places[i] + offset);
+            data.sum += src_data[i].sum;
+            data.count += src_data[i].count;
+        }
+    }
+
     void serialize_to_column(const std::vector<AggregateDataPtr>& places, 
size_t offset,
                              MutableColumnPtr& dst, const size_t num_rows) 
const override {
         auto& col = assert_cast<ColumnFixedLengthObject&>(*dst);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_count.h 
b/be/src/vec/aggregate_functions/aggregate_function_count.h
index 8ab6b53e56..3b8e716499 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_count.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_count.h
@@ -81,6 +81,15 @@ public:
         }
     }
 
+    void deserialize_and_merge_with_keys_from_column(AggregateDataPtr* places, 
const size_t offset,
+                                                     const IColumn& column, 
Arena* arena,
+                                                     size_t num_rows) const 
override {
+        auto data = assert_cast<const ColumnUInt64&>(column).get_data().data();
+        for (size_t i = 0; i != num_rows; ++i) {
+            reinterpret_cast<Data*>(places[i] + offset)->count += data[i];
+        }
+    }
+
     void serialize_to_column(const std::vector<AggregateDataPtr>& places, 
size_t offset,
                              MutableColumnPtr& dst, const size_t num_rows) 
const override {
         auto& col = assert_cast<ColumnUInt64&>(*dst);
@@ -175,6 +184,15 @@ public:
         }
     }
 
+    void deserialize_and_merge_with_keys_from_column(AggregateDataPtr* places, 
const size_t offset,
+                                                     const IColumn& column, 
Arena* arena,
+                                                     size_t num_rows) const 
override {
+        auto data = assert_cast<const ColumnUInt64&>(column).get_data().data();
+        for (size_t i = 0; i != num_rows; ++i) {
+            reinterpret_cast<Data*>(places[i] + offset)->count += data[i];
+        }
+    }
+
     void serialize_to_column(const std::vector<AggregateDataPtr>& places, 
size_t offset,
                              MutableColumnPtr& dst, const size_t num_rows) 
const override {
         auto& col = assert_cast<ColumnUInt64&>(*dst);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max.h 
b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
index 37e812bf38..5abdeab288 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_min_max.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
@@ -487,6 +487,21 @@ public:
         }
     }
 
+    void deserialize_and_merge_with_keys_from_column(AggregateDataPtr* places, 
const size_t offset,
+                                                     const IColumn& column, 
Arena* arena,
+                                                     size_t num_rows) const 
override {
+        if constexpr (Data::IsFixedLength) {
+            const auto& col = static_cast<const 
ColumnFixedLengthObject&>(column);
+            auto* column_data = reinterpret_cast<const 
Data*>(col.get_data().data());
+            for (size_t i = 0; i != num_rows; ++i) {
+                *reinterpret_cast<Data*>(places[i] + offset) = column_data[i];
+            }
+        } else {
+            Base::deserialize_and_merge_with_keys_from_column(places, offset, 
column, arena,
+                                                              num_rows);
+        }
+    }
+
     void serialize_to_column(const std::vector<AggregateDataPtr>& places, 
size_t offset,
                              MutableColumnPtr& dst, const size_t num_rows) 
const override {
         if constexpr (Data::IsFixedLength) {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sum.h 
b/be/src/vec/aggregate_functions/aggregate_function_sum.h
index 46ca85f3a9..b2f053f8a5 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sum.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sum.h
@@ -110,6 +110,16 @@ public:
         }
     }
 
+    void deserialize_and_merge_with_keys_from_column(AggregateDataPtr* places, 
const size_t offset,
+                                                     const IColumn& column, 
Arena* arena,
+                                                     size_t num_rows) const 
override {
+        auto data = assert_cast<const ColVecResult&>(column).get_data().data();
+        for (size_t i = 0; i != num_rows; ++i) {
+            auto* dst_data = reinterpret_cast<Data*>(places[i] + offset);
+            dst_data->sum += data[i];
+        }
+    }
+
     void serialize_to_column(const std::vector<AggregateDataPtr>& places, 
size_t offset,
                              MutableColumnPtr& dst, const size_t num_rows) 
const override {
         auto& col = assert_cast<ColVecResult&>(*dst);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_uniq.h 
b/be/src/vec/aggregate_functions/aggregate_function_uniq.h
index cb75117794..90caeb5d68 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_uniq.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_uniq.h
@@ -21,6 +21,7 @@
 #pragma once
 
 #include <parallel_hashmap/phmap.h>
+#include <parallel_hashmap/phmap_dump.h>
 
 #include <type_traits>
 
@@ -39,6 +40,46 @@ namespace doris::vectorized {
 // Here is an empirical value.
 static constexpr size_t HASH_MAP_PREFETCH_DIST = 16;
 
+template <typename WritableBuffer>
+class BinaryOutputInMemory {
+public:
+    BinaryOutputInMemory(WritableBuffer& buffer) : _buffer(buffer) {}
+
+    bool dump(const char* p, size_t sz) {
+        _buffer.write(p, sz);
+        return true;
+    }
+
+    template <typename V>
+    bool dump(const V& v) {
+        write_pod_binary(v, _buffer);
+        return true;
+    }
+
+private:
+    WritableBuffer& _buffer;
+};
+
+template <typename ReadableBuffer>
+class BinaryInputInMemory {
+public:
+    BinaryInputInMemory(ReadableBuffer& buffer) : _buffer(buffer) {}
+
+    bool load(char* p, size_t sz) {
+        _buffer.read(p, sz);
+        return true;
+    }
+
+    template <typename V>
+    bool load(V* v) {
+        read_pod_binary(v, _buffer);
+        return true;
+    }
+
+private:
+    ReadableBuffer& _buffer;
+};
+
 /// uniqExact
 
 template <typename T>
@@ -168,24 +209,30 @@ public:
 
     void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
         auto& set = this->data(place).set;
-        write_var_uint(set.size(), buf);
-        for (const auto& elem : set) {
-            write_pod_binary(elem, buf);
-        }
+        BinaryOutputInMemory output(buf);
+        set.dump(output);
     }
 
     void deserialize_and_merge(AggregateDataPtr __restrict place, 
BufferReadable& buf,
                                Arena* arena) const override {
         auto& set = this->data(place).set;
-        size_t size;
-        read_var_uint(size, buf);
-
-        set.rehash(size + set.size());
+        BinaryInputInMemory input(buf);
+        if (set.size() == 0) {
+            set.load(input);
+        } else {
+            Data src;
+            src.set.load(input);
+            set.merge(src.set);
+        }
+    }
 
-        for (size_t i = 0; i < size; ++i) {
-            KeyType ref;
-            read_pod_binary(ref, buf);
-            set.insert(ref);
+    void deserialize_and_merge_with_keys_from_column(AggregateDataPtr* places, 
const size_t offset,
+                                                     const IColumn& column, 
Arena* arena,
+                                                     size_t num_rows) const 
override {
+        const auto& col = static_cast<const ColumnString&>(column);
+        for (size_t i = 0; i != num_rows; ++i) {
+            VectorBufferReader buffer_reader(col.get_data_at(i));
+            deserialize_and_merge(places[i] + offset, buffer_reader, arena);
         }
     }
 
diff --git a/be/src/vec/common/sort/sorter.cpp 
b/be/src/vec/common/sort/sorter.cpp
index 4df6a6abc7..c766f13f74 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -93,9 +93,11 @@ Status Sorter::partial_sort(Block& src_block, Block& 
dest_block) {
     for (int i = 0; i < _sort_description.size(); i++) {
         const auto& ordering_expr = 
_vsort_exec_exprs.lhs_ordering_expr_ctxs()[i];
         if (materialized) {
-            RETURN_IF_ERROR(ordering_expr->execute(&src_block, 
&_sort_description[i].column_number));
+            RETURN_IF_ERROR(
+                    ordering_expr->execute(&src_block, 
&_sort_description[i].column_number));
         } else {
-            RETURN_IF_ERROR(ordering_expr->execute(&dest_block, 
&_sort_description[i].column_number));
+            RETURN_IF_ERROR(
+                    ordering_expr->execute(&dest_block, 
&_sort_description[i].column_number));
         }
 
         _sort_description[i].direction = _is_asc_order[i] ? 1 : -1;
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index 7b41b570c3..0952ccc15a 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -978,28 +978,34 @@ private:
                         column = 
((ColumnNullable*)column.get())->get_nested_column_ptr();
                     }
 
-                    size_t buffer_size =
-                            
_aggregate_evaluators[i]->function()->size_of_data() * rows;
-                    if (_deserialize_buffer.size() < buffer_size) {
-                        _deserialize_buffer.resize(buffer_size);
-                    }
-
                     if (_use_fixed_length_serialization_opt) {
                         SCOPED_TIMER(_deserialize_data_timer);
-                        
_aggregate_evaluators[i]->function()->deserialize_from_column(
-                                _deserialize_buffer.data(), *column, 
&_agg_arena_pool, rows);
+                        // 
_aggregate_evaluators[i]->function()->deserialize_from_column(
+                        //         _deserialize_buffer.data(), *column, 
&_agg_arena_pool, rows);
+
+                        _aggregate_evaluators[i]
+                                ->function()
+                                ->deserialize_and_merge_with_keys_from_column(
+                                        _places.data(), 
_offsets_of_aggregate_states[i], *column,
+                                        &_agg_arena_pool, rows);
                     } else {
                         SCOPED_TIMER(_deserialize_data_timer);
+
+                        size_t buffer_size =
+                                
_aggregate_evaluators[i]->function()->size_of_data() * rows;
+                        if (_deserialize_buffer.size() < buffer_size) {
+                            _deserialize_buffer.resize(buffer_size);
+                        }
                         _aggregate_evaluators[i]->function()->deserialize_vec(
                                 _deserialize_buffer.data(), 
(ColumnString*)(column.get()),
                                 &_agg_arena_pool, rows);
-                    }
-                    _aggregate_evaluators[i]->function()->merge_vec(
-                            _places.data(), _offsets_of_aggregate_states[i],
-                            _deserialize_buffer.data(), &_agg_arena_pool, 
rows);
+                        _aggregate_evaluators[i]->function()->merge_vec(
+                                _places.data(), 
_offsets_of_aggregate_states[i],
+                                _deserialize_buffer.data(), &_agg_arena_pool, 
rows);
 
-                    
_aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(),
-                                                                      rows);
+                        _aggregate_evaluators[i]->function()->destroy_vec(
+                                _deserialize_buffer.data(), rows);
+                    }
 
                 } else {
                     _aggregate_evaluators[i]->execute_batch_add(block,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to