This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.0.1 by this push: new 8cce760493 [hotfix](dev-1.0.1) fix agg node produce nullable value bug (#10430) 8cce760493 is described below commit 8cce760493b4a598c0ed93fbd5b7ffd2c6bdc642 Author: starocean999 <40539150+starocean...@users.noreply.github.com> AuthorDate: Tue Jun 28 16:29:51 2022 +0800 [hotfix](dev-1.0.1) fix agg node produce nullable value bug (#10430) --- be/src/vec/exec/vaggregation_node.cpp | 118 +++++++++++++--------------------- be/src/vec/exec/vaggregation_node.h | 9 +-- 2 files changed, 48 insertions(+), 79 deletions(-) diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index def5f56ff7..37be1f5648 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -224,15 +224,22 @@ Status AggregationNode::prepare(RuntimeState* state) { auto nullable_input = _probe_expr_ctxs[i]->root()->is_nullable(); if (nullable_output != nullable_input) { DCHECK(nullable_output); - _make_nullable_keys.emplace_back(i); + _make_nullable_output_column_pos.emplace_back(i); } } + int probe_expr_count = _probe_expr_ctxs.size(); for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++j) { SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j]; SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j]; RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(state, child(0)->row_desc(), _mem_pool.get(), intermediate_slot_desc, output_slot_desc, mem_tracker())); + auto nullable_output = output_slot_desc->is_nullable(); + auto nullable_agg_output = _aggregate_evaluators[i]->data_type()->is_nullable(); + if (nullable_output != nullable_agg_output) { + DCHECK(nullable_output); + _make_nullable_output_column_pos.emplace_back(i + probe_expr_count); + } } // set profile timer to evaluators @@ -384,11 +391,11 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) { } // pre stream agg need use _num_row_return to decide whether to do pre stream agg _num_rows_returned += block->rows(); - _make_nullable_output_key(block); + _make_nullable_output_column(block); COUNTER_SET(_rows_returned_counter, _num_rows_returned); } else { RETURN_IF_ERROR(_executor.get_result(state, block, eos)); - _make_nullable_output_key(block); + _make_nullable_output_column(block); // dispose the having clause, should not be execute in prestreaming agg RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block, block->columns())); reached_limit(block, eos); @@ -526,10 +533,12 @@ Status AggregationNode::_merge_without_key(Block* block) { std::unique_ptr<char[]> deserialize_buffer(new char[_total_size_of_aggregate_states]); int rows = block->rows(); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - DCHECK(_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 && - _aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref()); - int col_id = - ((VSlotRef*)_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root())->column_id(); + int col_id = i; + if (_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 && + _aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref()) { + col_id = ((VSlotRef*)_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()) + ->column_id(); + } if (_aggregate_evaluators[i]->is_merge()) { auto column = block->get_by_position(col_id).column; if (column->is_nullable()) { @@ -569,13 +578,16 @@ void AggregationNode::_close_without_key() { release_tracker(); } -void AggregationNode::_make_nullable_output_key(Block* block) { +void AggregationNode::_make_nullable_output_column(Block* block) { if (block->rows() != 0) { - for (auto cid : _make_nullable_keys) { - block->get_by_position(cid).column = - make_nullable(block->get_by_position(cid).column); - block->get_by_position(cid).type = - make_nullable(block->get_by_position(cid).type); + for (auto cid : _make_nullable_output_column_pos) { + if (!block->get_by_position(cid).column->is_nullable()) { + block->get_by_position(cid).column = + make_nullable(block->get_by_position(cid).column); + } + if (!block->get_by_position(cid).type->is_nullable()) { + block->get_by_position(cid).type = make_nullable(block->get_by_position(cid).type); + } } } } @@ -688,7 +700,7 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i // will serialize value data to string column std::vector<VectorBufferWriter> value_buffer_writers; - bool mem_reuse = out_block->mem_reuse(); + bool mem_reuse = out_block->mem_reuse() && _make_nullable_output_column_pos.empty(); auto serialize_string_type = std::make_shared<DataTypeString>(); MutableColumns value_columns; for (int i = 0; i < _aggregate_evaluators.size(); ++i) { @@ -839,49 +851,42 @@ Status AggregationNode::_execute_with_serialized_key(Block* block) { Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Block* block, bool* eos) { - bool mem_reuse = block->mem_reuse(); + bool mem_reuse = block->mem_reuse() && _make_nullable_output_column_pos.empty(); auto column_withschema = VectorizedUtils::create_columns_with_type_and_name(row_desc()); int key_size = _probe_expr_ctxs.size(); MutableColumns key_columns; for (int i = 0; i < key_size; ++i) { if (!mem_reuse) { - key_columns.emplace_back(column_withschema[i].type->create_column()); + key_columns.emplace_back(_probe_expr_ctxs[i]->root()->data_type()->create_column()); } else { key_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate()); } } - MutableColumns temp_key_columns = _create_temp_key_columns(); - DCHECK(temp_key_columns.size() == key_size); - MutableColumns value_columns; for (int i = key_size; i < column_withschema.size(); ++i) { if (!mem_reuse) { - value_columns.emplace_back(column_withschema[i].type->create_column()); + value_columns.emplace_back( + _aggregate_evaluators[i - key_size]->data_type()->create_column()); } else { value_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate()); } } - MutableColumns temp_value_columns = _create_temp_value_columns(); - DCHECK(temp_value_columns.size() == _aggregate_evaluators.size() && - _aggregate_evaluators.size() == column_withschema.size() - key_size); - SCOPED_TIMER(_get_results_timer); std::visit( [&](auto&& agg_method) -> void { auto& data = agg_method.data; auto& iter = agg_method.iterator; agg_method.init_once(); - while (iter != data.end() && temp_key_columns[0]->size() < state->batch_size()) { + while (iter != data.end() && key_columns[0]->size() < state->batch_size()) { const auto& key = iter->get_first(); auto& mapped = iter->get_second(); - agg_method.insert_key_into_columns(key, temp_key_columns, _probe_key_sz); + agg_method.insert_key_into_columns(key, key_columns, _probe_key_sz); for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) _aggregate_evaluators[i]->insert_result_info( - mapped + _offsets_of_aggregate_states[i], - temp_value_columns[i].get()); + mapped + _offsets_of_aggregate_states[i], value_columns[i].get()); ++iter; } @@ -889,15 +894,15 @@ Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo if (agg_method.data.has_null_key_data()) { // only one key of group by support wrap null key // here need additional processing logic on the null key / value - DCHECK(temp_key_columns.size() == 1); - DCHECK(temp_key_columns[0]->is_nullable()); - if (temp_key_columns[0]->size() < state->batch_size()) { - temp_key_columns[0]->insert_data(nullptr, 0); + DCHECK(key_columns.size() == 1); + DCHECK(key_columns[0]->is_nullable()); + if (key_columns[0]->size() < state->batch_size()) { + key_columns[0]->insert_data(nullptr, 0); auto mapped = agg_method.data.get_null_key_data(); for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) _aggregate_evaluators[i]->insert_result_info( mapped + _offsets_of_aggregate_states[i], - temp_value_columns[i].get()); + value_columns[i].get()); *eos = true; } } else { @@ -907,25 +912,6 @@ Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo }, _agg_data._aggregated_method_variant); - for (int i = 0; i < key_size; ++i) { - if (key_columns[i]->is_nullable() xor temp_key_columns[i]->is_nullable()) { - DCHECK(key_columns[i]->is_nullable() && !temp_key_columns[i]->is_nullable()); - key_columns[i] = (*std::move(make_nullable(std::move(temp_key_columns[i])))).mutate(); - } else { - key_columns[i] = std::move(temp_key_columns[i]); - } - } - - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - if (value_columns[i]->is_nullable() xor temp_value_columns[i]->is_nullable()) { - DCHECK(value_columns[i]->is_nullable() && !temp_value_columns[i]->is_nullable()); - value_columns[i] = - (*std::move(make_nullable(std::move(temp_value_columns[i])))).mutate(); - } else { - value_columns[i] = std::move(temp_value_columns[i]); - } - } - if (!mem_reuse) { *block = column_withschema; MutableColumns columns(block->columns()); @@ -949,7 +935,7 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat MutableColumns value_columns(agg_size); DataTypes value_data_types(agg_size); - bool mem_reuse = block->mem_reuse(); + bool mem_reuse = block->mem_reuse() && _make_nullable_output_column_pos.empty(); MutableColumns key_columns; for (int i = 0; i < key_size; ++i) { @@ -1078,10 +1064,12 @@ Status AggregationNode::_merge_with_serialized_key(Block* block) { std::unique_ptr<char[]> deserialize_buffer(new char[_total_size_of_aggregate_states]); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - DCHECK(_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 && - _aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref()); - int col_id = - ((VSlotRef*)_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root())->column_id(); + int col_id = i + key_size; + if (_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 && + _aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref()) { + col_id = ((VSlotRef*)_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()) + ->column_id(); + } if (_aggregate_evaluators[i]->is_merge()) { auto column = block->get_by_position(col_id).column; if (column->is_nullable()) { @@ -1143,20 +1131,4 @@ void AggregationNode::release_tracker() { mem_tracker()->Release(_mem_usage_record.used_in_state + _mem_usage_record.used_in_arena); } -MutableColumns AggregationNode::_create_temp_key_columns() { - MutableColumns key_columns; - for (const auto& expr_ctx : _probe_expr_ctxs) { - key_columns.push_back(expr_ctx->root()->data_type()->create_column()); - } - return key_columns; -} - -MutableColumns AggregationNode::_create_temp_value_columns() { - MutableColumns key_columns; - for (const auto& agg : _aggregate_evaluators) { - key_columns.push_back(agg->data_type()->create_column()); - } - return key_columns; -} - } // namespace doris::vectorized diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index d2f580d327..149836ade7 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -417,9 +417,9 @@ public: private: // group by k1,k2 std::vector<VExprContext*> _probe_expr_ctxs; - // left / full join will change the key nullable make output/input solt + // left / full join will change the output nullable make output/input solt // nullable diff. so we need make nullable of it. - std::vector<size_t> _make_nullable_keys; + std::vector<size_t> _make_nullable_output_column_pos; std::vector<size_t> _probe_key_sz; std::vector<AggFnEvaluator*> _aggregate_evaluators; @@ -461,7 +461,7 @@ private: /// the preagg should pass through any rows it can't fit in its tables. bool _should_expand_preagg_hash_tables(); - void _make_nullable_output_key(Block* block); + void _make_nullable_output_column(Block* block); Status _create_agg_status(AggregateDataPtr data); Status _destory_agg_status(AggregateDataPtr data); @@ -484,9 +484,6 @@ private: void release_tracker(); - MutableColumns _create_temp_key_columns(); - MutableColumns _create_temp_value_columns(); - using vectorized_execute = std::function<Status(Block* block)>; using vectorized_pre_agg = std::function<Status(Block* in_block, Block* out_block)>; using vectorized_get_result = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org