imay commented on a change in pull request #3143: Non blocking OlapTableSink URL: https://github.com/apache/incubator-doris/pull/3143#discussion_r395413391
########## File path: be/src/exec/tablet_sink.cpp ########## @@ -186,80 +257,89 @@ void NodeChannel::cancel() { closure->ref(); closure->cntl.set_timeout_ms(_rpc_timeout_ms); - _stub->tablet_writer_cancel(&closure->cntl, - &request, - &closure->result, - closure); + _stub->tablet_writer_cancel(&closure->cntl, &request, &closure->result, closure); request.release_id(); - // reset batch - _batch.reset(); + // Beware of the destruct sequence. RowBatches will use mem_trackers(include ancestors). + // Delete RowBatches here is a better choice to reduce the potential of dtor errors. + { + std::lock_guard<std::mutex> lg(_pending_batches_lock); + std::queue<AddBatchReq> empty; + std::swap(_pending_batches, empty); + _cur_batch.reset(); + } } -Status NodeChannel::_wait_in_flight_packet() { - if (!_has_in_flight_packet) { - return Status::OK(); +int NodeChannel::try_send_and_fetch_status() { + auto st = none_of({_rpc_error, _is_cancelled, _send_finished}); + if (!st.ok()) { + return 0; } - SCOPED_RAW_TIMER(_parent->mutable_wait_in_flight_packet_ns()); - _add_batch_closure->join(); - _has_in_flight_packet = false; - if (_add_batch_closure->cntl.Failed()) { - LOG(WARNING) << "failed to send batch, error=" - << berror(_add_batch_closure->cntl.ErrorCode()) - << ", error_text=" << _add_batch_closure->cntl.ErrorText(); - return Status::InternalError("failed to send batch"); - } + if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0) { + SCOPED_RAW_TIMER(&_actual_consume_ns); + AddBatchReq send_batch; + { + std::lock_guard<std::mutex> lg(_pending_batches_lock); + DCHECK(!_pending_batches.empty()); + send_batch = std::move(_pending_batches.front()); + _pending_batches.pop(); + _pending_batches_num--; + } - if (_add_batch_closure->result.has_execution_time_us()) { - _parent->update_node_add_batch_counter(_node_id, - _add_batch_closure->result.execution_time_us(), - _add_batch_closure->result.wait_lock_time_us()); - } - return {_add_batch_closure->result.status()}; -} + auto row_batch = std::move(send_batch.first); + auto request = std::move(send_batch.second); // doesn't need to be saved in heap -Status NodeChannel::_send_cur_batch(bool eos) { - RETURN_IF_ERROR(_wait_in_flight_packet()); + // tablet_ids has already set when add row + request.set_packet_seq(_next_packet_seq); + if (row_batch->num_rows() > 0) { + SCOPED_RAW_TIMER(&_serialize_batch_ns); + row_batch->serialize(request.mutable_row_batch()); + } - // tablet_ids has already set when add row - _add_batch_request.set_eos(eos); - _add_batch_request.set_packet_seq(_next_packet_seq); - if (_batch->num_rows() > 0) { - SCOPED_RAW_TIMER(_parent->mutable_serialize_batch_ns()); - _batch->serialize(_add_batch_request.mutable_row_batch()); - } + _add_batch_closure->reset(); + _add_batch_closure->cntl.set_timeout_ms(_rpc_timeout_ms); - _add_batch_closure->ref(); - _add_batch_closure->cntl.Reset(); - _add_batch_closure->cntl.set_timeout_ms(_rpc_timeout_ms); + if (request.eos()) { + for (auto pid : _parent->_partition_ids) { + request.add_partition_ids(pid); + } - if (eos) { - for (auto pid : _parent->_partition_ids) { - _add_batch_request.add_partition_ids(pid); + // eos request must be the last request + _add_batch_closure->end_mark(); + _send_finished = true; + DCHECK(_pending_batches_num == 0); + LOG(INFO) << name() << " send finished, should wait the last repsonse"; } - } - _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, - &_add_batch_request, - &_add_batch_closure->result, - _add_batch_closure); - _add_batch_request.clear_tablet_ids(); - _add_batch_request.clear_row_batch(); - _add_batch_request.clear_partition_ids(); + _add_batch_closure->set_in_flight(); + _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request, + &_add_batch_closure->result, _add_batch_closure); - _has_in_flight_packet = true; - _next_packet_seq++; + _next_packet_seq++; + } - _batch->reset(); - return Status::OK(); + return _send_finished ? 0 : 1; } -IndexChannel::~IndexChannel() { +Status NodeChannel::none_of(std::initializer_list<bool> vars) { + bool none = true; + std::string vars_str; Review comment: I think most of the scenarios are still normal. better to construct strings when it is abnormal ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org