This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 8e4374b7ec [enhancement](agg)remove unnessasery mem alloc and dealloc in agg node (#12535) 8e4374b7ec is described below commit 8e4374b7ec873585a170d333502eb17450695526 Author: starocean999 <40539150+starocean...@users.noreply.github.com> AuthorDate: Thu Sep 15 11:07:06 2022 +0800 [enhancement](agg)remove unnessasery mem alloc and dealloc in agg node (#12535) --- be/src/vec/exec/vaggregation_node.cpp | 52 +++++++++++++++-------------- be/src/vec/exec/vaggregation_node.h | 63 ++++++++++++++++++++++------------- 2 files changed, 66 insertions(+), 49 deletions(-) diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 71f06deb24..a403b4edb4 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -772,18 +772,16 @@ void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); - std::vector<size_t> hash_values; - if constexpr (HashTableTraits<HashTableType>::is_phmap) { - if (hash_values.size() < num_rows) hash_values.resize(num_rows); + if (_hash_values.size() < num_rows) _hash_values.resize(num_rows); if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< AggState>::value) { for (size_t i = 0; i < num_rows; ++i) { - hash_values[i] = agg_method.data.hash(agg_method.keys[i]); + _hash_values[i] = agg_method.data.hash(agg_method.keys[i]); } } else { for (size_t i = 0; i < num_rows; ++i) { - hash_values[i] = + _hash_values[i] = agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool)); } } @@ -802,10 +800,10 @@ void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR i + HASH_MAP_PREFETCH_DIST, _agg_arena_pool)); } else agg_method.data.prefetch_by_hash( - hash_values[i + HASH_MAP_PREFETCH_DIST]); + _hash_values[i + HASH_MAP_PREFETCH_DIST]); } - return state.emplace_key(agg_method.data, hash_values[i], i, + return state.emplace_key(agg_method.data, _hash_values[i], i, _agg_arena_pool); } else { return state.emplace_key(agg_method.data, i, _agg_arena_pool); @@ -843,18 +841,16 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr _pre_serialize_key_if_need(state, agg_method, key_columns, rows); - std::vector<size_t> hash_values; - if constexpr (HashTableTraits<HashTableType>::is_phmap) { - if (hash_values.size() < rows) hash_values.resize(rows); + if (_hash_values.size() < rows) _hash_values.resize(rows); if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< AggState>::value) { for (size_t i = 0; i < rows; ++i) { - hash_values[i] = agg_method.data.hash(agg_method.keys[i]); + _hash_values[i] = agg_method.data.hash(agg_method.keys[i]); } } else { for (size_t i = 0; i < rows; ++i) { - hash_values[i] = + _hash_values[i] = agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool)); } } @@ -870,10 +866,10 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr i + HASH_MAP_PREFETCH_DIST, _agg_arena_pool)); } else agg_method.data.prefetch_by_hash( - hash_values[i + HASH_MAP_PREFETCH_DIST]); + _hash_values[i + HASH_MAP_PREFETCH_DIST]); } - return state.find_key(agg_method.data, hash_values[i], i, + return state.find_key(agg_method.data, _hash_values[i], i, _agg_arena_pool); } else { return state.find_key(agg_method.data, i, _agg_arena_pool); @@ -909,7 +905,9 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i } int rows = in_block->rows(); - PODArray<AggregateDataPtr> places(rows); + if (_places.size() < rows) { + _places.resize(rows); + } // Stop expanding hash tables if we're not reducing the input sufficiently. As our // hash tables expand out of each level of cache hierarchy, every hash table lookup @@ -1006,11 +1004,11 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i _agg_data._aggregated_method_variant); if (!ret_flag) { - _emplace_into_hash_table(places.data(), key_columns, rows); + _emplace_into_hash_table(_places.data(), key_columns, rows); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { _aggregate_evaluators[i]->execute_batch_add(in_block, _offsets_of_aggregate_states[i], - places.data(), &_agg_arena_pool, + _places.data(), &_agg_arena_pool, _should_expand_hash_table); } } @@ -1058,12 +1056,14 @@ Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo const auto size = std::min(data.size(), size_t(state->batch_size())); using KeyType = std::decay_t<decltype(iter->get_first())>; std::vector<KeyType> keys(size); - std::vector<AggregateDataPtr> values(size); + if (_values.size() < size) { + _values.resize(size); + } size_t num_rows = 0; while (iter != data.end() && num_rows < state->batch_size()) { keys[num_rows] = iter->get_first(); - values[num_rows] = iter->get_second(); + _values[num_rows] = iter->get_second(); ++iter; ++num_rows; } @@ -1072,7 +1072,7 @@ Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { _aggregate_evaluators[i]->insert_result_info_vec( - values, _offsets_of_aggregate_states[i], value_columns[i].get(), + _values, _offsets_of_aggregate_states[i], value_columns[i].get(), num_rows); } @@ -1142,12 +1142,14 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat const auto size = std::min(data.size(), size_t(state->batch_size())); using KeyType = std::decay_t<decltype(iter->get_first())>; std::vector<KeyType> keys(size); - std::vector<AggregateDataPtr> values(size + 1); + if (_values.size() < size + 1) { + _values.resize(size + 1); + } size_t num_rows = 0; while (iter != data.end() && num_rows < state->batch_size()) { keys[num_rows] = iter->get_first(); - values[num_rows] = iter->get_second(); + _values[num_rows] = iter->get_second(); ++iter; ++num_rows; } @@ -1160,7 +1162,7 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat DCHECK(key_columns[0]->is_nullable()); if (agg_method.data.has_null_key_data()) { key_columns[0]->insert_data(nullptr, 0); - values[num_rows] = agg_method.data.get_null_key_data(); + _values[num_rows] = agg_method.data.get_null_key_data(); ++num_rows; *eos = true; } @@ -1183,7 +1185,7 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat _aggregate_evaluators[i]->function()->create_serialize_column(); } _aggregate_evaluators[i]->function()->serialize_to_column( - values, _offsets_of_aggregate_states[i], value_columns[i], + _values, _offsets_of_aggregate_states[i], value_columns[i], num_rows); } } else { @@ -1204,7 +1206,7 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat } for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { _aggregate_evaluators[i]->function()->serialize_vec( - values, _offsets_of_aggregate_states[i], value_buffer_writers[i], + _values, _offsets_of_aggregate_states[i], value_buffer_writers[i], num_rows); } } diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index ddacf96777..d05e6b06ef 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -686,6 +686,11 @@ private: bool _should_limit_output = false; bool _reach_limit = false; + PODArray<AggregateDataPtr> _places; + std::vector<char> _deserialize_buffer; + std::vector<size_t> _hash_values; + std::vector<AggregateDataPtr> _values; + private: /// Return true if we should keep expanding hash tables in the preagg. If false, /// the preagg should pass through any rows it can't fit in its tables. @@ -744,21 +749,23 @@ private: } int rows = block->rows(); - PODArray<AggregateDataPtr> places(rows); + if (_places.size() < rows) { + _places.resize(rows); + } if constexpr (limit) { - _find_in_hash_table(places.data(), key_columns, rows); + _find_in_hash_table(_places.data(), key_columns, rows); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { _aggregate_evaluators[i]->execute_batch_add_selected( - block, _offsets_of_aggregate_states[i], places.data(), &_agg_arena_pool); + block, _offsets_of_aggregate_states[i], _places.data(), &_agg_arena_pool); } } else { - _emplace_into_hash_table(places.data(), key_columns, rows); + _emplace_into_hash_table(_places.data(), key_columns, rows); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { _aggregate_evaluators[i]->execute_batch_add(block, _offsets_of_aggregate_states[i], - places.data(), &_agg_arena_pool); + _places.data(), &_agg_arena_pool); } if (_should_limit_output) { @@ -794,10 +801,12 @@ private: } int rows = block->rows(); - PODArray<AggregateDataPtr> places(rows); + if (_places.size() < rows) { + _places.resize(rows); + } if constexpr (limit) { - _find_in_hash_table(places.data(), key_columns, rows); + _find_in_hash_table(_places.data(), key_columns, rows); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { if (_aggregate_evaluators[i]->is_merge()) { @@ -807,34 +816,37 @@ private: column = ((ColumnNullable*)column.get())->get_nested_column_ptr(); } - std::unique_ptr<char[]> deserialize_buffer( - new char[_aggregate_evaluators[i]->function()->size_of_data() * rows]); + size_t buffer_size = + _aggregate_evaluators[i]->function()->size_of_data() * rows; + if (_deserialize_buffer.size() < buffer_size) { + _deserialize_buffer.resize(buffer_size); + } if (_use_fixed_length_serialization_opt) { SCOPED_TIMER(_deserialize_data_timer); _aggregate_evaluators[i]->function()->deserialize_from_column( - deserialize_buffer.get(), *column, &_agg_arena_pool, rows); + _deserialize_buffer.data(), *column, &_agg_arena_pool, rows); } else { SCOPED_TIMER(_deserialize_data_timer); _aggregate_evaluators[i]->function()->deserialize_vec( - deserialize_buffer.get(), (ColumnString*)(column.get()), + _deserialize_buffer.data(), (ColumnString*)(column.get()), &_agg_arena_pool, rows); } _aggregate_evaluators[i]->function()->merge_vec_selected( - places.data(), _offsets_of_aggregate_states[i], - deserialize_buffer.get(), &_agg_arena_pool, rows); + _places.data(), _offsets_of_aggregate_states[i], + _deserialize_buffer.data(), &_agg_arena_pool, rows); - _aggregate_evaluators[i]->function()->destroy_vec(deserialize_buffer.get(), + _aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(), rows); } else { _aggregate_evaluators[i]->execute_batch_add_selected( - block, _offsets_of_aggregate_states[i], places.data(), + block, _offsets_of_aggregate_states[i], _places.data(), &_agg_arena_pool); } } } else { - _emplace_into_hash_table(places.data(), key_columns, rows); + _emplace_into_hash_table(_places.data(), key_columns, rows); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { if (_aggregate_evaluators[i]->is_merge()) { @@ -844,30 +856,33 @@ private: column = ((ColumnNullable*)column.get())->get_nested_column_ptr(); } - std::unique_ptr<char[]> deserialize_buffer( - new char[_aggregate_evaluators[i]->function()->size_of_data() * rows]); + size_t buffer_size = + _aggregate_evaluators[i]->function()->size_of_data() * rows; + if (_deserialize_buffer.size() < buffer_size) { + _deserialize_buffer.resize(buffer_size); + } if (_use_fixed_length_serialization_opt) { SCOPED_TIMER(_deserialize_data_timer); _aggregate_evaluators[i]->function()->deserialize_from_column( - deserialize_buffer.get(), *column, &_agg_arena_pool, rows); + _deserialize_buffer.data(), *column, &_agg_arena_pool, rows); } else { SCOPED_TIMER(_deserialize_data_timer); _aggregate_evaluators[i]->function()->deserialize_vec( - deserialize_buffer.get(), (ColumnString*)(column.get()), + _deserialize_buffer.data(), (ColumnString*)(column.get()), &_agg_arena_pool, rows); } _aggregate_evaluators[i]->function()->merge_vec( - places.data(), _offsets_of_aggregate_states[i], - deserialize_buffer.get(), &_agg_arena_pool, rows); + _places.data(), _offsets_of_aggregate_states[i], + _deserialize_buffer.data(), &_agg_arena_pool, rows); - _aggregate_evaluators[i]->function()->destroy_vec(deserialize_buffer.get(), + _aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(), rows); } else { _aggregate_evaluators[i]->execute_batch_add(block, _offsets_of_aggregate_states[i], - places.data(), &_agg_arena_pool); + _places.data(), &_agg_arena_pool); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org