This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6512893257 [refactor](vectorized) Remove useless control variables to simplify aggregation node code (#22026) 6512893257 is described below commit 6512893257a9c595a576d61af5fc6a82a7e352fd Author: ZenoYang <cookie...@qq.com> AuthorDate: Fri Jul 21 12:45:23 2023 +0800 [refactor](vectorized) Remove useless control variables to simplify aggregation node code (#22026) * [refactor](vectorized) Remove useless control variables to simplify aggregation node code * fix --- be/src/vec/exec/vaggregation_node.cpp | 178 +++++---------------- be/src/vec/exec/vaggregation_node.h | 15 +- .../org/apache/doris/planner/AggregationNode.java | 1 - gensrc/thrift/PlanNodes.thrift | 2 +- 4 files changed, 41 insertions(+), 155 deletions(-) diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 5f021e9c95..d7ebfe0a51 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -135,9 +135,6 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode, } _is_first_phase = tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase; - _use_fixed_length_serialization_opt = - tnode.agg_node.__isset.use_fixed_length_serialization_opt && - tnode.agg_node.use_fixed_length_serialization_opt; _agg_data = std::make_unique<AggregatedDataVariants>(); _agg_arena_pool = std::make_unique<Arena>(); } @@ -709,34 +706,16 @@ Status AggregationNode::_serialize_without_key(RuntimeState* state, Block* block MutableColumns value_columns(agg_size); std::vector<DataTypePtr> data_types(agg_size); // will serialize data to string column - if (_use_fixed_length_serialization_opt) { - auto serialize_string_type = std::make_shared<DataTypeString>(); - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - data_types[i] = _aggregate_evaluators[i]->function()->get_serialized_type(); - value_columns[i] = _aggregate_evaluators[i]->function()->create_serialize_column(); - } - - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - _aggregate_evaluators[i]->function()->serialize_without_key_to_column( - _agg_data->without_key + _offsets_of_aggregate_states[i], *value_columns[i]); - } - } else { - std::vector<VectorBufferWriter> value_buffer_writers; - auto serialize_string_type = std::make_shared<DataTypeString>(); - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - data_types[i] = serialize_string_type; - value_columns[i] = serialize_string_type->create_column(); - value_buffer_writers.emplace_back( - *reinterpret_cast<ColumnString*>(value_columns[i].get())); - } + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + data_types[i] = _aggregate_evaluators[i]->function()->get_serialized_type(); + value_columns[i] = _aggregate_evaluators[i]->function()->create_serialize_column(); + } - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - _aggregate_evaluators[i]->function()->serialize( - _agg_data->without_key + _offsets_of_aggregate_states[i], - value_buffer_writers[i]); - value_buffer_writers[i].commit(); - } + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + _aggregate_evaluators[i]->function()->serialize_without_key_to_column( + _agg_data->without_key + _offsets_of_aggregate_states[i], *value_columns[i]); } + { ColumnsWithTypeAndName data_with_schema; for (int i = 0; i < _aggregate_evaluators.size(); ++i) { @@ -774,21 +753,9 @@ Status AggregationNode::_merge_without_key(Block* block) { } SCOPED_TIMER(_deserialize_data_timer); - if (_use_fixed_length_serialization_opt) { - _aggregate_evaluators[i]->function()->deserialize_and_merge_from_column( - _agg_data->without_key + _offsets_of_aggregate_states[i], *column, - _agg_arena_pool.get()); - } else { - const int rows = block->rows(); - for (int j = 0; j < rows; ++j) { - VectorBufferReader buffer_reader( - ((ColumnString*)(column.get()))->get_data_at(j)); - - _aggregate_evaluators[i]->function()->deserialize_and_merge( - _agg_data->without_key + _offsets_of_aggregate_states[i], buffer_reader, - _agg_arena_pool.get()); - } - } + _aggregate_evaluators[i]->function()->deserialize_and_merge_from_column( + _agg_data->without_key + _offsets_of_aggregate_states[i], *column, + _agg_arena_pool.get()); } else { RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_single_add( block, _agg_data->without_key + _offsets_of_aggregate_states[i], @@ -1127,56 +1094,28 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i std::vector<DataTypePtr> data_types; MutableColumns value_columns; - if (_use_fixed_length_serialization_opt) { - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - auto data_type = - _aggregate_evaluators[i]->function()->get_serialized_type(); - if (mem_reuse) { - value_columns.emplace_back( - std::move(*out_block->get_by_position(i + key_size) - .column) - .mutate()); - } else { - // slot type of value it should always be string type - value_columns.emplace_back(_aggregate_evaluators[i] - ->function() - ->create_serialize_column()); - } - data_types.emplace_back(data_type); - } - - for (int i = 0; i != _aggregate_evaluators.size(); ++i) { - SCOPED_TIMER(_serialize_data_timer); - RETURN_IF_ERROR( - _aggregate_evaluators[i]->streaming_agg_serialize_to_column( - in_block, value_columns[i], rows, - _agg_arena_pool.get())); - } - } else { - std::vector<VectorBufferWriter> value_buffer_writers; - auto serialize_string_type = std::make_shared<DataTypeString>(); - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - if (mem_reuse) { - value_columns.emplace_back( - std::move(*out_block->get_by_position(i + key_size) - .column) - .mutate()); - } else { - // slot type of value it should always be string type - value_columns.emplace_back( - serialize_string_type->create_column()); - } - data_types.emplace_back(serialize_string_type); - value_buffer_writers.emplace_back( - *reinterpret_cast<ColumnString*>(value_columns[i].get())); + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + auto data_type = + _aggregate_evaluators[i]->function()->get_serialized_type(); + if (mem_reuse) { + value_columns.emplace_back( + std::move(*out_block->get_by_position(i + key_size).column) + .mutate()); + } else { + // slot type of value it should always be string type + value_columns.emplace_back(_aggregate_evaluators[i] + ->function() + ->create_serialize_column()); } + data_types.emplace_back(data_type); + } - for (int i = 0; i != _aggregate_evaluators.size(); ++i) { - SCOPED_TIMER(_serialize_data_timer); - RETURN_IF_ERROR(_aggregate_evaluators[i]->streaming_agg_serialize( - in_block, value_buffer_writers[i], rows, - _agg_arena_pool.get())); - } + for (int i = 0; i != _aggregate_evaluators.size(); ++i) { + SCOPED_TIMER(_serialize_data_timer); + RETURN_IF_ERROR( + _aggregate_evaluators[i]->streaming_agg_serialize_to_column( + in_block, value_columns[i], rows, + _agg_arena_pool.get())); } if (!mem_reuse) { @@ -1233,17 +1172,9 @@ Status AggregationNode::_serialize_hash_table_to_block(HashTableCtxType& context key_columns.emplace_back(_probe_expr_ctxs[i]->root()->data_type()->create_column()); } - if (_use_fixed_length_serialization_opt) { - for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { - value_data_types[i] = _aggregate_evaluators[i]->function()->get_serialized_type(); - value_columns[i] = _aggregate_evaluators[i]->function()->create_serialize_column(); - } - } else { - auto serialize_string_type = std::make_shared<DataTypeString>(); - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - value_data_types[i] = serialize_string_type; - value_columns[i] = serialize_string_type->create_column(); - } + for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { + value_data_types[i] = _aggregate_evaluators[i]->function()->get_serialized_type(); + value_columns[i] = _aggregate_evaluators[i]->function()->create_serialize_column(); } context.init_once(); @@ -1280,21 +1211,9 @@ Status AggregationNode::_serialize_hash_table_to_block(HashTableCtxType& context ++num_rows; } - if (_use_fixed_length_serialization_opt) { - for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { - _aggregate_evaluators[i]->function()->serialize_to_column( - _values, _offsets_of_aggregate_states[i], value_columns[i], num_rows); - } - } else { - std::vector<VectorBufferWriter> value_buffer_writers; - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - value_buffer_writers.emplace_back( - *reinterpret_cast<ColumnString*>(value_columns[i].get())); - } - for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { - _aggregate_evaluators[i]->function()->serialize_vec( - _values, _offsets_of_aggregate_states[i], value_buffer_writers[i], num_rows); - } + for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { + _aggregate_evaluators[i]->function()->serialize_to_column( + _values, _offsets_of_aggregate_states[i], value_columns[i], num_rows); } ColumnsWithTypeAndName columns_with_schema; @@ -1677,7 +1596,7 @@ Status AggregationNode::_serialize_with_serialized_key_result_non_spill(RuntimeS } } - if (_use_fixed_length_serialization_opt) { + { SCOPED_TIMER(_serialize_data_timer); for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { value_data_types[i] = @@ -1694,27 +1613,6 @@ Status AggregationNode::_serialize_with_serialized_key_result_non_spill(RuntimeS _values, _offsets_of_aggregate_states[i], value_columns[i], num_rows); } - } else { - SCOPED_TIMER(_serialize_data_timer); - std::vector<VectorBufferWriter> value_buffer_writers; - auto serialize_string_type = std::make_shared<DataTypeString>(); - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - value_data_types[i] = serialize_string_type; - if (mem_reuse) { - value_columns[i] = - std::move(*block->get_by_position(i + key_size).column) - .mutate(); - } else { - value_columns[i] = serialize_string_type->create_column(); - } - value_buffer_writers.emplace_back( - *reinterpret_cast<ColumnString*>(value_columns[i].get())); - } - for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { - _aggregate_evaluators[i]->function()->serialize_vec( - _values, _offsets_of_aggregate_states[i], value_buffer_writers[i], - num_rows); - } } }, _agg_data->_aggregated_method_variant); diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index b30d1cfa0c..9d6f4c4979 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -909,7 +909,6 @@ private: bool _needs_finalize; bool _is_merge; bool _is_first_phase; - bool _use_fixed_length_serialization_opt; std::unique_ptr<Arena> _agg_profile_arena; size_t _align_aggregate_states = 1; @@ -1122,15 +1121,10 @@ private: _deserialize_buffer.resize(buffer_size); } - if (_use_fixed_length_serialization_opt) { + { SCOPED_TIMER(_deserialize_data_timer); _aggregate_evaluators[i]->function()->deserialize_from_column( _deserialize_buffer.data(), *column, _agg_arena_pool.get(), rows); - } else { - SCOPED_TIMER(_deserialize_data_timer); - _aggregate_evaluators[i]->function()->deserialize_vec( - _deserialize_buffer.data(), (ColumnString*)(column.get()), - _agg_arena_pool.get(), rows); } DEFER({ @@ -1169,15 +1163,10 @@ private: _deserialize_buffer.resize(buffer_size); } - if (_use_fixed_length_serialization_opt) { + { SCOPED_TIMER(_deserialize_data_timer); _aggregate_evaluators[i]->function()->deserialize_from_column( _deserialize_buffer.data(), *column, _agg_arena_pool.get(), rows); - } else { - SCOPED_TIMER(_deserialize_data_timer); - _aggregate_evaluators[i]->function()->deserialize_vec( - _deserialize_buffer.data(), (ColumnString*)(column.get()), - _agg_arena_pool.get(), rows); } DEFER({ diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index b564bba161..580700303f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -274,7 +274,6 @@ public class AggregationNode extends PlanNode { msg.agg_node.setAggSortInfos(aggSortInfos); msg.agg_node.setUseStreamingPreaggregation(useStreamingPreagg); msg.agg_node.setIsFirstPhase(aggInfo.isFirstPhase()); - msg.agg_node.setUseFixedLengthSerializationOpt(true); List<Expr> groupingExprs = aggInfo.getGroupingExprs(); if (groupingExprs != null) { msg.agg_node.setGroupingExprs(Expr.treesToThrift(groupingExprs)); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 9dacaac889..da19eb4975 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -792,7 +792,7 @@ struct TAggregationNode { 6: optional bool use_streaming_preaggregation 7: optional list<TSortInfo> agg_sort_infos 8: optional bool is_first_phase - 9: optional bool use_fixed_length_serialization_opt + // 9: optional bool use_fixed_length_serialization_opt } struct TRepeatNode { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org