imay commented on a change in pull request #3143:
URL: https://github.com/apache/incubator-doris/pull/3143#discussion_r417744700



##########
File path: be/src/exec/tablet_sink.cpp
##########
@@ -128,54 +131,136 @@ Status NodeChannel::open_wait() {
     _open_closure = nullptr;
 
     // add batch closure
-    _add_batch_closure = new RefCountClosure<PTabletWriterAddBatchResult>();
-    _add_batch_closure->ref();
+    _add_batch_closure = 
ReusableClosure<PTabletWriterAddBatchResult>::create();
+    _add_batch_closure->addFailedHandler([this]() {
+        _cancelled = true;
+        LOG(WARNING) << "NodeChannel add batch req rpc failed, " << 
print_load_info()
+                     << ", node=" << node_info()->host << ":" << 
node_info()->brpc_port;
+    });
+
+    _add_batch_closure->addSuccessHandler(
+            [this](const PTabletWriterAddBatchResult& result, bool 
is_last_rpc) {
+                Status status(result.status());
+                if (status.ok()) {
+                    if (is_last_rpc) {
+                        for (auto& tablet : result.tablet_vec()) {
+                            TTabletCommitInfo commit_info;
+                            commit_info.tabletId = tablet.tablet_id();
+                            commit_info.backendId = _node_id;
+                            
_tablet_commit_infos.emplace_back(std::move(commit_info));
+                        }
+                        _add_batches_finished = true;
+                    }
+                } else {
+                    _cancelled = true;
+                    LOG(WARNING) << "NodeChannel add batch req success but 
status isn't ok, "
+                                 << print_load_info() << ", node=" << 
node_info()->host << ":"
+                                 << node_info()->brpc_port << ", errmsg=" << 
status.get_error_msg();
+                }
+
+                if (result.has_execution_time_us()) {
+                    _add_batch_counter.add_batch_execution_time_us += 
result.execution_time_us();
+                    _add_batch_counter.add_batch_wait_lock_time_us += 
result.wait_lock_time_us();
+                    _add_batch_counter.add_batch_num++;
+                }
+            });
 
     return status;
 }
 
 Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
-    auto row_no = _batch->add_row();
+    // If add_row() when _eos_is_produced==true, there must be sth wrong, we 
can only mark this channel as failed.
+    auto st = none_of({_cancelled, _eos_is_produced});
+    if (!st.ok()) {
+        return st.clone_and_prepend("already stopped, can't add_row. 
cancelled/eos: ");
+    }
+
+    // We use OlapTableSink mem_tracker which has the same ancestor of _plan 
node,
+    // so in the ideal case, mem limit is a matter for _plan node.
+    // But there is still some unfinished things, we do mem limit here 
temporarily.
+    while (_parent->_mem_tracker->any_limit_exceeded()) {
+        SCOPED_RAW_TIMER(&_mem_exceeded_block_ns);
+        SleepFor(MonoDelta::FromMilliseconds(10));
+    }
+
+    auto row_no = _cur_batch->add_row();
     if (row_no == RowBatch::INVALID_ROW_INDEX) {
-        RETURN_IF_ERROR(_send_cur_batch());
-        row_no = _batch->add_row();
+        {
+            SCOPED_RAW_TIMER(&_queue_push_lock_ns);
+            std::lock_guard<std::mutex> l(_pending_batches_lock);
+            //To simplify the add_row logic, postpone adding batch into req 
until the time of sending req
+            _pending_batches.emplace(std::move(_cur_batch), 
_cur_add_batch_request);
+            _pending_batches_num++;
+        }
+
+        _cur_batch.reset(new RowBatch(*_row_desc, _batch_size, 
_parent->_mem_tracker));
+        _cur_add_batch_request.clear_tablet_ids();
+
+        row_no = _cur_batch->add_row();
     }
     DCHECK_NE(row_no, RowBatch::INVALID_ROW_INDEX);
-    auto tuple = input_tuple->deep_copy(*_tuple_desc, 
_batch->tuple_data_pool());
-    _batch->get_row(row_no)->set_tuple(0, tuple);
-    _batch->commit_last_row();
-    _add_batch_request.add_tablet_ids(tablet_id);
+    auto tuple = input_tuple->deep_copy(*_tuple_desc, 
_cur_batch->tuple_data_pool());
+    _cur_batch->get_row(row_no)->set_tuple(0, tuple);
+    _cur_batch->commit_last_row();
+    _cur_add_batch_request.add_tablet_ids(tablet_id);
     return Status::OK();
 }
 
-Status NodeChannel::close(RuntimeState* state) {
-    auto st = _close(state);
-    _batch.reset();
-    return st;
-}
+Status NodeChannel::mark_close() {
+    auto st = none_of({_cancelled, _eos_is_produced});
+    if (!st.ok()) {
+        return st.clone_and_prepend("already stopped, can't mark as closed. 
cancelled/eos: ");
+    }
 
-Status NodeChannel::_close(RuntimeState* state) {
-    return _send_cur_batch(true);
+    _cur_add_batch_request.set_eos(true);
+    {
+        std::lock_guard<std::mutex> l(_pending_batches_lock);
+        _pending_batches.emplace(std::move(_cur_batch), 
_cur_add_batch_request);
+        _pending_batches_num++;
+        DCHECK(_pending_batches.back().second.eos());
+    }
+
+    _eos_is_produced = true;
+
+    _cur_batch.reset();
+    return Status::OK();
 }
 
 Status NodeChannel::close_wait(RuntimeState* state) {
-    RETURN_IF_ERROR(_wait_in_flight_packet());
-    Status status(_add_batch_closure->result.status());
-    if (status.ok()) {
-        for (auto& tablet : _add_batch_closure->result.tablet_vec()) {
-            TTabletCommitInfo commit_info;
-            commit_info.tabletId = tablet.tablet_id();
-            commit_info.backendId = _node_id;
-            state->tablet_commit_infos().emplace_back(std::move(commit_info));
-        }
+    auto st = none_of({_cancelled, !_eos_is_produced});
+    if (!st.ok()) {
+        return st.clone_and_prepend("already stopped, skip waiting for close. 
cancelled/!eos: ");
     }
-    // clear batch after sendt
-    _batch.reset();
-    return status;
+
+    // waiting for finished, it may take a long time, so we could't set a 
timeout
+    // use log to make it easier
+    LOG(INFO) << name() << "start close_wait";
+    while (!_add_batches_finished && !_cancelled) {
+        SleepFor(MonoDelta::FromMilliseconds(1));
+    }
+    LOG(INFO) << name() << "close_wait done";

Review comment:
       If You want to, you can count the time with StopWatch and log the time 
in one log.
   I suggest to use vlog.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to