This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.0-alpha in repository https://gitbox.apache.org/repos/asf/doris.git
commit fd2f8a6baa8770b61de9c78e489b415e3f553f2d Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Thu Apr 27 14:58:32 2023 +0800 [fix](memory) Fix AggFunc memory leak due to incorrect destroy (#19126) --- be/src/olap/memtable.cpp | 18 ++++++++--- be/src/olap/schema_change.cpp | 37 ++++++++++++++-------- be/src/util/defer_op.h | 5 +++ .../vec/aggregate_functions/aggregate_function.h | 31 ++++++++++++++---- .../aggregate_function_distinct.h | 2 +- .../aggregate_function_java_udaf.h | 7 +++- .../aggregate_functions/aggregate_function_rpc.h | 3 +- .../aggregate_functions/aggregate_function_sort.h | 2 +- .../aggregate_function_window.h | 16 +++++----- be/src/vec/exec/vaggregation_node.cpp | 9 +++++- be/src/vec/exec/vaggregation_node.h | 20 +++++++----- be/src/vec/exec/vanalytic_eval_node.cpp | 12 +++++-- be/src/vec/exec/vanalytic_eval_node.h | 1 + be/src/vec/olap/block_reader.cpp | 8 +++-- be/src/vec/olap/vertical_block_reader.cpp | 8 +++-- 15 files changed, 126 insertions(+), 53 deletions(-) diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 2b4bb79324..8a0686b21d 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -205,11 +205,19 @@ void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) { row_in_block->init_agg_places(_arena->aligned_alloc(_total_size_of_aggregate_states, 16), _offsets_of_aggregate_states.data()); for (auto cid = _schema->num_key_columns(); cid < _schema->num_columns(); cid++) { - auto col_ptr = _input_mutable_block.mutable_columns()[cid].get(); - auto data = row_in_block->agg_places(cid); - _agg_functions[cid]->create(data); - _agg_functions[cid]->add(data, const_cast<const doris::vectorized::IColumn**>(&col_ptr), - row_in_block->_row_pos, nullptr); + try { + auto col_ptr = _input_mutable_block.mutable_columns()[cid].get(); + auto data = row_in_block->agg_places(cid); + _agg_functions[cid]->create(data); + _agg_functions[cid]->add(data, + const_cast<const doris::vectorized::IColumn**>(&col_ptr), + row_in_block->_row_pos, nullptr); + } catch (...) { + for (size_t i = _schema->num_key_columns(); i < cid; ++i) { + _agg_functions[i]->destroy(row_in_block->agg_places(i)); + } + throw; + } } _vec_skip_list->InsertWithHint(row_in_block, is_exist, &_vec_hint); diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 86c7a86f8a..1fee5f744a 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -89,16 +89,32 @@ public: std::vector<vectorized::AggregateDataPtr> agg_places; for (int i = key_number; i < columns; i++) { - vectorized::AggregateFunctionPtr function = - tablet_schema->column(i).get_aggregate_function( - {finalized_block.get_data_type(i)}, vectorized::AGG_LOAD_SUFFIX); - agg_functions.push_back(function); - // create aggregate data - vectorized::AggregateDataPtr place = new char[function->size_of_data()]; - function->create(place); - agg_places.push_back(place); + try { + vectorized::AggregateFunctionPtr function = + tablet_schema->column(i).get_aggregate_function( + {finalized_block.get_data_type(i)}, + vectorized::AGG_LOAD_SUFFIX); + agg_functions.push_back(function); + // create aggregate data + vectorized::AggregateDataPtr place = new char[function->size_of_data()]; + function->create(place); + agg_places.push_back(place); + } catch (...) { + for (int j = 0; j < i - key_number; ++j) { + agg_functions[j]->destroy(agg_places[j]); + delete[] agg_places[j]; + } + throw; + } } + DEFER({ + for (int i = 0; i < columns - key_number; i++) { + agg_functions[i]->destroy(agg_places[i]); + delete[] agg_places[i]; + } + }); + for (int i = 0; i < rows; i++) { auto row_ref = row_refs[i]; @@ -130,11 +146,6 @@ public: } } } - - for (int i = 0; i < columns - key_number; i++) { - agg_functions[i]->destroy(agg_places[i]); - delete[] agg_places[i]; - } } else { std::vector<RowRef> pushed_row_refs; if (_tablet->keys_type() == KeysType::DUP_KEYS) { diff --git a/be/src/util/defer_op.h b/be/src/util/defer_op.h index bb0732c227..cf86af5781 100644 --- a/be/src/util/defer_op.h +++ b/be/src/util/defer_op.h @@ -40,4 +40,9 @@ private: T _closure; }; +// Nested use Defer, variable name concat line number +#define DEFER_CONCAT(n, ...) const auto defer##n = Defer([&]() { __VA_ARGS__; }) +#define DEFER_FWD(n, ...) DEFER_CONCAT(n, __VA_ARGS__) +#define DEFER(...) DEFER_FWD(__LINE__, __VA_ARGS__) + } // namespace doris diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index ab0a7dde72..e02c2c16be 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -20,6 +20,7 @@ #pragma once +#include "util/defer_op.h" #include "vec/columns/column_complex.h" #include "vec/common/hash_table/phmap_fwd_decl.h" #include "vec/core/block.h" @@ -46,6 +47,16 @@ using DataTypes = std::vector<DataTypePtr>; using AggregateDataPtr = char*; using ConstAggregateDataPtr = const char*; +#define SAFE_CREATE(create, destroy) \ + do { \ + try { \ + create; \ + } catch (...) { \ + destroy; \ + throw; \ + } \ + } while (0) + /** Aggregate functions interface. * Instances of classes with this interface do not contain the data itself for aggregation, * but contain only metadata (description) of the aggregate function, @@ -306,10 +317,10 @@ public: char place[size_of_data()]; for (size_t i = 0; i != num_rows; ++i) { static_cast<const Derived*>(this)->create(place); + DEFER({ static_cast<const Derived*>(this)->destroy(place); }); static_cast<const Derived*>(this)->add(place, columns, i, arena); static_cast<const Derived*>(this)->serialize(place, buf); buf.commit(); - static_cast<const Derived*>(this)->destroy(place); } } @@ -330,10 +341,18 @@ public: size_t num_rows) const override { const auto size_of_data = static_cast<const Derived*>(this)->size_of_data(); for (size_t i = 0; i != num_rows; ++i) { - auto place = places + size_of_data * i; - VectorBufferReader buffer_reader(column->get_data_at(i)); - static_cast<const Derived*>(this)->create(place); - static_cast<const Derived*>(this)->deserialize(place, buffer_reader, arena); + try { + auto place = places + size_of_data * i; + VectorBufferReader buffer_reader(column->get_data_at(i)); + static_cast<const Derived*>(this)->create(place); + static_cast<const Derived*>(this)->deserialize(place, buffer_reader, arena); + } catch (...) { + for (int j = 0; j < i; ++j) { + auto place = places + size_of_data * j; + static_cast<const Derived*>(this)->destroy(place); + } + throw; + } } } @@ -402,9 +421,9 @@ public: auto derived = static_cast<const Derived*>(this); derived->create(deserialized_place); + DEFER({ derived->destroy(deserialized_place); }); derived->deserialize(deserialized_place, buf, arena); derived->merge(place, deserialized_place, arena); - derived->destroy(deserialized_place); } void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column, diff --git a/be/src/vec/aggregate_functions/aggregate_function_distinct.h b/be/src/vec/aggregate_functions/aggregate_function_distinct.h index 3961e32447..ef467a615b 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_distinct.h +++ b/be/src/vec/aggregate_functions/aggregate_function_distinct.h @@ -219,7 +219,7 @@ public: void create(AggregateDataPtr __restrict place) const override { new (place) Data; - nested_func->create(get_nested_place(place)); + SAFE_CREATE(nested_func->create(get_nested_place(place)), this->data(place).~Data()); } void destroy(AggregateDataPtr __restrict place) const noexcept override { diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h index 1295e54fb8..09c2428ff0 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h +++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h @@ -433,7 +433,12 @@ public: if (_first_created) { new (place) Data(argument_types.size()); Status status = Status::OK(); - RETURN_IF_STATUS_ERROR(status, this->data(place).init_udaf(_fn, _local_location)); + SAFE_CREATE(RETURN_IF_STATUS_ERROR(status, + this->data(place).init_udaf(_fn, _local_location)), + { + this->data(place).destroy(); + this->data(place).~Data(); + }); _first_created = false; _exec_place = place; } diff --git a/be/src/vec/aggregate_functions/aggregate_function_rpc.h b/be/src/vec/aggregate_functions/aggregate_function_rpc.h index 937c936d86..bff4e20a4f 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_rpc.h +++ b/be/src/vec/aggregate_functions/aggregate_function_rpc.h @@ -358,7 +358,8 @@ public: void create(AggregateDataPtr __restrict place) const override { new (place) Data(argument_types.size()); Status status = Status::OK(); - RETURN_IF_STATUS_ERROR(status, data(place).init(_fn)); + SAFE_CREATE(RETURN_IF_STATUS_ERROR(status, data(place).init(_fn)), + this->data(place).~Data()); } String get_name() const override { return _fn.name.function_name; } diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h b/be/src/vec/aggregate_functions/aggregate_function_sort.h index 4abdb9c980..1c78c8fb2e 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_sort.h +++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h @@ -158,7 +158,7 @@ public: void create(AggregateDataPtr __restrict place) const override { new (place) Data(_sort_desc, _block); - _nested_func->create(get_nested_place(place)); + SAFE_CREATE(_nested_func->create(get_nested_place(place)), this->data(place).~Data()); } void destroy(AggregateDataPtr __restrict place) const noexcept override { diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h b/be/src/vec/aggregate_functions/aggregate_function_window.h index 83a4586ab2..6ab756cc50 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_window.h +++ b/be/src/vec/aggregate_functions/aggregate_function_window.h @@ -34,7 +34,7 @@ namespace doris::vectorized { struct RowNumberData { - int64_t count; + int64_t count = 0; }; class WindowFunctionRowNumber final @@ -71,9 +71,9 @@ public: }; struct RankData { - int64_t rank; - int64_t count; - int64_t peer_group_start; + int64_t rank = 0; + int64_t count = 0; + int64_t peer_group_start = 0; }; class WindowFunctionRank final : public IAggregateFunctionDataHelper<RankData, WindowFunctionRank> { @@ -116,8 +116,8 @@ public: }; struct DenseRankData { - int64_t rank; - int64_t peer_group_start; + int64_t rank = 0; + int64_t peer_group_start = 0; }; class WindowFunctionDenseRank final : public IAggregateFunctionDataHelper<DenseRankData, WindowFunctionDenseRank> { @@ -157,8 +157,8 @@ public: }; struct NTileData { - int64_t bucket_index; - int64_t rows; + int64_t bucket_index = 0; + int64_t rows = 0; }; class WindowFunctionNTile final diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index c19a4b7730..91ddedd22a 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -603,7 +603,14 @@ Status AggregationNode::close(RuntimeState* state) { Status AggregationNode::_create_agg_status(AggregateDataPtr data) { for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - _aggregate_evaluators[i]->create(data + _offsets_of_aggregate_states[i]); + try { + _aggregate_evaluators[i]->create(data + _offsets_of_aggregate_states[i]); + } catch (...) { + for (int j = 0; j < i; ++j) { + _aggregate_evaluators[j]->destroy(data + _offsets_of_aggregate_states[j]); + } + throw; + } } return Status::OK(); } diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index 34bcb7efed..90c2eaff5a 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -1217,13 +1217,15 @@ private: _deserialize_buffer.data(), (ColumnString*)(column.get()), _agg_arena_pool.get(), rows); } + + DEFER({ + _aggregate_evaluators[i]->function()->destroy_vec( + _deserialize_buffer.data(), rows); + }); + _aggregate_evaluators[i]->function()->merge_vec_selected( _places.data(), _offsets_of_aggregate_states[i], _deserialize_buffer.data(), _agg_arena_pool.get(), rows); - - _aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(), - rows); - } else { RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add_selected( block, _offsets_of_aggregate_states[i], _places.data(), @@ -1257,13 +1259,15 @@ private: _deserialize_buffer.data(), (ColumnString*)(column.get()), _agg_arena_pool.get(), rows); } + + DEFER({ + _aggregate_evaluators[i]->function()->destroy_vec( + _deserialize_buffer.data(), rows); + }); + _aggregate_evaluators[i]->function()->merge_vec( _places.data(), _offsets_of_aggregate_states[i], _deserialize_buffer.data(), _agg_arena_pool.get(), rows); - - _aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(), - rows); - } else { RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add( block, _offsets_of_aggregate_states[i], _places.data(), diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index 932baa370d..881928be42 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -712,13 +712,21 @@ Status VAnalyticEvalNode::_reset_agg_status() { Status VAnalyticEvalNode::_create_agg_status() { for (size_t i = 0; i < _agg_functions_size; ++i) { - _agg_functions[i]->create(_fn_place_ptr + _offsets_of_aggregate_states[i]); + try { + _agg_functions[i]->create(_fn_place_ptr + _offsets_of_aggregate_states[i]); + } catch (...) { + for (int j = 0; j < i; ++j) { + _agg_functions[j]->destroy(_fn_place_ptr + _offsets_of_aggregate_states[j]); + } + throw; + } } + _agg_functions_created = true; return Status::OK(); } Status VAnalyticEvalNode::_destroy_agg_status() { - if (UNLIKELY(_fn_place_ptr == nullptr)) { + if (UNLIKELY(_fn_place_ptr == nullptr || !_agg_functions_created)) { return Status::OK(); } for (size_t i = 0; i < _agg_functions_size; ++i) { diff --git a/be/src/vec/exec/vanalytic_eval_node.h b/be/src/vec/exec/vanalytic_eval_node.h index ca7d723974..62a64a448d 100644 --- a/be/src/vec/exec/vanalytic_eval_node.h +++ b/be/src/vec/exec/vanalytic_eval_node.h @@ -142,6 +142,7 @@ private: int64_t _rows_start_offset = 0; int64_t _rows_end_offset = 0; size_t _agg_functions_size = 0; + bool _agg_functions_created = false; /// The offset of the n-th functions. std::vector<size_t> _offsets_of_aggregate_states; diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index a9dbcd6226..aed8eabe11 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -129,7 +129,10 @@ void BlockReader::_init_agg_state(const ReaderParams& read_params) { _agg_functions.push_back(function); // create aggregate data AggregateDataPtr place = new char[function->size_of_data()]; - function->create(place); + SAFE_CREATE(function->create(place), { + _agg_functions.pop_back(); + delete[] place; + }); _agg_places.push_back(place); // calculate `has_string` tag. @@ -438,8 +441,7 @@ void BlockReader::_update_agg_value(MutableColumns& columns, int begin, int end, if (is_close) { function->insert_result_into(place, *columns[_return_columns_loc[idx]]); // reset aggregate data - function->destroy(place); - function->create(place); + function->reset(place); } } } diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp index 097ddb7513..e5ef2a403c 100644 --- a/be/src/vec/olap/vertical_block_reader.cpp +++ b/be/src/vec/olap/vertical_block_reader.cpp @@ -154,7 +154,10 @@ void VerticalBlockReader::_init_agg_state(const ReaderParams& read_params) { _agg_functions.push_back(function); // create aggregate data AggregateDataPtr place = new char[function->size_of_data()]; - function->create(place); + SAFE_CREATE(function->create(place), { + _agg_functions.pop_back(); + delete[] place; + }); _agg_places.push_back(place); // calculate `has_string` tag. @@ -271,8 +274,7 @@ void VerticalBlockReader::_update_agg_value(MutableColumns& columns, int begin, if (is_close) { function->insert_result_into(place, *columns[idx]); // reset aggregate data - function->destroy(place); - function->create(place); + function->reset(place); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org