vagetablechicken commented on a change in pull request #3143: Non blocking 
OlapTableSink
URL: https://github.com/apache/incubator-doris/pull/3143#discussion_r395956321
 
 

 ##########
 File path: be/src/exec/tablet_sink.cpp
 ##########
 @@ -128,55 +129,128 @@ Status NodeChannel::open_wait() {
     _open_closure = nullptr;
 
     // add batch closure
-    _add_batch_closure = new RefCountClosure<PTabletWriterAddBatchResult>();
-    _add_batch_closure->ref();
+    _add_batch_closure = 
ReusableClosure<PTabletWriterAddBatchResult>::create();
+    _add_batch_closure->addFailedHandler([this]() {
+        _rpc_error = true;
+        LOG(WARNING) << "NodeChannel add batch req rpc failed, load_id=" << 
_parent->_load_id
+                     << ", node=" << node_info()->host << ":" << 
node_info()->brpc_port;
+    });
+
+    _add_batch_closure->addSuccessHandler(
+            [this](const PTabletWriterAddBatchResult& result, bool 
is_last_rpc) {
+                Status status(result.status());
+                if (status.ok()) {
+                    if (is_last_rpc) {
+                        for (auto& tablet : result.tablet_vec()) {
+                            TTabletCommitInfo commit_info;
+                            commit_info.tabletId = tablet.tablet_id();
+                            commit_info.backendId = _node_id;
+                            
_tablet_commit_infos.emplace_back(std::move(commit_info));
+                        }
+                        _add_batches_finished = true;
+                        LOG(INFO) << name() << " last rpc has responsed";
+                    }
+                } else {
+                    _rpc_error = true;
+                    LOG(WARNING) << "NodeChannel add batch req success but 
status not ok, load_id="
+                                 << _parent->_load_id << ", node=" << 
node_info()->host << ":"
+                                 << node_info()->brpc_port << ", errmsg=" << 
status.get_error_msg();
+                }
+
+                if (result.has_execution_time_us()) {
+                    _add_batch_counter.add_batch_execution_time_us += 
result.execution_time_us();
+                    _add_batch_counter.add_batch_wait_lock_time_us += 
result.wait_lock_time_us();
+                    _add_batch_counter.add_batch_num++;
+                }
+            });
 
     return status;
 }
 
 Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
-    auto row_no = _batch->add_row();
+    // If add_row() when _eos_is_produced==true, there must be sth wrong, we 
can only mark this channel as failed.
+    auto st = none_of({_rpc_error, _is_cancelled, _eos_is_produced});
+    if (!st.ok()) {
+        return st.clone_and_prepend("already stopped, can't add_row. 
rpc_error/cancelled/eos: ");
+    }
+
+    auto row_no = _cur_batch->add_row();
     if (row_no == RowBatch::INVALID_ROW_INDEX) {
-        RETURN_IF_ERROR(_send_cur_batch());
-        row_no = _batch->add_row();
+        {
+            SCOPED_RAW_TIMER(&_queue_push_lock_ns);
+            std::lock_guard<std::mutex> l(_pending_batches_lock);
+            //To simplify the add_row logic, postpone adding batch into req 
until the time of sending req
+            _pending_batches.emplace(std::move(_cur_batch), 
_cur_add_batch_request);
 
 Review comment:
   As I commented here 
https://github.com/apache/incubator-doris/pull/3143/files#diff-4007834d7219c2282c8ae5c454f01dbaR67
 , the scan node can do the memory limit. I should add more comments.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to