This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.0
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 76ad5d17250570da5f205602026091a7ba9aa946
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Wed Mar 30 09:52:11 2022 +0800

    [fix](load) fix bug that NodeChannel can not be destroyed ontime (#8705)
    
    After the ReusableClosure is reset, we can not call join() method, or it 
will blocked forever.
---
 be/src/exec/tablet_sink.cpp | 6 +++++-
 be/src/exec/tablet_sink.h   | 2 +-
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index c36b574..068daf9 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -494,7 +494,6 @@ void NodeChannel::try_send_batch() {
         }
     }
 
-    _add_batch_closure->reset();
     int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / 
NANOS_PER_MILLIS;
     if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
         if (remain_ms <= 0 && !request.eos()) {
@@ -504,6 +503,11 @@ void NodeChannel::try_send_batch() {
             remain_ms = config::min_load_rpc_timeout_ms;
         }
     }
+
+    // After calling reset(), make sure that the rpc will be called finally.
+    // Otherwise, when calling _add_batch_closure->join(), it will be blocked 
forever.
+    // and _add_batch_closure->join() will be called in ~NodeChannel().
+    _add_batch_closure->reset();
     _add_batch_closure->cntl.set_timeout_ms(remain_ms);
     if (config::tablet_writer_ignore_eovercrowded) {
         _add_batch_closure->cntl.ignore_eovercrowded();
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 8071d20..2ad45b6 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -100,7 +100,7 @@ public:
     void addSuccessHandler(std::function<void(const T&, bool)> fn) { 
success_handler = fn; }
 
     void join() {
-        if (cid != INVALID_BTHREAD_ID) {
+        if (cid != INVALID_BTHREAD_ID && _packet_in_flight) {
             brpc::Join(cid);
         }
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to