This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6512893257 [refactor](vectorized) Remove useless control variables to 
simplify aggregation node code (#22026)
6512893257 is described below

commit 6512893257a9c595a576d61af5fc6a82a7e352fd
Author: ZenoYang <cookie...@qq.com>
AuthorDate: Fri Jul 21 12:45:23 2023 +0800

    [refactor](vectorized) Remove useless control variables to simplify 
aggregation node code (#22026)
    
    * [refactor](vectorized) Remove useless control variables to simplify 
aggregation node code
    
    * fix
---
 be/src/vec/exec/vaggregation_node.cpp              | 178 +++++----------------
 be/src/vec/exec/vaggregation_node.h                |  15 +-
 .../org/apache/doris/planner/AggregationNode.java  |   1 -
 gensrc/thrift/PlanNodes.thrift                     |   2 +-
 4 files changed, 41 insertions(+), 155 deletions(-)

diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index 5f021e9c95..d7ebfe0a51 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -135,9 +135,6 @@ AggregationNode::AggregationNode(ObjectPool* pool, const 
TPlanNode& tnode,
     }
 
     _is_first_phase = tnode.agg_node.__isset.is_first_phase && 
tnode.agg_node.is_first_phase;
-    _use_fixed_length_serialization_opt =
-            tnode.agg_node.__isset.use_fixed_length_serialization_opt &&
-            tnode.agg_node.use_fixed_length_serialization_opt;
     _agg_data = std::make_unique<AggregatedDataVariants>();
     _agg_arena_pool = std::make_unique<Arena>();
 }
@@ -709,34 +706,16 @@ Status 
AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
     MutableColumns value_columns(agg_size);
     std::vector<DataTypePtr> data_types(agg_size);
     // will serialize data to string column
-    if (_use_fixed_length_serialization_opt) {
-        auto serialize_string_type = std::make_shared<DataTypeString>();
-        for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-            data_types[i] = 
_aggregate_evaluators[i]->function()->get_serialized_type();
-            value_columns[i] = 
_aggregate_evaluators[i]->function()->create_serialize_column();
-        }
-
-        for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-            
_aggregate_evaluators[i]->function()->serialize_without_key_to_column(
-                    _agg_data->without_key + _offsets_of_aggregate_states[i], 
*value_columns[i]);
-        }
-    } else {
-        std::vector<VectorBufferWriter> value_buffer_writers;
-        auto serialize_string_type = std::make_shared<DataTypeString>();
-        for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-            data_types[i] = serialize_string_type;
-            value_columns[i] = serialize_string_type->create_column();
-            value_buffer_writers.emplace_back(
-                    *reinterpret_cast<ColumnString*>(value_columns[i].get()));
-        }
+    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+        data_types[i] = 
_aggregate_evaluators[i]->function()->get_serialized_type();
+        value_columns[i] = 
_aggregate_evaluators[i]->function()->create_serialize_column();
+    }
 
-        for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-            _aggregate_evaluators[i]->function()->serialize(
-                    _agg_data->without_key + _offsets_of_aggregate_states[i],
-                    value_buffer_writers[i]);
-            value_buffer_writers[i].commit();
-        }
+    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+        _aggregate_evaluators[i]->function()->serialize_without_key_to_column(
+                _agg_data->without_key + _offsets_of_aggregate_states[i], 
*value_columns[i]);
     }
+
     {
         ColumnsWithTypeAndName data_with_schema;
         for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
@@ -774,21 +753,9 @@ Status AggregationNode::_merge_without_key(Block* block) {
             }
 
             SCOPED_TIMER(_deserialize_data_timer);
-            if (_use_fixed_length_serialization_opt) {
-                
_aggregate_evaluators[i]->function()->deserialize_and_merge_from_column(
-                        _agg_data->without_key + 
_offsets_of_aggregate_states[i], *column,
-                        _agg_arena_pool.get());
-            } else {
-                const int rows = block->rows();
-                for (int j = 0; j < rows; ++j) {
-                    VectorBufferReader buffer_reader(
-                            ((ColumnString*)(column.get()))->get_data_at(j));
-
-                    
_aggregate_evaluators[i]->function()->deserialize_and_merge(
-                            _agg_data->without_key + 
_offsets_of_aggregate_states[i], buffer_reader,
-                            _agg_arena_pool.get());
-                }
-            }
+            
_aggregate_evaluators[i]->function()->deserialize_and_merge_from_column(
+                    _agg_data->without_key + _offsets_of_aggregate_states[i], 
*column,
+                    _agg_arena_pool.get());
         } else {
             RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_single_add(
                     block, _agg_data->without_key + 
_offsets_of_aggregate_states[i],
@@ -1127,56 +1094,28 @@ Status 
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
 
                         std::vector<DataTypePtr> data_types;
                         MutableColumns value_columns;
-                        if (_use_fixed_length_serialization_opt) {
-                            for (int i = 0; i < _aggregate_evaluators.size(); 
++i) {
-                                auto data_type =
-                                        
_aggregate_evaluators[i]->function()->get_serialized_type();
-                                if (mem_reuse) {
-                                    value_columns.emplace_back(
-                                            
std::move(*out_block->get_by_position(i + key_size)
-                                                               .column)
-                                                    .mutate());
-                                } else {
-                                    // slot type of value it should always be 
string type
-                                    
value_columns.emplace_back(_aggregate_evaluators[i]
-                                                                       
->function()
-                                                                       
->create_serialize_column());
-                                }
-                                data_types.emplace_back(data_type);
-                            }
-
-                            for (int i = 0; i != _aggregate_evaluators.size(); 
++i) {
-                                SCOPED_TIMER(_serialize_data_timer);
-                                RETURN_IF_ERROR(
-                                        
_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
-                                                in_block, value_columns[i], 
rows,
-                                                _agg_arena_pool.get()));
-                            }
-                        } else {
-                            std::vector<VectorBufferWriter> 
value_buffer_writers;
-                            auto serialize_string_type = 
std::make_shared<DataTypeString>();
-                            for (int i = 0; i < _aggregate_evaluators.size(); 
++i) {
-                                if (mem_reuse) {
-                                    value_columns.emplace_back(
-                                            
std::move(*out_block->get_by_position(i + key_size)
-                                                               .column)
-                                                    .mutate());
-                                } else {
-                                    // slot type of value it should always be 
string type
-                                    value_columns.emplace_back(
-                                            
serialize_string_type->create_column());
-                                }
-                                data_types.emplace_back(serialize_string_type);
-                                value_buffer_writers.emplace_back(
-                                        
*reinterpret_cast<ColumnString*>(value_columns[i].get()));
+                        for (int i = 0; i < _aggregate_evaluators.size(); ++i) 
{
+                            auto data_type =
+                                    
_aggregate_evaluators[i]->function()->get_serialized_type();
+                            if (mem_reuse) {
+                                value_columns.emplace_back(
+                                        
std::move(*out_block->get_by_position(i + key_size).column)
+                                                .mutate());
+                            } else {
+                                // slot type of value it should always be 
string type
+                                
value_columns.emplace_back(_aggregate_evaluators[i]
+                                                                   ->function()
+                                                                   
->create_serialize_column());
                             }
+                            data_types.emplace_back(data_type);
+                        }
 
-                            for (int i = 0; i != _aggregate_evaluators.size(); 
++i) {
-                                SCOPED_TIMER(_serialize_data_timer);
-                                
RETURN_IF_ERROR(_aggregate_evaluators[i]->streaming_agg_serialize(
-                                        in_block, value_buffer_writers[i], 
rows,
-                                        _agg_arena_pool.get()));
-                            }
+                        for (int i = 0; i != _aggregate_evaluators.size(); 
++i) {
+                            SCOPED_TIMER(_serialize_data_timer);
+                            RETURN_IF_ERROR(
+                                    
_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
+                                            in_block, value_columns[i], rows,
+                                            _agg_arena_pool.get()));
                         }
 
                         if (!mem_reuse) {
@@ -1233,17 +1172,9 @@ Status 
AggregationNode::_serialize_hash_table_to_block(HashTableCtxType& context
         
key_columns.emplace_back(_probe_expr_ctxs[i]->root()->data_type()->create_column());
     }
 
-    if (_use_fixed_length_serialization_opt) {
-        for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
-            value_data_types[i] = 
_aggregate_evaluators[i]->function()->get_serialized_type();
-            value_columns[i] = 
_aggregate_evaluators[i]->function()->create_serialize_column();
-        }
-    } else {
-        auto serialize_string_type = std::make_shared<DataTypeString>();
-        for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-            value_data_types[i] = serialize_string_type;
-            value_columns[i] = serialize_string_type->create_column();
-        }
+    for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+        value_data_types[i] = 
_aggregate_evaluators[i]->function()->get_serialized_type();
+        value_columns[i] = 
_aggregate_evaluators[i]->function()->create_serialize_column();
     }
 
     context.init_once();
@@ -1280,21 +1211,9 @@ Status 
AggregationNode::_serialize_hash_table_to_block(HashTableCtxType& context
         ++num_rows;
     }
 
-    if (_use_fixed_length_serialization_opt) {
-        for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
-            _aggregate_evaluators[i]->function()->serialize_to_column(
-                    _values, _offsets_of_aggregate_states[i], 
value_columns[i], num_rows);
-        }
-    } else {
-        std::vector<VectorBufferWriter> value_buffer_writers;
-        for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-            value_buffer_writers.emplace_back(
-                    *reinterpret_cast<ColumnString*>(value_columns[i].get()));
-        }
-        for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
-            _aggregate_evaluators[i]->function()->serialize_vec(
-                    _values, _offsets_of_aggregate_states[i], 
value_buffer_writers[i], num_rows);
-        }
+    for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+        _aggregate_evaluators[i]->function()->serialize_to_column(
+                _values, _offsets_of_aggregate_states[i], value_columns[i], 
num_rows);
     }
 
     ColumnsWithTypeAndName columns_with_schema;
@@ -1677,7 +1596,7 @@ Status 
AggregationNode::_serialize_with_serialized_key_result_non_spill(RuntimeS
                     }
                 }
 
-                if (_use_fixed_length_serialization_opt) {
+                {
                     SCOPED_TIMER(_serialize_data_timer);
                     for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
                         value_data_types[i] =
@@ -1694,27 +1613,6 @@ Status 
AggregationNode::_serialize_with_serialized_key_result_non_spill(RuntimeS
                                 _values, _offsets_of_aggregate_states[i], 
value_columns[i],
                                 num_rows);
                     }
-                } else {
-                    SCOPED_TIMER(_serialize_data_timer);
-                    std::vector<VectorBufferWriter> value_buffer_writers;
-                    auto serialize_string_type = 
std::make_shared<DataTypeString>();
-                    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-                        value_data_types[i] = serialize_string_type;
-                        if (mem_reuse) {
-                            value_columns[i] =
-                                    std::move(*block->get_by_position(i + 
key_size).column)
-                                            .mutate();
-                        } else {
-                            value_columns[i] = 
serialize_string_type->create_column();
-                        }
-                        value_buffer_writers.emplace_back(
-                                
*reinterpret_cast<ColumnString*>(value_columns[i].get()));
-                    }
-                    for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
-                        _aggregate_evaluators[i]->function()->serialize_vec(
-                                _values, _offsets_of_aggregate_states[i], 
value_buffer_writers[i],
-                                num_rows);
-                    }
                 }
             },
             _agg_data->_aggregated_method_variant);
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index b30d1cfa0c..9d6f4c4979 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -909,7 +909,6 @@ private:
     bool _needs_finalize;
     bool _is_merge;
     bool _is_first_phase;
-    bool _use_fixed_length_serialization_opt;
     std::unique_ptr<Arena> _agg_profile_arena;
 
     size_t _align_aggregate_states = 1;
@@ -1122,15 +1121,10 @@ private:
                         _deserialize_buffer.resize(buffer_size);
                     }
 
-                    if (_use_fixed_length_serialization_opt) {
+                    {
                         SCOPED_TIMER(_deserialize_data_timer);
                         
_aggregate_evaluators[i]->function()->deserialize_from_column(
                                 _deserialize_buffer.data(), *column, 
_agg_arena_pool.get(), rows);
-                    } else {
-                        SCOPED_TIMER(_deserialize_data_timer);
-                        _aggregate_evaluators[i]->function()->deserialize_vec(
-                                _deserialize_buffer.data(), 
(ColumnString*)(column.get()),
-                                _agg_arena_pool.get(), rows);
                     }
 
                     DEFER({
@@ -1169,15 +1163,10 @@ private:
                         _deserialize_buffer.resize(buffer_size);
                     }
 
-                    if (_use_fixed_length_serialization_opt) {
+                    {
                         SCOPED_TIMER(_deserialize_data_timer);
                         
_aggregate_evaluators[i]->function()->deserialize_from_column(
                                 _deserialize_buffer.data(), *column, 
_agg_arena_pool.get(), rows);
-                    } else {
-                        SCOPED_TIMER(_deserialize_data_timer);
-                        _aggregate_evaluators[i]->function()->deserialize_vec(
-                                _deserialize_buffer.data(), 
(ColumnString*)(column.get()),
-                                _agg_arena_pool.get(), rows);
                     }
 
                     DEFER({
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index b564bba161..580700303f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -274,7 +274,6 @@ public class AggregationNode extends PlanNode {
         msg.agg_node.setAggSortInfos(aggSortInfos);
         msg.agg_node.setUseStreamingPreaggregation(useStreamingPreagg);
         msg.agg_node.setIsFirstPhase(aggInfo.isFirstPhase());
-        msg.agg_node.setUseFixedLengthSerializationOpt(true);
         List<Expr> groupingExprs = aggInfo.getGroupingExprs();
         if (groupingExprs != null) {
             msg.agg_node.setGroupingExprs(Expr.treesToThrift(groupingExprs));
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 9dacaac889..da19eb4975 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -792,7 +792,7 @@ struct TAggregationNode {
   6: optional bool use_streaming_preaggregation
   7: optional list<TSortInfo> agg_sort_infos
   8: optional bool is_first_phase
-  9: optional bool use_fixed_length_serialization_opt
+  // 9: optional bool use_fixed_length_serialization_opt
 }
 
 struct TRepeatNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to