This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new dae8f607e17 [Improvement](set) change set_operator's hash map to phmap (#43273) dae8f607e17 is described below commit dae8f607e176480306471754d0391df3daeee8a7 Author: Pxl <pxl...@qq.com> AuthorDate: Mon Nov 11 14:18:02 2024 +0800 [Improvement](set) change set_operator's hash map to phmap (#43273) ### What problem does this PR solve? 1. change set_operator's hash map to phmap. 2. optimize the logic of refresh hash table ```sql +-------+--------------+------+-------+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------+--------------+------+-------+---------+-------+ | k1 | int | Yes | true | NULL | | | k2 | int | No | false | NULL | NONE | | k3 | bigint | Yes | false | NULL | NONE | | k4 | varchar(100) | Yes | false | NULL | NONE | +-------+--------------+------+-------+---------+-------+ ``` ```sql select count(*) from (select * from a_table intersect select * from b_table)t; before:Time(ms)=37768|CpuTimeMS=61754|PeakMemoryBytes=129921088 after: Time(ms)=25218|CpuTimeMS=44571|PeakMemoryBytes=129902624 ``` ```sql select count(*) from (select k1 from a_table intersect select k1 from b_table)t; before: Time(ms)=7964|CpuTimeMS=13431|PeakMemoryBytes=130062400 after: Time(ms)=7459|CpuTimeMS=11976|PeakMemoryBytes=130091048 ``` --- be/src/pipeline/common/set_utils.h | 74 +++++++++--------------- be/src/pipeline/exec/join/join_op.h | 18 ++---- be/src/pipeline/exec/set_probe_sink_operator.cpp | 74 +++++++++++++----------- be/src/pipeline/exec/set_source_operator.cpp | 17 +++++- be/src/vec/common/hash_table/hash_map.h | 5 -- be/src/vec/common/hash_table/hash_map_context.h | 7 +-- be/src/vec/common/hash_table/ph_hash_map.h | 11 ++-- be/src/vec/common/hash_table/ph_hash_set.h | 3 +- be/src/vec/common/hash_table/string_hash_map.h | 1 + 9 files changed, 96 insertions(+), 114 deletions(-) diff --git a/be/src/pipeline/common/set_utils.h b/be/src/pipeline/common/set_utils.h index ed64035fb42..2caf5b7d0b8 100644 --- a/be/src/pipeline/common/set_utils.h +++ b/be/src/pipeline/common/set_utils.h @@ -25,21 +25,32 @@ namespace doris { -template <class Key> -using SetFixedKeyHashTableContext = - vectorized::MethodKeysFixed<HashMap<Key, pipeline::RowRefListWithFlags, HashCRC32<Key>>>; +template <typename T> +using SetData = PHHashMap<T, RowRefListWithFlags, HashCRC32<T>>; -template <class T> -using SetPrimaryTypeHashTableContext = - vectorized::MethodOneNumber<T, HashMap<T, pipeline::RowRefListWithFlags, HashCRC32<T>>>; +template <typename T> +using SetFixedKeyHashTableContext = vectorized::MethodKeysFixed<SetData<T>>; + +template <typename T> +using SetPrimaryTypeHashTableContext = vectorized::MethodOneNumber<T, SetData<T>>; + +template <typename T> +using SetPrimaryTypeHashTableContextNullable = vectorized::MethodSingleNullableColumn< + vectorized::MethodOneNumber<T, vectorized::DataWithNullKey<SetData<T>>>>; using SetSerializedHashTableContext = - vectorized::MethodSerialized<HashMap<StringRef, pipeline::RowRefListWithFlags>>; + vectorized::MethodSerialized<PHHashMap<StringRef, RowRefListWithFlags>>; using SetMethodOneString = - vectorized::MethodStringNoCache<HashMap<StringRef, pipeline::RowRefListWithFlags>>; + vectorized::MethodStringNoCache<PHHashMap<StringRef, RowRefListWithFlags>>; using SetHashTableVariants = std::variant<std::monostate, SetSerializedHashTableContext, SetMethodOneString, + SetPrimaryTypeHashTableContextNullable<vectorized::UInt8>, + SetPrimaryTypeHashTableContextNullable<vectorized::UInt16>, + SetPrimaryTypeHashTableContextNullable<vectorized::UInt32>, + SetPrimaryTypeHashTableContextNullable<vectorized::UInt64>, + SetPrimaryTypeHashTableContextNullable<vectorized::UInt128>, + SetPrimaryTypeHashTableContextNullable<vectorized::UInt256>, SetPrimaryTypeHashTableContext<vectorized::UInt8>, SetPrimaryTypeHashTableContext<vectorized::UInt16>, SetPrimaryTypeHashTableContext<vectorized::UInt32>, @@ -51,9 +62,9 @@ using SetHashTableVariants = SetFixedKeyHashTableContext<vectorized::UInt256>, SetFixedKeyHashTableContext<vectorized::UInt136>>; -struct SetDataVariants { - SetHashTableVariants method_variant; - +struct SetDataVariants + : public DataVariants<SetHashTableVariants, vectorized::MethodSingleNullableColumn, + vectorized::MethodOneNumber, vectorized::DataWithNullKey> { void init(const std::vector<vectorized::DataTypePtr>& data_types, HashKeyType type) { bool nullable = data_types.size() == 1 && data_types[0]->is_nullable(); switch (type) { @@ -61,51 +72,22 @@ struct SetDataVariants { method_variant.emplace<SetSerializedHashTableContext>(); break; case HashKeyType::int8_key: - if (nullable) { - method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt64>>( - get_key_sizes(data_types)); - } else { - method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt8>>(); - } + emplace_single<vectorized::UInt8, SetData<vectorized::UInt8>>(nullable); break; case HashKeyType::int16_key: - if (nullable) { - method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt64>>( - get_key_sizes(data_types)); - } else { - method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt16>>(); - } + emplace_single<vectorized::UInt16, SetData<vectorized::UInt16>>(nullable); break; case HashKeyType::int32_key: - if (nullable) { - method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt64>>( - get_key_sizes(data_types)); - } else { - method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt32>>(); - } + emplace_single<vectorized::UInt32, SetData<vectorized::UInt32>>(nullable); break; case HashKeyType::int64_key: - if (nullable) { - method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt128>>( - get_key_sizes(data_types)); - } else { - method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt64>>(); - } + emplace_single<vectorized::UInt64, SetData<vectorized::UInt64>>(nullable); break; case HashKeyType::int128_key: - if (nullable) { - method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt136>>( - get_key_sizes(data_types)); - } else { - method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt128>>(); - } + emplace_single<vectorized::UInt128, SetData<vectorized::UInt128>>(nullable); break; case HashKeyType::int256_key: - if (nullable) { - method_variant.emplace<SetSerializedHashTableContext>(); - } else { - method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt256>>(); - } + emplace_single<vectorized::UInt256, SetData<vectorized::UInt256>>(nullable); break; case HashKeyType::string_key: method_variant.emplace<SetMethodOneString>(); diff --git a/be/src/pipeline/exec/join/join_op.h b/be/src/pipeline/exec/join/join_op.h index 616753b72de..f3bd47a911e 100644 --- a/be/src/pipeline/exec/join/join_op.h +++ b/be/src/pipeline/exec/join/join_op.h @@ -20,7 +20,7 @@ #include "vec/common/columns_hashing.h" #include "vec/core/block.h" -namespace doris::pipeline { +namespace doris { /** * Now we have different kinds of RowRef for join operation. Overall, RowRef is the base class and * the class inheritance is below: @@ -129,12 +129,10 @@ struct RowRefList : RowRef { RowRefList() = default; RowRefList(size_t row_num_) : RowRef(row_num_) {} - ForwardIterator<RowRefList> begin() { return ForwardIterator<RowRefList>(this); } + ForwardIterator<RowRefList> begin() { return {this}; } /// insert element after current one - void insert(RowRefType&& row_ref, vectorized::Arena& pool) { - next.emplace_back(std::move(row_ref)); - } + void insert(RowRefType&& row_ref, vectorized::Arena& pool) { next.emplace_back(row_ref); } void clear() { next.clear(); } @@ -149,9 +147,7 @@ struct RowRefListWithFlag : RowRef { RowRefListWithFlag() = default; RowRefListWithFlag(size_t row_num_) : RowRef(row_num_) {} - ForwardIterator<RowRefListWithFlag> const begin() { - return ForwardIterator<RowRefListWithFlag>(this); - } + ForwardIterator<RowRefListWithFlag> begin() { return {this}; } /// insert element after current one void insert(RowRefType&& row_ref, vectorized::Arena& pool) { next.emplace_back(row_ref); } @@ -171,9 +167,7 @@ struct RowRefListWithFlags : RowRefWithFlag { RowRefListWithFlags() = default; RowRefListWithFlags(size_t row_num_) : RowRefWithFlag(row_num_) {} - ForwardIterator<RowRefListWithFlags> const begin() { - return ForwardIterator<RowRefListWithFlags>(this); - } + ForwardIterator<RowRefListWithFlags> begin() { return {this}; } /// insert element after current one void insert(RowRefType&& row_ref, vectorized::Arena& pool) { next.emplace_back(row_ref); } @@ -185,4 +179,4 @@ private: std::vector<RowRefType> next; }; -} // namespace doris::pipeline +} // namespace doris diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index 813dad3ad79..4c250d5603b 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -210,46 +210,52 @@ void SetProbeSinkOperatorX<is_intersect>::_refresh_hash_table( [&](auto&& arg) { using HashTableCtxType = std::decay_t<decltype(arg)>; if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { - auto tmp_hash_table = - std::make_shared<typename HashTableCtxType::HashMapType>(); - bool is_need_shrink = - arg.hash_table->should_be_shrink(valid_element_in_hash_tbl); - if (is_intersect || is_need_shrink) { - tmp_hash_table->init_buf_size(size_t( - valid_element_in_hash_tbl / arg.hash_table->get_factor() + 1)); - } - arg.init_iterator(); auto& iter = arg.iterator; auto iter_end = arg.hash_table->end(); - std::visit( - [&](auto is_need_shrink_const) { - while (iter != iter_end) { - auto& mapped = iter->get_second(); - auto it = mapped.begin(); - - if constexpr (is_intersect) { //intersected - if (it->visited) { - it->visited = false; - tmp_hash_table->insert(iter->get_value()); - } - ++iter; - } else { //except - if constexpr (is_need_shrink_const) { - if (!it->visited) { - tmp_hash_table->insert(iter->get_value()); - } - } - ++iter; - } - } - }, - vectorized::make_bool_variant(is_need_shrink)); - arg.reset(); - if (is_intersect || is_need_shrink) { + constexpr double need_shrink_ratio = 0.25; + bool is_need_shrink = + is_intersect + ? (valid_element_in_hash_tbl < + arg.hash_table + ->size()) // When intersect, shrink as long as the element decreases + : (valid_element_in_hash_tbl < + arg.hash_table->size() * + need_shrink_ratio); // When except, element decreases need to within the 'need_shrink_ratio' before shrinking + + if (is_need_shrink) { + auto tmp_hash_table = + std::make_shared<typename HashTableCtxType::HashMapType>(); + tmp_hash_table->reserve( + local_state._shared_state->valid_element_in_hash_tbl); + while (iter != iter_end) { + auto& mapped = iter->get_second(); + auto it = mapped.begin(); + + if constexpr (is_intersect) { + if (it->visited) { + it->visited = false; + tmp_hash_table->insert(iter->get_first(), iter->get_second()); + } + } else { + if (!it->visited) { + tmp_hash_table->insert(iter->get_first(), iter->get_second()); + } + } + ++iter; + } arg.hash_table = std::move(tmp_hash_table); + } else if (is_intersect) { + while (iter != iter_end) { + auto& mapped = iter->get_second(); + auto it = mapped.begin(); + it->visited = false; + ++iter; + } } + + arg.reset(); } else { LOG(FATAL) << "FATAL: uninited hash table"; __builtin_unreachable(); diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp index ebcd13ddf14..91c98288d8b 100644 --- a/be/src/pipeline/exec/set_source_operator.cpp +++ b/be/src/pipeline/exec/set_source_operator.cpp @@ -18,6 +18,7 @@ #include "set_source_operator.h" #include <memory> +#include <type_traits> #include "common/status.h" #include "pipeline/exec/operator.h" @@ -124,11 +125,9 @@ Status SetSourceOperatorX<is_intersect>::_get_data_in_hashtable( vectorized::Block* output_block, const int batch_size, bool* eos) { size_t left_col_len = local_state._left_table_data_types.size(); hash_table_ctx.init_iterator(); - auto& iter = hash_table_ctx.iterator; auto block_size = 0; - for (; iter != hash_table_ctx.hash_table->end() && block_size < batch_size; ++iter) { - auto& value = iter->get_second(); + auto add_result = [&local_state, &block_size, this](auto value) { auto it = value.begin(); if constexpr (is_intersect) { if (it->visited) { //intersected: have done probe, so visited values it's the result @@ -139,9 +138,21 @@ Status SetSourceOperatorX<is_intersect>::_get_data_in_hashtable( _add_result_columns(local_state, value, block_size); } } + }; + + auto& iter = hash_table_ctx.iterator; + for (; iter != hash_table_ctx.hash_table->end() && block_size < batch_size; ++iter) { + add_result(iter->get_second()); } *eos = iter == hash_table_ctx.hash_table->end(); + if (*eos && hash_table_ctx.hash_table->has_null_key_data()) { + auto value = hash_table_ctx.hash_table->template get_null_key_data<RowRefListWithFlags>(); + if constexpr (std::is_same_v<RowRefListWithFlags, std::decay_t<decltype(value)>>) { + add_result(value); + } + } + if (!output_block->mem_reuse()) { for (int i = 0; i < left_col_len; ++i) { output_block->insert( diff --git a/be/src/vec/common/hash_table/hash_map.h b/be/src/vec/common/hash_table/hash_map.h index 448ddd5b7c5..8cb02d6a80d 100644 --- a/be/src/vec/common/hash_table/hash_map.h +++ b/be/src/vec/common/hash_table/hash_map.h @@ -201,11 +201,6 @@ using HashMap = HashMapTable<Key, HashMapCell<Key, Mapped, Hash>, Hash, Grower, template <typename Key, typename Hash = DefaultHash<Key>> using JoinHashMap = JoinHashTable<Key, Hash>; -template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>, - typename Grower = HashTableGrower<>, typename Allocator = HashTableAllocator> -using HashMapWithSavedHash = - HashMapTable<Key, HashMapCellWithSavedHash<Key, Mapped, Hash>, Hash, Grower, Allocator>; - template <typename Key, typename Mapped, typename Hash, size_t initial_size_degree> using HashMapWithStackMemory = HashMapTable< Key, HashMapCellWithSavedHash<Key, Mapped, Hash>, Hash, diff --git a/be/src/vec/common/hash_table/hash_map_context.h b/be/src/vec/common/hash_table/hash_map_context.h index 16a793d7500..875c035b425 100644 --- a/be/src/vec/common/hash_table/hash_map_context.h +++ b/be/src/vec/common/hash_table/hash_map_context.h @@ -33,10 +33,6 @@ #include "vec/core/types.h" #include "vec/utils/util.hpp" -namespace doris::pipeline { -struct RowRefListWithFlags; -} - namespace doris::vectorized { constexpr auto BITSIZE = 8; @@ -587,8 +583,7 @@ struct DataWithNullKey : public Base { private: bool has_null_key = false; - // null_key_data store AggregateDataPtr on agg node, store PartitionBlocks on partition sort node. - void* null_key_data = nullptr; + Base::Value null_key_data; }; /// Single low cardinality column. diff --git a/be/src/vec/common/hash_table/ph_hash_map.h b/be/src/vec/common/hash_table/ph_hash_map.h index de320425223..414624c6e1a 100644 --- a/be/src/vec/common/hash_table/ph_hash_map.h +++ b/be/src/vec/common/hash_table/ph_hash_map.h @@ -40,6 +40,7 @@ public: using key_type = Key; using mapped_type = Mapped; + using Value = Mapped; using value_type = std::pair<const Key, Mapped>; using LookupResult = std::pair<const Key, Mapped>*; @@ -154,10 +155,8 @@ public: [&](const auto& ctor) { f(ctor, key, key); }); } - void ALWAYS_INLINE insert(const Key& key, size_t hash_value, const Mapped& value) { - auto it = &*_hash_map.lazy_emplace_with_hash(key, hash_value, - [&](const auto& ctor) { ctor(key, value); }); - it->second = value; + void ALWAYS_INLINE insert(const Key& key, const Mapped& value) { + _hash_map.lazy_emplace(key, [&](const auto& ctor) { ctor(key, value); }); } template <typename KeyHolder> @@ -190,8 +189,6 @@ public: return capacity * sizeof(typename HashMapImpl::slot_type); } - size_t get_buffer_size_in_cells() const { return _hash_map.capacity(); } - bool add_elem_size_overflow(size_t row) const { const auto capacity = _hash_map.capacity(); // phmap use 7/8th as maximum load factor. @@ -209,7 +206,7 @@ public: void clear_and_shrink() { _hash_map.clear(); } - void expanse_for_add_elem(size_t num_elem) { _hash_map.reserve(num_elem); } + void reserve(size_t num_elem) { _hash_map.reserve(num_elem); } private: HashMapImpl _hash_map; diff --git a/be/src/vec/common/hash_table/ph_hash_set.h b/be/src/vec/common/hash_table/ph_hash_set.h index 6ace649b7bb..79faca9d670 100644 --- a/be/src/vec/common/hash_table/ph_hash_set.h +++ b/be/src/vec/common/hash_table/ph_hash_set.h @@ -36,6 +36,7 @@ public: using key_type = Key; using mapped_type = void; using value_type = void; + using Value = void*; using LookupResult = void*; @@ -104,7 +105,7 @@ public: void clear_and_shrink() { _hash_set.clear(); } - void expanse_for_add_elem(size_t num_elem) { _hash_set.reserve(num_elem); } + void reserve(size_t num_elem) { _hash_set.reserve(num_elem); } private: HashSetImpl _hash_set; diff --git a/be/src/vec/common/hash_table/string_hash_map.h b/be/src/vec/common/hash_table/string_hash_map.h index 6c7a9e74dca..cffe32e82ce 100644 --- a/be/src/vec/common/hash_table/string_hash_map.h +++ b/be/src/vec/common/hash_table/string_hash_map.h @@ -114,6 +114,7 @@ public: using Base = StringHashTable<StringHashMapSubMaps<TMapped, Allocator>>; using Self = StringHashMap; using LookupResult = typename Base::LookupResult; + using Value = TMapped; using Base::Base; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org