This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9fbe6f5294e [improve](move-memtable) add more info in LoadStreamStub errors (#33618) 9fbe6f5294e is described below commit 9fbe6f5294ed32ea5b6f11129209ddadce5a2aeb Author: Kaijie Chen <c...@apache.org> AuthorDate: Mon Apr 15 10:14:46 2024 +0800 [improve](move-memtable) add more info in LoadStreamStub errors (#33618) --- be/src/vec/sink/load_stream_stub.cpp | 24 +++++++++++++----------- be/src/vec/sink/load_stream_stub.h | 2 ++ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 6d661a4c88e..78e1bc691cc 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -293,8 +293,7 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) { DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK); if (!_is_init.load()) { - return Status::InternalError("stream {} is not opened, load_id={}", _stream_id, - print_id(_load_id)); + return Status::InternalError("stream {} is not opened, {}", _stream_id, to_string()); } if (_is_closed.load()) { return _check_cancel(); @@ -310,17 +309,13 @@ Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) { << ", is_cancelled=" << state->get_query_ctx()->is_cancelled(); int ret = _close_cv.wait_for(lock, 1000000); if (ret != 0 && timeout_sec <= 0) { - return Status::InternalError( - "stream close_wait timeout, error={}, timeout_ms={}, load_id={}, dst_id={}, " - "stream_id={}", - ret, timeout_ms, print_id(_load_id), _dst_id, _stream_id); + return Status::InternalError("stream close_wait timeout, error={}, timeout_ms={}, {}", + ret, timeout_ms, to_string()); } } RETURN_IF_ERROR(_check_cancel()); if (!_is_eos.load()) { - return Status::InternalError( - "stream closed without eos, load_id={}, dst_id={}, stream_id={}", - print_id(_load_id), _dst_id, _stream_id); + return Status::InternalError("stream closed without eos, {}", to_string()); } return Status::OK(); } @@ -391,16 +386,23 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) { const timespec time = butil::seconds_from_now(config::load_stream_eagain_wait_seconds); int wait_ret = brpc::StreamWait(_stream_id, &time); if (wait_ret != 0) { - return Status::InternalError("StreamWait failed, err={}", wait_ret); + return Status::InternalError("StreamWait failed, err={}, {}", wait_ret, + to_string()); } break; } default: - return Status::InternalError("StreamWrite failed, err={}", ret); + return Status::InternalError("StreamWrite failed, err={}, {}", ret, to_string()); } } } +std::string LoadStreamStub::to_string() { + std::ostringstream ss; + ss << *this; + return ss.str(); +} + inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub) { ostr << "LoadStreamStub load_id=" << print_id(stub._load_id) << ", src_id=" << stub._src_id << ", dst_id=" << stub._dst_id << ", stream_id=" << stub._stream_id; diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index aa8b850760e..5ebec9f9d78 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -198,6 +198,8 @@ public: friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub); + std::string to_string(); + private: Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {}); Status _send_with_buffer(butil::IOBuf& buf, bool sync = false); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org