This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b74f36e009 [improvement]Use phmap for aggregation with integer keys (#11175) b74f36e009 is described below commit b74f36e009880b0afd7a87b0c7171f08708c88f2 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Wed Jul 27 13:58:20 2022 +0800 [improvement]Use phmap for aggregation with integer keys (#11175) --- be/src/vec/common/columns_hashing_impl.h | 1 + be/src/vec/common/hash_table/hash.h | 6 ++ be/src/vec/common/hash_table/hash_table_utils.h | 25 +++++ be/src/vec/common/hash_table/ph_hash_map.h | 56 ++++++---- be/src/vec/exec/vaggregation_node.cpp | 137 ++++++++++++++++++------ be/src/vec/exec/vaggregation_node.h | 132 +++++++++++++++++++---- 6 files changed, 285 insertions(+), 72 deletions(-) diff --git a/be/src/vec/common/columns_hashing_impl.h b/be/src/vec/common/columns_hashing_impl.h index 8aa1b1e96d..2abfa3b8e8 100644 --- a/be/src/vec/common/columns_hashing_impl.h +++ b/be/src/vec/common/columns_hashing_impl.h @@ -25,6 +25,7 @@ #include "vec/common/aggregation_common.h" #include "vec/common/assert_cast.h" #include "vec/common/hash_table/hash_table_key_holder.h" +#include "vec/common/hash_table/ph_hash_map.h" // #include <Interpreters/AggregationCommon.h> namespace doris::vectorized { diff --git a/be/src/vec/common/hash_table/hash.h b/be/src/vec/common/hash_table/hash.h index fb47657809..ab99563625 100644 --- a/be/src/vec/common/hash_table/hash.h +++ b/be/src/vec/common/hash_table/hash.h @@ -22,6 +22,7 @@ #include <type_traits> +#include "parallel_hashmap/phmap_utils.h" #include "vec/common/uint128.h" #include "vec/core/types.h" @@ -140,6 +141,11 @@ DEFINE_HASH(doris::vectorized::Float64) #undef DEFINE_HASH +template <typename Key, typename Hash = HashCRC32<Key>> +struct HashMixWrapper { + size_t operator()(Key key) const { return phmap::phmap_mix<sizeof(size_t)>()(Hash()(key)); } +}; + template <> struct HashCRC32<doris::vectorized::UInt256> { size_t operator()(const doris::vectorized::UInt256& x) const { diff --git a/be/src/vec/common/hash_table/hash_table_utils.h b/be/src/vec/common/hash_table/hash_table_utils.h new file mode 100644 index 0000000000..e437f07099 --- /dev/null +++ b/be/src/vec/common/hash_table/hash_table_utils.h @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/HashTable/HashTable.h +// and modified by Doris + +template <typename T> +struct HashTableTraits { + static constexpr bool is_phmap = false; + static constexpr bool is_parallel_phmap = false; +}; diff --git a/be/src/vec/common/hash_table/ph_hash_map.h b/be/src/vec/common/hash_table/ph_hash_map.h index 2883f58f17..5f06f6a09f 100644 --- a/be/src/vec/common/hash_table/ph_hash_map.h +++ b/be/src/vec/common/hash_table/ph_hash_map.h @@ -19,26 +19,28 @@ #include <parallel_hashmap/phmap.h> -ALWAYS_INLINE inline char** lookup_result_get_mapped(std::pair<const StringRef, char*>* it) { +#include "vec/common/hash_table/hash.h" +#include "vec/common/hash_table/hash_table_utils.h" + +template <typename Key, typename Mapped> +ALWAYS_INLINE inline auto lookup_result_get_mapped(std::pair<const Key, Mapped>* it) { return &(it->second); } -template <typename Map> -struct IsPhmapTraits { - constexpr static bool value = false; -}; - -template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>> +template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>, + bool use_parallel = false> class PHHashMap : private boost::noncopyable { public: using Self = PHHashMap; - using HashMapImpl = phmap::flat_hash_map<Key, Mapped, Hash>; + using HashMapImpl = + std::conditional_t<use_parallel, phmap::parallel_flat_hash_map<Key, Mapped, Hash>, + phmap::flat_hash_map<Key, Mapped, Hash>>; using key_type = Key; using mapped_type = Mapped; - using value_type = Key; + using value_type = std::pair<const Key, Mapped>; - using LookupResult = typename HashMapImpl::value_type*; + using LookupResult = std::pair<const Key, Mapped>*; using const_iterator_impl = typename HashMapImpl::const_iterator; using iterator_impl = typename HashMapImpl::iterator; @@ -121,17 +123,30 @@ public: bool& inserted) { const auto& key = key_holder_get_key(key_holder); inserted = false; - auto it_ = _hash_map.lazy_emplace_with_hash(key, hash_value, [&](const auto& ctor) { - inserted = true; - key_holder_persist_key(key_holder); - ctor(key, nullptr); - }); - it = &*it_; + if constexpr (use_parallel) { + auto it_ = _hash_map.lazy_emplace_with_hash(hash_value, key, [&](const auto& ctor) { + inserted = true; + key_holder_persist_key(key_holder); + ctor(key, nullptr); + }); + it = &*it_; + } else { + auto it_ = _hash_map.lazy_emplace_with_hash(key, hash_value, [&](const auto& ctor) { + inserted = true; + key_holder_persist_key(key_holder); + ctor(key, nullptr); + }); + it = &*it_; + } } size_t hash(const Key& x) const { return _hash_map.hash(x); } - void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) { _hash_map.prefetch_hash(hash_value); } + void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) { + if constexpr (!use_parallel) _hash_map.prefetch_hash(hash_value); + } + + void ALWAYS_INLINE prefetch_by_key(Key key) { _hash_map.prefetch(key); } /// Call func(const Key &, Mapped &) for each hash map element. template <typename Func> @@ -164,7 +179,8 @@ public: HashMapImpl _hash_map; }; -template <typename Key, typename Mapped, typename Hash> -struct IsPhmapTraits<PHHashMap<Key, Mapped, Hash>> { - constexpr static bool value = true; +template <typename Key, typename Mapped, typename Hash, bool use_parallel> +struct HashTableTraits<PHHashMap<Key, Mapped, Hash, use_parallel>> { + static constexpr bool is_phmap = true; + static constexpr bool is_parallel_phmap = use_parallel; }; diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 3301a7ed32..e7008957eb 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -142,16 +142,28 @@ void AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs) case TYPE_INT: case TYPE_FLOAT: case TYPE_DATEV2: - _agg_data.init(AggregatedDataVariants::Type::int32_key, is_nullable); + if (_is_merge) + _agg_data.init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable); + else + _agg_data.init(AggregatedDataVariants::Type::int32_key, is_nullable); return; case TYPE_BIGINT: case TYPE_DOUBLE: case TYPE_DATE: case TYPE_DATETIME: case TYPE_DATETIMEV2: - _agg_data.init(AggregatedDataVariants::Type::int64_key, is_nullable); + if (_is_merge) + _agg_data.init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable); + else + _agg_data.init(AggregatedDataVariants::Type::int64_key, is_nullable); return; - case TYPE_LARGEINT: + case TYPE_LARGEINT: { + if (_is_merge) + _agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable); + else + _agg_data.init(AggregatedDataVariants::Type::int128_key, is_nullable); + return; + } case TYPE_DECIMALV2: case TYPE_DECIMAL32: case TYPE_DECIMAL64: @@ -163,11 +175,20 @@ void AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs) : type_ptr->get_type_id(); WhichDataType which(idx); if (which.is_decimal32()) { - _agg_data.init(AggregatedDataVariants::Type::int32_key, is_nullable); + if (_is_merge) + _agg_data.init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable); + else + _agg_data.init(AggregatedDataVariants::Type::int32_key, is_nullable); } else if (which.is_decimal64()) { - _agg_data.init(AggregatedDataVariants::Type::int64_key, is_nullable); + if (_is_merge) + _agg_data.init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable); + else + _agg_data.init(AggregatedDataVariants::Type::int64_key, is_nullable); } else { - _agg_data.init(AggregatedDataVariants::Type::int128_key, is_nullable); + if (_is_merge) + _agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable); + else + _agg_data.init(AggregatedDataVariants::Type::int128_key, is_nullable); } return; } @@ -208,20 +229,38 @@ void AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs) if (use_fixed_key) { if (has_null) { if (std::tuple_size<KeysNullMap<UInt64>>::value + key_byte_size <= sizeof(UInt64)) { - _agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null); + if (_is_merge) + _agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null); + else + _agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null); } else if (std::tuple_size<KeysNullMap<UInt128>>::value + key_byte_size <= sizeof(UInt128)) { - _agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null); + if (_is_merge) + _agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null); + else + _agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null); } else { - _agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null); + if (_is_merge) + _agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null); + else + _agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null); } } else { if (key_byte_size <= sizeof(UInt64)) { - _agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null); + if (_is_merge) + _agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null); + else + _agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null); } else if (key_byte_size <= sizeof(UInt128)) { - _agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null); + if (_is_merge) + _agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null); + else + _agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null); } else { - _agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null); + if (_is_merge) + _agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null); + else + _agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null); } } } else { @@ -794,10 +833,18 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i std::vector<size_t> hash_values; - if constexpr (IsPhmapTraits<HashTableType>::value) { + if constexpr (HashTableTraits<HashTableType>::is_phmap) { if (hash_values.size() < rows) hash_values.resize(rows); - for (size_t i = 0; i < rows; ++i) { - hash_values[i] = agg_method.data.hash(agg_method.keys[i]); + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< + AggState>::value) { + for (size_t i = 0; i < rows; ++i) { + hash_values[i] = agg_method.data.hash(agg_method.keys[i]); + } + } else { + for (size_t i = 0; i < rows; ++i) { + hash_values[i] = agg_method.data.hash( + state.get_key_holder(i, _agg_arena_pool)); + } } } @@ -806,10 +853,15 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i AggregateDataPtr aggregate_data = nullptr; auto emplace_result = [&]() { - if constexpr (IsPhmapTraits<HashTableType>::value) { + if constexpr (HashTableTraits<HashTableType>::is_phmap) { if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) { - agg_method.data.prefetch_by_hash( - hash_values[i + HASH_MAP_PREFETCH_DIST]); + if constexpr (HashTableTraits< + HashTableType>::is_parallel_phmap) { + agg_method.data.prefetch_by_key(state.get_key_holder( + i + HASH_MAP_PREFETCH_DIST, _agg_arena_pool)); + } else + agg_method.data.prefetch_by_hash( + hash_values[i + HASH_MAP_PREFETCH_DIST]); } return state.emplace_key(agg_method.data, hash_values[i], i, @@ -879,10 +931,18 @@ Status AggregationNode::_execute_with_serialized_key(Block* block) { std::vector<size_t> hash_values; - if constexpr (IsPhmapTraits<HashTableType>::value) { + if constexpr (HashTableTraits<HashTableType>::is_phmap) { if (hash_values.size() < rows) hash_values.resize(rows); - for (size_t i = 0; i < rows; ++i) { - hash_values[i] = agg_method.data.hash(agg_method.keys[i]); + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< + AggState>::value) { + for (size_t i = 0; i < rows; ++i) { + hash_values[i] = agg_method.data.hash(agg_method.keys[i]); + } + } else { + for (size_t i = 0; i < rows; ++i) { + hash_values[i] = + agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool)); + } } } @@ -891,10 +951,14 @@ Status AggregationNode::_execute_with_serialized_key(Block* block) { AggregateDataPtr aggregate_data = nullptr; auto emplace_result = [&]() { - if constexpr (IsPhmapTraits<HashTableType>::value) { + if constexpr (HashTableTraits<HashTableType>::is_phmap) { if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) { - agg_method.data.prefetch_by_hash( - hash_values[i + HASH_MAP_PREFETCH_DIST]); + if constexpr (HashTableTraits<HashTableType>::is_parallel_phmap) { + agg_method.data.prefetch_by_key(state.get_key_holder( + i + HASH_MAP_PREFETCH_DIST, _agg_arena_pool)); + } else + agg_method.data.prefetch_by_hash( + hash_values[i + HASH_MAP_PREFETCH_DIST]); } return state.emplace_key(agg_method.data, hash_values[i], i, @@ -1141,10 +1205,18 @@ Status AggregationNode::_merge_with_serialized_key(Block* block) { std::vector<size_t> hash_values; - if constexpr (IsPhmapTraits<HashTableType>::value) { + if constexpr (HashTableTraits<HashTableType>::is_phmap) { if (hash_values.size() < rows) hash_values.resize(rows); - for (size_t i = 0; i < rows; ++i) { - hash_values[i] = agg_method.data.hash(agg_method.keys[i]); + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< + AggState>::value) { + for (size_t i = 0; i < rows; ++i) { + hash_values[i] = agg_method.data.hash(agg_method.keys[i]); + } + } else { + for (size_t i = 0; i < rows; ++i) { + hash_values[i] = + agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool)); + } } } @@ -1153,12 +1225,15 @@ Status AggregationNode::_merge_with_serialized_key(Block* block) { AggregateDataPtr aggregate_data = nullptr; auto emplace_result = [&]() { - if constexpr (IsPhmapTraits<HashTableType>::value) { + if constexpr (HashTableTraits<HashTableType>::is_phmap) { if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) { - agg_method.data.prefetch_by_hash( - hash_values[i + HASH_MAP_PREFETCH_DIST]); + if constexpr (HashTableTraits<HashTableType>::is_parallel_phmap) { + agg_method.data.prefetch_by_key(state.get_key_holder( + i + HASH_MAP_PREFETCH_DIST, _agg_arena_pool)); + } else + agg_method.data.prefetch_by_hash( + hash_values[i + HASH_MAP_PREFETCH_DIST]); } - return state.emplace_key(agg_method.data, hash_values[i], i, _agg_arena_pool); } else { diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index fd6b67f255..53eecb025f 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -157,7 +157,7 @@ struct AggregationMethodStringNoCache { /// For the case where there is one numeric key. /// FieldType is UInt8/16/32/64 for any type with corresponding bit width. -template <typename FieldType, typename TData, bool consecutive_keys_optimization = true> +template <typename FieldType, typename TData, bool consecutive_keys_optimization = false> struct AggregationMethodOneNumber { using Data = TData; using Key = typename Data::key_type; @@ -247,7 +247,7 @@ struct AggregationMethodKeysFixed { AggregationMethodKeysFixed(const Other& other) : data(other.data) {} using State = ColumnsHashing::HashMethodKeysFixed<typename Data::value_type, Key, Mapped, - has_nullable_keys>; + has_nullable_keys, false>; static void insert_key_into_columns(const Key& key, MutableColumns& key_columns, const Sizes& key_sizes) { @@ -356,38 +356,61 @@ struct AggregationMethodSingleNullableColumn : public SingleColumnMethod { using AggregatedDataWithUInt8Key = FixedImplicitZeroHashMapWithCalculatedSize<UInt8, AggregateDataPtr>; using AggregatedDataWithUInt16Key = FixedImplicitZeroHashMap<UInt16, AggregateDataPtr>; -using AggregatedDataWithUInt32Key = HashMap<UInt32, AggregateDataPtr, HashCRC32<UInt32>>; -using AggregatedDataWithUInt64Key = HashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>; -using AggregatedDataWithUInt128Key = HashMap<UInt128, AggregateDataPtr, HashCRC32<UInt128>>; -using AggregatedDataWithUInt256Key = HashMap<UInt256, AggregateDataPtr, HashCRC32<UInt256>>; +using AggregatedDataWithUInt32Key = PHHashMap<UInt32, AggregateDataPtr, HashCRC32<UInt32>>; +using AggregatedDataWithUInt64Key = PHHashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>; +using AggregatedDataWithUInt128Key = PHHashMap<UInt128, AggregateDataPtr, HashCRC32<UInt128>>; +using AggregatedDataWithUInt256Key = PHHashMap<UInt256, AggregateDataPtr, HashCRC32<UInt256>>; +using AggregatedDataWithUInt32KeyPhase2 = + PHHashMap<UInt32, AggregateDataPtr, HashMixWrapper<UInt32>>; +using AggregatedDataWithUInt64KeyPhase2 = + PHHashMap<UInt64, AggregateDataPtr, HashMixWrapper<UInt64>>; +using AggregatedDataWithUInt128KeyPhase2 = + PHHashMap<UInt128, AggregateDataPtr, HashMixWrapper<UInt128>>; +using AggregatedDataWithUInt256KeyPhase2 = + PHHashMap<UInt256, AggregateDataPtr, HashMixWrapper<UInt256>>; using AggregatedDataWithNullableUInt8Key = AggregationDataWithNullKey<AggregatedDataWithUInt8Key>; using AggregatedDataWithNullableUInt16Key = AggregationDataWithNullKey<AggregatedDataWithUInt16Key>; using AggregatedDataWithNullableUInt32Key = AggregationDataWithNullKey<AggregatedDataWithUInt32Key>; using AggregatedDataWithNullableUInt64Key = AggregationDataWithNullKey<AggregatedDataWithUInt64Key>; +using AggregatedDataWithNullableUInt32KeyPhase2 = + AggregationDataWithNullKey<AggregatedDataWithUInt32KeyPhase2>; +using AggregatedDataWithNullableUInt64KeyPhase2 = + AggregationDataWithNullKey<AggregatedDataWithUInt64KeyPhase2>; using AggregatedDataWithNullableShortStringKey = AggregationDataWithNullKey<AggregatedDataWithShortStringKey>; using AggregatedDataWithNullableUInt128Key = AggregationDataWithNullKey<AggregatedDataWithUInt128Key>; +using AggregatedDataWithNullableUInt128KeyPhase2 = + AggregationDataWithNullKey<AggregatedDataWithUInt128KeyPhase2>; using AggregatedMethodVariants = std::variant< AggregationMethodSerialized<AggregatedDataWithStringKey>, - AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key, false>, - AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key, false>, + AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key>, + AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key>, AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt32Key>, AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>, AggregationMethodStringNoCache<AggregatedDataWithShortStringKey>, AggregationMethodOneNumber<UInt128, AggregatedDataWithUInt128Key>, + AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt32KeyPhase2>, + AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyPhase2>, + AggregationMethodOneNumber<UInt128, AggregatedDataWithUInt128KeyPhase2>, AggregationMethodSingleNullableColumn< - AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false>>, + AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key>>, AggregationMethodSingleNullableColumn< - AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false>>, + AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key>>, AggregationMethodSingleNullableColumn< AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt32Key>>, AggregationMethodSingleNullableColumn< AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>>, + AggregationMethodSingleNullableColumn< + AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt32KeyPhase2>>, + AggregationMethodSingleNullableColumn< + AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyPhase2>>, AggregationMethodSingleNullableColumn< AggregationMethodOneNumber<UInt128, AggregatedDataWithNullableUInt128Key>>, + AggregationMethodSingleNullableColumn< + AggregationMethodOneNumber<UInt128, AggregatedDataWithNullableUInt128KeyPhase2>>, AggregationMethodSingleNullableColumn< AggregationMethodStringNoCache<AggregatedDataWithNullableShortStringKey>>, AggregationMethodKeysFixed<AggregatedDataWithUInt64Key, false>, @@ -395,7 +418,13 @@ using AggregatedMethodVariants = std::variant< AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, false>, AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, true>, AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, false>, - AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, true>>; + AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, true>, + AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, false>, + AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, true>, + AggregationMethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, false>, + AggregationMethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, true>, + AggregationMethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, false>, + AggregationMethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, true>>; struct AggregatedDataVariants { AggregatedDataVariants() = default; @@ -412,11 +441,17 @@ struct AggregatedDataVariants { int8_key, int16_key, int32_key, + int32_key_phase2, int64_key, + int64_key_phase2, int128_key, + int128_key_phase2, int64_keys, + int64_keys_phase2, int128_keys, + int128_keys_phase2, int256_keys, + int256_keys_phase2, string_key, }; @@ -433,22 +468,20 @@ struct AggregatedDataVariants { break; case Type::int8_key: if (is_nullable) { - _aggregated_method_variant - .emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber< - UInt8, AggregatedDataWithNullableUInt8Key, false>>>(); + _aggregated_method_variant.emplace<AggregationMethodSingleNullableColumn< + AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key>>>(); } else { - _aggregated_method_variant.emplace< - AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key, false>>(); + _aggregated_method_variant + .emplace<AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key>>(); } break; case Type::int16_key: if (is_nullable) { - _aggregated_method_variant - .emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber< - UInt16, AggregatedDataWithNullableUInt16Key, false>>>(); + _aggregated_method_variant.emplace<AggregationMethodSingleNullableColumn< + AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key>>>(); } else { - _aggregated_method_variant.emplace< - AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key, false>>(); + _aggregated_method_variant + .emplace<AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key>>(); } break; case Type::int32_key: @@ -460,6 +493,16 @@ struct AggregatedDataVariants { .emplace<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt32Key>>(); } break; + case Type::int32_key_phase2: + if (is_nullable) { + _aggregated_method_variant + .emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber< + UInt32, AggregatedDataWithNullableUInt32KeyPhase2>>>(); + } else { + _aggregated_method_variant.emplace< + AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt32KeyPhase2>>(); + } + break; case Type::int64_key: if (is_nullable) { _aggregated_method_variant.emplace<AggregationMethodSingleNullableColumn< @@ -469,6 +512,16 @@ struct AggregatedDataVariants { .emplace<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>>(); } break; + case Type::int64_key_phase2: + if (is_nullable) { + _aggregated_method_variant + .emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber< + UInt64, AggregatedDataWithNullableUInt64KeyPhase2>>>(); + } else { + _aggregated_method_variant.emplace< + AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyPhase2>>(); + } + break; case Type::int128_key: if (is_nullable) { _aggregated_method_variant @@ -479,6 +532,16 @@ struct AggregatedDataVariants { AggregationMethodOneNumber<UInt128, AggregatedDataWithUInt128Key>>(); } break; + case Type::int128_key_phase2: + if (is_nullable) { + _aggregated_method_variant + .emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber< + UInt128, AggregatedDataWithNullableUInt128KeyPhase2>>>(); + } else { + _aggregated_method_variant.emplace< + AggregationMethodOneNumber<UInt128, AggregatedDataWithUInt128KeyPhase2>>(); + } + break; case Type::int64_keys: if (is_nullable) { _aggregated_method_variant @@ -488,6 +551,15 @@ struct AggregatedDataVariants { .emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt64Key, false>>(); } break; + case Type::int64_keys_phase2: + if (is_nullable) { + _aggregated_method_variant.emplace< + AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, true>>(); + } else { + _aggregated_method_variant.emplace< + AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, false>>(); + } + break; case Type::int128_keys: if (is_nullable) { _aggregated_method_variant @@ -497,6 +569,15 @@ struct AggregatedDataVariants { .emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, false>>(); } break; + case Type::int128_keys_phase2: + if (is_nullable) { + _aggregated_method_variant.emplace< + AggregationMethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, true>>(); + } else { + _aggregated_method_variant.emplace< + AggregationMethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, false>>(); + } + break; case Type::int256_keys: if (is_nullable) { _aggregated_method_variant @@ -506,6 +587,15 @@ struct AggregatedDataVariants { .emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, false>>(); } break; + case Type::int256_keys_phase2: + if (is_nullable) { + _aggregated_method_variant.emplace< + AggregationMethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, true>>(); + } else { + _aggregated_method_variant.emplace< + AggregationMethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, false>>(); + } + break; case Type::string_key: if (is_nullable) { _aggregated_method_variant.emplace< --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org