This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.1-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push: new d366476697 [chore](BE) add more log for better tracing for be write (#14425) d366476697 is described below commit d366476697c626470b9a129e5747bbbb173b07e0 Author: AlexYue <yj976240...@gmail.com> AuthorDate: Thu Nov 24 22:51:53 2022 +0800 [chore](BE) add more log for better tracing for be write (#14425) Recently when tracing when bug happened in version1.1.4 I found out there were some places we can add more log for a better tracing. --- be/src/exec/tablet_sink.cpp | 19 ++++++++++++----- be/src/olap/delta_writer.cpp | 6 +++++- be/src/olap/memtable_flush_executor.cpp | 6 +++++- be/src/runtime/tablets_channel.cpp | 36 +++++++++++++++++++++------------ 4 files changed, 47 insertions(+), 20 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index f7c4268704..8db57b45ff 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -200,6 +200,7 @@ Status NodeChannel::open_wait() { } else if (is_last_rpc) { // if this is last rpc, will must set _add_batches_finished. otherwise, node channel's close_wait // will be blocked. + VLOG_PROGRESS << "node channel " << channel_info() << " add_batches_finished"; _add_batches_finished = true; } _add_batch_counter.add_batch_rpc_time_us += _add_batch_closure->watch.elapsed_time() / 1000; @@ -232,6 +233,9 @@ Status NodeChannel::open_wait() { commit_info.backendId = _node_id; _tablet_commit_infos.emplace_back(std::move(commit_info)); } + VLOG_PROGRESS << "node channel " << channel_info() + << " add_batches_finished and handled " + << result.tablet_errors().size() << " tablets errors"; _add_batches_finished = true; } } else { @@ -348,8 +352,7 @@ void NodeChannel::_sleep_if_memory_exceed() { << ", max_pending_batches_bytes = " << _max_pending_batches_bytes << ", is_packet_in_flight = " << _add_batch_closure->is_packet_in_flight() << ", next_packet_seq = " << _next_packet_seq - << ", cur_batch_rows = " << _cur_batch->num_rows() - << ", " << channel_info(); + << ", cur_batch_rows = " << _cur_batch->num_rows() << ", " << channel_info(); } } } @@ -370,7 +373,8 @@ void NodeChannel::mark_close() { DCHECK(_pending_batches.back().second.eos()); _close_time_ms = UnixMillis(); LOG(INFO) << channel_info() - << " mark closed, left pending batch size: " << _pending_batches.size(); + << " mark closed, left pending batch size: " << _pending_batches.size() + << " left pending batch size: " << _pending_batches_bytes; } _eos_is_produced = true; @@ -672,6 +676,8 @@ void IndexChannel::add_row(BlockRow& block_row, int64_t tablet_id) { void IndexChannel::mark_as_failed(int64_t node_id, const std::string& host, const std::string& err, int64_t tablet_id) { + VLOG_PROGRESS << "mark node_id:" << node_id << " tablet_id: " << tablet_id + << " as failed, err: " << err; const auto& it = _tablets_by_channel.find(node_id); if (it == _tablets_by_channel.end()) { return; @@ -883,6 +889,9 @@ Status OlapTableSink::prepare(RuntimeState* state) { } } auto channel = std::make_shared<IndexChannel>(this, index->index_id); + if (UNLIKELY(tablets.empty())) { + LOG(WARNING) << "load job:" << state->load_job_id() << " index: " << index->index_id << " would open 0 tablet"; + } RETURN_IF_ERROR(channel->init(state, tablets)); _channels.emplace_back(channel); } @@ -1109,8 +1118,8 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { for (auto const& pair : node_add_batch_counter_map) { ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_us / 1000) << ")(" << (pair.second.add_batch_wait_execution_time_us / 1000) << ")(" - << (pair.second.add_batch_rpc_time_us / 1000) << ")(" << pair.second.close_wait_time_ms - << ")(" << pair.second.add_batch_num << ")} "; + << (pair.second.add_batch_rpc_time_us / 1000) << ")(" + << pair.second.close_wait_time_ms << ")(" << pair.second.add_batch_num << ")} "; } LOG(INFO) << ss.str(); } else { diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 7dff080a9d..fb2f69a235 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -295,7 +295,11 @@ OLAPStatus DeltaWriter::close_wait() { } // return error if previous flush failed - RETURN_NOT_OK(_flush_token->wait()); + auto st = _flush_token->wait(); + if (OLAP_UNLIKELY(st != OLAP_SUCCESS)) { + LOG(WARNING) << "previous flush failed tablet " << _tablet->tablet_id(); + return st; + } // use rowset meta manager to save meta _cur_rowset = _rowset_writer->build(); diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index b63074d282..0a9ad6242e 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -42,7 +42,11 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { OLAPStatus FlushToken::submit(const std::shared_ptr<MemTable>& memtable) { RETURN_NOT_OK(_flush_status.load()); int64_t submit_task_time = MonotonicNanos(); - _flush_token->submit_func(std::bind(&FlushToken::_flush_memtable, this, memtable, submit_task_time)); + auto st = _flush_token->submit_func(std::bind(&FlushToken::_flush_memtable, this, memtable, submit_task_time)); + if (UNLIKELY(!st.ok())) { + VLOG_CRITICAL << "submit func err: " << st.get_error_msg(); + return OLAP_ERR_OTHER_ERROR; + } return OLAP_SUCCESS; } diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 3785eedd10..66a5a3fdc2 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -35,7 +35,10 @@ std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count; TabletsChannel::TabletsChannel(const TabletsChannelKey& key, const std::shared_ptr<MemTracker>& mem_tracker, bool is_high_priority) - : _key(key), _state(kInitialized), _closed_senders(64), _is_high_priority(is_high_priority) { + : _key(key), + _state(kInitialized), + _closed_senders(64), + _is_high_priority(is_high_priority) { _mem_tracker = MemTracker::CreateTracker(-1, "TabletsChannel", mem_tracker); static std::once_flag once_flag; std::call_once(once_flag, [] { @@ -78,26 +81,26 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& request) { } Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& request, - PTabletWriterAddBatchResult* response) { + PTabletWriterAddBatchResult* response) { DCHECK(request.tablet_ids_size() == request.row_batch().num_rows()); int64_t cur_seq; { std::lock_guard<std::mutex> l(_lock); if (_state != kOpened) { return _state == kFinished - ? _close_status - : Status::InternalError(strings::Substitute("TabletsChannel $0 state: $1", - _key.to_string(), _state)); + ? _close_status + : Status::InternalError(strings::Substitute( + "TabletsChannel $0 state: $1", _key.to_string(), _state)); } cur_seq = _next_seqs[request.sender_id()]; // check packet if (request.packet_seq() < cur_seq) { LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq - << ", recept_seq=" << request.packet_seq(); + << ", recept_seq=" << request.packet_seq(); return Status::OK(); } else if (request.packet_seq() > cur_seq) { LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq - << ", recept_seq=" << request.packet_seq(); + << ", recept_seq=" << request.packet_seq(); return Status::InternalError("lost data packet"); } } @@ -108,22 +111,24 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& request, int64_t tablet_id = request.tablet_ids(i); if (_broken_tablets.find(tablet_id) != _broken_tablets.end()) { // skip broken tablets + VLOG_PROGRESS << "skip broken tablet tablet=" << tablet_id; continue; } auto it = tablet_to_rowidxs.find(tablet_id); if (it == tablet_to_rowidxs.end()) { - tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int>{ i }); + tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int> {i}); } else { it->second.emplace_back(i); } } - google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors = response->mutable_tablet_errors(); + google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors = + response->mutable_tablet_errors(); for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) { auto tablet_writer_it = _tablet_writers.find(tablet_to_rowidxs_it.first); if (tablet_writer_it == _tablet_writers.end()) { - return Status::InternalError( - strings::Substitute("unknown tablet to append data, tablet=$0", tablet_to_rowidxs_it.first)); + return Status::InternalError(strings::Substitute( + "unknown tablet to append data, tablet=$0", tablet_to_rowidxs_it.first)); } OLAPStatus st = tablet_writer_it->second->write(&row_batch, tablet_to_rowidxs_it.second); @@ -192,6 +197,8 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished, // just skip this tablet(writer) and continue to close others continue; } + VLOG_PROGRESS << "cancel tablet writer successfully, tablet_id=" << it.first + << ", transaction_id=" << _txn_id; } } @@ -221,6 +228,8 @@ void TabletsChannel::_close_wait(DeltaWriter* writer, PTabletError* tablet_error = tablet_errors->Add(); tablet_error->set_tablet_id(writer->tablet_id()); tablet_error->set_msg("close wait failed: " + boost::lexical_cast<string>(st)); + VLOG_PROGRESS << "close wait failed tablet " << writer->tablet_id() << " transaction_id " + << _txn_id << "err msg " << st; } } @@ -255,7 +264,7 @@ Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) { int64_t mem_to_flushed = mem_limit / 3; int counter = 0; - int64_t sum = 0; + int64_t sum = 0; for (auto writer : writers) { if (writer->memtable_consumption() <= 0) { break; @@ -278,7 +287,8 @@ Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) { } OLAPStatus st = writers[i]->wait_flush(); if (st != OLAP_SUCCESS) { - return Status::InternalError(fmt::format("failed to reduce mem consumption by flushing memtable. err: {}", st)); + return Status::InternalError(fmt::format( + "failed to reduce mem consumption by flushing memtable. err: {}", st)); } } return Status::OK(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org