imay commented on a change in pull request #3143: Non blocking OlapTableSink URL: https://github.com/apache/incubator-doris/pull/3143#discussion_r395953272
########## File path: be/src/exec/tablet_sink.cpp ########## @@ -186,80 +260,90 @@ void NodeChannel::cancel() { closure->ref(); closure->cntl.set_timeout_ms(_rpc_timeout_ms); - _stub->tablet_writer_cancel(&closure->cntl, - &request, - &closure->result, - closure); + _stub->tablet_writer_cancel(&closure->cntl, &request, &closure->result, closure); request.release_id(); - // reset batch - _batch.reset(); + // Beware of the destruct sequence. RowBatches will use mem_trackers(include ancestors). + // Delete RowBatches here is a better choice to reduce the potential of dtor errors. + { + std::lock_guard<std::mutex> lg(_pending_batches_lock); + std::queue<AddBatchReq> empty; + std::swap(_pending_batches, empty); + _cur_batch.reset(); + } } -Status NodeChannel::_wait_in_flight_packet() { - if (!_has_in_flight_packet) { - return Status::OK(); +int NodeChannel::try_send_and_fetch_status() { + auto st = none_of({_rpc_error, _is_cancelled, _send_finished}); + if (!st.ok()) { + return 0; } - SCOPED_RAW_TIMER(_parent->mutable_wait_in_flight_packet_ns()); - _add_batch_closure->join(); - _has_in_flight_packet = false; - if (_add_batch_closure->cntl.Failed()) { - LOG(WARNING) << "failed to send batch, error=" - << berror(_add_batch_closure->cntl.ErrorCode()) - << ", error_text=" << _add_batch_closure->cntl.ErrorText(); - return Status::InternalError("failed to send batch"); - } + if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0) { + SCOPED_RAW_TIMER(&_actual_consume_ns); + AddBatchReq send_batch; + { + std::lock_guard<std::mutex> lg(_pending_batches_lock); + DCHECK(!_pending_batches.empty()); + send_batch = std::move(_pending_batches.front()); + _pending_batches.pop(); + _pending_batches_num--; + } - if (_add_batch_closure->result.has_execution_time_us()) { - _parent->update_node_add_batch_counter(_node_id, - _add_batch_closure->result.execution_time_us(), - _add_batch_closure->result.wait_lock_time_us()); - } - return {_add_batch_closure->result.status()}; -} + auto row_batch = std::move(send_batch.first); + auto request = std::move(send_batch.second); // doesn't need to be saved in heap -Status NodeChannel::_send_cur_batch(bool eos) { - RETURN_IF_ERROR(_wait_in_flight_packet()); + // tablet_ids has already set when add row + request.set_packet_seq(_next_packet_seq); + if (row_batch->num_rows() > 0) { + SCOPED_RAW_TIMER(&_serialize_batch_ns); + row_batch->serialize(request.mutable_row_batch()); + } - // tablet_ids has already set when add row - _add_batch_request.set_eos(eos); - _add_batch_request.set_packet_seq(_next_packet_seq); - if (_batch->num_rows() > 0) { - SCOPED_RAW_TIMER(_parent->mutable_serialize_batch_ns()); - _batch->serialize(_add_batch_request.mutable_row_batch()); - } + _add_batch_closure->reset(); + _add_batch_closure->cntl.set_timeout_ms(_rpc_timeout_ms); - _add_batch_closure->ref(); - _add_batch_closure->cntl.Reset(); - _add_batch_closure->cntl.set_timeout_ms(_rpc_timeout_ms); + if (request.eos()) { + for (auto pid : _parent->_partition_ids) { + request.add_partition_ids(pid); + } - if (eos) { - for (auto pid : _parent->_partition_ids) { - _add_batch_request.add_partition_ids(pid); + // eos request must be the last request + _add_batch_closure->end_mark(); + _send_finished = true; + DCHECK(_pending_batches_num == 0); + LOG(INFO) << name() << " send finished, should wait the last repsonse"; Review comment: should remove this trace log ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org