This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new b3bd16257e [impovement](sink) print load_id when sink fails (#11893)
b3bd16257e is described below

commit b3bd16257e81a3a55c32ca891b99ed6f3dab8409
Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com>
AuthorDate: Fri Aug 19 08:48:02 2022 +0800

    [impovement](sink) print load_id when sink fails (#11893)
---
 be/src/exec/tablet_sink.cpp     | 27 +++++++++++++++++----------
 be/src/runtime/fragment_mgr.cpp |  7 ++++---
 2 files changed, 21 insertions(+), 13 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 1aa0cdf692..be4cdca5e6 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -45,8 +45,7 @@ namespace doris {
 namespace stream_load {
 
 NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, 
int64_t node_id)
-        : _parent(parent), _index_channel(index_channel), _node_id(node_id) {
-}
+        : _parent(parent), _index_channel(index_channel), _node_id(node_id) {}
 
 NodeChannel::~NodeChannel() noexcept {
     if (_open_closure != nullptr) {
@@ -79,6 +78,9 @@ Status NodeChannel::init(RuntimeState* state) {
 
     _node_info = *node;
 
+    _load_info = "load_id=" + print_id(_parent->_load_id) +
+                 ", txn_id=" + std::to_string(_parent->_txn_id);
+
     _row_desc.reset(new RowDescriptor(_tuple_desc, false));
     _batch_size = state->batch_size();
     _cur_batch.reset(new RowBatch(*_row_desc, _batch_size, 
_parent->_mem_tracker.get()));
@@ -87,7 +89,7 @@ Status NodeChannel::init(RuntimeState* state) {
                                                                         
_node_info.brpc_port);
     if (_stub == nullptr) {
         LOG(WARNING) << "Get rpc stub failed, host=" << _node_info.host
-                     << ", port=" << _node_info.brpc_port;
+                     << ", port=" << _node_info.brpc_port << ", " << 
channel_info();
         _cancelled = true;
         return Status::InternalError("get rpc stub failed");
     }
@@ -143,7 +145,7 @@ void NodeChannel::open() {
 }
 
 void NodeChannel::_cancel_with_msg(const std::string& msg) {
-    LOG(WARNING) << msg;
+    LOG(WARNING) << channel_info() << ", " << msg;
     {
         std::lock_guard<SpinLock> l(_cancel_msg_lock);
         if (_cancel_msg == "") {
@@ -165,8 +167,11 @@ Status NodeChannel::open_wait() {
         ss << "failed to open tablet writer, error=" << 
berror(_open_closure->cntl.ErrorCode())
            << ", error_text=" << _open_closure->cntl.ErrorText();
         _cancelled = true;
-        LOG(WARNING) << ss.str();
-        return Status::InternalError(ss.str());
+
+        LOG(WARNING) << ss.str() << " " << channel_info();
+        return Status::InternalError("failed to open tablet writer, error={}, 
error_text={}",
+                                     berror(_open_closure->cntl.ErrorCode()),
+                                     _open_closure->cntl.ErrorText());
     }
     Status status(_open_closure->result.status());
     if (_open_closure->unref()) {
@@ -443,7 +448,8 @@ void NodeChannel::cancel(const std::string& cancel_msg) {
     request.release_id();
 }
 
-int NodeChannel::try_send_and_fetch_status(RuntimeState* state, 
std::unique_ptr<ThreadPoolToken>& thread_pool_token) {
+int NodeChannel::try_send_and_fetch_status(RuntimeState* state,
+                                           std::unique_ptr<ThreadPoolToken>& 
thread_pool_token) {
     auto st = none_of({_cancelled, _send_finished});
     if (!st.ok()) {
         return 0;
@@ -902,8 +908,8 @@ Status OlapTableSink::open(RuntimeState* state) {
     _send_batch_thread_pool_token = 
state->exec_env()->send_batch_thread_pool()->new_token(
             ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism);
     RETURN_IF_ERROR(Thread::create(
-            "OlapTableSink", "send_batch_process", [this, state]() { 
this->_send_batch_process(state); },
-            &_sender_thread));
+            "OlapTableSink", "send_batch_process",
+            [this, state]() { this->_send_batch_process(state); }, 
&_sender_thread));
 
     return Status::OK();
 }
@@ -1300,7 +1306,8 @@ void OlapTableSink::_send_batch_process(RuntimeState* 
state) {
     do {
         int running_channels_num = 0;
         for (auto index_channel : _channels) {
-            index_channel->for_each_node_channel([&running_channels_num, this, 
state](const std::shared_ptr<NodeChannel>& ch) {
+            index_channel->for_each_node_channel([&running_channels_num, this,
+                                                  state](const 
std::shared_ptr<NodeChannel>& ch) {
                 running_channels_num +=
                         ch->try_send_and_fetch_status(state, 
this->_send_batch_thread_pool_token);
             });
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index bad2c50c55..a70d744447 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -300,7 +300,8 @@ void FragmentExecState::coordinator_callback(const Status& 
status, RuntimeProfil
     FrontendServiceConnection coord(_exec_env->frontend_client_cache(), 
_coord_addr, &coord_status);
     if (!coord_status.ok()) {
         std::stringstream ss;
-        ss << "couldn't get a client for " << _coord_addr;
+        ss << "couldn't get a client for " << _coord_addr << ", reason: " << 
coord_status;
+        LOG(WARNING) << "query_id: " << _query_id << ", " << ss.str();
         update_status(Status::InternalError(ss.str()));
         return;
     }
@@ -387,8 +388,8 @@ void FragmentExecState::coordinator_callback(const Status& 
status, RuntimeProfil
     TReportExecStatusResult res;
     Status rpc_status;
 
-    VLOG_ROW << "debug: reportExecStatus params is "
-             << apache::thrift::ThriftDebugString(params).c_str();
+    VLOG_DEBUG << "reportExecStatus params is "
+               << apache::thrift::ThriftDebugString(params).c_str();
     try {
         try {
             coord->reportExecStatus(res, params);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to