This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d1573e1a4a [improvement]Use phmap for aggregation with serialized key (#10821) d1573e1a4a is described below commit d1573e1a4a715e34e7886810e792fb47c7e0e15d Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Thu Jul 14 11:26:09 2022 +0800 [improvement]Use phmap for aggregation with serialized key (#10821) --- be/src/vec/common/columns_hashing.h | 1 + be/src/vec/common/columns_hashing_impl.h | 50 +++++++++ be/src/vec/common/hash_table/ph_hash_map.h | 170 +++++++++++++++++++++++++++++ be/src/vec/exec/vaggregation_node.cpp | 76 ++++++++++++- be/src/vec/exec/vaggregation_node.h | 2 +- 5 files changed, 294 insertions(+), 5 deletions(-) diff --git a/be/src/vec/common/columns_hashing.h b/be/src/vec/common/columns_hashing.h index 300f7d70e0..9e4ed78f4e 100644 --- a/be/src/vec/common/columns_hashing.h +++ b/be/src/vec/common/columns_hashing.h @@ -28,6 +28,7 @@ #include "vec/common/columns_hashing_impl.h" #include "vec/common/hash_table/hash_table.h" #include "vec/common/hash_table/hash_table_key_holder.h" +#include "vec/common/hash_table/ph_hash_map.h" #include "vec/common/unaligned.h" namespace doris::vectorized { diff --git a/be/src/vec/common/columns_hashing_impl.h b/be/src/vec/common/columns_hashing_impl.h index 3ba2f630f2..8aa1b1e96d 100644 --- a/be/src/vec/common/columns_hashing_impl.h +++ b/be/src/vec/common/columns_hashing_impl.h @@ -132,6 +132,13 @@ public: return emplaceImpl(key_holder, data); } + template <typename Data> + ALWAYS_INLINE EmplaceResult emplace_key(Data& data, size_t hash_value, size_t row, + Arena& pool) { + auto key_holder = static_cast<Derived&>(*this).get_key_holder(row, pool); + return emplaceImpl(key_holder, hash_value, data); + } + template <typename Data> ALWAYS_INLINE FindResult find_key(Data& data, size_t row, Arena& pool) { auto key_holder = static_cast<Derived&>(*this).get_key_holder(row, pool); @@ -207,6 +214,49 @@ protected: return EmplaceResult(inserted); } + template <typename Data, typename KeyHolder> + ALWAYS_INLINE EmplaceResult emplaceImpl(KeyHolder& key_holder, size_t hash_value, Data& data) { + if constexpr (Cache::consecutive_keys_optimization) { + if (cache.found && cache.check(key_holder_get_key(key_holder))) { + if constexpr (has_mapped) + return EmplaceResult(cache.value.second, cache.value.second, false); + else + return EmplaceResult(false); + } + } + + typename Data::LookupResult it; + bool inserted = false; + data.emplace(key_holder, it, hash_value, inserted); + + [[maybe_unused]] Mapped* cached = nullptr; + if constexpr (has_mapped) cached = lookup_result_get_mapped(it); + + if (inserted) { + if constexpr (has_mapped) { + new (lookup_result_get_mapped(it)) Mapped(); + } + } + + if constexpr (consecutive_keys_optimization) { + cache.found = true; + cache.empty = false; + + if constexpr (has_mapped) { + cache.value.first = *lookup_result_get_key(it); + cache.value.second = *lookup_result_get_mapped(it); + cached = &cache.value.second; + } else { + cache.value = *lookup_result_get_key(it); + } + } + + if constexpr (has_mapped) + return EmplaceResult(*lookup_result_get_mapped(it), *cached, inserted); + else + return EmplaceResult(inserted); + } + template <typename Data, typename Key> ALWAYS_INLINE FindResult find_key_impl(Key key, Data& data) { if constexpr (Cache::consecutive_keys_optimization) { diff --git a/be/src/vec/common/hash_table/ph_hash_map.h b/be/src/vec/common/hash_table/ph_hash_map.h new file mode 100644 index 0000000000..2883f58f17 --- /dev/null +++ b/be/src/vec/common/hash_table/ph_hash_map.h @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <parallel_hashmap/phmap.h> + +ALWAYS_INLINE inline char** lookup_result_get_mapped(std::pair<const StringRef, char*>* it) { + return &(it->second); +} + +template <typename Map> +struct IsPhmapTraits { + constexpr static bool value = false; +}; + +template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>> +class PHHashMap : private boost::noncopyable { +public: + using Self = PHHashMap; + using HashMapImpl = phmap::flat_hash_map<Key, Mapped, Hash>; + + using key_type = Key; + using mapped_type = Mapped; + using value_type = Key; + + using LookupResult = typename HashMapImpl::value_type*; + + using const_iterator_impl = typename HashMapImpl::const_iterator; + using iterator_impl = typename HashMapImpl::iterator; + + template <typename Derived, bool is_const> + class iterator_base { + using BaseIterator = std::conditional_t<is_const, const_iterator_impl, iterator_impl>; + + BaseIterator base_iterator; + friend class PHHashMap; + + public: + iterator_base() {} + iterator_base(BaseIterator it) : base_iterator(it) {} + + bool operator==(const iterator_base& rhs) const { + return base_iterator == rhs.base_iterator; + } + bool operator!=(const iterator_base& rhs) const { + return base_iterator != rhs.base_iterator; + } + + Derived& operator++() { + base_iterator++; + return static_cast<Derived&>(*this); + } + + auto& operator*() const { return *this; } + auto* operator->() const { return this; } + + auto& operator*() { return *this; } + auto* operator->() { return this; } + + const auto& get_first() const { return base_iterator->first; } + + const auto& get_second() const { return base_iterator->second; } + + auto& get_second() { return base_iterator->second; } + + auto get_ptr() const { return *base_iterator; } + size_t get_hash() const { return base_iterator->get_hash(); } + + size_t get_collision_chain_length() const { return 0; } + }; + + class iterator : public iterator_base<iterator, false> { + public: + using iterator_base<iterator, false>::iterator_base; + }; + + class const_iterator : public iterator_base<const_iterator, true> { + public: + using iterator_base<const_iterator, true>::iterator_base; + }; + + const_iterator begin() const { return const_iterator(_hash_map.cbegin()); } + + const_iterator cbegin() const { return const_iterator(_hash_map.cbegin()); } + + iterator begin() { return iterator(_hash_map.begin()); } + + const_iterator end() const { return const_iterator(_hash_map.cend()); } + const_iterator cend() const { return const_iterator(_hash_map.cend()); } + iterator end() { return iterator(_hash_map.end()); } + + template <typename KeyHolder> + void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, bool& inserted) { + const auto& key = key_holder_get_key(key_holder); + inserted = false; + auto it_ = _hash_map.lazy_emplace(key, [&](const auto& ctor) { + inserted = true; + key_holder_persist_key(key_holder); + ctor(key_holder_get_key(key_holder), nullptr); + }); + it = &*it_; + } + + template <typename KeyHolder> + void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, size_t hash_value, + bool& inserted) { + const auto& key = key_holder_get_key(key_holder); + inserted = false; + auto it_ = _hash_map.lazy_emplace_with_hash(key, hash_value, [&](const auto& ctor) { + inserted = true; + key_holder_persist_key(key_holder); + ctor(key, nullptr); + }); + it = &*it_; + } + + size_t hash(const Key& x) const { return _hash_map.hash(x); } + + void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) { _hash_map.prefetch_hash(hash_value); } + + /// Call func(const Key &, Mapped &) for each hash map element. + template <typename Func> + void for_each_value(Func&& func) { + for (auto& v : *this) func(v.get_first(), v.get_second()); + } + + /// Call func(Mapped &) for each hash map element. + template <typename Func> + void for_each_mapped(Func&& func) { + for (auto& v : *this) func(v.get_second()); + } + + size_t get_buffer_size_in_bytes() const { + const auto capacity = _hash_map.capacity(); + return capacity * sizeof(typename HashMapImpl::slot_type); + } + + bool add_elem_size_overflow(size_t row) const { + const auto capacity = _hash_map.capacity(); + // phmap use 7/8th as maximum load factor. + return (_hash_map.size() + row) > (capacity * 7 / 8); + } + + size_t size() const { return _hash_map.size(); } + + char* get_null_key_data() { return nullptr; } + bool has_null_key_data() const { return false; } + + HashMapImpl _hash_map; +}; + +template <typename Key, typename Mapped, typename Hash> +struct IsPhmapTraits<PHHashMap<Key, Mapped, Hash>> { + constexpr static bool value = true; +}; diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index cbecc624ad..e152e7d477 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -32,6 +32,9 @@ namespace doris::vectorized { +// Here is an empirical value. +static constexpr size_t HASH_MAP_PREFETCH_DIST = 16; + /// The minimum reduction factor (input rows divided by output rows) to grow hash tables /// in a streaming preaggregation, given that the hash tables are currently the given /// size or above. The sizes roughly correspond to hash table sizes where the bucket @@ -778,17 +781,38 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i std::visit( [&](auto&& agg_method) -> void { using HashMethodType = std::decay_t<decltype(agg_method)>; + using HashTableType = std::decay_t<decltype(agg_method.data)>; using AggState = typename HashMethodType::State; AggState state(key_columns, _probe_key_sz, nullptr); _pre_serialize_key_if_need(state, agg_method, key_columns, rows); + std::vector<size_t> hash_values; + + if constexpr (IsPhmapTraits<HashTableType>::value) { + if (hash_values.size() < rows) hash_values.resize(rows); + for (size_t i = 0; i < rows; ++i) { + hash_values[i] = agg_method.data.hash(agg_method.keys[i]); + } + } + /// For all rows. for (size_t i = 0; i < rows; ++i) { AggregateDataPtr aggregate_data = nullptr; - auto emplace_result = - state.emplace_key(agg_method.data, i, _agg_arena_pool); + auto emplace_result = [&]() { + if constexpr (IsPhmapTraits<HashTableType>::value) { + if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) { + agg_method.data.prefetch_by_hash( + hash_values[i + HASH_MAP_PREFETCH_DIST]); + } + + return state.emplace_key(agg_method.data, hash_values[i], i, + _agg_arena_pool); + } else { + return state.emplace_key(agg_method.data, i, _agg_arena_pool); + } + }(); /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. if (emplace_result.is_inserted()) { @@ -842,16 +866,38 @@ Status AggregationNode::_execute_with_serialized_key(Block* block) { std::visit( [&](auto&& agg_method) -> void { using HashMethodType = std::decay_t<decltype(agg_method)>; + using HashTableType = std::decay_t<decltype(agg_method.data)>; using AggState = typename HashMethodType::State; AggState state(key_columns, _probe_key_sz, nullptr); _pre_serialize_key_if_need(state, agg_method, key_columns, rows); + std::vector<size_t> hash_values; + + if constexpr (IsPhmapTraits<HashTableType>::value) { + if (hash_values.size() < rows) hash_values.resize(rows); + for (size_t i = 0; i < rows; ++i) { + hash_values[i] = agg_method.data.hash(agg_method.keys[i]); + } + } + /// For all rows. for (size_t i = 0; i < rows; ++i) { AggregateDataPtr aggregate_data = nullptr; - auto emplace_result = state.emplace_key(agg_method.data, i, _agg_arena_pool); + auto emplace_result = [&]() { + if constexpr (IsPhmapTraits<HashTableType>::value) { + if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) { + agg_method.data.prefetch_by_hash( + hash_values[i + HASH_MAP_PREFETCH_DIST]); + } + + return state.emplace_key(agg_method.data, hash_values[i], i, + _agg_arena_pool); + } else { + return state.emplace_key(agg_method.data, i, _agg_arena_pool); + } + }(); /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. if (emplace_result.is_inserted()) { @@ -1064,16 +1110,38 @@ Status AggregationNode::_merge_with_serialized_key(Block* block) { std::visit( [&](auto&& agg_method) -> void { using HashMethodType = std::decay_t<decltype(agg_method)>; + using HashTableType = std::decay_t<decltype(agg_method.data)>; using AggState = typename HashMethodType::State; AggState state(key_columns, _probe_key_sz, nullptr); _pre_serialize_key_if_need(state, agg_method, key_columns, rows); + std::vector<size_t> hash_values; + + if constexpr (IsPhmapTraits<HashTableType>::value) { + if (hash_values.size() < rows) hash_values.resize(rows); + for (size_t i = 0; i < rows; ++i) { + hash_values[i] = agg_method.data.hash(agg_method.keys[i]); + } + } + /// For all rows. for (size_t i = 0; i < rows; ++i) { AggregateDataPtr aggregate_data = nullptr; - auto emplace_result = state.emplace_key(agg_method.data, i, _agg_arena_pool); + auto emplace_result = [&]() { + if constexpr (IsPhmapTraits<HashTableType>::value) { + if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) { + agg_method.data.prefetch_by_hash( + hash_values[i + HASH_MAP_PREFETCH_DIST]); + } + + return state.emplace_key(agg_method.data, hash_values[i], i, + _agg_arena_pool); + } else { + return state.emplace_key(agg_method.data, i, _agg_arena_pool); + } + }(); /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. if (emplace_result.is_inserted()) { diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index 0da44ec5dd..c2111905dc 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -106,7 +106,7 @@ private: }; using AggregatedDataWithoutKey = AggregateDataPtr; -using AggregatedDataWithStringKey = HashMapWithSavedHash<StringRef, AggregateDataPtr>; +using AggregatedDataWithStringKey = PHHashMap<StringRef, AggregateDataPtr, DefaultHash<StringRef>>; /// For the case where there is one numeric key. /// FieldType is UInt8/16/32/64 for any type with corresponding bit width. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org