This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f7a340a2dfe8250f4cced44eac9faa25adc5de84
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Tue Jan 16 20:08:26 2024 +0800

    [improve](move-memtable) add cancel method to load stream stub (#29994)
---
 be/src/vec/sink/load_stream_stub.cpp            | 25 +++++++++++++++++++++++--
 be/src/vec/sink/load_stream_stub.h              | 15 +++++++++++++++
 be/src/vec/sink/load_stream_stub_pool.cpp       |  2 +-
 be/src/vec/sink/load_stream_stub_pool.h         |  4 ++--
 be/src/vec/sink/writer/vtablet_writer_v2.cpp    |  7 +++++--
 be/test/vec/exec/load_stream_stub_pool_test.cpp |  7 +++----
 6 files changed, 49 insertions(+), 11 deletions(-)

diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 40ce75d24e6..347acb1b6f6 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -294,6 +294,7 @@ Status LoadStreamStub::wait_for_schema(int64_t 
partition_id, int64_t index_id, i
     watch.start();
     while (!_tablet_schema_for_index->contains(index_id) &&
            watch.elapsed_time() / 1000 / 1000 < timeout_ms) {
+        RETURN_IF_ERROR(_check_cancel());
         static_cast<void>(wait_for_new_schema(100));
     }
 
@@ -308,8 +309,12 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) {
         while (true) {
         };
     });
-    if (!_is_init.load() || _is_closed.load()) {
-        return Status::OK();
+    if (!_is_init.load()) {
+        return Status::InternalError("stream {} is not opened, load_id={}", 
_stream_id,
+                                     print_id(_load_id));
+    }
+    if (_is_closed.load()) {
+        return _check_cancel();
     }
     if (timeout_ms <= 0) {
         timeout_ms = config::close_load_stream_timeout_ms;
@@ -324,6 +329,7 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) {
                     print_id(_load_id), _dst_id, _stream_id);
         }
     }
+    RETURN_IF_ERROR(_check_cancel());
     if (!_is_eos.load()) {
         return Status::InternalError(
                 "stream closed without eos, load_id={}, dst_id={}, 
stream_id={}",
@@ -332,6 +338,20 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) {
     return Status::OK();
 }
 
+void LoadStreamStub::cancel(Status reason) {
+    LOG(WARNING) << *this << " is cancelled because of " << reason;
+    {
+        std::lock_guard<bthread::Mutex> lock(_cancel_mutex);
+        _cancel_reason = reason;
+        _is_cancelled.store(true);
+    }
+    {
+        std::lock_guard<bthread::Mutex> lock(_close_mutex);
+        _is_closed.store(true);
+        _close_cv.notify_all();
+    }
+}
+
 Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const 
Slice> data) {
     butil::IOBuf buf;
     size_t header_len = header.ByteSizeLong();
@@ -365,6 +385,7 @@ Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, 
bool sync) {
 
 Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) {
     for (;;) {
+        RETURN_IF_ERROR(_check_cancel());
         int ret;
         {
             
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 6aae778dc93..c91f1016d35 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -153,6 +153,9 @@ public:
     // if timeout_ms <= 0, will fallback to 
config::close_load_stream_timeout_ms
     Status close_wait(int64_t timeout_ms = 0);
 
+    // cancel the stream, abort close_wait, mark _is_closed and _is_cancelled
+    void cancel(Status reason);
+
     Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t 
tablet_id,
                            int64_t timeout_ms = 60000);
 
@@ -197,9 +200,19 @@ private:
     Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
     Status _send_with_retry(butil::IOBuf& buf);
 
+    Status _check_cancel() {
+        if (!_is_cancelled.load()) {
+            return Status::OK();
+        }
+        std::lock_guard<bthread::Mutex> lock(_cancel_mutex);
+        return Status::Cancelled("load_id={}, reason: {}", print_id(_load_id),
+                                 _cancel_reason.to_string_no_stack());
+    }
+
 protected:
     std::atomic<bool> _is_init;
     std::atomic<bool> _is_closed;
+    std::atomic<bool> _is_cancelled;
     std::atomic<bool> _is_eos;
     std::atomic<int> _use_cnt;
 
@@ -207,9 +220,11 @@ protected:
     brpc::StreamId _stream_id;
     int64_t _src_id = -1; // source backend_id
     int64_t _dst_id = -1; // destination backend_id
+    Status _cancel_reason;
 
     bthread::Mutex _open_mutex;
     bthread::Mutex _close_mutex;
+    bthread::Mutex _cancel_mutex;
     bthread::ConditionVariable _close_cv;
 
     std::mutex _tablets_to_commit_mutex;
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp 
b/be/src/vec/sink/load_stream_stub_pool.cpp
index 1baa903f2ee..d76402b57d5 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -26,7 +26,7 @@ class TExpr;
 LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, 
LoadStreamStubPool* pool)
         : _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {}
 
-void LoadStreams::release(Status status) {
+void LoadStreams::release() {
     int num_use = --_use_cnt;
     DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; });
     if (num_use == 0) {
diff --git a/be/src/vec/sink/load_stream_stub_pool.h 
b/be/src/vec/sink/load_stream_stub_pool.h
index b34383b25f9..662fc5bc1a1 100644
--- a/be/src/vec/sink/load_stream_stub_pool.h
+++ b/be/src/vec/sink/load_stream_stub_pool.h
@@ -76,7 +76,7 @@ class LoadStreams {
 public:
     LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, 
LoadStreamStubPool* pool);
 
-    void release(Status status);
+    void release();
 
     Streams& streams() { return _streams; }
 
@@ -116,4 +116,4 @@ private:
     std::unordered_map<std::pair<UniqueId, int64_t>, 
std::shared_ptr<LoadStreams>> _pool;
 };
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 02b40549253..e23fe761ecf 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -457,7 +457,10 @@ Status VTabletWriterV2::_cancel(Status status) {
         _delta_writer_for_tablet.reset();
     }
     for (const auto& [_, streams] : _streams_for_node) {
-        streams->release(status);
+        for (const auto& stream : streams->streams()) {
+            stream->cancel(status);
+        }
+        streams->release();
     }
     return Status::OK();
 }
@@ -514,7 +517,7 @@ Status VTabletWriterV2::close(Status exec_status) {
         // defer stream release to prevent memory leak
         Defer defer([&] {
             for (const auto& [_, streams] : _streams_for_node) {
-                streams->release(status);
+                streams->release();
             }
             _streams_for_node.clear();
         });
diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp 
b/be/test/vec/exec/load_stream_stub_pool_test.cpp
index bea5443b4ff..24da3bb6999 100644
--- a/be/test/vec/exec/load_stream_stub_pool_test.cpp
+++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp
@@ -32,7 +32,6 @@ TEST_F(LoadStreamStubPoolTest, test) {
     LoadStreamStubPool pool;
     int64_t src_id = 100;
     PUniqueId load_id;
-    Status st = Status::OK();
     load_id.set_hi(1);
     load_id.set_hi(2);
     auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1);
@@ -42,9 +41,9 @@ TEST_F(LoadStreamStubPoolTest, test) {
     EXPECT_EQ(1, pool.templates_size());
     EXPECT_EQ(streams1, streams3);
     EXPECT_NE(streams1, streams2);
-    streams1->release(st);
-    streams2->release(st);
-    streams3->release(st);
+    streams1->release();
+    streams2->release();
+    streams3->release();
     EXPECT_EQ(0, pool.size());
     EXPECT_EQ(0, pool.templates_size());
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to