This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 838225b6beb [fix](move-memtable) wait stream close before releasing 
streams (#27791)
838225b6beb is described below

commit 838225b6beb8246994e36200ea9feb17aecec4a3
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Thu Nov 30 15:03:07 2023 +0800

    [fix](move-memtable) wait stream close before releasing streams (#27791)
---
 be/src/vec/sink/load_stream_stub.cpp         | 12 ++++++++++--
 be/src/vec/sink/load_stream_stub.h           |  4 ++++
 be/src/vec/sink/load_stream_stub_pool.cpp    | 12 ++++++++++++
 be/src/vec/sink/writer/vtablet_writer_v2.cpp | 11 +++++++----
 4 files changed, 33 insertions(+), 6 deletions(-)

diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 76907713fde..26331053e3a 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -113,10 +113,18 @@ LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
           _tablet_schema_for_index(stub._tablet_schema_for_index),
           _enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {};
 
-LoadStreamStub::~LoadStreamStub() {
+LoadStreamStub::~LoadStreamStub() = default;
+
+Status LoadStreamStub::close_stream() {
     if (_is_init.load() && !_handler.is_closed()) {
-        brpc::StreamClose(_stream_id);
+        LOG(INFO) << "closing stream, load_id=" << print_id(_load_id) << ", 
src_id=" << _src_id
+                  << ", dst_id=" << _dst_id << ", stream_id=" << _stream_id;
+        auto ret = brpc::StreamClose(_stream_id);
+        if (ret != 0) {
+            return Status::InternalError("StreamClose failed, err={}", ret);
+        }
     }
+    return Status::OK();
 }
 
 // open_load_stream
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index aca2e9ea550..7aae8496a0c 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -174,6 +174,10 @@ public:
     // GET_SCHEMA
     Status get_schema(const std::vector<PTabletID>& tablets);
 
+    // close stream, usually close is initiated by the remote.
+    // in case of remote failure, we should be able to close stream locally.
+    Status close_stream();
+
     // wait remote to close stream,
     // remote will close stream when it receives CLOSE_LOAD
     Status close_wait(int64_t timeout_ms = 0) {
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp 
b/be/src/vec/sink/load_stream_stub_pool.cpp
index 240e44ef380..bc19bad532c 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -31,6 +31,18 @@ void LoadStreams::release() {
     int num_use = --_use_cnt;
     if (num_use == 0) {
         LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id=" 
<< _dst_id;
+        for (auto& stream : _streams) {
+            auto st = stream->close_stream();
+            if (!st.ok()) {
+                LOG(WARNING) << "close stream failed " << st;
+            }
+        }
+        for (auto& stream : _streams) {
+            auto st = stream->close_wait();
+            if (!st.ok()) {
+                LOG(WARNING) << "close wait failed " << st;
+            }
+        }
         _pool->erase(_load_id, _dst_id);
     } else {
         LOG(INFO) << "keeping streams, load_id=" << _load_id << ", dst_id=" << 
_dst_id
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 33bca6a9401..74cc3d2bcf6 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -42,6 +42,7 @@
 #include "runtime/thread_context.h"
 #include "service/brpc.h"
 #include "util/brpc_client_cache.h"
+#include "util/defer_op.h"
 #include "util/doris_metrics.h"
 #include "util/threadpool.h"
 #include "util/thrift_util.h"
@@ -498,10 +499,12 @@ Status VTabletWriterV2::close(Status exec_status) {
         COUNTER_SET(_row_distribution_timer, 
(int64_t)_row_distribution_watch.elapsed_time());
         COUNTER_SET(_validate_data_timer, 
_block_convertor->validate_data_ns());
 
-        // release streams from the pool first, to prevent memory leak
-        for (const auto& [_, streams] : _streams_for_node) {
-            streams->release();
-        }
+        // defer stream release to prevent memory leak
+        Defer defer([&] {
+            for (const auto& [_, streams] : _streams_for_node) {
+                streams->release();
+            }
+        });
 
         {
             SCOPED_TIMER(_close_writer_timer);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to