This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 99024ad7bd7772d51d12578852de3118056ae0e5
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Fri Jan 12 09:11:46 2024 +0800

    [fix](move-memtable) check eos for already closed streams (#29734)
---
 be/src/vec/sink/load_stream_stub.cpp | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)

diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index c84ef3b7a0a..59fc29c60f8 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -315,14 +315,13 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) {
     }
     DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
     std::unique_lock<bthread::Mutex> lock(_close_mutex);
-    if (_is_closed.load()) {
-        return Status::OK();
-    }
-    int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
-    if (ret != 0) {
-        return Status::InternalError(
-                "stream close_wait timeout, error={}, load_id={}, dst_id={}, 
stream_id={}", ret,
-                print_id(_load_id), _dst_id, _stream_id);
+    if (!_is_closed.load()) {
+        int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
+        if (ret != 0) {
+            return Status::InternalError(
+                    "stream close_wait timeout, error={}, load_id={}, 
dst_id={}, stream_id={}", ret,
+                    print_id(_load_id), _dst_id, _stream_id);
+        }
     }
     if (!_is_eos.load()) {
         return Status::InternalError(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to