This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new 88c2c590b5 [improvement](load) print more load log (#11925)
88c2c590b5 is described below

commit 88c2c590b56e730f7bd499f83882b71febc0e5dd
Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com>
AuthorDate: Fri Aug 19 17:47:53 2022 +0800

    [improvement](load) print more load log (#11925)
---
 be/src/exec/tablet_sink.cpp  | 42 +++++++++++++++++++++---------------------
 be/src/exec/tablet_sink.h    |  1 +
 be/src/olap/delta_writer.cpp |  6 +++---
 be/src/olap/memtable.cpp     |  6 ++----
 4 files changed, 27 insertions(+), 28 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index be4cdca5e6..560189c03b 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -169,9 +169,7 @@ Status NodeChannel::open_wait() {
         _cancelled = true;
 
         LOG(WARNING) << ss.str() << " " << channel_info();
-        return Status::InternalError("failed to open tablet writer, error={}, 
error_text={}",
-                                     berror(_open_closure->cntl.ErrorCode()),
-                                     _open_closure->cntl.ErrorText());
+        return Status::InternalError(ss.str());
     }
     Status status(_open_closure->result.status());
     if (_open_closure->unref()) {
@@ -260,15 +258,7 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t 
tablet_id) {
         }
     }
 
-    // We use OlapTableSink mem_tracker which has the same ancestor of _plan 
node,
-    // so in the ideal case, mem limit is a matter for _plan node.
-    // But there is still some unfinished things, we do mem limit here 
temporarily.
-    // _cancelled may be set by rpc callback, and it's possible that 
_cancelled might be set in any of the steps below.
-    // It's fine to do a fake add_row() and return OK, because we will check 
_cancelled in next add_row() or mark_close().
-    while (!_cancelled && _pending_batches_bytes > _max_pending_batches_bytes) 
{
-        SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
-        SleepFor(MonoDelta::FromMilliseconds(10));
-    }
+    _sleep_if_memory_exceed();
 
     auto row_no = _cur_batch->add_row();
     if (row_no == RowBatch::INVALID_ROW_INDEX) {
@@ -309,15 +299,8 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t 
tablet_id) {
         }
     }
 
-    // We use OlapTableSink mem_tracker which has the same ancestor of _plan 
node,
-    // so in the ideal case, mem limit is a matter for _plan node.
-    // But there is still some unfinished things, we do mem limit here 
temporarily.
-    // _cancelled may be set by rpc callback, and it's possible that 
_cancelled might be set in any of the steps below.
-    // It's fine to do a fake add_row() and return OK, because we will check 
_cancelled in next add_row() or mark_close().
-    while (!_cancelled && _pending_batches_bytes > _max_pending_batches_bytes) 
{
-        SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
-        SleepFor(MonoDelta::FromMilliseconds(10));
-    }
+    _sleep_if_memory_exceed();
+
     constexpr size_t BATCH_SIZE_FOR_SEND = 2 * 1024 * 1024; //2M
     auto row_no = _cur_batch->add_row();
     if (row_no == RowBatch::INVALID_ROW_INDEX ||
@@ -346,6 +329,23 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t 
tablet_id) {
     return Status::OK();
 }
 
+void NodeChannel::_sleep_if_memory_exceed() {
+    size_t begin_us = _mem_exceeded_block_ns / 1000;
+    while (!_cancelled && _pending_batches_bytes > _max_pending_batches_bytes) 
{
+        SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
+        SleepFor(MonoDelta::FromMilliseconds(10));
+        if (_mem_exceeded_block_ns / 1000 - begin_us > 5000000) {
+            begin_us = _mem_exceeded_block_ns / 1000;
+            LOG(INFO) << "sink sleeps too long, pending_batches_bytes = " << 
_pending_batches_bytes
+                      << ", max_pending_batches_bytes = " << 
_max_pending_batches_bytes
+                      << ", is_packet_in_flight = " << 
_add_batch_closure->is_packet_in_flight()
+                      << ", next_packet_seq = " << _next_packet_seq
+                      << ", cur_batch_rows = " << _cur_batch->num_rows()
+                      << ", " << channel_info();
+        }
+    }
+}
+
 void NodeChannel::mark_close() {
     auto st = none_of({_cancelled, _eos_is_produced});
     if (!st.ok()) {
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index d3e083363d..34abf496c0 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -233,6 +233,7 @@ public:
 
 private:
     void _cancel_with_msg(const std::string& msg);
+    void _sleep_if_memory_exceed();
 
 private:
     OlapTableSink* _parent = nullptr;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index e85cf77f5c..b33b5b215b 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -314,9 +314,9 @@ OLAPStatus DeltaWriter::close_wait() {
     _delta_written_success = true;
 
     const FlushStatistic& stat = _flush_token->get_stats();
-    VLOG_CRITICAL << "close delta writer for tablet: " << _tablet->tablet_id() 
-                  << ", load id: " << print_id(_req.load_id)
-                  << ", stats: " << stat;
+    LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id() 
+              << ", load id: " << print_id(_req.load_id)
+              << ", stats: " << stat;
     return OLAP_SUCCESS;
 }
 
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index a7d6728ebe..671bb02d52 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -134,8 +134,6 @@ void MemTable::_aggregate_two_row_with_sequence(const 
ContiguousRow& src_row,
 }
 
 OLAPStatus MemTable::flush() {
-    VLOG_CRITICAL << "begin to flush memtable for tablet: " << _tablet_id
-                  << ", memsize: " << memory_usage() << ", rows: " << _rows;
     int64_t duration_ns = 0;
     {
         SCOPED_RAW_TIMER(&duration_ns);
@@ -157,8 +155,8 @@ OLAPStatus MemTable::flush() {
     }
     DorisMetrics::instance()->memtable_flush_total->increment(1);
     
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 
1000);
-    VLOG_CRITICAL << "after flush memtable for tablet: " << _tablet_id
-                  << ", flushsize: " << _flush_size;
+    LOG(INFO) << "flush memtable, tablet: " << _tablet_id << ", memsize: " << 
memory_usage()
+              << ", rows: " << _rows << ", flushsize: " << _flush_size << ", 
duration_us: " << duration_ns / 1000;
     return OLAP_SUCCESS;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to