This is an automated email from the ASF dual-hosted git repository.

lide pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new a49a6c7c644 [fix](merge-iterator) Fix mem leak when get next batch 
failed (#41671)
a49a6c7c644 is described below

commit a49a6c7c64403d523166355bef23d88a7094c856
Author: lw112 <131352377+felixw...@users.noreply.github.com>
AuthorDate: Mon Oct 14 16:02:14 2024 +0800

    [fix](merge-iterator) Fix mem leak when get next batch failed (#41671)
---
 be/src/vec/olap/vgeneric_iterators.cpp |  5 +++--
 be/src/vec/olap/vgeneric_iterators.h   | 26 ++++++++++----------------
 2 files changed, 13 insertions(+), 18 deletions(-)

diff --git a/be/src/vec/olap/vgeneric_iterators.cpp 
b/be/src/vec/olap/vgeneric_iterators.cpp
index 75ce50b6c4d..5d9a79d5ef0 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -17,6 +17,7 @@
 
 #include <vec/olap/vgeneric_iterators.h>
 
+#include <algorithm>
 #include <memory>
 #include <queue>
 #include <utility>
@@ -321,13 +322,13 @@ Status VMergeIterator::init(const StorageReadOptions& 
opts) {
     _record_rowids = opts.record_rowids;
 
     for (auto iter : _origin_iters) {
-        auto ctx = std::make_unique<VMergeIteratorContext>(
+        auto ctx = std::make_shared<VMergeIteratorContext>(
                 iter, _sequence_id_idx, _is_unique, _is_reverse, 
opts.read_orderby_key_columns);
         RETURN_IF_ERROR(ctx->init(opts));
         if (!ctx->valid()) {
             continue;
         }
-        _merge_heap.push(ctx.release());
+        _merge_heap.push(ctx);
     }
 
     _origin_iters.clear();
diff --git a/be/src/vec/olap/vgeneric_iterators.h 
b/be/src/vec/olap/vgeneric_iterators.h
index e5bb36d0d34..bd2dc934d98 100644
--- a/be/src/vec/olap/vgeneric_iterators.h
+++ b/be/src/vec/olap/vgeneric_iterators.h
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <memory>
+
 #include "olap/iterators.h"
 #include "olap/row.h"
 #include "olap/row_block2.h"
@@ -181,13 +183,7 @@ public:
               _is_reverse(is_reverse),
               _merged_rows(merged_rows) {}
 
-    ~VMergeIterator() override {
-        while (!_merge_heap.empty()) {
-            auto ctx = _merge_heap.top();
-            _merge_heap.pop();
-            delete ctx;
-        }
-    }
+    ~VMergeIterator() override = default;
 
     Status init(const StorageReadOptions& opts) override;
 
@@ -221,7 +217,7 @@ private:
             _block_row_locations.resize(_block_row_max);
         }
         size_t row_idx = 0;
-        VMergeIteratorContext* pre_ctx = nullptr;
+        std::shared_ptr<VMergeIteratorContext> pre_ctx;
         while (_get_size(block) < _block_row_max) {
             if (_merge_heap.empty()) {
                 break;
@@ -238,7 +234,7 @@ private:
                     }
                     pre_ctx = ctx;
                 }
-                pre_ctx->set_pre_ctx_same(ctx);
+                pre_ctx->set_pre_ctx_same(ctx.get());
                 if (UNLIKELY(_record_rowids)) {
                     _block_row_locations[row_idx] = 
ctx->current_row_location();
                 }
@@ -261,9 +257,6 @@ private:
             RETURN_IF_ERROR(ctx->advance());
             if (ctx->valid()) {
                 _merge_heap.push(ctx);
-            } else {
-                // Release ctx earlier to reduce resource consumed
-                delete ctx;
             }
         }
         if (!_merge_heap.empty()) {
@@ -284,14 +277,15 @@ private:
     const Schema* _schema = nullptr;
 
     struct VMergeContextComparator {
-        bool operator()(const VMergeIteratorContext* lhs, const 
VMergeIteratorContext* rhs) const {
+        bool operator()(const std::shared_ptr<VMergeIteratorContext>& lhs,
+                        const std::shared_ptr<VMergeIteratorContext>& rhs) 
const {
             return lhs->compare(*rhs);
         }
     };
 
-    using VMergeHeap =
-            std::priority_queue<VMergeIteratorContext*, 
std::vector<VMergeIteratorContext*>,
-                                VMergeContextComparator>;
+    using VMergeHeap = 
std::priority_queue<std::shared_ptr<VMergeIteratorContext>,
+                                           
std::vector<std::shared_ptr<VMergeIteratorContext>>,
+                                           VMergeContextComparator>;
 
     VMergeHeap _merge_heap;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to