This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.0
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 422d4d2afdf8bccb431e9f7565ed4e9071886f9a
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Fri Mar 18 09:38:16 2022 +0800

    [fix](load) fix bug that BE may crash when calling `mark_as_failed` (#8501)
    
    1.
    The methods in the IndexChannel are called back in the RpcClosure in the 
NodeChannel.
    However, this callback may occur after the whole task is finished (e.g. due 
to network latency),
    and by that time the IndexChannel may have been destructured, so we should 
not call
    the IndexChannel methods anymore, otherwise the BE will crash.
    
    Therefore, we use the `_is_closed` variable and `_closed_lock` to ensure 
that the RPC callback function
    will not call the IndexChannel's method after the NodeChannel is closed.
    
    2.
    Do not add IndexChannel to the ObjectPool.
    Because when deconstruct IndexChannel, it may call the deconstruction of 
NodeChannel.
    And the deconstruction of NodeChannel maybe time consuming(wait rpc 
finished).
    But the ObjectPool will hold a SpinLock to destroy the objects, so it may 
cause CPU busy.
---
 be/src/exec/tablet_sink.cpp | 32 +++++++++++++++++++++++---------
 be/src/exec/tablet_sink.h   | 15 +++++++++++++--
 2 files changed, 36 insertions(+), 11 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 5b59508..b690f89 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -34,6 +34,7 @@
 #include "service/brpc.h"
 #include "util/brpc_client_cache.h"
 #include "util/debug/sanitizer_scopes.h"
+#include "util/defer_op.h"
 #include "util/monotime.h"
 #include "util/proto_util.h"
 #include "util/threadpool.h"
@@ -183,6 +184,12 @@ Status NodeChannel::open_wait() {
     // add batch closure
     _add_batch_closure = 
ReusableClosure<PTabletWriterAddBatchResult>::create();
     _add_batch_closure->addFailedHandler([this](bool is_last_rpc) {
+        std::lock_guard<std::mutex> l(this->_closed_lock);
+        if (this->_is_closed) {
+            // if the node channel is closed, no need to call `mark_as_failed`,
+            // and notice that _index_channel may already be destroyed.
+            return;
+        }
         // If rpc failed, mark all tablets on this node channel as failed
         _index_channel->mark_as_failed(this->node_id(), this->host(), 
_add_batch_closure->cntl.ErrorText(), -1);
         Status st = _index_channel->check_intolerable_failure();
@@ -197,6 +204,12 @@ Status NodeChannel::open_wait() {
 
     _add_batch_closure->addSuccessHandler([this](const 
PTabletWriterAddBatchResult& result,
                                                  bool is_last_rpc) {
+        std::lock_guard<std::mutex> l(this->_closed_lock);
+        if (this->_is_closed) {
+            // if the node channel is closed, no need to call the following 
logic,
+            // and notice that _index_channel may already be destroyed.
+            return;
+        }
         Status status(result.status());
         if (status.ok()) {
             // if has error tablet, handle them first
@@ -329,15 +342,10 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t 
tablet_id) {
     return Status::OK();
 }
 
-Status NodeChannel::mark_close() {
+void NodeChannel::mark_close() {
     auto st = none_of({_cancelled, _eos_is_produced});
     if (!st.ok()) {
-        if (_cancelled) {
-            std::lock_guard<SpinLock> l(_cancel_msg_lock);
-            return Status::InternalError("mark close failed. " + _cancel_msg);
-        } else {
-            return st.clone_and_prepend("already stopped, can't mark as 
closed. cancelled/eos: ");
-        }
+        return;
     }
 
     _cur_add_batch_request.set_eos(true);
@@ -354,10 +362,16 @@ Status NodeChannel::mark_close() {
     }
 
     _eos_is_produced = true;
-    return Status::OK();
+    return;
 }
 
 Status NodeChannel::close_wait(RuntimeState* state) {
+    // set _is_closed to true finally
+    Defer set_closed {[&]() {
+        std::lock_guard<std::mutex> l(_closed_lock);
+        _is_closed = true;
+    }};
+
     auto st = none_of({_cancelled, !_eos_is_produced});
     if (!st.ok()) {
         if (_cancelled) {
@@ -801,7 +815,7 @@ Status OlapTableSink::prepare(RuntimeState* state) {
                 tablets.emplace_back(std::move(tablet_with_partition));
             }
         }
-        auto channel = _pool->add(new IndexChannel(this, index->index_id));
+        auto channel = std::make_shared<IndexChannel>(this, index->index_id);
         RETURN_IF_ERROR(channel->init(state, tablets));
         _channels.emplace_back(channel);
     }
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index c485632..8071d20 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -170,7 +170,7 @@ public:
     // two ways to stop channel:
     // 1. mark_close()->close_wait() PS. close_wait() will block waiting for 
the last AddBatch rpc response.
     // 2. just cancel()
-    Status mark_close();
+    void mark_close();
     Status close_wait(RuntimeState* state);
 
     void cancel(const std::string& cancel_msg);
@@ -284,6 +284,17 @@ private:
 
     // the timestamp when this node channel be marked closed and finished 
closed
     uint64_t _close_time_ms = 0;
+
+    // lock to protect _is_closed.
+    // The methods in the IndexChannel are called back in the RpcClosure in 
the NodeChannel.
+    // However, this rpc callback may occur after the whole task is finished 
(e.g. due to network latency),
+    // and by that time the IndexChannel may have been destructured, so we 
should not call the
+    // IndexChannel methods anymore, otherwise the BE will crash.
+    // Therefore, we use the _is_closed and _closed_lock to ensure that the 
RPC callback
+    // function will not call the IndexChannel method after the NodeChannel is 
closed.
+    // The IndexChannel is definitely accessible until the NodeChannel is 
closed.
+    std::mutex _closed_lock;
+    bool _is_closed = false;
 };
 
 class IndexChannel {
@@ -424,7 +435,7 @@ protected:
     Bitmap _filter_bitmap;
 
     // index_channel
-    std::vector<IndexChannel*> _channels;
+    std::vector<std::shared_ptr<IndexChannel>> _channels;
 
     CountDownLatch _stop_background_threads_latch;
     scoped_refptr<Thread> _sender_thread;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to