This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e293fbd277 [improvement]pre-serialize aggregation keys (#10700) e293fbd277 is described below commit e293fbd277bb84813b713e4980a375de0f80e0ac Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Sat Jul 9 06:21:56 2022 +0800 [improvement]pre-serialize aggregation keys (#10700) --- be/src/vec/columns/column.h | 17 ++++++++++++ be/src/vec/columns/column_const.h | 13 +++++++++ be/src/vec/columns/column_nullable.cpp | 18 +++++++++++++ be/src/vec/columns/column_nullable.h | 3 +++ be/src/vec/columns/column_string.cpp | 39 +++++++++++++++++++++++++++ be/src/vec/columns/column_string.h | 9 +++++++ be/src/vec/columns/column_vector.cpp | 26 ++++++++++++++++++ be/src/vec/columns/column_vector.h | 9 +++++++ be/src/vec/common/columns_hashing.h | 33 ++++++++++++++++++----- be/src/vec/exec/vaggregation_node.cpp | 11 ++++++++ be/src/vec/exec/vaggregation_node.h | 48 ++++++++++++++++++++++++++++++++-- 11 files changed, 217 insertions(+), 9 deletions(-) diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index b0c5bee895..a2a015721a 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -246,6 +246,23 @@ public: /// Returns pointer to the position after the read data. virtual const char* deserialize_and_insert_from_arena(const char* pos) = 0; + /// Return the size of largest row. + /// This is for calculating the memory size for vectorized serialization of aggregation keys. + virtual size_t get_max_row_byte_size() const { + LOG(FATAL) << "get_max_row_byte_size not supported"; + } + + virtual void serialize_vec(std::vector<StringRef>& keys, size_t num_rows, + size_t max_row_byte_size) const { + LOG(FATAL) << "serialize_vec not supported"; + } + + virtual void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t num_rows, + const uint8_t* null_map, + size_t max_row_byte_size) const { + LOG(FATAL) << "serialize_vec_with_null_map not supported"; + } + /// Update state of hash function with value of n-th element. /// On subsequent calls of this method for sequence of column values of arbitrary types, /// passed bytes to hash must identify sequence of values unambiguously. diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index b557b68f86..b29d91ddf3 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -112,6 +112,19 @@ public: return res; } + size_t get_max_row_byte_size() const override { return data->get_max_row_byte_size(); } + + void serialize_vec(std::vector<StringRef>& keys, size_t num_rows, + size_t max_row_byte_size) const override { + data->serialize_vec(keys, num_rows, max_row_byte_size); + } + + void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t num_rows, + const uint8_t* null_map, + size_t max_row_byte_size) const override { + data->serialize_vec_with_null_map(keys, num_rows, null_map, max_row_byte_size); + } + void update_hash_with_value(size_t, SipHash& hash) const override { data->update_hash_with_value(0, hash); } diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index 215ced2383..e7a972a37c 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -134,6 +134,24 @@ const char* ColumnNullable::deserialize_and_insert_from_arena(const char* pos) { return pos; } +size_t ColumnNullable::get_max_row_byte_size() const { + constexpr auto flag_size = sizeof(NullMap::value_type); + return flag_size + get_nested_column().get_max_row_byte_size(); +} + +void ColumnNullable::serialize_vec(std::vector<StringRef>& keys, size_t num_rows, + size_t max_row_byte_size) const { + const auto& arr = get_null_map_data(); + static constexpr auto s = sizeof(arr[0]); + for (size_t i = 0; i < num_rows; ++i) { + auto* val = const_cast<char*>(keys[i].data + keys[i].size); + *val = (arr[i] ? 1 : 0); + keys[i].size += s; + } + + get_nested_column().serialize_vec_with_null_map(keys, num_rows, arr.data(), max_row_byte_size); +} + void ColumnNullable::insert_range_from(const IColumn& src, size_t start, size_t length) { const ColumnNullable& nullable_col = assert_cast<const ColumnNullable&>(src); get_null_map_column().insert_range_from(*nullable_col.null_map, start, length); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 1361711d8c..34d87bd0b7 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -89,6 +89,9 @@ public: StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override; const char* deserialize_and_insert_from_arena(const char* pos) override; + size_t get_max_row_byte_size() const override; + void serialize_vec(std::vector<StringRef>& keys, size_t num_rows, + size_t max_row_byte_size) const override; void insert_range_from(const IColumn& src, size_t start, size_t length) override; void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override; diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index ad5873d4f1..296924aea2 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -189,6 +189,45 @@ const char* ColumnString::deserialize_and_insert_from_arena(const char* pos) { return pos + string_size; } +size_t ColumnString::get_max_row_byte_size() const { + size_t max_size = 0; + size_t num_rows = offsets.size(); + for (size_t i = 0; i < num_rows; ++i) { + max_size = std::max(max_size, size_at(i)); + } + + return max_size + sizeof(size_t); +} + +void ColumnString::serialize_vec(std::vector<StringRef>& keys, size_t num_rows, + size_t max_row_byte_size) const { + for (size_t i = 0; i < num_rows; ++i) { + size_t offset = offset_at(i); + size_t string_size = size_at(i); + + auto* ptr = const_cast<char*>(keys[i].data + keys[i].size); + memcpy(ptr, &string_size, sizeof(string_size)); + memcpy(ptr + sizeof(string_size), &chars[offset], string_size); + keys[i].size += sizeof(string_size) + string_size; + } +} + +void ColumnString::serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t num_rows, + const uint8_t* null_map, + size_t max_row_byte_size) const { + for (size_t i = 0; i < num_rows; ++i) { + if (null_map[i] == 0) { + size_t offset = offset_at(i); + size_t string_size = size_at(i); + + auto* ptr = const_cast<char*>(keys[i].data + keys[i].size); + memcpy(ptr, &string_size, sizeof(string_size)); + memcpy(ptr + sizeof(string_size), &chars[offset], string_size); + keys[i].size += sizeof(string_size) + string_size; + } + } +} + template <typename Type> ColumnPtr ColumnString::index_impl(const PaddedPODArray<Type>& indexes, size_t limit) const { if (limit == 0) return ColumnString::create(); diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index 913ccc2312..6d7b55da2d 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -208,6 +208,15 @@ public: const char* deserialize_and_insert_from_arena(const char* pos) override; + size_t get_max_row_byte_size() const override; + + void serialize_vec(std::vector<StringRef>& keys, size_t num_rows, + size_t max_row_byte_size) const override; + + void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t num_rows, + const uint8_t* null_map, + size_t max_row_byte_size) const override; + void update_hash_with_value(size_t n, SipHash& hash) const override { size_t string_size = size_at(n); size_t offset = offset_at(n); diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index dde2f033a7..3a198fef20 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -52,6 +52,32 @@ const char* ColumnVector<T>::deserialize_and_insert_from_arena(const char* pos) return pos + sizeof(T); } +template <typename T> +size_t ColumnVector<T>::get_max_row_byte_size() const { + return sizeof(T); +} + +template <typename T> +void ColumnVector<T>::serialize_vec(std::vector<StringRef>& keys, size_t num_rows, + size_t max_row_byte_size) const { + for (size_t i = 0; i < num_rows; ++i) { + memcpy(const_cast<char*>(keys[i].data + keys[i].size), &data[i], sizeof(T)); + keys[i].size += sizeof(T); + } +} + +template <typename T> +void ColumnVector<T>::serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t num_rows, + const uint8_t* null_map, + size_t max_row_byte_size) const { + for (size_t i = 0; i < num_rows; ++i) { + if (null_map[i] == 0) { + memcpy(const_cast<char*>(keys[i].data + keys[i].size), &data[i], sizeof(T)); + keys[i].size += sizeof(T); + } + } +} + template <typename T> void ColumnVector<T>::update_hash_with_value(size_t n, SipHash& hash) const { hash.update(data[n]); diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index 1d42455c76..544cf8bc4a 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -222,6 +222,15 @@ public: const char* deserialize_and_insert_from_arena(const char* pos) override; + size_t get_max_row_byte_size() const override; + + void serialize_vec(std::vector<StringRef>& keys, size_t num_rows, + size_t max_row_byte_size) const override; + + void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t num_rows, + const uint8_t* null_map, + size_t max_row_byte_size) const override; + void update_hash_with_value(size_t n, SipHash& hash) const override; size_t byte_size() const override { return data.size() * sizeof(data[0]); } diff --git a/be/src/vec/common/columns_hashing.h b/be/src/vec/common/columns_hashing.h index afb6a685b4..300f7d70e0 100644 --- a/be/src/vec/common/columns_hashing.h +++ b/be/src/vec/common/columns_hashing.h @@ -111,29 +111,48 @@ protected: * That is, for example, for strings, it contains first the serialized length of the string, and then the bytes. * Therefore, when aggregating by several strings, there is no ambiguity. */ -template <typename Value, typename Mapped> +template <typename Value, typename Mapped, bool keys_pre_serialized = false> struct HashMethodSerialized - : public columns_hashing_impl::HashMethodBase<HashMethodSerialized<Value, Mapped>, Value, - Mapped, false> { - using Self = HashMethodSerialized<Value, Mapped>; + : public columns_hashing_impl::HashMethodBase< + HashMethodSerialized<Value, Mapped, keys_pre_serialized>, Value, Mapped, false> { + using Self = HashMethodSerialized<Value, Mapped, keys_pre_serialized>; using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>; + using KeyHolderType = + std::conditional_t<keys_pre_serialized, ArenaKeyHolder, SerializedKeyHolder>; ColumnRawPtrs key_columns; size_t keys_size; + const StringRef* keys; HashMethodSerialized(const ColumnRawPtrs& key_columns_, const Sizes& /*key_sizes*/, const HashMethodContextPtr&) : key_columns(key_columns_), keys_size(key_columns_.size()) {} + void set_serialized_keys(const StringRef* keys_) { keys = keys_; } + protected: friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>; - ALWAYS_INLINE SerializedKeyHolder get_key_holder(size_t row, Arena& pool) const { - return SerializedKeyHolder { - serialize_keys_to_pool_contiguous(row, keys_size, key_columns, pool), pool}; + ALWAYS_INLINE KeyHolderType get_key_holder(size_t row, Arena& pool) const { + if constexpr (keys_pre_serialized) { + return KeyHolderType {keys[row], pool}; + } else { + return KeyHolderType { + serialize_keys_to_pool_contiguous(row, keys_size, key_columns, pool), pool}; + } } }; +template <typename HashMethod> +struct IsPreSerializedKeysHashMethodTraits { + constexpr static bool value = false; +}; + +template <typename Value, typename Mapped> +struct IsPreSerializedKeysHashMethodTraits<HashMethodSerialized<Value, Mapped, true>> { + constexpr static bool value = true; +}; + /// For the case when there is one string key. template <typename Value, typename Mapped, bool use_cache = true> struct HashMethodHashed diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index bbd94134fb..8725297e75 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -84,6 +84,7 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode, _is_merge(false), _agg_data(), _build_timer(nullptr), + _serialize_key_timer(nullptr), _exec_timer(nullptr), _merge_timer(nullptr) { if (tnode.agg_node.__isset.use_streaming_preaggregation) { @@ -206,6 +207,7 @@ Status AggregationNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); + _serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTimer"); _exec_timer = ADD_TIMER(runtime_profile(), "ExecTime"); _merge_timer = ADD_TIMER(runtime_profile(), "MergeTime"); _expr_timer = ADD_TIMER(runtime_profile(), "ExprTime"); @@ -754,6 +756,9 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i using HashMethodType = std::decay_t<decltype(agg_method)>; using AggState = typename HashMethodType::State; AggState state(key_columns, _probe_key_sz, nullptr); + + _pre_serialize_key_if_need(state, agg_method, key_columns, rows); + /// For all rows. for (size_t i = 0; i < rows; ++i) { AggregateDataPtr aggregate_data = nullptr; @@ -815,6 +820,9 @@ Status AggregationNode::_execute_with_serialized_key(Block* block) { using HashMethodType = std::decay_t<decltype(agg_method)>; using AggState = typename HashMethodType::State; AggState state(key_columns, _probe_key_sz, nullptr); + + _pre_serialize_key_if_need(state, agg_method, key_columns, rows); + /// For all rows. for (size_t i = 0; i < rows; ++i) { AggregateDataPtr aggregate_data = nullptr; @@ -1034,6 +1042,9 @@ Status AggregationNode::_merge_with_serialized_key(Block* block) { using HashMethodType = std::decay_t<decltype(agg_method)>; using AggState = typename HashMethodType::State; AggState state(key_columns, _probe_key_sz, nullptr); + + _pre_serialize_key_if_need(state, agg_method, key_columns, rows); + /// For all rows. for (size_t i = 0; i < rows; ++i) { AggregateDataPtr aggregate_data = nullptr; diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index 31b925bbfe..0da44ec5dd 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -50,13 +50,41 @@ struct AggregationMethodSerialized { Data data; Iterator iterator; bool inited = false; + std::vector<StringRef> keys; + AggregationMethodSerialized() + : _serialized_key_buffer_size(0), + _serialized_key_buffer(nullptr), + _mem_pool(new MemPool) {} - AggregationMethodSerialized() = default; + using State = ColumnsHashing::HashMethodSerialized<typename Data::value_type, Mapped, true>; template <typename Other> explicit AggregationMethodSerialized(const Other& other) : data(other.data) {} - using State = ColumnsHashing::HashMethodSerialized<typename Data::value_type, Mapped>; + void serialize_keys(const ColumnRawPtrs& key_columns, const size_t num_rows) { + size_t max_one_row_byte_size = 0; + for (const auto& column : key_columns) { + max_one_row_byte_size += column->get_max_row_byte_size(); + } + + if ((max_one_row_byte_size * num_rows) > _serialized_key_buffer_size) { + _serialized_key_buffer_size = max_one_row_byte_size * num_rows; + _mem_pool->clear(); + _serialized_key_buffer = _mem_pool->allocate(_serialized_key_buffer_size); + } + + if (keys.size() < num_rows) keys.resize(num_rows); + + for (size_t i = 0; i < num_rows; ++i) { + keys[i].data = + reinterpret_cast<char*>(_serialized_key_buffer + i * max_one_row_byte_size); + keys[i].size = 0; + } + + for (const auto& column : key_columns) { + column->serialize_vec(keys, num_rows, max_one_row_byte_size); + } + } static void insert_key_into_columns(const StringRef& key, MutableColumns& key_columns, const Sizes&) { @@ -70,6 +98,11 @@ struct AggregationMethodSerialized { iterator = data.begin(); } } + +private: + size_t _serialized_key_buffer_size; + uint8_t* _serialized_key_buffer; + std::unique_ptr<MemPool> _mem_pool; }; using AggregatedDataWithoutKey = AggregateDataPtr; @@ -448,6 +481,7 @@ private: Arena _agg_arena_pool; RuntimeProfile::Counter* _build_timer; + RuntimeProfile::Counter* _serialize_key_timer; RuntimeProfile::Counter* _exec_timer; RuntimeProfile::Counter* _merge_timer; RuntimeProfile::Counter* _expr_timer; @@ -484,6 +518,16 @@ private: void _close_with_serialized_key(); void _init_hash_method(std::vector<VExprContext*>& probe_exprs); + template <typename AggState, typename AggMethod> + void _pre_serialize_key_if_need(AggState& state, AggMethod& agg_method, + const ColumnRawPtrs& key_columns, const size_t num_rows) { + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<AggState>::value) { + SCOPED_TIMER(_serialize_key_timer); + agg_method.serialize_keys(key_columns, num_rows); + state.set_serialized_keys(agg_method.keys.data()); + } + } + void release_tracker(); using vectorized_execute = std::function<Status(Block* block)>; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org