imay commented on a change in pull request #3143: Non blocking OlapTableSink
URL: https://github.com/apache/incubator-doris/pull/3143#discussion_r395413391
 
 

 ##########
 File path: be/src/exec/tablet_sink.cpp
 ##########
 @@ -186,80 +257,89 @@ void NodeChannel::cancel() {
 
     closure->ref();
     closure->cntl.set_timeout_ms(_rpc_timeout_ms);
-    _stub->tablet_writer_cancel(&closure->cntl,
-                                &request,
-                                &closure->result,
-                                closure);
+    _stub->tablet_writer_cancel(&closure->cntl, &request, &closure->result, 
closure);
     request.release_id();
 
-    // reset batch
-    _batch.reset();
+    // Beware of the destruct sequence. RowBatches will use 
mem_trackers(include ancestors).
+    // Delete RowBatches here is a better choice to reduce the potential of 
dtor errors.
+    {
+        std::lock_guard<std::mutex> lg(_pending_batches_lock);
+        std::queue<AddBatchReq> empty;
+        std::swap(_pending_batches, empty);
+        _cur_batch.reset();
+    }
 }
 
-Status NodeChannel::_wait_in_flight_packet() {
-    if (!_has_in_flight_packet) {
-        return Status::OK();
+int NodeChannel::try_send_and_fetch_status() {
+    auto st = none_of({_rpc_error, _is_cancelled, _send_finished});
+    if (!st.ok()) {
+        return 0;
     }
 
-    SCOPED_RAW_TIMER(_parent->mutable_wait_in_flight_packet_ns());
-    _add_batch_closure->join();
-    _has_in_flight_packet = false;
-    if (_add_batch_closure->cntl.Failed()) {
-        LOG(WARNING) << "failed to send batch, error="
-            << berror(_add_batch_closure->cntl.ErrorCode())
-            << ", error_text=" << _add_batch_closure->cntl.ErrorText();
-        return Status::InternalError("failed to send batch");
-    }
+    if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 
0) {
+        SCOPED_RAW_TIMER(&_actual_consume_ns);
+        AddBatchReq send_batch;
+        {
+            std::lock_guard<std::mutex> lg(_pending_batches_lock);
+            DCHECK(!_pending_batches.empty());
+            send_batch = std::move(_pending_batches.front());
+            _pending_batches.pop();
+            _pending_batches_num--;
+        }
 
-    if (_add_batch_closure->result.has_execution_time_us()) {
-        _parent->update_node_add_batch_counter(_node_id,
-                _add_batch_closure->result.execution_time_us(),
-                _add_batch_closure->result.wait_lock_time_us());
-    }
-    return {_add_batch_closure->result.status()};
-}
+        auto row_batch = std::move(send_batch.first);
+        auto request = std::move(send_batch.second); // doesn't need to be 
saved in heap
 
-Status NodeChannel::_send_cur_batch(bool eos) {
-    RETURN_IF_ERROR(_wait_in_flight_packet());
+        // tablet_ids has already set when add row
+        request.set_packet_seq(_next_packet_seq);
+        if (row_batch->num_rows() > 0) {
+            SCOPED_RAW_TIMER(&_serialize_batch_ns);
+            row_batch->serialize(request.mutable_row_batch());
+        }
 
-    // tablet_ids has already set when add row
-    _add_batch_request.set_eos(eos);
-    _add_batch_request.set_packet_seq(_next_packet_seq);
-    if (_batch->num_rows() > 0) {
-        SCOPED_RAW_TIMER(_parent->mutable_serialize_batch_ns());
-        _batch->serialize(_add_batch_request.mutable_row_batch());
-    }
+        _add_batch_closure->reset();
+        _add_batch_closure->cntl.set_timeout_ms(_rpc_timeout_ms);
 
-    _add_batch_closure->ref();
-    _add_batch_closure->cntl.Reset();
-    _add_batch_closure->cntl.set_timeout_ms(_rpc_timeout_ms);
+        if (request.eos()) {
+            for (auto pid : _parent->_partition_ids) {
+                request.add_partition_ids(pid);
+            }
 
-    if (eos) {
-        for (auto pid : _parent->_partition_ids) {
-            _add_batch_request.add_partition_ids(pid);
+            // eos request must be the last request
+            _add_batch_closure->end_mark();
+            _send_finished = true;
+            DCHECK(_pending_batches_num == 0);
+            LOG(INFO) << name() << " send finished, should wait the last 
repsonse";
         }
-    }
 
-    _stub->tablet_writer_add_batch(&_add_batch_closure->cntl,
-                                   &_add_batch_request,
-                                   &_add_batch_closure->result,
-                                   _add_batch_closure);
-    _add_batch_request.clear_tablet_ids();
-    _add_batch_request.clear_row_batch();
-    _add_batch_request.clear_partition_ids();
+        _add_batch_closure->set_in_flight();
+        _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
+                                       &_add_batch_closure->result, 
_add_batch_closure);
 
-    _has_in_flight_packet = true;
-    _next_packet_seq++;
+        _next_packet_seq++;
+    }
 
-    _batch->reset();
-    return Status::OK();
+    return _send_finished ? 0 : 1;
 }
 
-IndexChannel::~IndexChannel() {
+Status NodeChannel::none_of(std::initializer_list<bool> vars) {
+    bool none = true;
+    std::string vars_str;
 
 Review comment:
   I think most of the scenarios are still normal. better to construct strings 
when it is abnormal

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to