This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new 88c2c590b5 [improvement](load) print more load log (#11925) 88c2c590b5 is described below commit 88c2c590b56e730f7bd499f83882b71febc0e5dd Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> AuthorDate: Fri Aug 19 17:47:53 2022 +0800 [improvement](load) print more load log (#11925) --- be/src/exec/tablet_sink.cpp | 42 +++++++++++++++++++++--------------------- be/src/exec/tablet_sink.h | 1 + be/src/olap/delta_writer.cpp | 6 +++--- be/src/olap/memtable.cpp | 6 ++---- 4 files changed, 27 insertions(+), 28 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index be4cdca5e6..560189c03b 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -169,9 +169,7 @@ Status NodeChannel::open_wait() { _cancelled = true; LOG(WARNING) << ss.str() << " " << channel_info(); - return Status::InternalError("failed to open tablet writer, error={}, error_text={}", - berror(_open_closure->cntl.ErrorCode()), - _open_closure->cntl.ErrorText()); + return Status::InternalError(ss.str()); } Status status(_open_closure->result.status()); if (_open_closure->unref()) { @@ -260,15 +258,7 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { } } - // We use OlapTableSink mem_tracker which has the same ancestor of _plan node, - // so in the ideal case, mem limit is a matter for _plan node. - // But there is still some unfinished things, we do mem limit here temporarily. - // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below. - // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close(). - while (!_cancelled && _pending_batches_bytes > _max_pending_batches_bytes) { - SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns); - SleepFor(MonoDelta::FromMilliseconds(10)); - } + _sleep_if_memory_exceed(); auto row_no = _cur_batch->add_row(); if (row_no == RowBatch::INVALID_ROW_INDEX) { @@ -309,15 +299,8 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) { } } - // We use OlapTableSink mem_tracker which has the same ancestor of _plan node, - // so in the ideal case, mem limit is a matter for _plan node. - // But there is still some unfinished things, we do mem limit here temporarily. - // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below. - // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close(). - while (!_cancelled && _pending_batches_bytes > _max_pending_batches_bytes) { - SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns); - SleepFor(MonoDelta::FromMilliseconds(10)); - } + _sleep_if_memory_exceed(); + constexpr size_t BATCH_SIZE_FOR_SEND = 2 * 1024 * 1024; //2M auto row_no = _cur_batch->add_row(); if (row_no == RowBatch::INVALID_ROW_INDEX || @@ -346,6 +329,23 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) { return Status::OK(); } +void NodeChannel::_sleep_if_memory_exceed() { + size_t begin_us = _mem_exceeded_block_ns / 1000; + while (!_cancelled && _pending_batches_bytes > _max_pending_batches_bytes) { + SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns); + SleepFor(MonoDelta::FromMilliseconds(10)); + if (_mem_exceeded_block_ns / 1000 - begin_us > 5000000) { + begin_us = _mem_exceeded_block_ns / 1000; + LOG(INFO) << "sink sleeps too long, pending_batches_bytes = " << _pending_batches_bytes + << ", max_pending_batches_bytes = " << _max_pending_batches_bytes + << ", is_packet_in_flight = " << _add_batch_closure->is_packet_in_flight() + << ", next_packet_seq = " << _next_packet_seq + << ", cur_batch_rows = " << _cur_batch->num_rows() + << ", " << channel_info(); + } + } +} + void NodeChannel::mark_close() { auto st = none_of({_cancelled, _eos_is_produced}); if (!st.ok()) { diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index d3e083363d..34abf496c0 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -233,6 +233,7 @@ public: private: void _cancel_with_msg(const std::string& msg); + void _sleep_if_memory_exceed(); private: OlapTableSink* _parent = nullptr; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index e85cf77f5c..b33b5b215b 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -314,9 +314,9 @@ OLAPStatus DeltaWriter::close_wait() { _delta_written_success = true; const FlushStatistic& stat = _flush_token->get_stats(); - VLOG_CRITICAL << "close delta writer for tablet: " << _tablet->tablet_id() - << ", load id: " << print_id(_req.load_id) - << ", stats: " << stat; + LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id() + << ", load id: " << print_id(_req.load_id) + << ", stats: " << stat; return OLAP_SUCCESS; } diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index a7d6728ebe..671bb02d52 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -134,8 +134,6 @@ void MemTable::_aggregate_two_row_with_sequence(const ContiguousRow& src_row, } OLAPStatus MemTable::flush() { - VLOG_CRITICAL << "begin to flush memtable for tablet: " << _tablet_id - << ", memsize: " << memory_usage() << ", rows: " << _rows; int64_t duration_ns = 0; { SCOPED_RAW_TIMER(&duration_ns); @@ -157,8 +155,8 @@ OLAPStatus MemTable::flush() { } DorisMetrics::instance()->memtable_flush_total->increment(1); DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000); - VLOG_CRITICAL << "after flush memtable for tablet: " << _tablet_id - << ", flushsize: " << _flush_size; + LOG(INFO) << "flush memtable, tablet: " << _tablet_id << ", memsize: " << memory_usage() + << ", rows: " << _rows << ", flushsize: " << _flush_size << ", duration_us: " << duration_ns / 1000; return OLAP_SUCCESS; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org