This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 842a5b8e24 [refactor](agg) Abstract the hash operation into a method" (#11399) 842a5b8e24 is described below commit 842a5b8e243f631d02d20cde68f2b6ed2d069567 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Tue Aug 2 17:27:19 2022 +0800 [refactor](agg) Abstract the hash operation into a method" (#11399) --- be/src/vec/exec/vaggregation_node.cpp | 271 +++++++++------------------------- be/src/vec/exec/vaggregation_node.h | 3 + 2 files changed, 76 insertions(+), 198 deletions(-) diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 6de36c6d1e..1209750203 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -706,6 +706,76 @@ bool AggregationNode::_should_expand_preagg_hash_tables() { _agg_data._aggregated_method_variant); } +void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns, + const size_t num_rows) { + std::visit( + [&](auto&& agg_method) -> void { + using HashMethodType = std::decay_t<decltype(agg_method)>; + using HashTableType = std::decay_t<decltype(agg_method.data)>; + using AggState = typename HashMethodType::State; + AggState state(key_columns, _probe_key_sz, nullptr); + + _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); + + std::vector<size_t> hash_values; + + if constexpr (HashTableTraits<HashTableType>::is_phmap) { + if (hash_values.size() < num_rows) hash_values.resize(num_rows); + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< + AggState>::value) { + for (size_t i = 0; i < num_rows; ++i) { + hash_values[i] = agg_method.data.hash(agg_method.keys[i]); + } + } else { + for (size_t i = 0; i < num_rows; ++i) { + hash_values[i] = + agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool)); + } + } + } + + /// For all rows. + for (size_t i = 0; i < num_rows; ++i) { + AggregateDataPtr aggregate_data = nullptr; + + auto emplace_result = [&]() { + if constexpr (HashTableTraits<HashTableType>::is_phmap) { + if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) { + if constexpr (HashTableTraits<HashTableType>::is_parallel_phmap) { + agg_method.data.prefetch_by_key(state.get_key_holder( + i + HASH_MAP_PREFETCH_DIST, _agg_arena_pool)); + } else + agg_method.data.prefetch_by_hash( + hash_values[i + HASH_MAP_PREFETCH_DIST]); + } + + return state.emplace_key(agg_method.data, hash_values[i], i, + _agg_arena_pool); + } else { + return state.emplace_key(agg_method.data, i, _agg_arena_pool); + } + }(); + + /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. + if (emplace_result.is_inserted()) { + /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. + emplace_result.set_mapped(nullptr); + + aggregate_data = _agg_arena_pool.aligned_alloc( + _total_size_of_aggregate_states, _align_aggregate_states); + _create_agg_status(aggregate_data); + + emplace_result.set_mapped(aggregate_data); + } else + aggregate_data = emplace_result.get_mapped(); + + places[i] = aggregate_data; + assert(places[i] != nullptr); + } + }, + _agg_data._aggregated_method_variant); +} + Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* in_block, doris::vectorized::Block* out_block) { SCOPED_TIMER(_build_timer); @@ -816,73 +886,7 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i _agg_data._aggregated_method_variant); if (!ret_flag) { - std::visit( - [&](auto&& agg_method) -> void { - using HashMethodType = std::decay_t<decltype(agg_method)>; - using HashTableType = std::decay_t<decltype(agg_method.data)>; - using AggState = typename HashMethodType::State; - AggState state(key_columns, _probe_key_sz, nullptr); - - _pre_serialize_key_if_need(state, agg_method, key_columns, rows); - - std::vector<size_t> hash_values; - - if constexpr (HashTableTraits<HashTableType>::is_phmap) { - if (hash_values.size() < rows) hash_values.resize(rows); - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< - AggState>::value) { - for (size_t i = 0; i < rows; ++i) { - hash_values[i] = agg_method.data.hash(agg_method.keys[i]); - } - } else { - for (size_t i = 0; i < rows; ++i) { - hash_values[i] = agg_method.data.hash( - state.get_key_holder(i, _agg_arena_pool)); - } - } - } - - /// For all rows. - for (size_t i = 0; i < rows; ++i) { - AggregateDataPtr aggregate_data = nullptr; - - auto emplace_result = [&]() { - if constexpr (HashTableTraits<HashTableType>::is_phmap) { - if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) { - if constexpr (HashTableTraits< - HashTableType>::is_parallel_phmap) { - agg_method.data.prefetch_by_key(state.get_key_holder( - i + HASH_MAP_PREFETCH_DIST, _agg_arena_pool)); - } else - agg_method.data.prefetch_by_hash( - hash_values[i + HASH_MAP_PREFETCH_DIST]); - } - - return state.emplace_key(agg_method.data, hash_values[i], i, - _agg_arena_pool); - } else { - return state.emplace_key(agg_method.data, i, _agg_arena_pool); - } - }(); - - /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. - if (emplace_result.is_inserted()) { - /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. - emplace_result.set_mapped(nullptr); - - aggregate_data = _agg_arena_pool.aligned_alloc( - _total_size_of_aggregate_states, _align_aggregate_states); - _create_agg_status(aggregate_data); - - emplace_result.set_mapped(aggregate_data); - } else - aggregate_data = emplace_result.get_mapped(); - - places[i] = aggregate_data; - assert(places[i] != nullptr); - } - }, - _agg_data._aggregated_method_variant); + _emplace_into_hash_table(places.data(), key_columns, rows); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { _aggregate_evaluators[i]->execute_batch_add(in_block, _offsets_of_aggregate_states[i], @@ -914,72 +918,7 @@ Status AggregationNode::_execute_with_serialized_key(Block* block) { int rows = block->rows(); PODArray<AggregateDataPtr> places(rows); - std::visit( - [&](auto&& agg_method) -> void { - using HashMethodType = std::decay_t<decltype(agg_method)>; - using HashTableType = std::decay_t<decltype(agg_method.data)>; - using AggState = typename HashMethodType::State; - AggState state(key_columns, _probe_key_sz, nullptr); - - _pre_serialize_key_if_need(state, agg_method, key_columns, rows); - - std::vector<size_t> hash_values; - - if constexpr (HashTableTraits<HashTableType>::is_phmap) { - if (hash_values.size() < rows) hash_values.resize(rows); - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< - AggState>::value) { - for (size_t i = 0; i < rows; ++i) { - hash_values[i] = agg_method.data.hash(agg_method.keys[i]); - } - } else { - for (size_t i = 0; i < rows; ++i) { - hash_values[i] = - agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool)); - } - } - } - - /// For all rows. - for (size_t i = 0; i < rows; ++i) { - AggregateDataPtr aggregate_data = nullptr; - - auto emplace_result = [&]() { - if constexpr (HashTableTraits<HashTableType>::is_phmap) { - if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) { - if constexpr (HashTableTraits<HashTableType>::is_parallel_phmap) { - agg_method.data.prefetch_by_key(state.get_key_holder( - i + HASH_MAP_PREFETCH_DIST, _agg_arena_pool)); - } else - agg_method.data.prefetch_by_hash( - hash_values[i + HASH_MAP_PREFETCH_DIST]); - } - - return state.emplace_key(agg_method.data, hash_values[i], i, - _agg_arena_pool); - } else { - return state.emplace_key(agg_method.data, i, _agg_arena_pool); - } - }(); - - /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. - if (emplace_result.is_inserted()) { - /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. - emplace_result.set_mapped(nullptr); - - aggregate_data = _agg_arena_pool.aligned_alloc( - _total_size_of_aggregate_states, _align_aggregate_states); - _create_agg_status(aggregate_data); - - emplace_result.set_mapped(aggregate_data); - } else - aggregate_data = emplace_result.get_mapped(); - - places[i] = aggregate_data; - assert(places[i] != nullptr); - } - }, - _agg_data._aggregated_method_variant); + _emplace_into_hash_table(places.data(), key_columns, rows); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { _aggregate_evaluators[i]->execute_batch_add(block, _offsets_of_aggregate_states[i], @@ -1188,71 +1127,7 @@ Status AggregationNode::_merge_with_serialized_key(Block* block) { int rows = block->rows(); PODArray<AggregateDataPtr> places(rows); - std::visit( - [&](auto&& agg_method) -> void { - using HashMethodType = std::decay_t<decltype(agg_method)>; - using HashTableType = std::decay_t<decltype(agg_method.data)>; - using AggState = typename HashMethodType::State; - AggState state(key_columns, _probe_key_sz, nullptr); - - _pre_serialize_key_if_need(state, agg_method, key_columns, rows); - - std::vector<size_t> hash_values; - - if constexpr (HashTableTraits<HashTableType>::is_phmap) { - if (hash_values.size() < rows) hash_values.resize(rows); - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< - AggState>::value) { - for (size_t i = 0; i < rows; ++i) { - hash_values[i] = agg_method.data.hash(agg_method.keys[i]); - } - } else { - for (size_t i = 0; i < rows; ++i) { - hash_values[i] = - agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool)); - } - } - } - - /// For all rows. - for (size_t i = 0; i < rows; ++i) { - AggregateDataPtr aggregate_data = nullptr; - - auto emplace_result = [&]() { - if constexpr (HashTableTraits<HashTableType>::is_phmap) { - if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) { - if constexpr (HashTableTraits<HashTableType>::is_parallel_phmap) { - agg_method.data.prefetch_by_key(state.get_key_holder( - i + HASH_MAP_PREFETCH_DIST, _agg_arena_pool)); - } else - agg_method.data.prefetch_by_hash( - hash_values[i + HASH_MAP_PREFETCH_DIST]); - } - return state.emplace_key(agg_method.data, hash_values[i], i, - _agg_arena_pool); - } else { - return state.emplace_key(agg_method.data, i, _agg_arena_pool); - } - }(); - - /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. - if (emplace_result.is_inserted()) { - /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. - emplace_result.set_mapped(nullptr); - - aggregate_data = _agg_arena_pool.aligned_alloc( - _total_size_of_aggregate_states, _align_aggregate_states); - _create_agg_status(aggregate_data); - - emplace_result.set_mapped(aggregate_data); - } else - aggregate_data = emplace_result.get_mapped(); - - places[i] = aggregate_data; - assert(places[i] != nullptr); - } - }, - _agg_data._aggregated_method_variant); + _emplace_into_hash_table(places.data(), key_columns, rows); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { DCHECK(_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 && diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index 478cd563ca..ba7cf24b70 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -710,6 +710,9 @@ private: } } + void _emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns, + const size_t num_rows); + void release_tracker(); using vectorized_execute = std::function<Status(Block* block)>; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org