This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 842a5b8e24 [refactor](agg) Abstract the hash operation into a method" 
(#11399)
842a5b8e24 is described below

commit 842a5b8e243f631d02d20cde68f2b6ed2d069567
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Tue Aug 2 17:27:19 2022 +0800

    [refactor](agg) Abstract the hash operation into a method" (#11399)
---
 be/src/vec/exec/vaggregation_node.cpp | 271 +++++++++-------------------------
 be/src/vec/exec/vaggregation_node.h   |   3 +
 2 files changed, 76 insertions(+), 198 deletions(-)

diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index 6de36c6d1e..1209750203 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -706,6 +706,76 @@ bool AggregationNode::_should_expand_preagg_hash_tables() {
             _agg_data._aggregated_method_variant);
 }
 
+void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, 
ColumnRawPtrs& key_columns,
+                                               const size_t num_rows) {
+    std::visit(
+            [&](auto&& agg_method) -> void {
+                using HashMethodType = std::decay_t<decltype(agg_method)>;
+                using HashTableType = std::decay_t<decltype(agg_method.data)>;
+                using AggState = typename HashMethodType::State;
+                AggState state(key_columns, _probe_key_sz, nullptr);
+
+                _pre_serialize_key_if_need(state, agg_method, key_columns, 
num_rows);
+
+                std::vector<size_t> hash_values;
+
+                if constexpr (HashTableTraits<HashTableType>::is_phmap) {
+                    if (hash_values.size() < num_rows) 
hash_values.resize(num_rows);
+                    if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
+                                          AggState>::value) {
+                        for (size_t i = 0; i < num_rows; ++i) {
+                            hash_values[i] = 
agg_method.data.hash(agg_method.keys[i]);
+                        }
+                    } else {
+                        for (size_t i = 0; i < num_rows; ++i) {
+                            hash_values[i] =
+                                    
agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool));
+                        }
+                    }
+                }
+
+                /// For all rows.
+                for (size_t i = 0; i < num_rows; ++i) {
+                    AggregateDataPtr aggregate_data = nullptr;
+
+                    auto emplace_result = [&]() {
+                        if constexpr 
(HashTableTraits<HashTableType>::is_phmap) {
+                            if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) 
{
+                                if constexpr 
(HashTableTraits<HashTableType>::is_parallel_phmap) {
+                                    
agg_method.data.prefetch_by_key(state.get_key_holder(
+                                            i + HASH_MAP_PREFETCH_DIST, 
_agg_arena_pool));
+                                } else
+                                    agg_method.data.prefetch_by_hash(
+                                            hash_values[i + 
HASH_MAP_PREFETCH_DIST]);
+                            }
+
+                            return state.emplace_key(agg_method.data, 
hash_values[i], i,
+                                                     _agg_arena_pool);
+                        } else {
+                            return state.emplace_key(agg_method.data, i, 
_agg_arena_pool);
+                        }
+                    }();
+
+                    /// If a new key is inserted, initialize the states of the 
aggregate functions, and possibly something related to the key.
+                    if (emplace_result.is_inserted()) {
+                        /// exception-safety - if you can not allocate memory 
or create states, then destructors will not be called.
+                        emplace_result.set_mapped(nullptr);
+
+                        aggregate_data = _agg_arena_pool.aligned_alloc(
+                                _total_size_of_aggregate_states, 
_align_aggregate_states);
+                        _create_agg_status(aggregate_data);
+
+                        emplace_result.set_mapped(aggregate_data);
+                    } else
+                        aggregate_data = emplace_result.get_mapped();
+
+                    places[i] = aggregate_data;
+                    assert(places[i] != nullptr);
+                }
+            },
+            _agg_data._aggregated_method_variant);
+}
+
 Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* 
in_block,
                                                      doris::vectorized::Block* 
out_block) {
     SCOPED_TIMER(_build_timer);
@@ -816,73 +886,7 @@ Status 
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
             _agg_data._aggregated_method_variant);
 
     if (!ret_flag) {
-        std::visit(
-                [&](auto&& agg_method) -> void {
-                    using HashMethodType = std::decay_t<decltype(agg_method)>;
-                    using HashTableType = 
std::decay_t<decltype(agg_method.data)>;
-                    using AggState = typename HashMethodType::State;
-                    AggState state(key_columns, _probe_key_sz, nullptr);
-
-                    _pre_serialize_key_if_need(state, agg_method, key_columns, 
rows);
-
-                    std::vector<size_t> hash_values;
-
-                    if constexpr (HashTableTraits<HashTableType>::is_phmap) {
-                        if (hash_values.size() < rows) 
hash_values.resize(rows);
-                        if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
-                                              AggState>::value) {
-                            for (size_t i = 0; i < rows; ++i) {
-                                hash_values[i] = 
agg_method.data.hash(agg_method.keys[i]);
-                            }
-                        } else {
-                            for (size_t i = 0; i < rows; ++i) {
-                                hash_values[i] = agg_method.data.hash(
-                                        state.get_key_holder(i, 
_agg_arena_pool));
-                            }
-                        }
-                    }
-
-                    /// For all rows.
-                    for (size_t i = 0; i < rows; ++i) {
-                        AggregateDataPtr aggregate_data = nullptr;
-
-                        auto emplace_result = [&]() {
-                            if constexpr 
(HashTableTraits<HashTableType>::is_phmap) {
-                                if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) 
{
-                                    if constexpr (HashTableTraits<
-                                                          
HashTableType>::is_parallel_phmap) {
-                                        
agg_method.data.prefetch_by_key(state.get_key_holder(
-                                                i + HASH_MAP_PREFETCH_DIST, 
_agg_arena_pool));
-                                    } else
-                                        agg_method.data.prefetch_by_hash(
-                                                hash_values[i + 
HASH_MAP_PREFETCH_DIST]);
-                                }
-
-                                return state.emplace_key(agg_method.data, 
hash_values[i], i,
-                                                         _agg_arena_pool);
-                            } else {
-                                return state.emplace_key(agg_method.data, i, 
_agg_arena_pool);
-                            }
-                        }();
-
-                        /// If a new key is inserted, initialize the states of 
the aggregate functions, and possibly something related to the key.
-                        if (emplace_result.is_inserted()) {
-                            /// exception-safety - if you can not allocate 
memory or create states, then destructors will not be called.
-                            emplace_result.set_mapped(nullptr);
-
-                            aggregate_data = _agg_arena_pool.aligned_alloc(
-                                    _total_size_of_aggregate_states, 
_align_aggregate_states);
-                            _create_agg_status(aggregate_data);
-
-                            emplace_result.set_mapped(aggregate_data);
-                        } else
-                            aggregate_data = emplace_result.get_mapped();
-
-                        places[i] = aggregate_data;
-                        assert(places[i] != nullptr);
-                    }
-                },
-                _agg_data._aggregated_method_variant);
+        _emplace_into_hash_table(places.data(), key_columns, rows);
 
         for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
             _aggregate_evaluators[i]->execute_batch_add(in_block, 
_offsets_of_aggregate_states[i],
@@ -914,72 +918,7 @@ Status 
AggregationNode::_execute_with_serialized_key(Block* block) {
     int rows = block->rows();
     PODArray<AggregateDataPtr> places(rows);
 
-    std::visit(
-            [&](auto&& agg_method) -> void {
-                using HashMethodType = std::decay_t<decltype(agg_method)>;
-                using HashTableType = std::decay_t<decltype(agg_method.data)>;
-                using AggState = typename HashMethodType::State;
-                AggState state(key_columns, _probe_key_sz, nullptr);
-
-                _pre_serialize_key_if_need(state, agg_method, key_columns, 
rows);
-
-                std::vector<size_t> hash_values;
-
-                if constexpr (HashTableTraits<HashTableType>::is_phmap) {
-                    if (hash_values.size() < rows) hash_values.resize(rows);
-                    if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
-                                          AggState>::value) {
-                        for (size_t i = 0; i < rows; ++i) {
-                            hash_values[i] = 
agg_method.data.hash(agg_method.keys[i]);
-                        }
-                    } else {
-                        for (size_t i = 0; i < rows; ++i) {
-                            hash_values[i] =
-                                    
agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool));
-                        }
-                    }
-                }
-
-                /// For all rows.
-                for (size_t i = 0; i < rows; ++i) {
-                    AggregateDataPtr aggregate_data = nullptr;
-
-                    auto emplace_result = [&]() {
-                        if constexpr 
(HashTableTraits<HashTableType>::is_phmap) {
-                            if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) {
-                                if constexpr 
(HashTableTraits<HashTableType>::is_parallel_phmap) {
-                                    
agg_method.data.prefetch_by_key(state.get_key_holder(
-                                            i + HASH_MAP_PREFETCH_DIST, 
_agg_arena_pool));
-                                } else
-                                    agg_method.data.prefetch_by_hash(
-                                            hash_values[i + 
HASH_MAP_PREFETCH_DIST]);
-                            }
-
-                            return state.emplace_key(agg_method.data, 
hash_values[i], i,
-                                                     _agg_arena_pool);
-                        } else {
-                            return state.emplace_key(agg_method.data, i, 
_agg_arena_pool);
-                        }
-                    }();
-
-                    /// If a new key is inserted, initialize the states of the 
aggregate functions, and possibly something related to the key.
-                    if (emplace_result.is_inserted()) {
-                        /// exception-safety - if you can not allocate memory 
or create states, then destructors will not be called.
-                        emplace_result.set_mapped(nullptr);
-
-                        aggregate_data = _agg_arena_pool.aligned_alloc(
-                                _total_size_of_aggregate_states, 
_align_aggregate_states);
-                        _create_agg_status(aggregate_data);
-
-                        emplace_result.set_mapped(aggregate_data);
-                    } else
-                        aggregate_data = emplace_result.get_mapped();
-
-                    places[i] = aggregate_data;
-                    assert(places[i] != nullptr);
-                }
-            },
-            _agg_data._aggregated_method_variant);
+    _emplace_into_hash_table(places.data(), key_columns, rows);
 
     for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
         _aggregate_evaluators[i]->execute_batch_add(block, 
_offsets_of_aggregate_states[i],
@@ -1188,71 +1127,7 @@ Status 
AggregationNode::_merge_with_serialized_key(Block* block) {
     int rows = block->rows();
     PODArray<AggregateDataPtr> places(rows);
 
-    std::visit(
-            [&](auto&& agg_method) -> void {
-                using HashMethodType = std::decay_t<decltype(agg_method)>;
-                using HashTableType = std::decay_t<decltype(agg_method.data)>;
-                using AggState = typename HashMethodType::State;
-                AggState state(key_columns, _probe_key_sz, nullptr);
-
-                _pre_serialize_key_if_need(state, agg_method, key_columns, 
rows);
-
-                std::vector<size_t> hash_values;
-
-                if constexpr (HashTableTraits<HashTableType>::is_phmap) {
-                    if (hash_values.size() < rows) hash_values.resize(rows);
-                    if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
-                                          AggState>::value) {
-                        for (size_t i = 0; i < rows; ++i) {
-                            hash_values[i] = 
agg_method.data.hash(agg_method.keys[i]);
-                        }
-                    } else {
-                        for (size_t i = 0; i < rows; ++i) {
-                            hash_values[i] =
-                                    
agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool));
-                        }
-                    }
-                }
-
-                /// For all rows.
-                for (size_t i = 0; i < rows; ++i) {
-                    AggregateDataPtr aggregate_data = nullptr;
-
-                    auto emplace_result = [&]() {
-                        if constexpr 
(HashTableTraits<HashTableType>::is_phmap) {
-                            if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) {
-                                if constexpr 
(HashTableTraits<HashTableType>::is_parallel_phmap) {
-                                    
agg_method.data.prefetch_by_key(state.get_key_holder(
-                                            i + HASH_MAP_PREFETCH_DIST, 
_agg_arena_pool));
-                                } else
-                                    agg_method.data.prefetch_by_hash(
-                                            hash_values[i + 
HASH_MAP_PREFETCH_DIST]);
-                            }
-                            return state.emplace_key(agg_method.data, 
hash_values[i], i,
-                                                     _agg_arena_pool);
-                        } else {
-                            return state.emplace_key(agg_method.data, i, 
_agg_arena_pool);
-                        }
-                    }();
-
-                    /// If a new key is inserted, initialize the states of the 
aggregate functions, and possibly something related to the key.
-                    if (emplace_result.is_inserted()) {
-                        /// exception-safety - if you can not allocate memory 
or create states, then destructors will not be called.
-                        emplace_result.set_mapped(nullptr);
-
-                        aggregate_data = _agg_arena_pool.aligned_alloc(
-                                _total_size_of_aggregate_states, 
_align_aggregate_states);
-                        _create_agg_status(aggregate_data);
-
-                        emplace_result.set_mapped(aggregate_data);
-                    } else
-                        aggregate_data = emplace_result.get_mapped();
-
-                    places[i] = aggregate_data;
-                    assert(places[i] != nullptr);
-                }
-            },
-            _agg_data._aggregated_method_variant);
+    _emplace_into_hash_table(places.data(), key_columns, rows);
 
     for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
         DCHECK(_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 &&
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index 478cd563ca..ba7cf24b70 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -710,6 +710,9 @@ private:
         }
     }
 
+    void _emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs& 
key_columns,
+                                  const size_t num_rows);
+
     void release_tracker();
 
     using vectorized_execute = std::function<Status(Block* block)>;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to