This is an automated email from the ASF dual-hosted git repository. zhaoc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new fb02bb5 [Load] Fix mem limit in NodeChannel (#3643) fb02bb5 is described below commit fb02bb5cd9ed04834efda9f150d843b05885307d Author: HuangWei <huangw...@xiaomi.com> AuthorDate: Fri May 22 09:11:59 2020 +0800 [Load] Fix mem limit in NodeChannel (#3643) --- be/src/exec/tablet_sink.cpp | 10 +++++++++- be/src/exec/tablet_sink.h | 1 + 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 45df64f..4941bb3 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -59,6 +59,7 @@ Status NodeChannel::init(RuntimeState* state) { if (_node_info == nullptr) { std::stringstream ss; ss << "unknown node id, id=" << _node_id; + _cancelled = true; return Status::InternalError(ss.str()); } @@ -131,6 +132,11 @@ Status NodeChannel::open_wait() { } _open_closure = nullptr; + if (!status.ok()) { + _cancelled = true; + return status; + } + // add batch closure _add_batch_closure = ReusableClosure<PTabletWriterAddBatchResult>::create(); _add_batch_closure->addFailedHandler([this]() { @@ -179,7 +185,9 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { // We use OlapTableSink mem_tracker which has the same ancestor of _plan node, // so in the ideal case, mem limit is a matter for _plan node. // But there is still some unfinished things, we do mem limit here temporarily. - while (_parent->_mem_tracker->any_limit_exceeded()) { + // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below. + // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close(). + while (!_cancelled && _parent->_mem_tracker->any_limit_exceeded() && _pending_batches_num > 0) { SCOPED_RAW_TIMER(&_mem_exceeded_block_ns); SleepFor(MonoDelta::FromMilliseconds(10)); } diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 9ff18cf..47c9cfe 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -166,6 +166,7 @@ public: // 0: stopped, send finished(eos request has been sent), or any internal error; // 1: running, haven't reach eos. // only allow 1 rpc in flight + // plz make sure, this func should be called after open_wait(). int try_send_and_fetch_status(); void time_report(std::unordered_map<int64_t, AddBatchCounter>* add_batch_counter_map, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org