This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 838225b6beb [fix](move-memtable) wait stream close before releasing streams (#27791) 838225b6beb is described below commit 838225b6beb8246994e36200ea9feb17aecec4a3 Author: Kaijie Chen <c...@apache.org> AuthorDate: Thu Nov 30 15:03:07 2023 +0800 [fix](move-memtable) wait stream close before releasing streams (#27791) --- be/src/vec/sink/load_stream_stub.cpp | 12 ++++++++++-- be/src/vec/sink/load_stream_stub.h | 4 ++++ be/src/vec/sink/load_stream_stub_pool.cpp | 12 ++++++++++++ be/src/vec/sink/writer/vtablet_writer_v2.cpp | 11 +++++++---- 4 files changed, 33 insertions(+), 6 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 76907713fde..26331053e3a 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -113,10 +113,18 @@ LoadStreamStub::LoadStreamStub(LoadStreamStub& stub) _tablet_schema_for_index(stub._tablet_schema_for_index), _enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {}; -LoadStreamStub::~LoadStreamStub() { +LoadStreamStub::~LoadStreamStub() = default; + +Status LoadStreamStub::close_stream() { if (_is_init.load() && !_handler.is_closed()) { - brpc::StreamClose(_stream_id); + LOG(INFO) << "closing stream, load_id=" << print_id(_load_id) << ", src_id=" << _src_id + << ", dst_id=" << _dst_id << ", stream_id=" << _stream_id; + auto ret = brpc::StreamClose(_stream_id); + if (ret != 0) { + return Status::InternalError("StreamClose failed, err={}", ret); + } } + return Status::OK(); } // open_load_stream diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index aca2e9ea550..7aae8496a0c 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -174,6 +174,10 @@ public: // GET_SCHEMA Status get_schema(const std::vector<PTabletID>& tablets); + // close stream, usually close is initiated by the remote. + // in case of remote failure, we should be able to close stream locally. + Status close_stream(); + // wait remote to close stream, // remote will close stream when it receives CLOSE_LOAD Status close_wait(int64_t timeout_ms = 0) { diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp b/be/src/vec/sink/load_stream_stub_pool.cpp index 240e44ef380..bc19bad532c 100644 --- a/be/src/vec/sink/load_stream_stub_pool.cpp +++ b/be/src/vec/sink/load_stream_stub_pool.cpp @@ -31,6 +31,18 @@ void LoadStreams::release() { int num_use = --_use_cnt; if (num_use == 0) { LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id=" << _dst_id; + for (auto& stream : _streams) { + auto st = stream->close_stream(); + if (!st.ok()) { + LOG(WARNING) << "close stream failed " << st; + } + } + for (auto& stream : _streams) { + auto st = stream->close_wait(); + if (!st.ok()) { + LOG(WARNING) << "close wait failed " << st; + } + } _pool->erase(_load_id, _dst_id); } else { LOG(INFO) << "keeping streams, load_id=" << _load_id << ", dst_id=" << _dst_id diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 33bca6a9401..74cc3d2bcf6 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -42,6 +42,7 @@ #include "runtime/thread_context.h" #include "service/brpc.h" #include "util/brpc_client_cache.h" +#include "util/defer_op.h" #include "util/doris_metrics.h" #include "util/threadpool.h" #include "util/thrift_util.h" @@ -498,10 +499,12 @@ Status VTabletWriterV2::close(Status exec_status) { COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time()); COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns()); - // release streams from the pool first, to prevent memory leak - for (const auto& [_, streams] : _streams_for_node) { - streams->release(); - } + // defer stream release to prevent memory leak + Defer defer([&] { + for (const auto& [_, streams] : _streams_for_node) { + streams->release(); + } + }); { SCOPED_TIMER(_close_writer_timer); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org