This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e293fbd277 [improvement]pre-serialize aggregation keys (#10700)
e293fbd277 is described below

commit e293fbd277bb84813b713e4980a375de0f80e0ac
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Sat Jul 9 06:21:56 2022 +0800

    [improvement]pre-serialize aggregation keys (#10700)
---
 be/src/vec/columns/column.h            | 17 ++++++++++++
 be/src/vec/columns/column_const.h      | 13 +++++++++
 be/src/vec/columns/column_nullable.cpp | 18 +++++++++++++
 be/src/vec/columns/column_nullable.h   |  3 +++
 be/src/vec/columns/column_string.cpp   | 39 +++++++++++++++++++++++++++
 be/src/vec/columns/column_string.h     |  9 +++++++
 be/src/vec/columns/column_vector.cpp   | 26 ++++++++++++++++++
 be/src/vec/columns/column_vector.h     |  9 +++++++
 be/src/vec/common/columns_hashing.h    | 33 ++++++++++++++++++-----
 be/src/vec/exec/vaggregation_node.cpp  | 11 ++++++++
 be/src/vec/exec/vaggregation_node.h    | 48 ++++++++++++++++++++++++++++++++--
 11 files changed, 217 insertions(+), 9 deletions(-)

diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index b0c5bee895..a2a015721a 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -246,6 +246,23 @@ public:
     /// Returns pointer to the position after the read data.
     virtual const char* deserialize_and_insert_from_arena(const char* pos) = 0;
 
+    /// Return the size of largest row.
+    /// This is for calculating the memory size for vectorized serialization 
of aggregation keys.
+    virtual size_t get_max_row_byte_size() const {
+        LOG(FATAL) << "get_max_row_byte_size not supported";
+    }
+
+    virtual void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
+                               size_t max_row_byte_size) const {
+        LOG(FATAL) << "serialize_vec not supported";
+    }
+
+    virtual void serialize_vec_with_null_map(std::vector<StringRef>& keys, 
size_t num_rows,
+                                             const uint8_t* null_map,
+                                             size_t max_row_byte_size) const {
+        LOG(FATAL) << "serialize_vec_with_null_map not supported";
+    }
+
     /// Update state of hash function with value of n-th element.
     /// On subsequent calls of this method for sequence of column values of 
arbitrary types,
     ///  passed bytes to hash must identify sequence of values unambiguously.
diff --git a/be/src/vec/columns/column_const.h 
b/be/src/vec/columns/column_const.h
index b557b68f86..b29d91ddf3 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -112,6 +112,19 @@ public:
         return res;
     }
 
+    size_t get_max_row_byte_size() const override { return 
data->get_max_row_byte_size(); }
+
+    void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
+                       size_t max_row_byte_size) const override {
+        data->serialize_vec(keys, num_rows, max_row_byte_size);
+    }
+
+    void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t 
num_rows,
+                                     const uint8_t* null_map,
+                                     size_t max_row_byte_size) const override {
+        data->serialize_vec_with_null_map(keys, num_rows, null_map, 
max_row_byte_size);
+    }
+
     void update_hash_with_value(size_t, SipHash& hash) const override {
         data->update_hash_with_value(0, hash);
     }
diff --git a/be/src/vec/columns/column_nullable.cpp 
b/be/src/vec/columns/column_nullable.cpp
index 215ced2383..e7a972a37c 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -134,6 +134,24 @@ const char* 
ColumnNullable::deserialize_and_insert_from_arena(const char* pos) {
     return pos;
 }
 
+size_t ColumnNullable::get_max_row_byte_size() const {
+    constexpr auto flag_size = sizeof(NullMap::value_type);
+    return flag_size + get_nested_column().get_max_row_byte_size();
+}
+
+void ColumnNullable::serialize_vec(std::vector<StringRef>& keys, size_t 
num_rows,
+                                   size_t max_row_byte_size) const {
+    const auto& arr = get_null_map_data();
+    static constexpr auto s = sizeof(arr[0]);
+    for (size_t i = 0; i < num_rows; ++i) {
+        auto* val = const_cast<char*>(keys[i].data + keys[i].size);
+        *val = (arr[i] ? 1 : 0);
+        keys[i].size += s;
+    }
+
+    get_nested_column().serialize_vec_with_null_map(keys, num_rows, 
arr.data(), max_row_byte_size);
+}
+
 void ColumnNullable::insert_range_from(const IColumn& src, size_t start, 
size_t length) {
     const ColumnNullable& nullable_col = assert_cast<const 
ColumnNullable&>(src);
     get_null_map_column().insert_range_from(*nullable_col.null_map, start, 
length);
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index 1361711d8c..34d87bd0b7 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -89,6 +89,9 @@ public:
 
     StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& 
begin) const override;
     const char* deserialize_and_insert_from_arena(const char* pos) override;
+    size_t get_max_row_byte_size() const override;
+    void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
+                       size_t max_row_byte_size) const override;
     void insert_range_from(const IColumn& src, size_t start, size_t length) 
override;
     void insert_indices_from(const IColumn& src, const int* indices_begin,
                              const int* indices_end) override;
diff --git a/be/src/vec/columns/column_string.cpp 
b/be/src/vec/columns/column_string.cpp
index ad5873d4f1..296924aea2 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -189,6 +189,45 @@ const char* 
ColumnString::deserialize_and_insert_from_arena(const char* pos) {
     return pos + string_size;
 }
 
+size_t ColumnString::get_max_row_byte_size() const {
+    size_t max_size = 0;
+    size_t num_rows = offsets.size();
+    for (size_t i = 0; i < num_rows; ++i) {
+        max_size = std::max(max_size, size_at(i));
+    }
+
+    return max_size + sizeof(size_t);
+}
+
+void ColumnString::serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
+                                 size_t max_row_byte_size) const {
+    for (size_t i = 0; i < num_rows; ++i) {
+        size_t offset = offset_at(i);
+        size_t string_size = size_at(i);
+
+        auto* ptr = const_cast<char*>(keys[i].data + keys[i].size);
+        memcpy(ptr, &string_size, sizeof(string_size));
+        memcpy(ptr + sizeof(string_size), &chars[offset], string_size);
+        keys[i].size += sizeof(string_size) + string_size;
+    }
+}
+
+void ColumnString::serialize_vec_with_null_map(std::vector<StringRef>& keys, 
size_t num_rows,
+                                               const uint8_t* null_map,
+                                               size_t max_row_byte_size) const 
{
+    for (size_t i = 0; i < num_rows; ++i) {
+        if (null_map[i] == 0) {
+            size_t offset = offset_at(i);
+            size_t string_size = size_at(i);
+
+            auto* ptr = const_cast<char*>(keys[i].data + keys[i].size);
+            memcpy(ptr, &string_size, sizeof(string_size));
+            memcpy(ptr + sizeof(string_size), &chars[offset], string_size);
+            keys[i].size += sizeof(string_size) + string_size;
+        }
+    }
+}
+
 template <typename Type>
 ColumnPtr ColumnString::index_impl(const PaddedPODArray<Type>& indexes, size_t 
limit) const {
     if (limit == 0) return ColumnString::create();
diff --git a/be/src/vec/columns/column_string.h 
b/be/src/vec/columns/column_string.h
index 913ccc2312..6d7b55da2d 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -208,6 +208,15 @@ public:
 
     const char* deserialize_and_insert_from_arena(const char* pos) override;
 
+    size_t get_max_row_byte_size() const override;
+
+    void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
+                       size_t max_row_byte_size) const override;
+
+    void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t 
num_rows,
+                                     const uint8_t* null_map,
+                                     size_t max_row_byte_size) const override;
+
     void update_hash_with_value(size_t n, SipHash& hash) const override {
         size_t string_size = size_at(n);
         size_t offset = offset_at(n);
diff --git a/be/src/vec/columns/column_vector.cpp 
b/be/src/vec/columns/column_vector.cpp
index dde2f033a7..3a198fef20 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -52,6 +52,32 @@ const char* 
ColumnVector<T>::deserialize_and_insert_from_arena(const char* pos)
     return pos + sizeof(T);
 }
 
+template <typename T>
+size_t ColumnVector<T>::get_max_row_byte_size() const {
+    return sizeof(T);
+}
+
+template <typename T>
+void ColumnVector<T>::serialize_vec(std::vector<StringRef>& keys, size_t 
num_rows,
+                                    size_t max_row_byte_size) const {
+    for (size_t i = 0; i < num_rows; ++i) {
+        memcpy(const_cast<char*>(keys[i].data + keys[i].size), &data[i], 
sizeof(T));
+        keys[i].size += sizeof(T);
+    }
+}
+
+template <typename T>
+void ColumnVector<T>::serialize_vec_with_null_map(std::vector<StringRef>& 
keys, size_t num_rows,
+                                                  const uint8_t* null_map,
+                                                  size_t max_row_byte_size) 
const {
+    for (size_t i = 0; i < num_rows; ++i) {
+        if (null_map[i] == 0) {
+            memcpy(const_cast<char*>(keys[i].data + keys[i].size), &data[i], 
sizeof(T));
+            keys[i].size += sizeof(T);
+        }
+    }
+}
+
 template <typename T>
 void ColumnVector<T>::update_hash_with_value(size_t n, SipHash& hash) const {
     hash.update(data[n]);
diff --git a/be/src/vec/columns/column_vector.h 
b/be/src/vec/columns/column_vector.h
index 1d42455c76..544cf8bc4a 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -222,6 +222,15 @@ public:
 
     const char* deserialize_and_insert_from_arena(const char* pos) override;
 
+    size_t get_max_row_byte_size() const override;
+
+    void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
+                       size_t max_row_byte_size) const override;
+
+    void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t 
num_rows,
+                                     const uint8_t* null_map,
+                                     size_t max_row_byte_size) const override;
+
     void update_hash_with_value(size_t n, SipHash& hash) const override;
 
     size_t byte_size() const override { return data.size() * sizeof(data[0]); }
diff --git a/be/src/vec/common/columns_hashing.h 
b/be/src/vec/common/columns_hashing.h
index afb6a685b4..300f7d70e0 100644
--- a/be/src/vec/common/columns_hashing.h
+++ b/be/src/vec/common/columns_hashing.h
@@ -111,29 +111,48 @@ protected:
   * That is, for example, for strings, it contains first the serialized length 
of the string, and then the bytes.
   * Therefore, when aggregating by several strings, there is no ambiguity.
   */
-template <typename Value, typename Mapped>
+template <typename Value, typename Mapped, bool keys_pre_serialized = false>
 struct HashMethodSerialized
-        : public 
columns_hashing_impl::HashMethodBase<HashMethodSerialized<Value, Mapped>, Value,
-                                                      Mapped, false> {
-    using Self = HashMethodSerialized<Value, Mapped>;
+        : public columns_hashing_impl::HashMethodBase<
+                  HashMethodSerialized<Value, Mapped, keys_pre_serialized>, 
Value, Mapped, false> {
+    using Self = HashMethodSerialized<Value, Mapped, keys_pre_serialized>;
     using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, 
false>;
+    using KeyHolderType =
+            std::conditional_t<keys_pre_serialized, ArenaKeyHolder, 
SerializedKeyHolder>;
 
     ColumnRawPtrs key_columns;
     size_t keys_size;
+    const StringRef* keys;
 
     HashMethodSerialized(const ColumnRawPtrs& key_columns_, const Sizes& 
/*key_sizes*/,
                          const HashMethodContextPtr&)
             : key_columns(key_columns_), keys_size(key_columns_.size()) {}
 
+    void set_serialized_keys(const StringRef* keys_) { keys = keys_; }
+
 protected:
     friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped, 
false>;
 
-    ALWAYS_INLINE SerializedKeyHolder get_key_holder(size_t row, Arena& pool) 
const {
-        return SerializedKeyHolder {
-                serialize_keys_to_pool_contiguous(row, keys_size, key_columns, 
pool), pool};
+    ALWAYS_INLINE KeyHolderType get_key_holder(size_t row, Arena& pool) const {
+        if constexpr (keys_pre_serialized) {
+            return KeyHolderType {keys[row], pool};
+        } else {
+            return KeyHolderType {
+                    serialize_keys_to_pool_contiguous(row, keys_size, 
key_columns, pool), pool};
+        }
     }
 };
 
+template <typename HashMethod>
+struct IsPreSerializedKeysHashMethodTraits {
+    constexpr static bool value = false;
+};
+
+template <typename Value, typename Mapped>
+struct IsPreSerializedKeysHashMethodTraits<HashMethodSerialized<Value, Mapped, 
true>> {
+    constexpr static bool value = true;
+};
+
 /// For the case when there is one string key.
 template <typename Value, typename Mapped, bool use_cache = true>
 struct HashMethodHashed
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index bbd94134fb..8725297e75 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -84,6 +84,7 @@ AggregationNode::AggregationNode(ObjectPool* pool, const 
TPlanNode& tnode,
           _is_merge(false),
           _agg_data(),
           _build_timer(nullptr),
+          _serialize_key_timer(nullptr),
           _exec_timer(nullptr),
           _merge_timer(nullptr) {
     if (tnode.agg_node.__isset.use_streaming_preaggregation) {
@@ -206,6 +207,7 @@ Status AggregationNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
+    _serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTimer");
     _exec_timer = ADD_TIMER(runtime_profile(), "ExecTime");
     _merge_timer = ADD_TIMER(runtime_profile(), "MergeTime");
     _expr_timer = ADD_TIMER(runtime_profile(), "ExprTime");
@@ -754,6 +756,9 @@ Status 
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
                     using HashMethodType = std::decay_t<decltype(agg_method)>;
                     using AggState = typename HashMethodType::State;
                     AggState state(key_columns, _probe_key_sz, nullptr);
+
+                    _pre_serialize_key_if_need(state, agg_method, key_columns, 
rows);
+
                     /// For all rows.
                     for (size_t i = 0; i < rows; ++i) {
                         AggregateDataPtr aggregate_data = nullptr;
@@ -815,6 +820,9 @@ Status AggregationNode::_execute_with_serialized_key(Block* 
block) {
                 using HashMethodType = std::decay_t<decltype(agg_method)>;
                 using AggState = typename HashMethodType::State;
                 AggState state(key_columns, _probe_key_sz, nullptr);
+
+                _pre_serialize_key_if_need(state, agg_method, key_columns, 
rows);
+
                 /// For all rows.
                 for (size_t i = 0; i < rows; ++i) {
                     AggregateDataPtr aggregate_data = nullptr;
@@ -1034,6 +1042,9 @@ Status AggregationNode::_merge_with_serialized_key(Block* 
block) {
                 using HashMethodType = std::decay_t<decltype(agg_method)>;
                 using AggState = typename HashMethodType::State;
                 AggState state(key_columns, _probe_key_sz, nullptr);
+
+                _pre_serialize_key_if_need(state, agg_method, key_columns, 
rows);
+
                 /// For all rows.
                 for (size_t i = 0; i < rows; ++i) {
                     AggregateDataPtr aggregate_data = nullptr;
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index 31b925bbfe..0da44ec5dd 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -50,13 +50,41 @@ struct AggregationMethodSerialized {
     Data data;
     Iterator iterator;
     bool inited = false;
+    std::vector<StringRef> keys;
+    AggregationMethodSerialized()
+            : _serialized_key_buffer_size(0),
+              _serialized_key_buffer(nullptr),
+              _mem_pool(new MemPool) {}
 
-    AggregationMethodSerialized() = default;
+    using State = ColumnsHashing::HashMethodSerialized<typename 
Data::value_type, Mapped, true>;
 
     template <typename Other>
     explicit AggregationMethodSerialized(const Other& other) : 
data(other.data) {}
 
-    using State = ColumnsHashing::HashMethodSerialized<typename 
Data::value_type, Mapped>;
+    void serialize_keys(const ColumnRawPtrs& key_columns, const size_t 
num_rows) {
+        size_t max_one_row_byte_size = 0;
+        for (const auto& column : key_columns) {
+            max_one_row_byte_size += column->get_max_row_byte_size();
+        }
+
+        if ((max_one_row_byte_size * num_rows) > _serialized_key_buffer_size) {
+            _serialized_key_buffer_size = max_one_row_byte_size * num_rows;
+            _mem_pool->clear();
+            _serialized_key_buffer = 
_mem_pool->allocate(_serialized_key_buffer_size);
+        }
+
+        if (keys.size() < num_rows) keys.resize(num_rows);
+
+        for (size_t i = 0; i < num_rows; ++i) {
+            keys[i].data =
+                    reinterpret_cast<char*>(_serialized_key_buffer + i * 
max_one_row_byte_size);
+            keys[i].size = 0;
+        }
+
+        for (const auto& column : key_columns) {
+            column->serialize_vec(keys, num_rows, max_one_row_byte_size);
+        }
+    }
 
     static void insert_key_into_columns(const StringRef& key, MutableColumns& 
key_columns,
                                         const Sizes&) {
@@ -70,6 +98,11 @@ struct AggregationMethodSerialized {
             iterator = data.begin();
         }
     }
+
+private:
+    size_t _serialized_key_buffer_size;
+    uint8_t* _serialized_key_buffer;
+    std::unique_ptr<MemPool> _mem_pool;
 };
 
 using AggregatedDataWithoutKey = AggregateDataPtr;
@@ -448,6 +481,7 @@ private:
     Arena _agg_arena_pool;
 
     RuntimeProfile::Counter* _build_timer;
+    RuntimeProfile::Counter* _serialize_key_timer;
     RuntimeProfile::Counter* _exec_timer;
     RuntimeProfile::Counter* _merge_timer;
     RuntimeProfile::Counter* _expr_timer;
@@ -484,6 +518,16 @@ private:
     void _close_with_serialized_key();
     void _init_hash_method(std::vector<VExprContext*>& probe_exprs);
 
+    template <typename AggState, typename AggMethod>
+    void _pre_serialize_key_if_need(AggState& state, AggMethod& agg_method,
+                                    const ColumnRawPtrs& key_columns, const 
size_t num_rows) {
+        if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<AggState>::value) {
+            SCOPED_TIMER(_serialize_key_timer);
+            agg_method.serialize_keys(key_columns, num_rows);
+            state.set_serialized_keys(agg_method.keys.data());
+        }
+    }
+
     void release_tracker();
 
     using vectorized_execute = std::function<Status(Block* block)>;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to