This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 99024ad7bd7772d51d12578852de3118056ae0e5 Author: Kaijie Chen <c...@apache.org> AuthorDate: Fri Jan 12 09:11:46 2024 +0800 [fix](move-memtable) check eos for already closed streams (#29734) --- be/src/vec/sink/load_stream_stub.cpp | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index c84ef3b7a0a..59fc29c60f8 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -315,14 +315,13 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) { } DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0"; std::unique_lock<bthread::Mutex> lock(_close_mutex); - if (_is_closed.load()) { - return Status::OK(); - } - int ret = _close_cv.wait_for(lock, timeout_ms * 1000); - if (ret != 0) { - return Status::InternalError( - "stream close_wait timeout, error={}, load_id={}, dst_id={}, stream_id={}", ret, - print_id(_load_id), _dst_id, _stream_id); + if (!_is_closed.load()) { + int ret = _close_cv.wait_for(lock, timeout_ms * 1000); + if (ret != 0) { + return Status::InternalError( + "stream close_wait timeout, error={}, load_id={}, dst_id={}, stream_id={}", ret, + print_id(_load_id), _dst_id, _stream_id); + } } if (!_is_eos.load()) { return Status::InternalError( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org