This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 0e0facf356acd83316fc6dcd98b933a78d874fc1
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Wed Mar 30 09:52:11 2022 +0800

    [fix](load) fix bug that NodeChannel can not be destroyed ontime (#8705)
    
    After the ReusableClosure is reset, we can not call join() method, or it 
will blocked forever.
---
 be/src/exec/tablet_sink.cpp | 6 +++++-
 be/src/exec/tablet_sink.h   | 2 +-
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 080db73..6a5f9c6 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -496,7 +496,6 @@ void NodeChannel::try_send_batch() {
         }
     }
 
-    _add_batch_closure->reset();
     int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / 
NANOS_PER_MILLIS;
     if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
         if (remain_ms <= 0 && !request.eos()) {
@@ -506,6 +505,11 @@ void NodeChannel::try_send_batch() {
             remain_ms = config::min_load_rpc_timeout_ms;
         }
     }
+
+    // After calling reset(), make sure that the rpc will be called finally.
+    // Otherwise, when calling _add_batch_closure->join(), it will be blocked 
forever.
+    // and _add_batch_closure->join() will be called in ~NodeChannel().
+    _add_batch_closure->reset();
     _add_batch_closure->cntl.set_timeout_ms(remain_ms);
     if (config::tablet_writer_ignore_eovercrowded) {
         _add_batch_closure->cntl.ignore_eovercrowded();
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index b5c10b7..41d6521 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -100,7 +100,7 @@ public:
     void addSuccessHandler(std::function<void(const T&, bool)> fn) { 
success_handler = fn; }
 
     void join() {
-        if (cid != INVALID_BTHREAD_ID) {
+        if (cid != INVALID_BTHREAD_ID && _packet_in_flight) {
             brpc::Join(cid);
         }
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to