This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a9a633c8f8c [Fix](load) Fix the channel leak when close wait has been cancelled (#38031) a9a633c8f8c is described below commit a9a633c8f8c8f3b217583a40824340607330d618 Author: Xin Liao <liaoxin...@126.com> AuthorDate: Thu Jul 18 09:53:52 2024 +0800 [Fix](load) Fix the channel leak when close wait has been cancelled (#38031) When the close_wait is called, the NodeChannel has already been marked as cancelled, but close_wait will set _is_closed to true. When it actually sends a cancel request to the downstream LoadChannel, it finds that _is_closed has already been set to true, so it will not send an RPC request, causing a LoadChannel leak. --- be/src/vec/sink/writer/vtablet_writer.cpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index c140ddd5f26..d3d6c35fc42 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -895,11 +895,6 @@ void VNodeChannel::cancel(const std::string& cancel_msg) { Status VNodeChannel::close_wait(RuntimeState* state) { DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", { MemoryReclamation::process_full_gc(); }); SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); - // set _is_closed to true finally - Defer set_closed {[&]() { - std::lock_guard<std::mutex> l(_closed_lock); - _is_closed = true; - }}; auto st = none_of({_cancelled, !_eos_is_produced}); if (!st.ok()) { @@ -923,8 +918,8 @@ Status VNodeChannel::close_wait(RuntimeState* state) { VLOG_CRITICAL << _parent->_sender_id << " close wait finished"; _close_time_ms = UnixMillis() - _close_time_ms; - if (_cancelled || state->is_cancelled()) { - cancel(state->cancel_reason().to_string()); + if (state->is_cancelled()) { + _cancel_with_msg(state->cancel_reason().to_string()); } if (_add_batches_finished) { @@ -936,6 +931,11 @@ Status VNodeChannel::close_wait(RuntimeState* state) { _index_channel->set_error_tablet_in_state(state); _index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id); _index_channel->set_tablets_filtered_rows(_tablets_filtered_rows, _node_id); + + std::lock_guard<std::mutex> l(_closed_lock); + // only when normal close, we set _is_closed to true. + // otherwise, we will set it to true in cancel(). + _is_closed = true; return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org