This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fb02bb5  [Load] Fix mem limit in NodeChannel (#3643)
fb02bb5 is described below

commit fb02bb5cd9ed04834efda9f150d843b05885307d
Author: HuangWei <huangw...@xiaomi.com>
AuthorDate: Fri May 22 09:11:59 2020 +0800

    [Load] Fix mem limit in NodeChannel (#3643)
---
 be/src/exec/tablet_sink.cpp | 10 +++++++++-
 be/src/exec/tablet_sink.h   |  1 +
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 45df64f..4941bb3 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -59,6 +59,7 @@ Status NodeChannel::init(RuntimeState* state) {
     if (_node_info == nullptr) {
         std::stringstream ss;
         ss << "unknown node id, id=" << _node_id;
+        _cancelled = true;
         return Status::InternalError(ss.str());
     }
 
@@ -131,6 +132,11 @@ Status NodeChannel::open_wait() {
     }
     _open_closure = nullptr;
 
+    if (!status.ok()) {
+        _cancelled = true;
+        return status;
+    }
+
     // add batch closure
     _add_batch_closure = 
ReusableClosure<PTabletWriterAddBatchResult>::create();
     _add_batch_closure->addFailedHandler([this]() {
@@ -179,7 +185,9 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t 
tablet_id) {
     // We use OlapTableSink mem_tracker which has the same ancestor of _plan 
node,
     // so in the ideal case, mem limit is a matter for _plan node.
     // But there is still some unfinished things, we do mem limit here 
temporarily.
-    while (_parent->_mem_tracker->any_limit_exceeded()) {
+    // _cancelled may be set by rpc callback, and it's possible that 
_cancelled might be set in any of the steps below.
+    // It's fine to do a fake add_row() and return OK, because we will check 
_cancelled in next add_row() or mark_close().
+    while (!_cancelled && _parent->_mem_tracker->any_limit_exceeded() && 
_pending_batches_num > 0) {
         SCOPED_RAW_TIMER(&_mem_exceeded_block_ns);
         SleepFor(MonoDelta::FromMilliseconds(10));
     }
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 9ff18cf..47c9cfe 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -166,6 +166,7 @@ public:
     // 0: stopped, send finished(eos request has been sent), or any internal 
error;
     // 1: running, haven't reach eos.
     // only allow 1 rpc in flight
+    // plz make sure, this func should be called after open_wait().
     int try_send_and_fetch_status();
 
     void time_report(std::unordered_map<int64_t, AddBatchCounter>* 
add_batch_counter_map,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to