This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
     new d366476697 [chore](BE) add more log for better tracing for be write 
(#14425)
d366476697 is described below

commit d366476697c626470b9a129e5747bbbb173b07e0
Author: AlexYue <yj976240...@gmail.com>
AuthorDate: Thu Nov 24 22:51:53 2022 +0800

    [chore](BE) add more log for better tracing for be write (#14425)
    
    Recently when tracing when bug happened in version1.1.4 I found out there 
were some places we can add more log for a better tracing.
---
 be/src/exec/tablet_sink.cpp             | 19 ++++++++++++-----
 be/src/olap/delta_writer.cpp            |  6 +++++-
 be/src/olap/memtable_flush_executor.cpp |  6 +++++-
 be/src/runtime/tablets_channel.cpp      | 36 +++++++++++++++++++++------------
 4 files changed, 47 insertions(+), 20 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index f7c4268704..8db57b45ff 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -200,6 +200,7 @@ Status NodeChannel::open_wait() {
         } else if (is_last_rpc) {
             // if this is last rpc, will must set _add_batches_finished. 
otherwise, node channel's close_wait
             // will be blocked.
+            VLOG_PROGRESS << "node channel " << channel_info() << " 
add_batches_finished";
             _add_batches_finished = true;
         }
         _add_batch_counter.add_batch_rpc_time_us += 
_add_batch_closure->watch.elapsed_time() / 1000;
@@ -232,6 +233,9 @@ Status NodeChannel::open_wait() {
                     commit_info.backendId = _node_id;
                     _tablet_commit_infos.emplace_back(std::move(commit_info));
                 }
+                VLOG_PROGRESS << "node channel " << channel_info()
+                              << " add_batches_finished and handled "
+                              << result.tablet_errors().size() << " tablets 
errors";
                 _add_batches_finished = true;
             }
         } else {
@@ -348,8 +352,7 @@ void NodeChannel::_sleep_if_memory_exceed() {
                       << ", max_pending_batches_bytes = " << 
_max_pending_batches_bytes
                       << ", is_packet_in_flight = " << 
_add_batch_closure->is_packet_in_flight()
                       << ", next_packet_seq = " << _next_packet_seq
-                      << ", cur_batch_rows = " << _cur_batch->num_rows()
-                      << ", " << channel_info();
+                      << ", cur_batch_rows = " << _cur_batch->num_rows() << ", 
" << channel_info();
         }
     }
 }
@@ -370,7 +373,8 @@ void NodeChannel::mark_close() {
         DCHECK(_pending_batches.back().second.eos());
         _close_time_ms = UnixMillis();
         LOG(INFO) << channel_info()
-                  << " mark closed, left pending batch size: " << 
_pending_batches.size();
+                  << " mark closed, left pending batch size: " << 
_pending_batches.size()
+                  << " left pending batch size: " << _pending_batches_bytes;
     }
 
     _eos_is_produced = true;
@@ -672,6 +676,8 @@ void IndexChannel::add_row(BlockRow& block_row, int64_t 
tablet_id) {
 
 void IndexChannel::mark_as_failed(int64_t node_id, const std::string& host, 
const std::string& err,
                                   int64_t tablet_id) {
+    VLOG_PROGRESS << "mark node_id:" << node_id << " tablet_id: " << tablet_id
+                  << " as failed, err: " << err;
     const auto& it = _tablets_by_channel.find(node_id);
     if (it == _tablets_by_channel.end()) {
         return;
@@ -883,6 +889,9 @@ Status OlapTableSink::prepare(RuntimeState* state) {
             }
         }
         auto channel = std::make_shared<IndexChannel>(this, index->index_id);
+        if (UNLIKELY(tablets.empty())) {
+            LOG(WARNING) << "load job:" << state->load_job_id() << " index: " 
<< index->index_id << " would open 0 tablet";
+        }
         RETURN_IF_ERROR(channel->init(state, tablets));
         _channels.emplace_back(channel);
     }
@@ -1109,8 +1118,8 @@ Status OlapTableSink::close(RuntimeState* state, Status 
close_status) {
         for (auto const& pair : node_add_batch_counter_map) {
             ss << "{" << pair.first << ":(" << 
(pair.second.add_batch_execution_time_us / 1000)
                << ")(" << (pair.second.add_batch_wait_execution_time_us / 
1000) << ")("
-               << (pair.second.add_batch_rpc_time_us / 1000) << ")(" << 
pair.second.close_wait_time_ms
-               << ")(" << pair.second.add_batch_num << ")} ";
+               << (pair.second.add_batch_rpc_time_us / 1000) << ")("
+               << pair.second.close_wait_time_ms << ")(" << 
pair.second.add_batch_num << ")} ";
         }
         LOG(INFO) << ss.str();
     } else {
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 7dff080a9d..fb2f69a235 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -295,7 +295,11 @@ OLAPStatus DeltaWriter::close_wait() {
     }
 
     // return error if previous flush failed
-    RETURN_NOT_OK(_flush_token->wait());
+    auto st = _flush_token->wait();
+    if (OLAP_UNLIKELY(st != OLAP_SUCCESS)) {
+        LOG(WARNING) << "previous flush failed tablet " << 
_tablet->tablet_id();
+        return st;
+    }
 
     // use rowset meta manager to save meta
     _cur_rowset = _rowset_writer->build();
diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index b63074d282..0a9ad6242e 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -42,7 +42,11 @@ std::ostream& operator<<(std::ostream& os, const 
FlushStatistic& stat) {
 OLAPStatus FlushToken::submit(const std::shared_ptr<MemTable>& memtable) {
     RETURN_NOT_OK(_flush_status.load());
     int64_t submit_task_time = MonotonicNanos();
-    _flush_token->submit_func(std::bind(&FlushToken::_flush_memtable, this, 
memtable, submit_task_time));
+    auto st = 
_flush_token->submit_func(std::bind(&FlushToken::_flush_memtable, this, 
memtable, submit_task_time));
+    if (UNLIKELY(!st.ok())) {
+        VLOG_CRITICAL << "submit func err: " << st.get_error_msg();
+        return OLAP_ERR_OTHER_ERROR;
+    }
     return OLAP_SUCCESS;
 }
 
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 3785eedd10..66a5a3fdc2 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -35,7 +35,10 @@ std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count;
 TabletsChannel::TabletsChannel(const TabletsChannelKey& key,
                                const std::shared_ptr<MemTracker>& mem_tracker,
                                bool is_high_priority)
-        : _key(key), _state(kInitialized), _closed_senders(64), 
_is_high_priority(is_high_priority) {
+        : _key(key),
+          _state(kInitialized),
+          _closed_senders(64),
+          _is_high_priority(is_high_priority) {
     _mem_tracker = MemTracker::CreateTracker(-1, "TabletsChannel", 
mem_tracker);
     static std::once_flag once_flag;
     std::call_once(once_flag, [] {
@@ -78,26 +81,26 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& 
request) {
 }
 
 Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& request,
-        PTabletWriterAddBatchResult* response) {
+                                 PTabletWriterAddBatchResult* response) {
     DCHECK(request.tablet_ids_size() == request.row_batch().num_rows());
     int64_t cur_seq;
     {
         std::lock_guard<std::mutex> l(_lock);
         if (_state != kOpened) {
             return _state == kFinished
-                ? _close_status
-                : Status::InternalError(strings::Substitute("TabletsChannel $0 
state: $1",
-                            _key.to_string(), _state));
+                           ? _close_status
+                           : Status::InternalError(strings::Substitute(
+                                     "TabletsChannel $0 state: $1", 
_key.to_string(), _state));
         }
         cur_seq = _next_seqs[request.sender_id()];
         // check packet
         if (request.packet_seq() < cur_seq) {
             LOG(INFO) << "packet has already recept before, expect_seq=" << 
cur_seq
-                << ", recept_seq=" << request.packet_seq();
+                      << ", recept_seq=" << request.packet_seq();
             return Status::OK();
         } else if (request.packet_seq() > cur_seq) {
             LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq
-                << ", recept_seq=" << request.packet_seq();
+                         << ", recept_seq=" << request.packet_seq();
             return Status::InternalError("lost data packet");
         }
     }
@@ -108,22 +111,24 @@ Status TabletsChannel::add_batch(const 
PTabletWriterAddBatchRequest& request,
         int64_t tablet_id = request.tablet_ids(i);
         if (_broken_tablets.find(tablet_id) != _broken_tablets.end()) {
             // skip broken tablets
+            VLOG_PROGRESS << "skip broken tablet tablet=" << tablet_id;
             continue;
         }
         auto it = tablet_to_rowidxs.find(tablet_id);
         if (it == tablet_to_rowidxs.end()) {
-            tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int>{ i 
});
+            tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int> 
{i});
         } else {
             it->second.emplace_back(i);
         }
     }
 
-    google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors = 
response->mutable_tablet_errors(); 
+    google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
+            response->mutable_tablet_errors();
     for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
         auto tablet_writer_it = 
_tablet_writers.find(tablet_to_rowidxs_it.first);
         if (tablet_writer_it == _tablet_writers.end()) {
-            return Status::InternalError(
-                    strings::Substitute("unknown tablet to append data, 
tablet=$0", tablet_to_rowidxs_it.first));
+            return Status::InternalError(strings::Substitute(
+                    "unknown tablet to append data, tablet=$0", 
tablet_to_rowidxs_it.first));
         }
 
         OLAPStatus st = tablet_writer_it->second->write(&row_batch, 
tablet_to_rowidxs_it.second);
@@ -192,6 +197,8 @@ Status TabletsChannel::close(int sender_id, int64_t 
backend_id, bool* finished,
                     // just skip this tablet(writer) and continue to close 
others
                     continue;
                 }
+                VLOG_PROGRESS << "cancel tablet writer successfully, 
tablet_id=" << it.first
+                              << ", transaction_id=" << _txn_id;
             }
         }
 
@@ -221,6 +228,8 @@ void TabletsChannel::_close_wait(DeltaWriter* writer,
         PTabletError* tablet_error = tablet_errors->Add();
         tablet_error->set_tablet_id(writer->tablet_id());
         tablet_error->set_msg("close wait failed: " + 
boost::lexical_cast<string>(st));
+        VLOG_PROGRESS << "close wait failed tablet " << writer->tablet_id() << 
" transaction_id "
+                      << _txn_id << "err msg " << st;
     }
 }
 
@@ -255,7 +264,7 @@ Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
 
     int64_t mem_to_flushed = mem_limit / 3;
     int counter = 0;
-    int64_t  sum = 0;
+    int64_t sum = 0;
     for (auto writer : writers) {
         if (writer->memtable_consumption() <= 0) {
             break;
@@ -278,7 +287,8 @@ Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
         }
         OLAPStatus st = writers[i]->wait_flush();
         if (st != OLAP_SUCCESS) {
-            return Status::InternalError(fmt::format("failed to reduce mem 
consumption by flushing memtable. err: {}", st));
+            return Status::InternalError(fmt::format(
+                    "failed to reduce mem consumption by flushing memtable. 
err: {}", st));
         }
     }
     return Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to