This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.0.1 by this push:
     new 8cce760493 [hotfix](dev-1.0.1) fix agg node produce nullable value bug 
(#10430)
8cce760493 is described below

commit 8cce760493b4a598c0ed93fbd5b7ffd2c6bdc642
Author: starocean999 <40539150+starocean...@users.noreply.github.com>
AuthorDate: Tue Jun 28 16:29:51 2022 +0800

    [hotfix](dev-1.0.1) fix agg node produce nullable value bug (#10430)
---
 be/src/vec/exec/vaggregation_node.cpp | 118 +++++++++++++---------------------
 be/src/vec/exec/vaggregation_node.h   |   9 +--
 2 files changed, 48 insertions(+), 79 deletions(-)

diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index def5f56ff7..37be1f5648 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -224,15 +224,22 @@ Status AggregationNode::prepare(RuntimeState* state) {
         auto nullable_input = _probe_expr_ctxs[i]->root()->is_nullable();
         if (nullable_output != nullable_input) {
             DCHECK(nullable_output);
-            _make_nullable_keys.emplace_back(i);
+            _make_nullable_output_column_pos.emplace_back(i);
         }
     }
+    int probe_expr_count = _probe_expr_ctxs.size();
     for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++j) {
         SlotDescriptor* intermediate_slot_desc = 
_intermediate_tuple_desc->slots()[j];
         SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
         RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(state, 
child(0)->row_desc(),
                                                           _mem_pool.get(), 
intermediate_slot_desc,
                                                           output_slot_desc, 
mem_tracker()));
+        auto nullable_output = output_slot_desc->is_nullable();
+        auto nullable_agg_output = 
_aggregate_evaluators[i]->data_type()->is_nullable();
+        if (nullable_output != nullable_agg_output) {
+            DCHECK(nullable_output);
+            _make_nullable_output_column_pos.emplace_back(i + 
probe_expr_count);
+        }
     }
 
     // set profile timer to evaluators
@@ -384,11 +391,11 @@ Status AggregationNode::get_next(RuntimeState* state, 
Block* block, bool* eos) {
         }
         // pre stream agg need use _num_row_return to decide whether to do pre 
stream agg
         _num_rows_returned += block->rows();
-        _make_nullable_output_key(block);
+        _make_nullable_output_column(block);
         COUNTER_SET(_rows_returned_counter, _num_rows_returned);
     } else {
         RETURN_IF_ERROR(_executor.get_result(state, block, eos));
-        _make_nullable_output_key(block);
+        _make_nullable_output_column(block);
         // dispose the having clause, should not be execute in prestreaming agg
         RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block, 
block->columns()));
         reached_limit(block, eos);
@@ -526,10 +533,12 @@ Status AggregationNode::_merge_without_key(Block* block) {
     std::unique_ptr<char[]> deserialize_buffer(new 
char[_total_size_of_aggregate_states]);
     int rows = block->rows();
     for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-        DCHECK(_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 &&
-               
_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref());
-        int col_id =
-                
((VSlotRef*)_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root())->column_id();
+        int col_id = i;
+        if (_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 &&
+            
_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref()) {
+            col_id = 
((VSlotRef*)_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root())
+                             ->column_id();
+        }
         if (_aggregate_evaluators[i]->is_merge()) {
             auto column = block->get_by_position(col_id).column;
             if (column->is_nullable()) {
@@ -569,13 +578,16 @@ void AggregationNode::_close_without_key() {
     release_tracker();
 }
 
-void AggregationNode::_make_nullable_output_key(Block* block) {
+void AggregationNode::_make_nullable_output_column(Block* block) {
     if (block->rows() != 0) {
-        for (auto cid : _make_nullable_keys) {
-            block->get_by_position(cid).column =
-                    make_nullable(block->get_by_position(cid).column);
-            block->get_by_position(cid).type =
-                    make_nullable(block->get_by_position(cid).type);
+        for (auto cid : _make_nullable_output_column_pos) {
+            if (!block->get_by_position(cid).column->is_nullable()) {
+                block->get_by_position(cid).column =
+                        make_nullable(block->get_by_position(cid).column);
+            }
+            if (!block->get_by_position(cid).type->is_nullable()) {
+                block->get_by_position(cid).type = 
make_nullable(block->get_by_position(cid).type);
+            }
         }
     }
 }
@@ -688,7 +700,7 @@ Status 
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
 
                         // will serialize value data to string column
                         std::vector<VectorBufferWriter> value_buffer_writers;
-                        bool mem_reuse = out_block->mem_reuse();
+                        bool mem_reuse = out_block->mem_reuse() && 
_make_nullable_output_column_pos.empty();
                         auto serialize_string_type = 
std::make_shared<DataTypeString>();
                         MutableColumns value_columns;
                         for (int i = 0; i < _aggregate_evaluators.size(); ++i) 
{
@@ -839,49 +851,42 @@ Status 
AggregationNode::_execute_with_serialized_key(Block* block) {
 
 Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, 
Block* block,
                                                         bool* eos) {
-    bool mem_reuse = block->mem_reuse();
+    bool mem_reuse = block->mem_reuse() && 
_make_nullable_output_column_pos.empty();
     auto column_withschema = 
VectorizedUtils::create_columns_with_type_and_name(row_desc());
     int key_size = _probe_expr_ctxs.size();
 
     MutableColumns key_columns;
     for (int i = 0; i < key_size; ++i) {
         if (!mem_reuse) {
-            
key_columns.emplace_back(column_withschema[i].type->create_column());
+            
key_columns.emplace_back(_probe_expr_ctxs[i]->root()->data_type()->create_column());
         } else {
             
key_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate());
         }
     }
 
-    MutableColumns temp_key_columns = _create_temp_key_columns();
-    DCHECK(temp_key_columns.size() == key_size);
-
     MutableColumns value_columns;
     for (int i = key_size; i < column_withschema.size(); ++i) {
         if (!mem_reuse) {
-            
value_columns.emplace_back(column_withschema[i].type->create_column());
+            value_columns.emplace_back(
+                    _aggregate_evaluators[i - 
key_size]->data_type()->create_column());
         } else {
             
value_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate());
         }
     }
 
-    MutableColumns temp_value_columns = _create_temp_value_columns();
-    DCHECK(temp_value_columns.size() == _aggregate_evaluators.size() &&
-           _aggregate_evaluators.size() == column_withschema.size() - 
key_size);
-
     SCOPED_TIMER(_get_results_timer);
     std::visit(
             [&](auto&& agg_method) -> void {
                 auto& data = agg_method.data;
                 auto& iter = agg_method.iterator;
                 agg_method.init_once();
-                while (iter != data.end() && temp_key_columns[0]->size() < 
state->batch_size()) {
+                while (iter != data.end() && key_columns[0]->size() < 
state->batch_size()) {
                     const auto& key = iter->get_first();
                     auto& mapped = iter->get_second();
-                    agg_method.insert_key_into_columns(key, temp_key_columns, 
_probe_key_sz);
+                    agg_method.insert_key_into_columns(key, key_columns, 
_probe_key_sz);
                     for (size_t i = 0; i < _aggregate_evaluators.size(); ++i)
                         _aggregate_evaluators[i]->insert_result_info(
-                                mapped + _offsets_of_aggregate_states[i],
-                                temp_value_columns[i].get());
+                                mapped + _offsets_of_aggregate_states[i], 
value_columns[i].get());
 
                     ++iter;
                 }
@@ -889,15 +894,15 @@ Status 
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
                     if (agg_method.data.has_null_key_data()) {
                         // only one key of group by support wrap null key
                         // here need additional processing logic on the null 
key / value
-                        DCHECK(temp_key_columns.size() == 1);
-                        DCHECK(temp_key_columns[0]->is_nullable());
-                        if (temp_key_columns[0]->size() < state->batch_size()) 
{
-                            temp_key_columns[0]->insert_data(nullptr, 0);
+                        DCHECK(key_columns.size() == 1);
+                        DCHECK(key_columns[0]->is_nullable());
+                        if (key_columns[0]->size() < state->batch_size()) {
+                            key_columns[0]->insert_data(nullptr, 0);
                             auto mapped = agg_method.data.get_null_key_data();
                             for (size_t i = 0; i < 
_aggregate_evaluators.size(); ++i)
                                 _aggregate_evaluators[i]->insert_result_info(
                                         mapped + 
_offsets_of_aggregate_states[i],
-                                        temp_value_columns[i].get());
+                                        value_columns[i].get());
                             *eos = true;
                         }
                     } else {
@@ -907,25 +912,6 @@ Status 
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
             },
             _agg_data._aggregated_method_variant);
 
-    for (int i = 0; i < key_size; ++i) {
-        if (key_columns[i]->is_nullable() xor 
temp_key_columns[i]->is_nullable()) {
-            DCHECK(key_columns[i]->is_nullable() && 
!temp_key_columns[i]->is_nullable());
-            key_columns[i] = 
(*std::move(make_nullable(std::move(temp_key_columns[i])))).mutate();
-        } else {
-            key_columns[i] = std::move(temp_key_columns[i]);
-        }
-    }
-
-    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-        if (value_columns[i]->is_nullable() xor 
temp_value_columns[i]->is_nullable()) {
-            DCHECK(value_columns[i]->is_nullable() && 
!temp_value_columns[i]->is_nullable());
-            value_columns[i] =
-                    
(*std::move(make_nullable(std::move(temp_value_columns[i])))).mutate();
-        } else {
-            value_columns[i] = std::move(temp_value_columns[i]);
-        }
-    }
-
     if (!mem_reuse) {
         *block = column_withschema;
         MutableColumns columns(block->columns());
@@ -949,7 +935,7 @@ Status 
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
     MutableColumns value_columns(agg_size);
     DataTypes value_data_types(agg_size);
 
-    bool mem_reuse = block->mem_reuse();
+    bool mem_reuse = block->mem_reuse() && 
_make_nullable_output_column_pos.empty();
 
     MutableColumns key_columns;
     for (int i = 0; i < key_size; ++i) {
@@ -1078,10 +1064,12 @@ Status 
AggregationNode::_merge_with_serialized_key(Block* block) {
     std::unique_ptr<char[]> deserialize_buffer(new 
char[_total_size_of_aggregate_states]);
 
     for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-        DCHECK(_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 &&
-               
_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref());
-        int col_id =
-                
((VSlotRef*)_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root())->column_id();
+        int col_id = i + key_size;
+        if (_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 &&
+            
_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref()) {
+            col_id = 
((VSlotRef*)_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root())
+                             ->column_id();
+        }
         if (_aggregate_evaluators[i]->is_merge()) {
             auto column = block->get_by_position(col_id).column;
             if (column->is_nullable()) {
@@ -1143,20 +1131,4 @@ void AggregationNode::release_tracker() {
     mem_tracker()->Release(_mem_usage_record.used_in_state + 
_mem_usage_record.used_in_arena);
 }
 
-MutableColumns AggregationNode::_create_temp_key_columns() {
-    MutableColumns key_columns;
-    for (const auto& expr_ctx : _probe_expr_ctxs) {
-        key_columns.push_back(expr_ctx->root()->data_type()->create_column());
-    }
-    return key_columns;
-}
-
-MutableColumns AggregationNode::_create_temp_value_columns() {
-    MutableColumns key_columns;
-    for (const auto& agg : _aggregate_evaluators) {
-        key_columns.push_back(agg->data_type()->create_column());
-    }
-    return key_columns;
-}
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index d2f580d327..149836ade7 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -417,9 +417,9 @@ public:
 private:
     // group by k1,k2
     std::vector<VExprContext*> _probe_expr_ctxs;
-    // left / full join will change the key nullable make output/input solt
+    // left / full join will change the output nullable make output/input solt
     // nullable diff. so we need make nullable of it.
-    std::vector<size_t> _make_nullable_keys;
+    std::vector<size_t> _make_nullable_output_column_pos;
     std::vector<size_t> _probe_key_sz;
 
     std::vector<AggFnEvaluator*> _aggregate_evaluators;
@@ -461,7 +461,7 @@ private:
     /// the preagg should pass through any rows it can't fit in its tables.
     bool _should_expand_preagg_hash_tables();
 
-    void _make_nullable_output_key(Block* block);
+    void _make_nullable_output_column(Block* block);
 
     Status _create_agg_status(AggregateDataPtr data);
     Status _destory_agg_status(AggregateDataPtr data);
@@ -484,9 +484,6 @@ private:
 
     void release_tracker();
 
-    MutableColumns _create_temp_key_columns();
-    MutableColumns _create_temp_value_columns();
-
     using vectorized_execute = std::function<Status(Block* block)>;
     using vectorized_pre_agg = std::function<Status(Block* in_block, Block* 
out_block)>;
     using vectorized_get_result =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to