This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b7c9007776 [improvement][agg]Process aggregated results in the 
vectorized way (#11084)
b7c9007776 is described below

commit b7c9007776cf9b8f9db343d41b164cc6760f0975
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Fri Jul 22 22:04:43 2022 +0800

    [improvement][agg]Process aggregated results in the vectorized way (#11084)
---
 .../vec/aggregate_functions/aggregate_function.h   | 48 +++++++++++++
 be/src/vec/columns/column.h                        | 21 ++++++
 be/src/vec/columns/column_decimal.cpp              | 22 ++++++
 be/src/vec/columns/column_decimal.h                | 12 ++++
 be/src/vec/columns/column_nullable.cpp             | 29 ++++++++
 be/src/vec/columns/column_nullable.h               | 10 +++
 be/src/vec/columns/column_string.cpp               | 21 ++++++
 be/src/vec/columns/column_string.h                 | 25 +++++++
 be/src/vec/columns/column_vector.cpp               | 22 ++++++
 be/src/vec/columns/column_vector.h                 | 13 ++++
 be/src/vec/exec/vaggregation_node.cpp              | 81 ++++++++++++----------
 be/src/vec/exec/vaggregation_node.h                | 39 +++++++++++
 be/src/vec/exprs/vectorized_agg_fn.cpp             |  5 ++
 be/src/vec/exprs/vectorized_agg_fn.h               |  3 +
 14 files changed, 316 insertions(+), 35 deletions(-)

diff --git a/be/src/vec/aggregate_functions/aggregate_function.h 
b/be/src/vec/aggregate_functions/aggregate_function.h
index eca81b3199..677c189002 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -91,19 +91,32 @@ public:
     virtual void merge(AggregateDataPtr __restrict place, 
ConstAggregateDataPtr rhs,
                        Arena* arena) const = 0;
 
+    virtual void merge_vec(const AggregateDataPtr* places, size_t offset, 
ConstAggregateDataPtr rhs,
+                           Arena* arena, const size_t num_rows) const = 0;
+
     /// Serializes state (to transmit it over the network, for example).
     virtual void serialize(ConstAggregateDataPtr __restrict place, 
BufferWritable& buf) const = 0;
 
+    virtual void serialize_vec(const std::vector<AggregateDataPtr>& places, 
size_t offset,
+                               BufferWritable& buf, const size_t num_rows) 
const = 0;
+
     /// Deserializes state. This function is called only for empty (just 
created) states.
     virtual void deserialize(AggregateDataPtr __restrict place, 
BufferReadable& buf,
                              Arena* arena) const = 0;
 
+    virtual void deserialize_vec(AggregateDataPtr places, ColumnString* 
column, Arena* arena,
+                                 size_t num_rows) const = 0;
+
     /// Returns true if a function requires Arena to handle own states (see 
add(), merge(), deserialize()).
     virtual bool allocates_memory_in_arena() const { return false; }
 
     /// Inserts results into a column.
     virtual void insert_result_into(ConstAggregateDataPtr __restrict place, 
IColumn& to) const = 0;
 
+    virtual void insert_result_into_vec(const std::vector<AggregateDataPtr>& 
places,
+                                        const size_t offset, IColumn& to,
+                                        const size_t num_rows) const = 0;
+
     /** Returns true for aggregate functions of type -State.
       * They are executed as other aggregate functions, but not finalized 
(return an aggregation state that can be combined with another).
       */
@@ -176,6 +189,41 @@ public:
             static_cast<const Derived*>(this)->add(place, columns, i, arena);
         }
     }
+
+    void insert_result_into_vec(const std::vector<AggregateDataPtr>& places, 
const size_t offset,
+                                IColumn& to, const size_t num_rows) const 
override {
+        for (size_t i = 0; i != num_rows; ++i) {
+            static_cast<const Derived*>(this)->insert_result_into(places[i] + 
offset, to);
+        }
+    }
+
+    void serialize_vec(const std::vector<AggregateDataPtr>& places, size_t 
offset,
+                       BufferWritable& buf, const size_t num_rows) const 
override {
+        for (size_t i = 0; i != num_rows; ++i) {
+            static_cast<const Derived*>(this)->serialize(places[i] + offset, 
buf);
+            buf.commit();
+        }
+    }
+
+    void deserialize_vec(AggregateDataPtr places, ColumnString* column, Arena* 
arena,
+                         size_t num_rows) const override {
+        const auto size_of_data = static_cast<const 
Derived*>(this)->size_of_data();
+        for (size_t i = 0; i != num_rows; ++i) {
+            auto place = places + size_of_data * i;
+            VectorBufferReader buffer_reader(column->get_data_at(i));
+            static_cast<const Derived*>(this)->create(place);
+            static_cast<const Derived*>(this)->deserialize(place, 
buffer_reader, arena);
+        }
+    }
+
+    void merge_vec(const AggregateDataPtr* places, size_t offset, 
ConstAggregateDataPtr rhs,
+                   Arena* arena, const size_t num_rows) const override {
+        const auto size_of_data = static_cast<const 
Derived*>(this)->size_of_data();
+        for (size_t i = 0; i != num_rows; ++i) {
+            static_cast<const Derived*>(this)->merge(places[i] + offset, rhs + 
size_of_data * i,
+                                                     arena);
+        }
+    }
 };
 
 /// Implements several methods for manipulation with data. T - type of 
structure with data for aggregation.
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index b6c8f3e67c..006389ded2 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -204,6 +204,16 @@ public:
         LOG(FATAL) << "Method insert_many_binary_data is not supported for " 
<< get_name();
     }
 
+    virtual void insert_many_strings(const StringRef* strings, size_t num) {
+        LOG(FATAL) << "Method insert_many_binary_data is not supported for " 
<< get_name();
+    }
+
+    // Here `pos` points to the memory data type is the same as the data type 
of the column.
+    // This function is used by `insert_keys_into_columns` in AggregationNode.
+    virtual void insert_many_raw_data(const char* pos, size_t num) {
+        LOG(FATAL) << "Method insert_many_raw_data is not supported for " << 
get_name();
+    }
+
     void insert_many_data(const char* pos, size_t length, size_t data_num) {
         for (size_t i = 0; i < data_num; ++i) {
             insert_data(pos, length);
@@ -263,6 +273,17 @@ public:
         LOG(FATAL) << "serialize_vec_with_null_map not supported";
     }
 
+    // This function deserializes group-by keys into column in the vectorized 
way.
+    virtual void deserialize_vec(std::vector<StringRef>& keys, const size_t 
num_rows) {
+        LOG(FATAL) << "deserialize_vec not supported";
+    }
+
+    // Used in ColumnNullable::deserialize_vec
+    virtual void deserialize_vec_with_null_map(std::vector<StringRef>& keys, 
const size_t num_rows,
+                                               const uint8_t* null_map) {
+        LOG(FATAL) << "deserialize_vec_with_null_map not supported";
+    }
+
     /// Update state of hash function with value of n-th element.
     /// On subsequent calls of this method for sequence of column values of 
arbitrary types,
     ///  passed bytes to hash must identify sequence of values unambiguously.
diff --git a/be/src/vec/columns/column_decimal.cpp 
b/be/src/vec/columns/column_decimal.cpp
index 315c1103c0..1b3c58a778 100644
--- a/be/src/vec/columns/column_decimal.cpp
+++ b/be/src/vec/columns/column_decimal.cpp
@@ -85,6 +85,28 @@ void 
ColumnDecimal<T>::serialize_vec_with_null_map(std::vector<StringRef>& keys,
     }
 }
 
+template <typename T>
+void ColumnDecimal<T>::deserialize_vec(std::vector<StringRef>& keys, const 
size_t num_rows) {
+    for (size_t i = 0; i < num_rows; ++i) {
+        keys[i].data = deserialize_and_insert_from_arena(keys[i].data);
+        keys[i].size -= sizeof(T);
+    }
+}
+
+template <typename T>
+void ColumnDecimal<T>::deserialize_vec_with_null_map(std::vector<StringRef>& 
keys,
+                                                     const size_t num_rows,
+                                                     const uint8_t* null_map) {
+    for (size_t i = 0; i < num_rows; ++i) {
+        if (null_map[i] == 0) {
+            keys[i].data = deserialize_and_insert_from_arena(keys[i].data);
+            keys[i].size -= sizeof(T);
+        } else {
+            insert_default();
+        }
+    }
+}
+
 template <typename T>
 UInt64 ColumnDecimal<T>::get64(size_t n) const {
     if constexpr (sizeof(T) > sizeof(UInt64)) {
diff --git a/be/src/vec/columns/column_decimal.h 
b/be/src/vec/columns/column_decimal.h
index d28e7ae469..e318271b13 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -114,6 +114,13 @@ public:
     }
 
     void insert_many_fix_len_data(const char* data_ptr, size_t num) override;
+
+    void insert_many_raw_data(const char* pos, size_t num) override {
+        size_t old_size = data.size();
+        data.resize(old_size + num);
+        memcpy(data.data() + old_size, pos, num * sizeof(T));
+    }
+
     void insert_data(const char* pos, size_t /*length*/) override;
     void insert_default() override { data.push_back(T()); }
     void insert(const Field& x) override {
@@ -141,6 +148,11 @@ public:
                                              const uint8_t* null_map,
                                              size_t max_row_byte_size) const 
override;
 
+    void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) 
override;
+
+    void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const 
size_t num_rows,
+                                       const uint8_t* null_map) override;
+
     void update_hash_with_value(size_t n, SipHash& hash) const override;
     int compare_at(size_t n, size_t m, const IColumn& rhs_, int 
nan_direction_hint) const override;
     void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
diff --git a/be/src/vec/columns/column_nullable.cpp 
b/be/src/vec/columns/column_nullable.cpp
index 982ddf9fdd..b8058a8d97 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -96,6 +96,20 @@ void ColumnNullable::insert_data(const char* pos, size_t 
length) {
     }
 }
 
+void ColumnNullable::insert_many_strings(const StringRef* strings, size_t num) 
{
+    auto& nested_column = get_nested_column();
+    auto& null_map_data = get_null_map_data();
+    for (size_t i = 0; i != num; ++i) {
+        if (strings[i].data == nullptr) {
+            nested_column.insert_default();
+            null_map_data.push_back(1);
+        } else {
+            nested_column.insert_data(strings[i].data, strings[i].size);
+            null_map_data.push_back(0);
+        }
+    }
+}
+
 StringRef ColumnNullable::serialize_value_into_arena(size_t n, Arena& arena,
                                                      char const*& begin) const 
{
     const auto& arr = get_null_map_data();
@@ -152,6 +166,21 @@ void ColumnNullable::serialize_vec(std::vector<StringRef>& 
keys, size_t num_rows
     get_nested_column().serialize_vec_with_null_map(keys, num_rows, 
arr.data(), max_row_byte_size);
 }
 
+void ColumnNullable::deserialize_vec(std::vector<StringRef>& keys, const 
size_t num_rows) {
+    auto& arr = get_null_map_data();
+    const size_t old_size = arr.size();
+    arr.resize(old_size + num_rows);
+
+    auto* null_map_data = &arr[old_size];
+    for (size_t i = 0; i != num_rows; ++i) {
+        UInt8 val = *reinterpret_cast<const UInt8*>(keys[i].data);
+        null_map_data[i] = val;
+        keys[i].data += sizeof(val);
+        keys[i].size -= sizeof(val);
+    }
+    get_nested_column().deserialize_vec_with_null_map(keys, num_rows, 
arr.data());
+}
+
 void ColumnNullable::insert_range_from(const IColumn& src, size_t start, 
size_t length) {
     const ColumnNullable& nullable_col = assert_cast<const 
ColumnNullable&>(src);
     get_null_map_column().insert_range_from(*nullable_col.null_map, start, 
length);
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index 1fa9a50e10..06e3c9fa8a 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -87,11 +87,16 @@ public:
     /// JOIN_NULL_HINT in null map means null is generated by join, only use 
in tuple is null
     void insert_join_null_data();
 
+    void insert_many_strings(const StringRef* strings, size_t num) override;
+
     StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& 
begin) const override;
     const char* deserialize_and_insert_from_arena(const char* pos) override;
     size_t get_max_row_byte_size() const override;
     void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
                        size_t max_row_byte_size) const override;
+
+    void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) 
override;
+
     void insert_range_from(const IColumn& src, size_t start, size_t length) 
override;
     void insert_indices_from(const IColumn& src, const int* indices_begin,
                              const int* indices_end) override;
@@ -107,6 +112,11 @@ public:
         get_nested_column().insert_many_fix_len_data(pos, num);
     }
 
+    void insert_many_raw_data(const char* pos, size_t num) override {
+        get_null_map_column().fill(0, num);
+        get_nested_column().insert_many_raw_data(pos, num);
+    }
+
     void insert_many_dict_data(const int32_t* data_array, size_t start_index, 
const StringRef* dict,
                                size_t data_num, uint32_t dict_num) override {
         get_null_map_column().fill(0, data_num);
diff --git a/be/src/vec/columns/column_string.cpp 
b/be/src/vec/columns/column_string.cpp
index e74ec544dc..3adf082ae0 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -228,6 +228,27 @@ void 
ColumnString::serialize_vec_with_null_map(std::vector<StringRef>& keys, siz
     }
 }
 
+void ColumnString::deserialize_vec(std::vector<StringRef>& keys, const size_t 
num_rows) {
+    for (size_t i = 0; i != num_rows; ++i) {
+        auto original_ptr = keys[i].data;
+        keys[i].data = deserialize_and_insert_from_arena(original_ptr);
+        keys[i].size -= keys[i].data - original_ptr;
+    }
+}
+
+void ColumnString::deserialize_vec_with_null_map(std::vector<StringRef>& keys,
+                                                 const size_t num_rows, const 
uint8_t* null_map) {
+    for (size_t i = 0; i != num_rows; ++i) {
+        if (null_map[i] == 0) {
+            auto original_ptr = keys[i].data;
+            keys[i].data = deserialize_and_insert_from_arena(original_ptr);
+            keys[i].size -= keys[i].data - original_ptr;
+        } else {
+            insert_default();
+        }
+    }
+}
+
 template <typename Type>
 ColumnPtr ColumnString::index_impl(const PaddedPODArray<Type>& indexes, size_t 
limit) const {
     if (limit == 0) return ColumnString::create();
diff --git a/be/src/vec/columns/column_string.h 
b/be/src/vec/columns/column_string.h
index 6d7b55da2d..bbe6993ae2 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -180,6 +180,26 @@ public:
         }
     };
 
+    void insert_many_strings(const StringRef* strings, size_t num) override {
+        size_t new_size = 0;
+        for (size_t i = 0; i < num; i++) {
+            new_size += strings[i].size + 1;
+        }
+
+        const size_t old_size = chars.size();
+        chars.resize(old_size + new_size);
+
+        Char* data = chars.data();
+        size_t offset = old_size;
+        for (size_t i = 0; i < num; i++) {
+            uint32_t len = strings[i].size;
+            if (len) memcpy(data + offset, strings[i].data, len);
+            data[offset + len] = 0;
+            offset += len + 1;
+            offsets.push_back(offset);
+        }
+    }
+
     void insert_many_dict_data(const int32_t* data_array, size_t start_index, 
const StringRef* dict,
                                size_t num, uint32_t /*dict_num*/) override {
         for (size_t end_index = start_index + num; start_index < end_index; 
++start_index) {
@@ -208,6 +228,8 @@ public:
 
     const char* deserialize_and_insert_from_arena(const char* pos) override;
 
+    void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) 
override;
+
     size_t get_max_row_byte_size() const override;
 
     void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
@@ -217,6 +239,9 @@ public:
                                      const uint8_t* null_map,
                                      size_t max_row_byte_size) const override;
 
+    void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const 
size_t num_rows,
+                                       const uint8_t* null_map) override;
+
     void update_hash_with_value(size_t n, SipHash& hash) const override {
         size_t string_size = size_at(n);
         size_t offset = offset_at(n);
diff --git a/be/src/vec/columns/column_vector.cpp 
b/be/src/vec/columns/column_vector.cpp
index 3a198fef20..c740a515a0 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -78,6 +78,28 @@ void 
ColumnVector<T>::serialize_vec_with_null_map(std::vector<StringRef>& keys,
     }
 }
 
+template <typename T>
+void ColumnVector<T>::deserialize_vec(std::vector<StringRef>& keys, const 
size_t num_rows) {
+    for (size_t i = 0; i != num_rows; ++i) {
+        keys[i].data = deserialize_and_insert_from_arena(keys[i].data);
+        keys[i].size -= sizeof(T);
+    }
+}
+
+template <typename T>
+void ColumnVector<T>::deserialize_vec_with_null_map(std::vector<StringRef>& 
keys,
+                                                    const size_t num_rows,
+                                                    const uint8_t* null_map) {
+    for (size_t i = 0; i < num_rows; ++i) {
+        if (null_map[i] == 0) {
+            keys[i].data = deserialize_and_insert_from_arena(keys[i].data);
+            keys[i].size -= sizeof(T);
+        } else {
+            insert_default();
+        }
+    }
+}
+
 template <typename T>
 void ColumnVector<T>::update_hash_with_value(size_t n, SipHash& hash) const {
     hash.update(data[n]);
diff --git a/be/src/vec/columns/column_vector.h 
b/be/src/vec/columns/column_vector.h
index 544cf8bc4a..c673f0b7bd 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -208,6 +208,14 @@ public:
         }
     }
 
+    void insert_many_raw_data(const char* pos, size_t num) override {
+        if constexpr (std::is_same_v<T, vectorized::Int128>) {
+            insert_many_in_copy_way(pos, num);
+        } else {
+            insert_many_default_type(pos, num);
+        }
+    }
+
     void insert_default() override { data.push_back(T()); }
 
     void insert_many_defaults(size_t length) override {
@@ -222,6 +230,11 @@ public:
 
     const char* deserialize_and_insert_from_arena(const char* pos) override;
 
+    void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) 
override;
+
+    void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const 
size_t num_rows,
+                                       const uint8_t* null_map) override;
+
     size_t get_max_row_byte_size() const override;
 
     void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index c7539996c6..c5f3d7ddf9 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -959,16 +959,27 @@ Status 
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
                 auto& data = agg_method.data;
                 auto& iter = agg_method.iterator;
                 agg_method.init_once();
-                while (iter != data.end() && key_columns[0]->size() < 
state->batch_size()) {
-                    const auto& key = iter->get_first();
-                    auto& mapped = iter->get_second();
-                    agg_method.insert_key_into_columns(key, key_columns, 
_probe_key_sz);
-                    for (size_t i = 0; i < _aggregate_evaluators.size(); ++i)
-                        _aggregate_evaluators[i]->insert_result_info(
-                                mapped + _offsets_of_aggregate_states[i], 
value_columns[i].get());
-
+                const auto size = std::min(data.size(), 
size_t(state->batch_size()));
+                using KeyType = std::decay_t<decltype(iter->get_first())>;
+                std::vector<KeyType> keys(size);
+                std::vector<AggregateDataPtr> values(size);
+
+                size_t num_rows = 0;
+                while (iter != data.end() && num_rows < state->batch_size()) {
+                    keys[num_rows] = iter->get_first();
+                    values[num_rows] = iter->get_second();
                     ++iter;
+                    ++num_rows;
+                }
+
+                agg_method.insert_keys_into_columns(keys, key_columns, 
num_rows, _probe_key_sz);
+
+                for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+                    _aggregate_evaluators[i]->insert_result_info_vec(
+                            values, _offsets_of_aggregate_states[i], 
value_columns[i].get(),
+                            num_rows);
                 }
+
                 if (iter == data.end()) {
                     if (agg_method.data.has_null_key_data()) {
                         // only one key of group by support wrap null key
@@ -1043,19 +1054,26 @@ Status 
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
                 agg_method.init_once();
                 auto& data = agg_method.data;
                 auto& iter = agg_method.iterator;
-                while (iter != data.end() && key_columns[0]->size() < 
state->batch_size()) {
-                    const auto& key = iter->get_first();
-                    auto& mapped = iter->get_second();
-                    // insert keys
-                    agg_method.insert_key_into_columns(key, key_columns, 
_probe_key_sz);
-
-                    // serialize values
-                    for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
-                        _aggregate_evaluators[i]->function()->serialize(
-                                mapped + _offsets_of_aggregate_states[i], 
value_buffer_writers[i]);
-                        value_buffer_writers[i].commit();
-                    }
+
+                const auto size = std::min(data.size(), 
size_t(state->batch_size()));
+                using KeyType = std::decay_t<decltype(iter->get_first())>;
+                std::vector<KeyType> keys(size);
+                std::vector<AggregateDataPtr> values(size);
+
+                size_t num_rows = 0;
+                while (iter != data.end() && num_rows < state->batch_size()) {
+                    keys[num_rows] = iter->get_first();
+                    values[num_rows] = iter->get_second();
                     ++iter;
+                    ++num_rows;
+                }
+
+                agg_method.insert_keys_into_columns(keys, key_columns, 
num_rows, _probe_key_sz);
+
+                for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+                    _aggregate_evaluators[i]->function()->serialize_vec(
+                            values, _offsets_of_aggregate_states[i], 
value_buffer_writers[i],
+                            num_rows);
                 }
 
                 if (iter == data.end()) {
@@ -1166,8 +1184,6 @@ Status AggregationNode::_merge_with_serialized_key(Block* 
block) {
             },
             _agg_data._aggregated_method_variant);
 
-    std::unique_ptr<char[]> deserialize_buffer(new 
char[_total_size_of_aggregate_states]);
-
     for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
         DCHECK(_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 &&
                
_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref());
@@ -1179,21 +1195,16 @@ Status 
AggregationNode::_merge_with_serialized_key(Block* block) {
                 column = 
((ColumnNullable*)column.get())->get_nested_column_ptr();
             }
 
-            for (int j = 0; j < rows; ++j) {
-                VectorBufferReader 
buffer_reader(((ColumnString*)(column.get()))->get_data_at(j));
-                _create_agg_status(deserialize_buffer.get());
-
-                _aggregate_evaluators[i]->function()->deserialize(
-                        deserialize_buffer.get() + 
_offsets_of_aggregate_states[i], buffer_reader,
-                        &_agg_arena_pool);
+            std::unique_ptr<char> deserialize_buffer(
+                    new 
char[_aggregate_evaluators[i]->function()->size_of_data() * rows]);
 
-                _aggregate_evaluators[i]->function()->merge(
-                        places.data()[j] + _offsets_of_aggregate_states[i],
-                        deserialize_buffer.get() + 
_offsets_of_aggregate_states[i],
-                        &_agg_arena_pool);
+            
_aggregate_evaluators[i]->function()->deserialize_vec(deserialize_buffer.get(),
+                                                                  
(ColumnString*)(column.get()),
+                                                                  
&_agg_arena_pool, rows);
+            _aggregate_evaluators[i]->function()->merge_vec(
+                    places.data(), _offsets_of_aggregate_states[i], 
deserialize_buffer.get(),
+                    &_agg_arena_pool, rows);
 
-                _destroy_agg_status(deserialize_buffer.get());
-            }
         } else {
             _aggregate_evaluators[i]->execute_batch_add(block, 
_offsets_of_aggregate_states[i],
                                                         places.data(), 
&_agg_arena_pool);
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index 08834e4611..fd6b67f255 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -92,6 +92,11 @@ struct AggregationMethodSerialized {
         for (auto& column : key_columns) pos = 
column->deserialize_and_insert_from_arena(pos);
     }
 
+    static void insert_keys_into_columns(std::vector<StringRef>& keys, 
MutableColumns& key_columns,
+                                         const size_t num_rows, const Sizes&) {
+        for (auto& column : key_columns) column->deserialize_vec(keys, 
num_rows);
+    }
+
     void init_once() {
         if (!inited) {
             inited = true;
@@ -136,6 +141,12 @@ struct AggregationMethodStringNoCache {
         key_columns[0]->insert_data(key.data, key.size);
     }
 
+    static void insert_keys_into_columns(std::vector<StringRef>& keys, 
MutableColumns& key_columns,
+                                         const size_t num_rows, const Sizes&) {
+        key_columns[0]->reserve(num_rows);
+        key_columns[0]->insert_many_strings(keys.data(), num_rows);
+    }
+
     void init_once() {
         if (!inited) {
             inited = true;
@@ -174,6 +185,16 @@ struct AggregationMethodOneNumber {
         column->insert_raw_data<sizeof(FieldType)>(key_holder);
     }
 
+    static void insert_keys_into_columns(std::vector<Key>& keys, 
MutableColumns& key_columns,
+                                         const size_t num_rows, const Sizes&) {
+        key_columns[0]->reserve(num_rows);
+        auto* column = static_cast<ColumnVectorHelper*>(key_columns[0].get());
+        for (size_t i = 0; i != num_rows; ++i) {
+            const auto* key_holder = reinterpret_cast<const char*>(&keys[i]);
+            column->insert_raw_data<sizeof(FieldType)>(key_holder);
+        }
+    }
+
     void init_once() {
         if (!inited) {
             inited = true;
@@ -275,6 +296,13 @@ struct AggregationMethodKeysFixed {
         }
     }
 
+    static void insert_keys_into_columns(std::vector<Key>& keys, 
MutableColumns& key_columns,
+                                         const size_t num_rows, const Sizes& 
key_sizes) {
+        for (size_t i = 0; i != num_rows; ++i) {
+            insert_key_into_columns(keys[i], key_columns, key_sizes);
+        }
+    }
+
     void init_once() {
         if (!inited) {
             inited = true;
@@ -312,6 +340,17 @@ struct AggregationMethodSingleNullableColumn : public 
SingleColumnMethod {
             col->insert_data(reinterpret_cast<const char*>(&key), sizeof(key));
         }
     }
+
+    static void insert_keys_into_columns(std::vector<Key>& keys, 
MutableColumns& key_columns,
+                                         const size_t num_rows, const Sizes&) {
+        auto col = key_columns[0].get();
+        col->reserve(num_rows);
+        if constexpr (std::is_same_v<Key, StringRef>) {
+            col->insert_many_strings(keys.data(), num_rows);
+        } else {
+            col->insert_many_raw_data(reinterpret_cast<char*>(keys.data()), 
num_rows);
+        }
+    }
 };
 
 using AggregatedDataWithUInt8Key =
diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp 
b/be/src/vec/exprs/vectorized_agg_fn.cpp
index 6882788e2c..427b90ec2f 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.cpp
+++ b/be/src/vec/exprs/vectorized_agg_fn.cpp
@@ -136,6 +136,11 @@ void AggFnEvaluator::insert_result_info(AggregateDataPtr 
place, IColumn* column)
     _function->insert_result_into(place, *column);
 }
 
+void AggFnEvaluator::insert_result_info_vec(const 
std::vector<AggregateDataPtr>& places,
+                                            size_t offset, IColumn* column, 
const size_t num_rows) {
+    _function->insert_result_into_vec(places, offset, *column, num_rows);
+}
+
 void AggFnEvaluator::reset(AggregateDataPtr place) {
     _function->reset(place);
 }
diff --git a/be/src/vec/exprs/vectorized_agg_fn.h 
b/be/src/vec/exprs/vectorized_agg_fn.h
index 8a4f777355..a541257487 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.h
+++ b/be/src/vec/exprs/vectorized_agg_fn.h
@@ -58,6 +58,9 @@ public:
 
     void insert_result_info(AggregateDataPtr place, IColumn* column);
 
+    void insert_result_info_vec(const std::vector<AggregateDataPtr>& place, 
size_t offset,
+                                IColumn* column, const size_t num_rows);
+
     void reset(AggregateDataPtr place);
 
     DataTypePtr& data_type() { return _data_type; }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to