This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5c3ca0fbc2bd94af4b58b77829ec369612941c06
Author: Xin Liao <liaoxin...@126.com>
AuthorDate: Sat Mar 23 22:33:59 2024 +0800

    [fix](move-memtable) fix load timeout caused by lost wakeup (#32720)
---
 be/src/vec/sink/load_stream_stub.cpp | 25 ++++++++++---------------
 1 file changed, 10 insertions(+), 15 deletions(-)

diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index da0276fdd06..6eb91e46853 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -321,21 +321,16 @@ Status LoadStreamStub::close_wait(RuntimeState* state, 
int64_t timeout_ms) {
     }
     DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
     std::unique_lock<bthread::Mutex> lock(_close_mutex);
-    if (!_is_closed.load()) {
-        auto timeout_sec = timeout_ms / 1000;
-        while (!state->get_query_ctx()->is_cancelled() && timeout_sec > 0) {
-            //the query maybe cancel, so need check after wait 1s
-            timeout_sec = timeout_sec - 1;
-            int ret = _close_cv.wait_for(lock, 1000000);
-            if (ret == 0) {
-                break;
-            }
-            if (timeout_sec <= 0) {
-                return Status::InternalError(
-                        "stream close_wait timeout, timeout_ms={}, load_id={}, 
dst_id={}, "
-                        "stream_id={}",
-                        timeout_ms, print_id(_load_id), _dst_id, _stream_id);
-            }
+    auto timeout_sec = timeout_ms / 1000;
+    while (!_is_closed.load() && !state->get_query_ctx()->is_cancelled()) {
+        //the query maybe cancel, so need check after wait 1s
+        timeout_sec = timeout_sec - 1;
+        int ret = _close_cv.wait_for(lock, 1000000);
+        if (ret != 0 && timeout_sec <= 0) {
+            return Status::InternalError(
+                    "stream close_wait timeout, error={}, timeout_ms={}, 
load_id={}, dst_id={}, "
+                    "stream_id={}",
+                    ret, timeout_ms, print_id(_load_id), _dst_id, _stream_id);
         }
     }
     RETURN_IF_ERROR(_check_cancel());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to