This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new b3bd16257e [impovement](sink) print load_id when sink fails (#11893) b3bd16257e is described below commit b3bd16257e81a3a55c32ca891b99ed6f3dab8409 Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> AuthorDate: Fri Aug 19 08:48:02 2022 +0800 [impovement](sink) print load_id when sink fails (#11893) --- be/src/exec/tablet_sink.cpp | 27 +++++++++++++++++---------- be/src/runtime/fragment_mgr.cpp | 7 ++++--- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 1aa0cdf692..be4cdca5e6 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -45,8 +45,7 @@ namespace doris { namespace stream_load { NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int64_t node_id) - : _parent(parent), _index_channel(index_channel), _node_id(node_id) { -} + : _parent(parent), _index_channel(index_channel), _node_id(node_id) {} NodeChannel::~NodeChannel() noexcept { if (_open_closure != nullptr) { @@ -79,6 +78,9 @@ Status NodeChannel::init(RuntimeState* state) { _node_info = *node; + _load_info = "load_id=" + print_id(_parent->_load_id) + + ", txn_id=" + std::to_string(_parent->_txn_id); + _row_desc.reset(new RowDescriptor(_tuple_desc, false)); _batch_size = state->batch_size(); _cur_batch.reset(new RowBatch(*_row_desc, _batch_size, _parent->_mem_tracker.get())); @@ -87,7 +89,7 @@ Status NodeChannel::init(RuntimeState* state) { _node_info.brpc_port); if (_stub == nullptr) { LOG(WARNING) << "Get rpc stub failed, host=" << _node_info.host - << ", port=" << _node_info.brpc_port; + << ", port=" << _node_info.brpc_port << ", " << channel_info(); _cancelled = true; return Status::InternalError("get rpc stub failed"); } @@ -143,7 +145,7 @@ void NodeChannel::open() { } void NodeChannel::_cancel_with_msg(const std::string& msg) { - LOG(WARNING) << msg; + LOG(WARNING) << channel_info() << ", " << msg; { std::lock_guard<SpinLock> l(_cancel_msg_lock); if (_cancel_msg == "") { @@ -165,8 +167,11 @@ Status NodeChannel::open_wait() { ss << "failed to open tablet writer, error=" << berror(_open_closure->cntl.ErrorCode()) << ", error_text=" << _open_closure->cntl.ErrorText(); _cancelled = true; - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); + + LOG(WARNING) << ss.str() << " " << channel_info(); + return Status::InternalError("failed to open tablet writer, error={}, error_text={}", + berror(_open_closure->cntl.ErrorCode()), + _open_closure->cntl.ErrorText()); } Status status(_open_closure->result.status()); if (_open_closure->unref()) { @@ -443,7 +448,8 @@ void NodeChannel::cancel(const std::string& cancel_msg) { request.release_id(); } -int NodeChannel::try_send_and_fetch_status(RuntimeState* state, std::unique_ptr<ThreadPoolToken>& thread_pool_token) { +int NodeChannel::try_send_and_fetch_status(RuntimeState* state, + std::unique_ptr<ThreadPoolToken>& thread_pool_token) { auto st = none_of({_cancelled, _send_finished}); if (!st.ok()) { return 0; @@ -902,8 +908,8 @@ Status OlapTableSink::open(RuntimeState* state) { _send_batch_thread_pool_token = state->exec_env()->send_batch_thread_pool()->new_token( ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism); RETURN_IF_ERROR(Thread::create( - "OlapTableSink", "send_batch_process", [this, state]() { this->_send_batch_process(state); }, - &_sender_thread)); + "OlapTableSink", "send_batch_process", + [this, state]() { this->_send_batch_process(state); }, &_sender_thread)); return Status::OK(); } @@ -1300,7 +1306,8 @@ void OlapTableSink::_send_batch_process(RuntimeState* state) { do { int running_channels_num = 0; for (auto index_channel : _channels) { - index_channel->for_each_node_channel([&running_channels_num, this, state](const std::shared_ptr<NodeChannel>& ch) { + index_channel->for_each_node_channel([&running_channels_num, this, + state](const std::shared_ptr<NodeChannel>& ch) { running_channels_num += ch->try_send_and_fetch_status(state, this->_send_batch_thread_pool_token); }); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index bad2c50c55..a70d744447 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -300,7 +300,8 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil FrontendServiceConnection coord(_exec_env->frontend_client_cache(), _coord_addr, &coord_status); if (!coord_status.ok()) { std::stringstream ss; - ss << "couldn't get a client for " << _coord_addr; + ss << "couldn't get a client for " << _coord_addr << ", reason: " << coord_status; + LOG(WARNING) << "query_id: " << _query_id << ", " << ss.str(); update_status(Status::InternalError(ss.str())); return; } @@ -387,8 +388,8 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil TReportExecStatusResult res; Status rpc_status; - VLOG_ROW << "debug: reportExecStatus params is " - << apache::thrift::ThriftDebugString(params).c_str(); + VLOG_DEBUG << "reportExecStatus params is " + << apache::thrift::ThriftDebugString(params).c_str(); try { try { coord->reportExecStatus(res, params); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org