This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e4374b7ec [enhancement](agg)remove unnessasery mem alloc and dealloc 
in agg node (#12535)
8e4374b7ec is described below

commit 8e4374b7ec873585a170d333502eb17450695526
Author: starocean999 <40539150+starocean...@users.noreply.github.com>
AuthorDate: Thu Sep 15 11:07:06 2022 +0800

    [enhancement](agg)remove unnessasery mem alloc and dealloc in agg node 
(#12535)
---
 be/src/vec/exec/vaggregation_node.cpp | 52 +++++++++++++++--------------
 be/src/vec/exec/vaggregation_node.h   | 63 ++++++++++++++++++++++-------------
 2 files changed, 66 insertions(+), 49 deletions(-)

diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index 71f06deb24..a403b4edb4 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -772,18 +772,16 @@ void 
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
 
                 _pre_serialize_key_if_need(state, agg_method, key_columns, 
num_rows);
 
-                std::vector<size_t> hash_values;
-
                 if constexpr (HashTableTraits<HashTableType>::is_phmap) {
-                    if (hash_values.size() < num_rows) 
hash_values.resize(num_rows);
+                    if (_hash_values.size() < num_rows) 
_hash_values.resize(num_rows);
                     if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
                                           AggState>::value) {
                         for (size_t i = 0; i < num_rows; ++i) {
-                            hash_values[i] = 
agg_method.data.hash(agg_method.keys[i]);
+                            _hash_values[i] = 
agg_method.data.hash(agg_method.keys[i]);
                         }
                     } else {
                         for (size_t i = 0; i < num_rows; ++i) {
-                            hash_values[i] =
+                            _hash_values[i] =
                                     
agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool));
                         }
                     }
@@ -802,10 +800,10 @@ void 
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
                                             i + HASH_MAP_PREFETCH_DIST, 
_agg_arena_pool));
                                 } else
                                     agg_method.data.prefetch_by_hash(
-                                            hash_values[i + 
HASH_MAP_PREFETCH_DIST]);
+                                            _hash_values[i + 
HASH_MAP_PREFETCH_DIST]);
                             }
 
-                            return state.emplace_key(agg_method.data, 
hash_values[i], i,
+                            return state.emplace_key(agg_method.data, 
_hash_values[i], i,
                                                      _agg_arena_pool);
                         } else {
                             return state.emplace_key(agg_method.data, i, 
_agg_arena_pool);
@@ -843,18 +841,16 @@ void 
AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr
 
                 _pre_serialize_key_if_need(state, agg_method, key_columns, 
rows);
 
-                std::vector<size_t> hash_values;
-
                 if constexpr (HashTableTraits<HashTableType>::is_phmap) {
-                    if (hash_values.size() < rows) hash_values.resize(rows);
+                    if (_hash_values.size() < rows) _hash_values.resize(rows);
                     if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
                                           AggState>::value) {
                         for (size_t i = 0; i < rows; ++i) {
-                            hash_values[i] = 
agg_method.data.hash(agg_method.keys[i]);
+                            _hash_values[i] = 
agg_method.data.hash(agg_method.keys[i]);
                         }
                     } else {
                         for (size_t i = 0; i < rows; ++i) {
-                            hash_values[i] =
+                            _hash_values[i] =
                                     
agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool));
                         }
                     }
@@ -870,10 +866,10 @@ void 
AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr
                                             i + HASH_MAP_PREFETCH_DIST, 
_agg_arena_pool));
                                 } else
                                     agg_method.data.prefetch_by_hash(
-                                            hash_values[i + 
HASH_MAP_PREFETCH_DIST]);
+                                            _hash_values[i + 
HASH_MAP_PREFETCH_DIST]);
                             }
 
-                            return state.find_key(agg_method.data, 
hash_values[i], i,
+                            return state.find_key(agg_method.data, 
_hash_values[i], i,
                                                   _agg_arena_pool);
                         } else {
                             return state.find_key(agg_method.data, i, 
_agg_arena_pool);
@@ -909,7 +905,9 @@ Status 
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
     }
 
     int rows = in_block->rows();
-    PODArray<AggregateDataPtr> places(rows);
+    if (_places.size() < rows) {
+        _places.resize(rows);
+    }
 
     // Stop expanding hash tables if we're not reducing the input 
sufficiently. As our
     // hash tables expand out of each level of cache hierarchy, every hash 
table lookup
@@ -1006,11 +1004,11 @@ Status 
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
             _agg_data._aggregated_method_variant);
 
     if (!ret_flag) {
-        _emplace_into_hash_table(places.data(), key_columns, rows);
+        _emplace_into_hash_table(_places.data(), key_columns, rows);
 
         for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
             _aggregate_evaluators[i]->execute_batch_add(in_block, 
_offsets_of_aggregate_states[i],
-                                                        places.data(), 
&_agg_arena_pool,
+                                                        _places.data(), 
&_agg_arena_pool,
                                                         
_should_expand_hash_table);
         }
     }
@@ -1058,12 +1056,14 @@ Status 
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
                 const auto size = std::min(data.size(), 
size_t(state->batch_size()));
                 using KeyType = std::decay_t<decltype(iter->get_first())>;
                 std::vector<KeyType> keys(size);
-                std::vector<AggregateDataPtr> values(size);
+                if (_values.size() < size) {
+                    _values.resize(size);
+                }
 
                 size_t num_rows = 0;
                 while (iter != data.end() && num_rows < state->batch_size()) {
                     keys[num_rows] = iter->get_first();
-                    values[num_rows] = iter->get_second();
+                    _values[num_rows] = iter->get_second();
                     ++iter;
                     ++num_rows;
                 }
@@ -1072,7 +1072,7 @@ Status 
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
 
                 for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
                     _aggregate_evaluators[i]->insert_result_info_vec(
-                            values, _offsets_of_aggregate_states[i], 
value_columns[i].get(),
+                            _values, _offsets_of_aggregate_states[i], 
value_columns[i].get(),
                             num_rows);
                 }
 
@@ -1142,12 +1142,14 @@ Status 
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
                 const auto size = std::min(data.size(), 
size_t(state->batch_size()));
                 using KeyType = std::decay_t<decltype(iter->get_first())>;
                 std::vector<KeyType> keys(size);
-                std::vector<AggregateDataPtr> values(size + 1);
+                if (_values.size() < size + 1) {
+                    _values.resize(size + 1);
+                }
 
                 size_t num_rows = 0;
                 while (iter != data.end() && num_rows < state->batch_size()) {
                     keys[num_rows] = iter->get_first();
-                    values[num_rows] = iter->get_second();
+                    _values[num_rows] = iter->get_second();
                     ++iter;
                     ++num_rows;
                 }
@@ -1160,7 +1162,7 @@ Status 
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
                         DCHECK(key_columns[0]->is_nullable());
                         if (agg_method.data.has_null_key_data()) {
                             key_columns[0]->insert_data(nullptr, 0);
-                            values[num_rows] = 
agg_method.data.get_null_key_data();
+                            _values[num_rows] = 
agg_method.data.get_null_key_data();
                             ++num_rows;
                             *eos = true;
                         }
@@ -1183,7 +1185,7 @@ Status 
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
                                     
_aggregate_evaluators[i]->function()->create_serialize_column();
                         }
                         
_aggregate_evaluators[i]->function()->serialize_to_column(
-                                values, _offsets_of_aggregate_states[i], 
value_columns[i],
+                                _values, _offsets_of_aggregate_states[i], 
value_columns[i],
                                 num_rows);
                     }
                 } else {
@@ -1204,7 +1206,7 @@ Status 
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
                     }
                     for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
                         _aggregate_evaluators[i]->function()->serialize_vec(
-                                values, _offsets_of_aggregate_states[i], 
value_buffer_writers[i],
+                                _values, _offsets_of_aggregate_states[i], 
value_buffer_writers[i],
                                 num_rows);
                     }
                 }
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index ddacf96777..d05e6b06ef 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -686,6 +686,11 @@ private:
     bool _should_limit_output = false;
     bool _reach_limit = false;
 
+    PODArray<AggregateDataPtr> _places;
+    std::vector<char> _deserialize_buffer;
+    std::vector<size_t> _hash_values;
+    std::vector<AggregateDataPtr> _values;
+
 private:
     /// Return true if we should keep expanding hash tables in the preagg. If 
false,
     /// the preagg should pass through any rows it can't fit in its tables.
@@ -744,21 +749,23 @@ private:
         }
 
         int rows = block->rows();
-        PODArray<AggregateDataPtr> places(rows);
+        if (_places.size() < rows) {
+            _places.resize(rows);
+        }
 
         if constexpr (limit) {
-            _find_in_hash_table(places.data(), key_columns, rows);
+            _find_in_hash_table(_places.data(), key_columns, rows);
 
             for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
                 _aggregate_evaluators[i]->execute_batch_add_selected(
-                        block, _offsets_of_aggregate_states[i], places.data(), 
&_agg_arena_pool);
+                        block, _offsets_of_aggregate_states[i], 
_places.data(), &_agg_arena_pool);
             }
         } else {
-            _emplace_into_hash_table(places.data(), key_columns, rows);
+            _emplace_into_hash_table(_places.data(), key_columns, rows);
 
             for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
                 _aggregate_evaluators[i]->execute_batch_add(block, 
_offsets_of_aggregate_states[i],
-                                                            places.data(), 
&_agg_arena_pool);
+                                                            _places.data(), 
&_agg_arena_pool);
             }
 
             if (_should_limit_output) {
@@ -794,10 +801,12 @@ private:
         }
 
         int rows = block->rows();
-        PODArray<AggregateDataPtr> places(rows);
+        if (_places.size() < rows) {
+            _places.resize(rows);
+        }
 
         if constexpr (limit) {
-            _find_in_hash_table(places.data(), key_columns, rows);
+            _find_in_hash_table(_places.data(), key_columns, rows);
 
             for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
                 if (_aggregate_evaluators[i]->is_merge()) {
@@ -807,34 +816,37 @@ private:
                         column = 
((ColumnNullable*)column.get())->get_nested_column_ptr();
                     }
 
-                    std::unique_ptr<char[]> deserialize_buffer(
-                            new 
char[_aggregate_evaluators[i]->function()->size_of_data() * rows]);
+                    size_t buffer_size =
+                            
_aggregate_evaluators[i]->function()->size_of_data() * rows;
+                    if (_deserialize_buffer.size() < buffer_size) {
+                        _deserialize_buffer.resize(buffer_size);
+                    }
 
                     if (_use_fixed_length_serialization_opt) {
                         SCOPED_TIMER(_deserialize_data_timer);
                         
_aggregate_evaluators[i]->function()->deserialize_from_column(
-                                deserialize_buffer.get(), *column, 
&_agg_arena_pool, rows);
+                                _deserialize_buffer.data(), *column, 
&_agg_arena_pool, rows);
                     } else {
                         SCOPED_TIMER(_deserialize_data_timer);
                         _aggregate_evaluators[i]->function()->deserialize_vec(
-                                deserialize_buffer.get(), 
(ColumnString*)(column.get()),
+                                _deserialize_buffer.data(), 
(ColumnString*)(column.get()),
                                 &_agg_arena_pool, rows);
                     }
                     _aggregate_evaluators[i]->function()->merge_vec_selected(
-                            places.data(), _offsets_of_aggregate_states[i],
-                            deserialize_buffer.get(), &_agg_arena_pool, rows);
+                            _places.data(), _offsets_of_aggregate_states[i],
+                            _deserialize_buffer.data(), &_agg_arena_pool, 
rows);
 
-                    
_aggregate_evaluators[i]->function()->destroy_vec(deserialize_buffer.get(),
+                    
_aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(),
                                                                       rows);
 
                 } else {
                     _aggregate_evaluators[i]->execute_batch_add_selected(
-                            block, _offsets_of_aggregate_states[i], 
places.data(),
+                            block, _offsets_of_aggregate_states[i], 
_places.data(),
                             &_agg_arena_pool);
                 }
             }
         } else {
-            _emplace_into_hash_table(places.data(), key_columns, rows);
+            _emplace_into_hash_table(_places.data(), key_columns, rows);
 
             for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
                 if (_aggregate_evaluators[i]->is_merge()) {
@@ -844,30 +856,33 @@ private:
                         column = 
((ColumnNullable*)column.get())->get_nested_column_ptr();
                     }
 
-                    std::unique_ptr<char[]> deserialize_buffer(
-                            new 
char[_aggregate_evaluators[i]->function()->size_of_data() * rows]);
+                    size_t buffer_size =
+                            
_aggregate_evaluators[i]->function()->size_of_data() * rows;
+                    if (_deserialize_buffer.size() < buffer_size) {
+                        _deserialize_buffer.resize(buffer_size);
+                    }
 
                     if (_use_fixed_length_serialization_opt) {
                         SCOPED_TIMER(_deserialize_data_timer);
                         
_aggregate_evaluators[i]->function()->deserialize_from_column(
-                                deserialize_buffer.get(), *column, 
&_agg_arena_pool, rows);
+                                _deserialize_buffer.data(), *column, 
&_agg_arena_pool, rows);
                     } else {
                         SCOPED_TIMER(_deserialize_data_timer);
                         _aggregate_evaluators[i]->function()->deserialize_vec(
-                                deserialize_buffer.get(), 
(ColumnString*)(column.get()),
+                                _deserialize_buffer.data(), 
(ColumnString*)(column.get()),
                                 &_agg_arena_pool, rows);
                     }
                     _aggregate_evaluators[i]->function()->merge_vec(
-                            places.data(), _offsets_of_aggregate_states[i],
-                            deserialize_buffer.get(), &_agg_arena_pool, rows);
+                            _places.data(), _offsets_of_aggregate_states[i],
+                            _deserialize_buffer.data(), &_agg_arena_pool, 
rows);
 
-                    
_aggregate_evaluators[i]->function()->destroy_vec(deserialize_buffer.get(),
+                    
_aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(),
                                                                       rows);
 
                 } else {
                     _aggregate_evaluators[i]->execute_batch_add(block,
                                                                 
_offsets_of_aggregate_states[i],
-                                                                places.data(), 
&_agg_arena_pool);
+                                                                
_places.data(), &_agg_arena_pool);
                 }
             }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to