vagetablechicken commented on a change in pull request #3143: Non blocking OlapTableSink URL: https://github.com/apache/incubator-doris/pull/3143#discussion_r395956321
########## File path: be/src/exec/tablet_sink.cpp ########## @@ -128,55 +129,128 @@ Status NodeChannel::open_wait() { _open_closure = nullptr; // add batch closure - _add_batch_closure = new RefCountClosure<PTabletWriterAddBatchResult>(); - _add_batch_closure->ref(); + _add_batch_closure = ReusableClosure<PTabletWriterAddBatchResult>::create(); + _add_batch_closure->addFailedHandler([this]() { + _rpc_error = true; + LOG(WARNING) << "NodeChannel add batch req rpc failed, load_id=" << _parent->_load_id + << ", node=" << node_info()->host << ":" << node_info()->brpc_port; + }); + + _add_batch_closure->addSuccessHandler( + [this](const PTabletWriterAddBatchResult& result, bool is_last_rpc) { + Status status(result.status()); + if (status.ok()) { + if (is_last_rpc) { + for (auto& tablet : result.tablet_vec()) { + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet.tablet_id(); + commit_info.backendId = _node_id; + _tablet_commit_infos.emplace_back(std::move(commit_info)); + } + _add_batches_finished = true; + LOG(INFO) << name() << " last rpc has responsed"; + } + } else { + _rpc_error = true; + LOG(WARNING) << "NodeChannel add batch req success but status not ok, load_id=" + << _parent->_load_id << ", node=" << node_info()->host << ":" + << node_info()->brpc_port << ", errmsg=" << status.get_error_msg(); + } + + if (result.has_execution_time_us()) { + _add_batch_counter.add_batch_execution_time_us += result.execution_time_us(); + _add_batch_counter.add_batch_wait_lock_time_us += result.wait_lock_time_us(); + _add_batch_counter.add_batch_num++; + } + }); return status; } Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { - auto row_no = _batch->add_row(); + // If add_row() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed. + auto st = none_of({_rpc_error, _is_cancelled, _eos_is_produced}); + if (!st.ok()) { + return st.clone_and_prepend("already stopped, can't add_row. rpc_error/cancelled/eos: "); + } + + auto row_no = _cur_batch->add_row(); if (row_no == RowBatch::INVALID_ROW_INDEX) { - RETURN_IF_ERROR(_send_cur_batch()); - row_no = _batch->add_row(); + { + SCOPED_RAW_TIMER(&_queue_push_lock_ns); + std::lock_guard<std::mutex> l(_pending_batches_lock); + //To simplify the add_row logic, postpone adding batch into req until the time of sending req + _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); Review comment: As I commented here https://github.com/apache/incubator-doris/pull/3143/files#diff-4007834d7219c2282c8ae5c454f01dbaR67 , the scan node can do the memory limit. I should add more comments. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org