imay commented on a change in pull request #3143: URL: https://github.com/apache/incubator-doris/pull/3143#discussion_r417744700
########## File path: be/src/exec/tablet_sink.cpp ########## @@ -128,54 +131,136 @@ Status NodeChannel::open_wait() { _open_closure = nullptr; // add batch closure - _add_batch_closure = new RefCountClosure<PTabletWriterAddBatchResult>(); - _add_batch_closure->ref(); + _add_batch_closure = ReusableClosure<PTabletWriterAddBatchResult>::create(); + _add_batch_closure->addFailedHandler([this]() { + _cancelled = true; + LOG(WARNING) << "NodeChannel add batch req rpc failed, " << print_load_info() + << ", node=" << node_info()->host << ":" << node_info()->brpc_port; + }); + + _add_batch_closure->addSuccessHandler( + [this](const PTabletWriterAddBatchResult& result, bool is_last_rpc) { + Status status(result.status()); + if (status.ok()) { + if (is_last_rpc) { + for (auto& tablet : result.tablet_vec()) { + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet.tablet_id(); + commit_info.backendId = _node_id; + _tablet_commit_infos.emplace_back(std::move(commit_info)); + } + _add_batches_finished = true; + } + } else { + _cancelled = true; + LOG(WARNING) << "NodeChannel add batch req success but status isn't ok, " + << print_load_info() << ", node=" << node_info()->host << ":" + << node_info()->brpc_port << ", errmsg=" << status.get_error_msg(); + } + + if (result.has_execution_time_us()) { + _add_batch_counter.add_batch_execution_time_us += result.execution_time_us(); + _add_batch_counter.add_batch_wait_lock_time_us += result.wait_lock_time_us(); + _add_batch_counter.add_batch_num++; + } + }); return status; } Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { - auto row_no = _batch->add_row(); + // If add_row() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed. + auto st = none_of({_cancelled, _eos_is_produced}); + if (!st.ok()) { + return st.clone_and_prepend("already stopped, can't add_row. cancelled/eos: "); + } + + // We use OlapTableSink mem_tracker which has the same ancestor of _plan node, + // so in the ideal case, mem limit is a matter for _plan node. + // But there is still some unfinished things, we do mem limit here temporarily. + while (_parent->_mem_tracker->any_limit_exceeded()) { + SCOPED_RAW_TIMER(&_mem_exceeded_block_ns); + SleepFor(MonoDelta::FromMilliseconds(10)); + } + + auto row_no = _cur_batch->add_row(); if (row_no == RowBatch::INVALID_ROW_INDEX) { - RETURN_IF_ERROR(_send_cur_batch()); - row_no = _batch->add_row(); + { + SCOPED_RAW_TIMER(&_queue_push_lock_ns); + std::lock_guard<std::mutex> l(_pending_batches_lock); + //To simplify the add_row logic, postpone adding batch into req until the time of sending req + _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); + _pending_batches_num++; + } + + _cur_batch.reset(new RowBatch(*_row_desc, _batch_size, _parent->_mem_tracker)); + _cur_add_batch_request.clear_tablet_ids(); + + row_no = _cur_batch->add_row(); } DCHECK_NE(row_no, RowBatch::INVALID_ROW_INDEX); - auto tuple = input_tuple->deep_copy(*_tuple_desc, _batch->tuple_data_pool()); - _batch->get_row(row_no)->set_tuple(0, tuple); - _batch->commit_last_row(); - _add_batch_request.add_tablet_ids(tablet_id); + auto tuple = input_tuple->deep_copy(*_tuple_desc, _cur_batch->tuple_data_pool()); + _cur_batch->get_row(row_no)->set_tuple(0, tuple); + _cur_batch->commit_last_row(); + _cur_add_batch_request.add_tablet_ids(tablet_id); return Status::OK(); } -Status NodeChannel::close(RuntimeState* state) { - auto st = _close(state); - _batch.reset(); - return st; -} +Status NodeChannel::mark_close() { + auto st = none_of({_cancelled, _eos_is_produced}); + if (!st.ok()) { + return st.clone_and_prepend("already stopped, can't mark as closed. cancelled/eos: "); + } -Status NodeChannel::_close(RuntimeState* state) { - return _send_cur_batch(true); + _cur_add_batch_request.set_eos(true); + { + std::lock_guard<std::mutex> l(_pending_batches_lock); + _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); + _pending_batches_num++; + DCHECK(_pending_batches.back().second.eos()); + } + + _eos_is_produced = true; + + _cur_batch.reset(); + return Status::OK(); } Status NodeChannel::close_wait(RuntimeState* state) { - RETURN_IF_ERROR(_wait_in_flight_packet()); - Status status(_add_batch_closure->result.status()); - if (status.ok()) { - for (auto& tablet : _add_batch_closure->result.tablet_vec()) { - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet.tablet_id(); - commit_info.backendId = _node_id; - state->tablet_commit_infos().emplace_back(std::move(commit_info)); - } + auto st = none_of({_cancelled, !_eos_is_produced}); + if (!st.ok()) { + return st.clone_and_prepend("already stopped, skip waiting for close. cancelled/!eos: "); } - // clear batch after sendt - _batch.reset(); - return status; + + // waiting for finished, it may take a long time, so we could't set a timeout + // use log to make it easier + LOG(INFO) << name() << "start close_wait"; + while (!_add_batches_finished && !_cancelled) { + SleepFor(MonoDelta::FromMilliseconds(1)); + } + LOG(INFO) << name() << "close_wait done"; Review comment: If You want to, you can count the time with StopWatch and log the time in one log. I suggest to use vlog. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org