This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3553b58ce71 [fix](move-memtable) do not retry open streams (#41550) 3553b58ce71 is described below commit 3553b58ce71e91e8901ab31f365743988faf6625 Author: Kaijie Chen <c...@apache.org> AuthorDate: Mon Oct 14 19:19:34 2024 +0800 [fix](move-memtable) do not retry open streams (#41550) ## Proposed changes Currently, a second sink may retry open a LoadStreamStub if a previous sink failed to open the stream. In some cases the load stream could receive both opens, causing the actual open stream count to be greater than the expected total streams count, which leads to use-after-free afterwards. This PR disables the retry of `LoadStreamStub::open()`. --- be/src/vec/sink/load_stream_stub.cpp | 60 ++++++++++++++-------------- be/src/vec/sink/load_stream_stub.h | 6 +-- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 2 +- 3 files changed, 33 insertions(+), 35 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index e3899ce7743..26c8267bb12 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -135,7 +135,7 @@ LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, _is_incremental(incremental) {}; LoadStreamStub::~LoadStreamStub() { - if (_is_init.load() && !_is_closed.load()) { + if (_is_open.load() && !_is_closed.load()) { auto ret = brpc::StreamClose(_stream_id); LOG(INFO) << *this << " is deconstructed, close " << (ret == 0 ? "success" : "failed"); } @@ -149,8 +149,9 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, int64_t idle_timeout_ms, bool enable_profile) { std::unique_lock<bthread::Mutex> lock(_open_mutex); if (_is_init.load()) { - return _init_st; + return _status; } + _is_init.store(true); _dst_id = node_info.id; brpc::StreamOptions opt; opt.max_buf_size = config::load_stream_max_buf_size; @@ -160,8 +161,8 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, brpc::Controller cntl; if (int ret = brpc::StreamCreate(&_stream_id, cntl, &opt)) { delete opt.handler; - _init_st = Status::Error<true>(ret, "Failed to create stream"); - return _init_st; + _status = Status::Error<true>(ret, "Failed to create stream"); + return _status; } cntl.set_timeout_ms(config::open_load_stream_timeout_ms); POpenLoadStreamRequest request; @@ -174,8 +175,8 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, } else if (total_streams > 0) { request.set_total_streams(total_streams); } else { - _init_st = Status::InternalError("total_streams should be greator than 0"); - return _init_st; + _status = Status::InternalError("total_streams should be greator than 0"); + return _status; } request.set_idle_timeout_ms(idle_timeout_ms); schema.to_protobuf(request.mutable_schema()); @@ -199,13 +200,13 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, } if (cntl.Failed()) { brpc::StreamClose(_stream_id); - _init_st = Status::InternalError("Failed to connect to backend {}: {}", _dst_id, - cntl.ErrorText()); - return _init_st; + _status = Status::InternalError("Failed to connect to backend {}: {}", _dst_id, + cntl.ErrorText()); + return _status; } LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << node_info.brpc_port << ", " << *this; - _is_init.store(true); + _is_open.store(true); return Status::OK(); } @@ -213,9 +214,9 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t segment_id, uint64_t offset, std::span<const Slice> data, bool segment_eos, FileType file_type) { - if (!_is_init.load()) { - add_failed_tablet(tablet_id, _init_st); - return _init_st; + if (!_is_open.load()) { + add_failed_tablet(tablet_id, _status); + return _status; } DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { if (segment_id != 0) { @@ -240,9 +241,9 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64 Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t segment_id, const SegmentStatistics& segment_stat, TabletSchemaSPtr flush_schema) { - if (!_is_init.load()) { - add_failed_tablet(tablet_id, _init_st); - return _init_st; + if (!_is_open.load()) { + add_failed_tablet(tablet_id, _status); + return _status; } DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { if (segment_id != 0) { @@ -266,8 +267,8 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64 // CLOSE_LOAD Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit) { - if (!_is_init.load()) { - return _init_st; + if (!_is_open.load()) { + return _status; } PStreamHeader header; *header.mutable_load_id() = _load_id; @@ -276,10 +277,10 @@ Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commi for (const auto& tablet : tablets_to_commit) { *header.add_tablets() = tablet; } - _close_st = _encode_and_send(header); - if (!_close_st.ok()) { - LOG(WARNING) << "stream " << _stream_id << " close failed: " << _close_st; - return _close_st; + _status = _encode_and_send(header); + if (!_status.ok()) { + LOG(WARNING) << "stream " << _stream_id << " close failed: " << _status; + return _status; } _is_closing.store(true); return Status::OK(); @@ -287,8 +288,8 @@ Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commi // GET_SCHEMA Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) { - if (!_is_init.load()) { - return _init_st; + if (!_is_open.load()) { + return _status; } PStreamHeader header; *header.mutable_load_id() = _load_id; @@ -310,8 +311,8 @@ Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) { Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t timeout_ms) { - if (!_is_init.load()) { - return _init_st; + if (!_is_open.load()) { + return _status; } if (_tablet_schema_for_index->contains(index_id)) { return Status::OK(); @@ -338,11 +339,8 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) { DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK); - if (!_is_init.load()) { - return _init_st; - } if (!_is_closing.load()) { - return _close_st; + return _status; } if (_is_closed.load()) { return _check_cancel(); @@ -371,7 +369,7 @@ Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) { void LoadStreamStub::cancel(Status reason) { LOG(WARNING) << *this << " is cancelled because of " << reason; - if (_is_init.load()) { + if (_is_open.load()) { brpc::StreamClose(_stream_id); } { diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index e83cbf24c69..d3baef54c5e 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -194,7 +194,7 @@ public: int64_t dst_id() const { return _dst_id; } - bool is_inited() const { return _is_init.load(); } + bool is_open() const { return _is_open.load(); } bool is_incremental() const { return _is_incremental; } @@ -230,6 +230,7 @@ private: protected: std::atomic<bool> _is_init; + std::atomic<bool> _is_open; std::atomic<bool> _is_closing; std::atomic<bool> _is_closed; std::atomic<bool> _is_cancelled; @@ -239,8 +240,7 @@ protected: brpc::StreamId _stream_id; int64_t _src_id = -1; // source backend_id int64_t _dst_id = -1; // destination backend_id - Status _init_st = Status::InternalError<false>("Stream is not open"); - Status _close_st; + Status _status = Status::InternalError<false>("Stream is not open"); Status _cancel_st; bthread::Mutex _open_mutex; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 2135e4729fa..ccfca564297 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -390,7 +390,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, index_id, tablet_id); _tablets_for_node[node_id].emplace(tablet_id, tablet); auto stream = _load_stream_map->at(node_id)->at(_stream_index); - for (int i = 1; i < _stream_per_node && !stream->is_inited(); i++) { + for (int i = 1; i < _stream_per_node && !stream->is_open(); i++) { stream = _load_stream_map->at(node_id)->at((_stream_index + i) % _stream_per_node); } streams.emplace_back(std::move(stream)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org