vagetablechicken commented on a change in pull request #3143: URL: https://github.com/apache/incubator-doris/pull/3143#discussion_r417728936
########## File path: be/src/exec/tablet_sink.cpp ########## @@ -128,54 +131,136 @@ Status NodeChannel::open_wait() { _open_closure = nullptr; // add batch closure - _add_batch_closure = new RefCountClosure<PTabletWriterAddBatchResult>(); - _add_batch_closure->ref(); + _add_batch_closure = ReusableClosure<PTabletWriterAddBatchResult>::create(); + _add_batch_closure->addFailedHandler([this]() { + _cancelled = true; + LOG(WARNING) << "NodeChannel add batch req rpc failed, " << print_load_info() + << ", node=" << node_info()->host << ":" << node_info()->brpc_port; + }); + + _add_batch_closure->addSuccessHandler( + [this](const PTabletWriterAddBatchResult& result, bool is_last_rpc) { + Status status(result.status()); + if (status.ok()) { + if (is_last_rpc) { + for (auto& tablet : result.tablet_vec()) { + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet.tablet_id(); + commit_info.backendId = _node_id; + _tablet_commit_infos.emplace_back(std::move(commit_info)); + } + _add_batches_finished = true; + } + } else { + _cancelled = true; + LOG(WARNING) << "NodeChannel add batch req success but status isn't ok, " + << print_load_info() << ", node=" << node_info()->host << ":" + << node_info()->brpc_port << ", errmsg=" << status.get_error_msg(); + } + + if (result.has_execution_time_us()) { + _add_batch_counter.add_batch_execution_time_us += result.execution_time_us(); + _add_batch_counter.add_batch_wait_lock_time_us += result.wait_lock_time_us(); + _add_batch_counter.add_batch_num++; + } + }); return status; } Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { - auto row_no = _batch->add_row(); + // If add_row() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed. + auto st = none_of({_cancelled, _eos_is_produced}); + if (!st.ok()) { + return st.clone_and_prepend("already stopped, can't add_row. cancelled/eos: "); + } + + // We use OlapTableSink mem_tracker which has the same ancestor of _plan node, + // so in the ideal case, mem limit is a matter for _plan node. + // But there is still some unfinished things, we do mem limit here temporarily. + while (_parent->_mem_tracker->any_limit_exceeded()) { + SCOPED_RAW_TIMER(&_mem_exceeded_block_ns); + SleepFor(MonoDelta::FromMilliseconds(10)); + } + + auto row_no = _cur_batch->add_row(); if (row_no == RowBatch::INVALID_ROW_INDEX) { - RETURN_IF_ERROR(_send_cur_batch()); - row_no = _batch->add_row(); + { + SCOPED_RAW_TIMER(&_queue_push_lock_ns); + std::lock_guard<std::mutex> l(_pending_batches_lock); + //To simplify the add_row logic, postpone adding batch into req until the time of sending req + _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); + _pending_batches_num++; + } + + _cur_batch.reset(new RowBatch(*_row_desc, _batch_size, _parent->_mem_tracker)); + _cur_add_batch_request.clear_tablet_ids(); + + row_no = _cur_batch->add_row(); } DCHECK_NE(row_no, RowBatch::INVALID_ROW_INDEX); - auto tuple = input_tuple->deep_copy(*_tuple_desc, _batch->tuple_data_pool()); - _batch->get_row(row_no)->set_tuple(0, tuple); - _batch->commit_last_row(); - _add_batch_request.add_tablet_ids(tablet_id); + auto tuple = input_tuple->deep_copy(*_tuple_desc, _cur_batch->tuple_data_pool()); + _cur_batch->get_row(row_no)->set_tuple(0, tuple); + _cur_batch->commit_last_row(); + _cur_add_batch_request.add_tablet_ids(tablet_id); return Status::OK(); } -Status NodeChannel::close(RuntimeState* state) { - auto st = _close(state); - _batch.reset(); - return st; -} +Status NodeChannel::mark_close() { + auto st = none_of({_cancelled, _eos_is_produced}); + if (!st.ok()) { + return st.clone_and_prepend("already stopped, can't mark as closed. cancelled/eos: "); + } -Status NodeChannel::_close(RuntimeState* state) { - return _send_cur_batch(true); + _cur_add_batch_request.set_eos(true); + { + std::lock_guard<std::mutex> l(_pending_batches_lock); + _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); + _pending_batches_num++; + DCHECK(_pending_batches.back().second.eos()); + } + + _eos_is_produced = true; + + _cur_batch.reset(); + return Status::OK(); } Status NodeChannel::close_wait(RuntimeState* state) { - RETURN_IF_ERROR(_wait_in_flight_packet()); - Status status(_add_batch_closure->result.status()); - if (status.ok()) { - for (auto& tablet : _add_batch_closure->result.tablet_vec()) { - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet.tablet_id(); - commit_info.backendId = _node_id; - state->tablet_commit_infos().emplace_back(std::move(commit_info)); - } + auto st = none_of({_cancelled, !_eos_is_produced}); + if (!st.ok()) { + return st.clone_and_prepend("already stopped, skip waiting for close. cancelled/!eos: "); } - // clear batch after sendt - _batch.reset(); - return status; + + // waiting for finished, it may take a long time, so we could't set a timeout + // use log to make it easier + LOG(INFO) << name() << "start close_wait"; + while (!_add_batches_finished && !_cancelled) { + SleepFor(MonoDelta::FromMilliseconds(1)); + } + LOG(INFO) << name() << "close_wait done"; Review comment: We just count the total time of channel's close(). There may be some slow BE hide inside. I think this logs are good for analyzing the state at that time. It's convenient for developers. How about set it to vlog? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org