vagetablechicken commented on a change in pull request #3143: Non blocking 
OlapTableSink
URL: https://github.com/apache/incubator-doris/pull/3143#discussion_r395963965
 
 

 ##########
 File path: be/src/exec/tablet_sink.cpp
 ##########
 @@ -128,55 +129,128 @@ Status NodeChannel::open_wait() {
     _open_closure = nullptr;
 
     // add batch closure
-    _add_batch_closure = new RefCountClosure<PTabletWriterAddBatchResult>();
-    _add_batch_closure->ref();
+    _add_batch_closure = 
ReusableClosure<PTabletWriterAddBatchResult>::create();
+    _add_batch_closure->addFailedHandler([this]() {
+        _rpc_error = true;
+        LOG(WARNING) << "NodeChannel add batch req rpc failed, load_id=" << 
_parent->_load_id
+                     << ", node=" << node_info()->host << ":" << 
node_info()->brpc_port;
+    });
+
+    _add_batch_closure->addSuccessHandler(
+            [this](const PTabletWriterAddBatchResult& result, bool 
is_last_rpc) {
+                Status status(result.status());
+                if (status.ok()) {
+                    if (is_last_rpc) {
+                        for (auto& tablet : result.tablet_vec()) {
+                            TTabletCommitInfo commit_info;
+                            commit_info.tabletId = tablet.tablet_id();
+                            commit_info.backendId = _node_id;
+                            
_tablet_commit_infos.emplace_back(std::move(commit_info));
+                        }
+                        _add_batches_finished = true;
+                        LOG(INFO) << name() << " last rpc has responsed";
+                    }
+                } else {
+                    _rpc_error = true;
+                    LOG(WARNING) << "NodeChannel add batch req success but 
status not ok, load_id="
+                                 << _parent->_load_id << ", node=" << 
node_info()->host << ":"
+                                 << node_info()->brpc_port << ", errmsg=" << 
status.get_error_msg();
+                }
+
+                if (result.has_execution_time_us()) {
+                    _add_batch_counter.add_batch_execution_time_us += 
result.execution_time_us();
+                    _add_batch_counter.add_batch_wait_lock_time_us += 
result.wait_lock_time_us();
+                    _add_batch_counter.add_batch_num++;
+                }
+            });
 
     return status;
 }
 
 Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
-    auto row_no = _batch->add_row();
+    // If add_row() when _eos_is_produced==true, there must be sth wrong, we 
can only mark this channel as failed.
+    auto st = none_of({_rpc_error, _is_cancelled, _eos_is_produced});
+    if (!st.ok()) {
+        return st.clone_and_prepend("already stopped, can't add_row. 
rpc_error/cancelled/eos: ");
+    }
+
+    auto row_no = _cur_batch->add_row();
     if (row_no == RowBatch::INVALID_ROW_INDEX) {
-        RETURN_IF_ERROR(_send_cur_batch());
-        row_no = _batch->add_row();
+        {
+            SCOPED_RAW_TIMER(&_queue_push_lock_ns);
+            std::lock_guard<std::mutex> l(_pending_batches_lock);
+            //To simplify the add_row logic, postpone adding batch into req 
until the time of sending req
+            _pending_batches.emplace(std::move(_cur_batch), 
_cur_add_batch_request);
 
 Review comment:
   _plan node in PlanFragmentExec may not limit the mem, so I add mem limit 
check in NodeChannel::add_row, it'll block the main thread if mem limit 
exceeded.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to