This is an automated email from the ASF dual-hosted git repository. lide pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push: new a49a6c7c644 [fix](merge-iterator) Fix mem leak when get next batch failed (#41671) a49a6c7c644 is described below commit a49a6c7c64403d523166355bef23d88a7094c856 Author: lw112 <131352377+felixw...@users.noreply.github.com> AuthorDate: Mon Oct 14 16:02:14 2024 +0800 [fix](merge-iterator) Fix mem leak when get next batch failed (#41671) --- be/src/vec/olap/vgeneric_iterators.cpp | 5 +++-- be/src/vec/olap/vgeneric_iterators.h | 26 ++++++++++---------------- 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp index 75ce50b6c4d..5d9a79d5ef0 100644 --- a/be/src/vec/olap/vgeneric_iterators.cpp +++ b/be/src/vec/olap/vgeneric_iterators.cpp @@ -17,6 +17,7 @@ #include <vec/olap/vgeneric_iterators.h> +#include <algorithm> #include <memory> #include <queue> #include <utility> @@ -321,13 +322,13 @@ Status VMergeIterator::init(const StorageReadOptions& opts) { _record_rowids = opts.record_rowids; for (auto iter : _origin_iters) { - auto ctx = std::make_unique<VMergeIteratorContext>( + auto ctx = std::make_shared<VMergeIteratorContext>( iter, _sequence_id_idx, _is_unique, _is_reverse, opts.read_orderby_key_columns); RETURN_IF_ERROR(ctx->init(opts)); if (!ctx->valid()) { continue; } - _merge_heap.push(ctx.release()); + _merge_heap.push(ctx); } _origin_iters.clear(); diff --git a/be/src/vec/olap/vgeneric_iterators.h b/be/src/vec/olap/vgeneric_iterators.h index e5bb36d0d34..bd2dc934d98 100644 --- a/be/src/vec/olap/vgeneric_iterators.h +++ b/be/src/vec/olap/vgeneric_iterators.h @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include <memory> + #include "olap/iterators.h" #include "olap/row.h" #include "olap/row_block2.h" @@ -181,13 +183,7 @@ public: _is_reverse(is_reverse), _merged_rows(merged_rows) {} - ~VMergeIterator() override { - while (!_merge_heap.empty()) { - auto ctx = _merge_heap.top(); - _merge_heap.pop(); - delete ctx; - } - } + ~VMergeIterator() override = default; Status init(const StorageReadOptions& opts) override; @@ -221,7 +217,7 @@ private: _block_row_locations.resize(_block_row_max); } size_t row_idx = 0; - VMergeIteratorContext* pre_ctx = nullptr; + std::shared_ptr<VMergeIteratorContext> pre_ctx; while (_get_size(block) < _block_row_max) { if (_merge_heap.empty()) { break; @@ -238,7 +234,7 @@ private: } pre_ctx = ctx; } - pre_ctx->set_pre_ctx_same(ctx); + pre_ctx->set_pre_ctx_same(ctx.get()); if (UNLIKELY(_record_rowids)) { _block_row_locations[row_idx] = ctx->current_row_location(); } @@ -261,9 +257,6 @@ private: RETURN_IF_ERROR(ctx->advance()); if (ctx->valid()) { _merge_heap.push(ctx); - } else { - // Release ctx earlier to reduce resource consumed - delete ctx; } } if (!_merge_heap.empty()) { @@ -284,14 +277,15 @@ private: const Schema* _schema = nullptr; struct VMergeContextComparator { - bool operator()(const VMergeIteratorContext* lhs, const VMergeIteratorContext* rhs) const { + bool operator()(const std::shared_ptr<VMergeIteratorContext>& lhs, + const std::shared_ptr<VMergeIteratorContext>& rhs) const { return lhs->compare(*rhs); } }; - using VMergeHeap = - std::priority_queue<VMergeIteratorContext*, std::vector<VMergeIteratorContext*>, - VMergeContextComparator>; + using VMergeHeap = std::priority_queue<std::shared_ptr<VMergeIteratorContext>, + std::vector<std::shared_ptr<VMergeIteratorContext>>, + VMergeContextComparator>; VMergeHeap _merge_heap; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org