This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 81bab55d43 [Bug](function) catch function calculation error on aggregate node to avoid core dump (#15903) 81bab55d43 is described below commit 81bab55d43d072916cdbe1b5a4309efa7224ee88 Author: Pxl <pxl...@qq.com> AuthorDate: Mon Jan 16 11:21:28 2023 +0800 [Bug](function) catch function calculation error on aggregate node to avoid core dump (#15903) --- be/src/vec/exec/vaggregation_node.cpp | 36 ++++++++++---------- be/src/vec/exec/vaggregation_node.h | 35 ++++++++++++-------- be/src/vec/exprs/vectorized_agg_fn.cpp | 38 +++++++++++++--------- be/src/vec/exprs/vectorized_agg_fn.h | 20 ++++++------ .../bitmap_functions/test_bitmap_function.out | 3 ++ .../bitmap_functions/test_bitmap_function.groovy | 16 +++++++++ 6 files changed, 91 insertions(+), 57 deletions(-) diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index ecf4d87081..89504b20ef 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -80,12 +80,11 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), - _intermediate_tuple_desc(NULL), + _intermediate_tuple_desc(nullptr), _output_tuple_id(tnode.agg_node.output_tuple_id), - _output_tuple_desc(NULL), + _output_tuple_desc(nullptr), _needs_finalize(tnode.agg_node.need_finalize), _is_merge(false), - _agg_data(), _build_timer(nullptr), _serialize_key_timer(nullptr), _exec_timer(nullptr), @@ -714,9 +713,9 @@ Status AggregationNode::_execute_without_key(Block* block) { DCHECK(_agg_data->without_key != nullptr); SCOPED_TIMER(_build_timer); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - _aggregate_evaluators[i]->execute_single_add( + RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_single_add( block, _agg_data->without_key + _offsets_of_aggregate_states[i], - _agg_arena_pool.get()); + _agg_arena_pool.get())); } return Status::OK(); } @@ -749,9 +748,9 @@ Status AggregationNode::_merge_without_key(Block* block) { } } } else { - _aggregate_evaluators[i]->execute_single_add( + RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_single_add( block, _agg_data->without_key + _offsets_of_aggregate_states[i], - _agg_arena_pool.get()); + _agg_arena_pool.get())); } } return Status::OK(); @@ -1019,8 +1018,8 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i // to avoid wasting memory. // But for fixed hash map, it never need to expand bool ret_flag = false; - std::visit( - [&](auto&& agg_method) -> void { + RETURN_IF_ERROR(std::visit( + [&](auto&& agg_method) -> Status { if (auto& hash_tbl = agg_method.data; hash_tbl.add_elem_size_overflow(rows)) { // do not try to do agg, just init and serialize directly return the out_block if (!_should_expand_preagg_hash_tables()) { @@ -1052,8 +1051,10 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i for (int i = 0; i != _aggregate_evaluators.size(); ++i) { SCOPED_TIMER(_serialize_data_timer); - _aggregate_evaluators[i]->streaming_agg_serialize_to_column( - in_block, value_columns[i], rows, _agg_arena_pool.get()); + RETURN_IF_ERROR( + _aggregate_evaluators[i]->streaming_agg_serialize_to_column( + in_block, value_columns[i], rows, + _agg_arena_pool.get())); } } else { std::vector<VectorBufferWriter> value_buffer_writers; @@ -1076,9 +1077,9 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i for (int i = 0; i != _aggregate_evaluators.size(); ++i) { SCOPED_TIMER(_serialize_data_timer); - _aggregate_evaluators[i]->streaming_agg_serialize( + RETURN_IF_ERROR(_aggregate_evaluators[i]->streaming_agg_serialize( in_block, value_buffer_writers[i], rows, - _agg_arena_pool.get()); + _agg_arena_pool.get())); } } @@ -1104,16 +1105,17 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i } } } + return Status::OK(); }, - _agg_data->_aggregated_method_variant); + _agg_data->_aggregated_method_variant)); if (!ret_flag) { RETURN_IF_CATCH_BAD_ALLOC(_emplace_into_hash_table(_places.data(), key_columns, rows)); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - _aggregate_evaluators[i]->execute_batch_add(in_block, _offsets_of_aggregate_states[i], - _places.data(), _agg_arena_pool.get(), - _should_expand_hash_table); + RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add( + in_block, _offsets_of_aggregate_states[i], _places.data(), + _agg_arena_pool.get(), _should_expand_hash_table)); } } diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index 520e6b1511..7d9a32960b 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -116,12 +116,16 @@ struct AggregationMethodSerialized { static void insert_key_into_columns(const StringRef& key, MutableColumns& key_columns, const Sizes&) { auto pos = key.data; - for (auto& column : key_columns) pos = column->deserialize_and_insert_from_arena(pos); + for (auto& column : key_columns) { + pos = column->deserialize_and_insert_from_arena(pos); + } } static void insert_keys_into_columns(std::vector<StringRef>& keys, MutableColumns& key_columns, const size_t num_rows, const Sizes&) { - for (auto& column : key_columns) column->deserialize_vec(keys, num_rows); + for (auto& column : key_columns) { + column->deserialize_vec(keys, num_rows); + } } void init_once() { @@ -270,7 +274,7 @@ struct AggregationMethodKeysFixed { Iterator iterator; bool inited = false; - AggregationMethodKeysFixed() {} + AggregationMethodKeysFixed() = default; template <typename Other> AggregationMethodKeysFixed(const Other& other) : data(other.data) {} @@ -292,7 +296,9 @@ struct AggregationMethodKeysFixed { ColumnUInt8* null_map; bool column_nullable = false; - if constexpr (has_nullable_keys) column_nullable = is_column_nullable(*key_columns[i]); + if constexpr (has_nullable_keys) { + column_nullable = is_column_nullable(*key_columns[i]); + } /// If we have a nullable column, get its nested column and its null map. if (column_nullable) { @@ -315,9 +321,9 @@ struct AggregationMethodKeysFixed { is_null = val == 1; } - if (has_nullable_keys && is_null) + if (has_nullable_keys && is_null) { observed_column->insert_default(); - else { + } else { size_t size = key_sizes[i]; observed_column->insert_data(reinterpret_cast<const char*>(&key) + pos, size); pos += size; @@ -928,16 +934,17 @@ private: _find_in_hash_table(_places.data(), key_columns, rows); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - _aggregate_evaluators[i]->execute_batch_add_selected( + RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add_selected( block, _offsets_of_aggregate_states[i], _places.data(), - _agg_arena_pool.get()); + _agg_arena_pool.get())); } } else { _emplace_into_hash_table(_places.data(), key_columns, rows); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - _aggregate_evaluators[i]->execute_batch_add(block, _offsets_of_aggregate_states[i], - _places.data(), _agg_arena_pool.get()); + RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add( + block, _offsets_of_aggregate_states[i], _places.data(), + _agg_arena_pool.get())); } if (_should_limit_output) { @@ -1012,9 +1019,9 @@ private: rows); } else { - _aggregate_evaluators[i]->execute_batch_add_selected( + RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add_selected( block, _offsets_of_aggregate_states[i], _places.data(), - _agg_arena_pool.get()); + _agg_arena_pool.get())); } } } else { @@ -1052,9 +1059,9 @@ private: rows); } else { - _aggregate_evaluators[i]->execute_batch_add( + RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add( block, _offsets_of_aggregate_states[i], _places.data(), - _agg_arena_pool.get()); + _agg_arena_pool.get())); } } diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp b/be/src/vec/exprs/vectorized_agg_fn.cpp index 55ca05a4ab..bdeba41688 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.cpp +++ b/be/src/vec/exprs/vectorized_agg_fn.cpp @@ -153,38 +153,43 @@ void AggFnEvaluator::destroy(AggregateDataPtr place) { _function->destroy(place); } -void AggFnEvaluator::execute_single_add(Block* block, AggregateDataPtr place, Arena* arena) { - _calc_argment_columns(block); +Status AggFnEvaluator::execute_single_add(Block* block, AggregateDataPtr place, Arena* arena) { + RETURN_IF_ERROR(_calc_argment_columns(block)); SCOPED_TIMER(_exec_timer); _function->add_batch_single_place(block->rows(), place, _agg_columns.data(), arena); + return Status::OK(); } -void AggFnEvaluator::execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places, - Arena* arena, bool agg_many) { - _calc_argment_columns(block); +Status AggFnEvaluator::execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places, + Arena* arena, bool agg_many) { + RETURN_IF_ERROR(_calc_argment_columns(block)); SCOPED_TIMER(_exec_timer); _function->add_batch(block->rows(), places, offset, _agg_columns.data(), arena, agg_many); + return Status::OK(); } -void AggFnEvaluator::execute_batch_add_selected(Block* block, size_t offset, - AggregateDataPtr* places, Arena* arena) { - _calc_argment_columns(block); +Status AggFnEvaluator::execute_batch_add_selected(Block* block, size_t offset, + AggregateDataPtr* places, Arena* arena) { + RETURN_IF_ERROR(_calc_argment_columns(block)); SCOPED_TIMER(_exec_timer); _function->add_batch_selected(block->rows(), places, offset, _agg_columns.data(), arena); + return Status::OK(); } -void AggFnEvaluator::streaming_agg_serialize(Block* block, BufferWritable& buf, - const size_t num_rows, Arena* arena) { - _calc_argment_columns(block); +Status AggFnEvaluator::streaming_agg_serialize(Block* block, BufferWritable& buf, + const size_t num_rows, Arena* arena) { + RETURN_IF_ERROR(_calc_argment_columns(block)); SCOPED_TIMER(_exec_timer); _function->streaming_agg_serialize(_agg_columns.data(), buf, num_rows, arena); + return Status::OK(); } -void AggFnEvaluator::streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst, - const size_t num_rows, Arena* arena) { - _calc_argment_columns(block); +Status AggFnEvaluator::streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst, + const size_t num_rows, Arena* arena) { + RETURN_IF_ERROR(_calc_argment_columns(block)); SCOPED_TIMER(_exec_timer); _function->streaming_agg_serialize_to_column(_agg_columns.data(), dst, num_rows, arena); + return Status::OK(); } void AggFnEvaluator::insert_result_info(AggregateDataPtr place, IColumn* column) { @@ -219,19 +224,20 @@ std::string AggFnEvaluator::debug_string() const { return out.str(); } -void AggFnEvaluator::_calc_argment_columns(Block* block) { +Status AggFnEvaluator::_calc_argment_columns(Block* block) { SCOPED_TIMER(_expr_timer); _agg_columns.resize(_input_exprs_ctxs.size()); int column_ids[_input_exprs_ctxs.size()]; for (int i = 0; i < _input_exprs_ctxs.size(); ++i) { int column_id = -1; - _input_exprs_ctxs[i]->execute(block, &column_id); + RETURN_IF_ERROR(_input_exprs_ctxs[i]->execute(block, &column_id)); column_ids[i] = column_id; } materialize_block_inplace(*block, column_ids, column_ids + _input_exprs_ctxs.size()); for (int i = 0; i < _input_exprs_ctxs.size(); ++i) { _agg_columns[i] = block->get_by_position(column_ids[i]).column.get(); } + return Status::OK(); } } // namespace doris::vectorized diff --git a/be/src/vec/exprs/vectorized_agg_fn.h b/be/src/vec/exprs/vectorized_agg_fn.h index bcc0b7e1ef..2a7f572af1 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.h +++ b/be/src/vec/exprs/vectorized_agg_fn.h @@ -53,19 +53,19 @@ public: void destroy(AggregateDataPtr place); // agg_function - void execute_single_add(Block* block, AggregateDataPtr place, Arena* arena = nullptr); + Status execute_single_add(Block* block, AggregateDataPtr place, Arena* arena = nullptr); - void execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places, - Arena* arena = nullptr, bool agg_many = false); + Status execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places, + Arena* arena = nullptr, bool agg_many = false); - void execute_batch_add_selected(Block* block, size_t offset, AggregateDataPtr* places, - Arena* arena = nullptr); + Status execute_batch_add_selected(Block* block, size_t offset, AggregateDataPtr* places, + Arena* arena = nullptr); - void streaming_agg_serialize(Block* block, BufferWritable& buf, const size_t num_rows, - Arena* arena); + Status streaming_agg_serialize(Block* block, BufferWritable& buf, const size_t num_rows, + Arena* arena); - void streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst, - const size_t num_rows, Arena* arena); + Status streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst, + const size_t num_rows, Arena* arena); void insert_result_info(AggregateDataPtr place, IColumn* column); @@ -89,7 +89,7 @@ private: AggFnEvaluator(const TExprNode& desc); - void _calc_argment_columns(Block* block); + Status _calc_argment_columns(Block* block); DataTypes _argument_types_with_sort; DataTypes _real_argument_types; diff --git a/regression-test/data/query_p0/sql_functions/bitmap_functions/test_bitmap_function.out b/regression-test/data/query_p0/sql_functions/bitmap_functions/test_bitmap_function.out index ed771ea325..d7b68d929b 100644 --- a/regression-test/data/query_p0/sql_functions/bitmap_functions/test_bitmap_function.out +++ b/regression-test/data/query_p0/sql_functions/bitmap_functions/test_bitmap_function.out @@ -271,3 +271,6 @@ false -- !sql -- 20221103 +-- !sql -- +\N + diff --git a/regression-test/suites/query_p0/sql_functions/bitmap_functions/test_bitmap_function.groovy b/regression-test/suites/query_p0/sql_functions/bitmap_functions/test_bitmap_function.groovy index d3cca363d3..d31d96bd1a 100644 --- a/regression-test/suites/query_p0/sql_functions/bitmap_functions/test_bitmap_function.groovy +++ b/regression-test/suites/query_p0/sql_functions/bitmap_functions/test_bitmap_function.groovy @@ -188,4 +188,20 @@ suite("test_bitmap_function") { qt_sql """ select bitmap_to_string(sub_bitmap(bitmap_from_string('1'), 0, 3)) value; """ qt_sql """ select bitmap_to_string(bitmap_subset_limit(bitmap_from_string('100'), 0, 3)) value; """ qt_sql """ select bitmap_to_string(bitmap_subset_in_range(bitmap_from_string('20221103'), 0, 20221104)) date_list_bitmap; """ + + sql "drop table if exists d_table;" + sql """ + create table d_table ( + k1 int null, + k2 int not null, + k3 bigint null, + k4 varchar(100) null + ) + duplicate key (k1,k2,k3) + distributed BY hash(k1) buckets 3 + properties("replication_num" = "1"); + """ + sql "insert into d_table select -4,-4,-4,'d';" + try_sql "select bitmap_union(to_bitmap_with_check(k2)) from d_table;" + qt_sql "select bitmap_union(to_bitmap(k2)) from d_table;" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org