This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.0 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 422d4d2afdf8bccb431e9f7565ed4e9071886f9a Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Fri Mar 18 09:38:16 2022 +0800 [fix](load) fix bug that BE may crash when calling `mark_as_failed` (#8501) 1. The methods in the IndexChannel are called back in the RpcClosure in the NodeChannel. However, this callback may occur after the whole task is finished (e.g. due to network latency), and by that time the IndexChannel may have been destructured, so we should not call the IndexChannel methods anymore, otherwise the BE will crash. Therefore, we use the `_is_closed` variable and `_closed_lock` to ensure that the RPC callback function will not call the IndexChannel's method after the NodeChannel is closed. 2. Do not add IndexChannel to the ObjectPool. Because when deconstruct IndexChannel, it may call the deconstruction of NodeChannel. And the deconstruction of NodeChannel maybe time consuming(wait rpc finished). But the ObjectPool will hold a SpinLock to destroy the objects, so it may cause CPU busy. --- be/src/exec/tablet_sink.cpp | 32 +++++++++++++++++++++++--------- be/src/exec/tablet_sink.h | 15 +++++++++++++-- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 5b59508..b690f89 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -34,6 +34,7 @@ #include "service/brpc.h" #include "util/brpc_client_cache.h" #include "util/debug/sanitizer_scopes.h" +#include "util/defer_op.h" #include "util/monotime.h" #include "util/proto_util.h" #include "util/threadpool.h" @@ -183,6 +184,12 @@ Status NodeChannel::open_wait() { // add batch closure _add_batch_closure = ReusableClosure<PTabletWriterAddBatchResult>::create(); _add_batch_closure->addFailedHandler([this](bool is_last_rpc) { + std::lock_guard<std::mutex> l(this->_closed_lock); + if (this->_is_closed) { + // if the node channel is closed, no need to call `mark_as_failed`, + // and notice that _index_channel may already be destroyed. + return; + } // If rpc failed, mark all tablets on this node channel as failed _index_channel->mark_as_failed(this->node_id(), this->host(), _add_batch_closure->cntl.ErrorText(), -1); Status st = _index_channel->check_intolerable_failure(); @@ -197,6 +204,12 @@ Status NodeChannel::open_wait() { _add_batch_closure->addSuccessHandler([this](const PTabletWriterAddBatchResult& result, bool is_last_rpc) { + std::lock_guard<std::mutex> l(this->_closed_lock); + if (this->_is_closed) { + // if the node channel is closed, no need to call the following logic, + // and notice that _index_channel may already be destroyed. + return; + } Status status(result.status()); if (status.ok()) { // if has error tablet, handle them first @@ -329,15 +342,10 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) { return Status::OK(); } -Status NodeChannel::mark_close() { +void NodeChannel::mark_close() { auto st = none_of({_cancelled, _eos_is_produced}); if (!st.ok()) { - if (_cancelled) { - std::lock_guard<SpinLock> l(_cancel_msg_lock); - return Status::InternalError("mark close failed. " + _cancel_msg); - } else { - return st.clone_and_prepend("already stopped, can't mark as closed. cancelled/eos: "); - } + return; } _cur_add_batch_request.set_eos(true); @@ -354,10 +362,16 @@ Status NodeChannel::mark_close() { } _eos_is_produced = true; - return Status::OK(); + return; } Status NodeChannel::close_wait(RuntimeState* state) { + // set _is_closed to true finally + Defer set_closed {[&]() { + std::lock_guard<std::mutex> l(_closed_lock); + _is_closed = true; + }}; + auto st = none_of({_cancelled, !_eos_is_produced}); if (!st.ok()) { if (_cancelled) { @@ -801,7 +815,7 @@ Status OlapTableSink::prepare(RuntimeState* state) { tablets.emplace_back(std::move(tablet_with_partition)); } } - auto channel = _pool->add(new IndexChannel(this, index->index_id)); + auto channel = std::make_shared<IndexChannel>(this, index->index_id); RETURN_IF_ERROR(channel->init(state, tablets)); _channels.emplace_back(channel); } diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index c485632..8071d20 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -170,7 +170,7 @@ public: // two ways to stop channel: // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response. // 2. just cancel() - Status mark_close(); + void mark_close(); Status close_wait(RuntimeState* state); void cancel(const std::string& cancel_msg); @@ -284,6 +284,17 @@ private: // the timestamp when this node channel be marked closed and finished closed uint64_t _close_time_ms = 0; + + // lock to protect _is_closed. + // The methods in the IndexChannel are called back in the RpcClosure in the NodeChannel. + // However, this rpc callback may occur after the whole task is finished (e.g. due to network latency), + // and by that time the IndexChannel may have been destructured, so we should not call the + // IndexChannel methods anymore, otherwise the BE will crash. + // Therefore, we use the _is_closed and _closed_lock to ensure that the RPC callback + // function will not call the IndexChannel method after the NodeChannel is closed. + // The IndexChannel is definitely accessible until the NodeChannel is closed. + std::mutex _closed_lock; + bool _is_closed = false; }; class IndexChannel { @@ -424,7 +435,7 @@ protected: Bitmap _filter_bitmap; // index_channel - std::vector<IndexChannel*> _channels; + std::vector<std::shared_ptr<IndexChannel>> _channels; CountDownLatch _stop_background_threads_latch; scoped_refptr<Thread> _sender_thread; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org