This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 25d7d0b2554 [fix](move-memtable) abstract multi-streams to one logical stream (#42039) (#42250) 25d7d0b2554 is described below commit 25d7d0b2554aea3b440c3bb8c58d0837e2740ead Author: Kaijie Chen <c...@apache.org> AuthorDate: Tue Oct 22 20:26:42 2024 +0800 [fix](move-memtable) abstract multi-streams to one logical stream (#42039) (#42250) backport #42039 --- be/src/vec/sink/load_stream_map_pool.cpp | 50 +++---- be/src/vec/sink/load_stream_map_pool.h | 12 +- be/src/vec/sink/load_stream_stub.cpp | 66 ++++++++- be/src/vec/sink/load_stream_stub.h | 69 +++++++++ be/src/vec/sink/writer/vtablet_writer_v2.cpp | 160 ++++++++++----------- be/src/vec/sink/writer/vtablet_writer_v2.h | 8 +- be/test/vec/sink/vtablet_writer_v2_test.cpp | 7 +- .../test_multi_replica_fault_injection.groovy | 2 - 8 files changed, 234 insertions(+), 140 deletions(-) diff --git a/be/src/vec/sink/load_stream_map_pool.cpp b/be/src/vec/sink/load_stream_map_pool.cpp index e8407f4730d..d6dddcc96dc 100644 --- a/be/src/vec/sink/load_stream_map_pool.cpp +++ b/be/src/vec/sink/load_stream_map_pool.cpp @@ -35,22 +35,20 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, DCHECK(num_use > 0) << "use num should be greater than 0"; } -std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id, bool incremental) { +std::shared_ptr<LoadStreamStubs> LoadStreamMap::get_or_create(int64_t dst_id, bool incremental) { std::lock_guard<std::mutex> lock(_mutex); - std::shared_ptr<Streams> streams = _streams_for_node[dst_id]; + std::shared_ptr<LoadStreamStubs> streams = _streams_for_node[dst_id]; if (streams != nullptr) { return streams; } - streams = std::make_shared<Streams>(); - for (int i = 0; i < _num_streams; i++) { - streams->emplace_back(new LoadStreamStub(_load_id, _src_id, _tablet_schema_for_index, - _enable_unique_mow_for_index, incremental)); - } + streams = std::make_shared<LoadStreamStubs>(_num_streams, _load_id, _src_id, + _tablet_schema_for_index, + _enable_unique_mow_for_index, incremental); _streams_for_node[dst_id] = streams; return streams; } -std::shared_ptr<Streams> LoadStreamMap::at(int64_t dst_id) { +std::shared_ptr<LoadStreamStubs> LoadStreamMap::at(int64_t dst_id) { std::lock_guard<std::mutex> lock(_mutex); return _streams_for_node.at(dst_id); } @@ -60,7 +58,7 @@ bool LoadStreamMap::contains(int64_t dst_id) { return _streams_for_node.contains(dst_id); } -void LoadStreamMap::for_each(std::function<void(int64_t, const Streams&)> fn) { +void LoadStreamMap::for_each(std::function<void(int64_t, LoadStreamStubs&)> fn) { decltype(_streams_for_node) snapshot; { std::lock_guard<std::mutex> lock(_mutex); @@ -71,7 +69,7 @@ void LoadStreamMap::for_each(std::function<void(int64_t, const Streams&)> fn) { } } -Status LoadStreamMap::for_each_st(std::function<Status(int64_t, const Streams&)> fn) { +Status LoadStreamMap::for_each_st(std::function<Status(int64_t, LoadStreamStubs&)> fn) { decltype(_streams_for_node) snapshot; { std::lock_guard<std::mutex> lock(_mutex); @@ -108,7 +106,10 @@ bool LoadStreamMap::release() { } void LoadStreamMap::close_load(bool incremental) { - auto st = for_each_st([this, incremental](int64_t dst_id, const Streams& streams) -> Status { + for (auto& [dst_id, streams] : _streams_for_node) { + if (streams->is_incremental()) { + continue; + } std::vector<PTabletID> tablets_to_commit; const auto& tablets = _tablets_to_commit[dst_id]; tablets_to_commit.reserve(tablets.size()); @@ -116,30 +117,11 @@ void LoadStreamMap::close_load(bool incremental) { tablets_to_commit.push_back(tablet); tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]); } - Status status = Status::OK(); - bool first = true; - for (auto& stream : streams) { - if (stream->is_incremental() != incremental) { - continue; - } - if (first) { - auto st = stream->close_load(tablets_to_commit); - if (!st.ok() && status.ok()) { - status = st; - } - first = false; - } else { - auto st = stream->close_load({}); - if (!st.ok() && status.ok()) { - status = st; - } - } + auto st = streams->close_load(tablets_to_commit); + if (!st.ok()) { + LOG(WARNING) << "close_load for " << (incremental ? "incremental" : "non-incremental") + << " streams failed: " << st << ", load_id=" << _load_id; } - return status; - }); - if (!st.ok()) { - LOG(WARNING) << "close_load for " << (incremental ? "incremental" : "non-incremental") - << " streams failed: " << st << ", load_id=" << _load_id; } } diff --git a/be/src/vec/sink/load_stream_map_pool.h b/be/src/vec/sink/load_stream_map_pool.h index e5b66aaf9c9..602f1711a94 100644 --- a/be/src/vec/sink/load_stream_map_pool.h +++ b/be/src/vec/sink/load_stream_map_pool.h @@ -71,22 +71,20 @@ class LoadStreamStub; class LoadStreamMapPool; -using Streams = std::vector<std::shared_ptr<LoadStreamStub>>; - class LoadStreamMap { public: LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use, LoadStreamMapPool* pool); - std::shared_ptr<Streams> get_or_create(int64_t dst_id, bool incremental = false); + std::shared_ptr<LoadStreamStubs> get_or_create(int64_t dst_id, bool incremental = false); - std::shared_ptr<Streams> at(int64_t dst_id); + std::shared_ptr<LoadStreamStubs> at(int64_t dst_id); bool contains(int64_t dst_id); - void for_each(std::function<void(int64_t, const Streams&)> fn); + void for_each(std::function<void(int64_t, LoadStreamStubs&)> fn); - Status for_each_st(std::function<Status(int64_t, const Streams&)> fn); + Status for_each_st(std::function<Status(int64_t, LoadStreamStubs&)> fn); void save_tablets_to_commit(int64_t dst_id, const std::vector<PTabletID>& tablets_to_commit); @@ -107,7 +105,7 @@ private: const int _num_streams; std::atomic<int> _use_cnt; std::mutex _mutex; - std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node; + std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>> _streams_for_node; LoadStreamMapPool* _pool = nullptr; std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index; std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index; diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 1d13ca4b903..672a0be44f7 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -207,7 +207,8 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << node_info.brpc_port << ", " << *this; _is_open.store(true); - return Status::OK(); + _status = Status::OK(); + return _status; } // APPEND_DATA @@ -504,4 +505,67 @@ inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub) return ostr; } +Status LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache, + const NodeInfo& node_info, int64_t txn_id, + const OlapTableSchemaParam& schema, + const std::vector<PTabletID>& tablets_for_schema, int total_streams, + int64_t idle_timeout_ms, bool enable_profile) { + bool get_schema = true; + auto status = Status::OK(); + for (auto& stream : _streams) { + Status st; + if (get_schema) { + st = stream->open(client_cache, node_info, txn_id, schema, tablets_for_schema, + total_streams, idle_timeout_ms, enable_profile); + } else { + st = stream->open(client_cache, node_info, txn_id, schema, {}, total_streams, + idle_timeout_ms, enable_profile); + } + if (st.ok()) { + get_schema = false; + } else { + LOG(WARNING) << "open stream failed: " << st << "; stream: " << *stream; + status = st; + // no break here to try get schema from the rest streams + } + } + // only mark open when all streams open success + _open_success.store(status.ok()); + // cancel all streams if open failed + if (!status.ok()) { + cancel(status); + } + return status; +} + +Status LoadStreamStubs::close_load(const std::vector<PTabletID>& tablets_to_commit) { + if (!_open_success.load()) { + return Status::InternalError("streams not open"); + } + bool first = true; + auto status = Status::OK(); + for (auto& stream : _streams) { + Status st; + if (first) { + st = stream->close_load(tablets_to_commit); + first = false; + } else { + st = stream->close_load({}); + } + if (!st.ok()) { + LOG(WARNING) << "close_load failed: " << st << "; stream: " << *stream; + } + } + return status; +} + +Status LoadStreamStubs::close_wait(RuntimeState* state, int64_t timeout_ms) { + MonotonicStopWatch watch; + watch.start(); + for (auto& stream : _streams) { + RETURN_IF_ERROR(stream->close_wait(state, timeout_ms - watch.elapsed_time() / 1000 / 1000)); + } + return Status::OK(); +} + } // namespace doris diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 223babb42e3..241d7e612ce 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -266,4 +266,73 @@ protected: bool _is_incremental = false; }; +// a collection of LoadStreams connect to the same node +class LoadStreamStubs { +public: + LoadStreamStubs(size_t num_streams, UniqueId load_id, int64_t src_id, + std::shared_ptr<IndexToTabletSchema> schema_map, + std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false) + : _is_incremental(incremental) { + _streams.reserve(num_streams); + for (size_t i = 0; i < num_streams; i++) { + _streams.emplace_back( + new LoadStreamStub(load_id, src_id, schema_map, mow_map, incremental)); + } + } + + Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info, + int64_t txn_id, const OlapTableSchemaParam& schema, + const std::vector<PTabletID>& tablets_for_schema, int total_streams, + int64_t idle_timeout_ms, bool enable_profile); + + bool is_incremental() const { return _is_incremental; } + + size_t size() const { return _streams.size(); } + + // for UT only + void mark_open() { _open_success.store(true); } + + std::shared_ptr<LoadStreamStub> select_one_stream() { + if (!_open_success.load()) { + return nullptr; + } + size_t i = _select_index.fetch_add(1); + return _streams[i % _streams.size()]; + } + + void cancel(Status reason) { + for (auto& stream : _streams) { + stream->cancel(reason); + } + } + + Status close_load(const std::vector<PTabletID>& tablets_to_commit); + + Status close_wait(RuntimeState* state, int64_t timeout_ms = 0); + + std::unordered_set<int64_t> success_tablets() { + std::unordered_set<int64_t> s; + for (auto& stream : _streams) { + auto v = stream->success_tablets(); + std::copy(v.begin(), v.end(), std::inserter(s, s.end())); + } + return s; + } + + std::unordered_map<int64_t, Status> failed_tablets() { + std::unordered_map<int64_t, Status> m; + for (auto& stream : _streams) { + auto v = stream->failed_tablets(); + m.insert(v.begin(), v.end()); + } + return m; + } + +private: + std::vector<std::shared_ptr<LoadStreamStub>> _streams; + std::atomic<bool> _open_success = false; + std::atomic<size_t> _select_index = 0; + const bool _is_incremental; +}; + } // namespace doris diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 16c11b1cf42..c693e20c3a8 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -269,6 +269,8 @@ Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) { Status VTabletWriterV2::_open_streams() { bool fault_injection_skip_be = true; + bool any_backend = false; + bool any_success = false; for (auto& [dst_id, _] : _tablets_for_node) { auto streams = _load_stream_map->get_or_create(dst_id); DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_one_backend", { @@ -277,12 +279,17 @@ Status VTabletWriterV2::_open_streams() { continue; } }); - RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams)); + auto st = _open_streams_to_backend(dst_id, *streams); + any_backend = true; + any_success = any_success || st.ok(); + } + if (any_backend && !any_success) { + return Status::InternalError("failed to open streams to any BE"); } return Status::OK(); } -Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& streams) { +Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreamStubs& streams) { const auto* node_info = _nodes_info->find_node(dst_id); DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.node_info_null", { node_info = nullptr; }); @@ -293,26 +300,14 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& stream std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id]; DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.no_schema_when_open_streams", { tablets_for_schema.clear(); }); - int fault_injection_skip_cnt = 0; - for (auto& stream : streams) { - DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.one_stream_open_failure", { - if (fault_injection_skip_cnt < 1) { - fault_injection_skip_cnt++; - continue; - } - }); - auto st = stream->open(_state->exec_env()->brpc_streaming_client_cache(), *node_info, - _txn_id, *_schema, tablets_for_schema, _total_streams, - idle_timeout_ms, _state->enable_profile()); - if (st.ok()) { - // get tablet schema from each backend only in the 1st stream - tablets_for_schema.clear(); - } else { - LOG(WARNING) << "failed to open stream to backend " << dst_id - << ", load_id=" << print_id(_load_id); - } + auto st = streams.open(_state->exec_env()->brpc_streaming_client_cache(), *node_info, _txn_id, + *_schema, tablets_for_schema, _total_streams, idle_timeout_ms, + _state->enable_profile()); + if (!st.ok()) { + LOG(WARNING) << "failed to open stream to backend " << dst_id + << ", load_id=" << print_id(_load_id) << ", err=" << st; } - return Status::OK(); + return st; } Status VTabletWriterV2::_build_tablet_node_mapping() { @@ -375,7 +370,7 @@ void VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& r } Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, - Streams& streams) { + std::vector<std::shared_ptr<LoadStreamStub>>& streams) { const auto* location = _location->find_tablet(tablet_id); DBUG_EXECUTE_IF("VTabletWriterV2._select_streams.location_null", { location = nullptr; }); if (location == nullptr) { @@ -388,13 +383,16 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, tablet.set_tablet_id(tablet_id); VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, index_id, tablet_id); _tablets_for_node[node_id].emplace(tablet_id, tablet); - auto stream = _load_stream_map->at(node_id)->at(_stream_index); - for (int i = 1; i < _stream_per_node && !stream->is_open(); i++) { - stream = _load_stream_map->at(node_id)->at((_stream_index + i) % _stream_per_node); + auto stream = _load_stream_map->at(node_id)->select_one_stream(); + if (stream == nullptr) { + continue; } streams.emplace_back(std::move(stream)); } - _stream_index = (_stream_index + 1) % _stream_per_node; + if (streams.size() <= location->node_ids.size() / 2) { + return Status::InternalError("not enough streams {}/{}", streams.size(), + location->node_ids.size()); + } Status st; for (auto& stream : streams) { st = stream->wait_for_schema(partition_id, index_id, tablet_id); @@ -458,9 +456,10 @@ Status VTabletWriterV2::write(Block& input_block) { Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id, const Rows& rows) { + auto st = Status::OK(); auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() { - Streams streams; - auto st = _select_streams(tablet_id, rows.partition_id, rows.index_id, streams); + std::vector<std::shared_ptr<LoadStreamStub>> streams; + st = _select_streams(tablet_id, rows.partition_id, rows.index_id, streams); if (!st.ok()) [[unlikely]] { LOG(WARNING) << "select stream failed, " << st << ", load_id=" << print_id(_load_id); return std::unique_ptr<DeltaWriterV2>(nullptr); @@ -487,7 +486,8 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block } DBUG_EXECUTE_IF("VTabletWriterV2._write_memtable.index_not_found", { index_not_found = true; }); - if (index_not_found) { + if (index_not_found) [[unlikely]] { + st = Status::InternalError("no index {} in schema", rows.index_id); LOG(WARNING) << "index " << rows.index_id << " not found in schema, load_id=" << print_id(_load_id); return std::unique_ptr<DeltaWriterV2>(nullptr); @@ -496,15 +496,15 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block }); if (delta_writer == nullptr) { LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id - << ", load_id=" << print_id(_load_id); - return Status::InternalError("failed to open DeltaWriter for tablet {}", tablet_id); + << ", load_id=" << print_id(_load_id) << ", err: " << st; + return Status::InternalError("failed to open DeltaWriter {}: {}", tablet_id, st.msg()); } { SCOPED_TIMER(_wait_mem_limit_timer); ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(); } SCOPED_TIMER(_write_memtable_timer); - auto st = delta_writer->write(block.get(), rows.row_idxes); + st = delta_writer->write(block.get(), rows.row_idxes); return st; } @@ -517,11 +517,8 @@ void VTabletWriterV2::_cancel(Status status) { _delta_writer_for_tablet.reset(); } if (_load_stream_map) { - _load_stream_map->for_each([status](int64_t dst_id, const Streams& streams) { - for (auto& stream : streams) { - stream->cancel(status); - } - }); + _load_stream_map->for_each( + [status](int64_t dst_id, LoadStreamStubs& streams) { streams.cancel(status); }); _load_stream_map->release(); } } @@ -624,17 +621,14 @@ Status VTabletWriterV2::close(Status exec_status) { DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", { auto streams = _load_stream_map->at(_tablets_for_node.begin()->first); int64_t tablet_id = -1; - for (auto& stream : *streams) { - const auto& tablets = stream->success_tablets(); - if (tablets.size() > 0) { - tablet_id = tablets[0]; - break; - } + for (auto tablet : streams->success_tablets()) { + tablet_id = tablet; + break; } if (tablet_id != -1) { LOG(INFO) << "fault injection: adding failed tablet_id: " << tablet_id; - streams->front()->add_failed_tablet(tablet_id, - Status::InternalError("fault injection")); + streams->select_one_stream()->add_failed_tablet( + tablet_id, Status::InternalError("fault injection")); } else { LOG(INFO) << "fault injection: failed to inject failed tablet_id"; } @@ -672,26 +666,24 @@ Status VTabletWriterV2::close(Status exec_status) { void VTabletWriterV2::_close_wait(bool incremental) { SCOPED_TIMER(_close_load_timer); auto st = _load_stream_map->for_each_st( - [this, incremental](int64_t dst_id, const Streams& streams) -> Status { - Status status = Status::OK(); - for (auto& stream : streams) { - if (stream->is_incremental() != incremental) { - continue; - } - int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 - - _timeout_watch.elapsed_time() / 1000 / 1000; - DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; }); - if (remain_ms <= 0) { - LOG(WARNING) << "load timed out before close waiting, load_id=" - << print_id(_load_id); - return Status::TimedOut("load timed out before close waiting"); - } - auto st = stream->close_wait(_state, remain_ms); - if (!st.ok() && status.ok()) { - status = st; - } + [this, incremental](int64_t dst_id, LoadStreamStubs& streams) -> Status { + if (streams.is_incremental() != incremental) { + return Status::OK(); + } + int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 - + _timeout_watch.elapsed_time() / 1000 / 1000; + DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; }); + if (remain_ms <= 0) { + LOG(WARNING) << "load timed out before close waiting, load_id=" + << print_id(_load_id); + return Status::TimedOut("load timed out before close waiting"); } - return status; + auto st = streams.close_wait(_state, remain_ms); + if (!st.ok()) { + LOG(WARNING) << "close_wait timeout on streams to dst_id=" << dst_id + << ", load_id=" << print_id(_load_id) << ": " << st; + } + return st; }); if (!st.ok()) { LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << print_id(_load_id); @@ -730,31 +722,23 @@ Status VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>& tabl int num_replicas) { std::unordered_map<int64_t, int> failed_tablets; std::unordered_map<int64_t, Status> failed_reason; - load_stream_map->for_each([&](int64_t dst_id, const Streams& streams) { - std::unordered_set<int64_t> known_tablets; - for (const auto& stream : streams) { - LOG(INFO) << "stream " << stream->stream_id() - << " success tablets: " << stream->success_tablets().size() - << ", failed tablets: " << stream->failed_tablets().size(); - for (auto [tablet_id, reason] : stream->failed_tablets()) { - if (known_tablets.contains(tablet_id)) { - continue; - } - known_tablets.insert(tablet_id); - failed_tablets[tablet_id]++; - failed_reason[tablet_id] = reason; - } - for (auto tablet_id : stream->success_tablets()) { - if (known_tablets.contains(tablet_id)) { - continue; - } - known_tablets.insert(tablet_id); - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet_id; - commit_info.backendId = dst_id; - tablet_commit_infos.emplace_back(std::move(commit_info)); - } + load_stream_map->for_each([&](int64_t dst_id, LoadStreamStubs& streams) { + size_t num_success_tablets = 0; + size_t num_failed_tablets = 0; + for (auto [tablet_id, reason] : streams.failed_tablets()) { + failed_tablets[tablet_id]++; + failed_reason[tablet_id] = reason; + num_failed_tablets++; + } + for (auto tablet_id : streams.success_tablets()) { + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet_id; + commit_info.backendId = dst_id; + tablet_commit_infos.emplace_back(std::move(commit_info)); + num_success_tablets++; } + LOG(INFO) << "streams to dst_id: " << dst_id << ", success tablets: " << num_success_tablets + << ", failed tablets: " << num_failed_tablets; }); for (auto [tablet_id, replicas] : failed_tablets) { diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index f65e0c8f3cd..b50044ede93 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -69,6 +69,7 @@ namespace doris { class DeltaWriterV2; class LoadStreamStub; +class LoadStreamStubs; class LoadStreamMap; class ObjectPool; class RowDescriptor; @@ -85,8 +86,6 @@ class OlapTabletFinder; class VTabletWriterV2; class DeltaWriterV2Map; -using Streams = std::vector<std::shared_ptr<LoadStreamStub>>; - struct Rows { int64_t partition_id; int64_t index_id; @@ -128,7 +127,7 @@ private: Status _open_streams(); - Status _open_streams_to_backend(int64_t dst_id, Streams& streams); + Status _open_streams_to_backend(int64_t dst_id, LoadStreamStubs& streams); Status _incremental_open_streams(const std::vector<TOlapTablePartition>& partitions); @@ -143,7 +142,7 @@ private: const Rows& rows); Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, - Streams& streams); + std::vector<std::shared_ptr<LoadStreamStub>>& streams); void _calc_tablets_to_commit(); @@ -226,7 +225,6 @@ private: std::shared_ptr<LoadStreamMap> _load_stream_map; - size_t _stream_index = 0; std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet; VRowDistribution _row_distribution; diff --git a/be/test/vec/sink/vtablet_writer_v2_test.cpp b/be/test/vec/sink/vtablet_writer_v2_test.cpp index 6289896c75f..67dc9d089ab 100644 --- a/be/test/vec/sink/vtablet_writer_v2_test.cpp +++ b/be/test/vec/sink/vtablet_writer_v2_test.cpp @@ -37,12 +37,13 @@ const int64_t src_id = 1000; static void add_stream(std::shared_ptr<LoadStreamMap> load_stream_map, int64_t node_id, std::vector<int64_t> success_tablets, std::unordered_map<int64_t, Status> failed_tablets) { - auto stub = load_stream_map->get_or_create(node_id); + auto streams = load_stream_map->get_or_create(node_id); + streams->mark_open(); for (const auto& tablet_id : success_tablets) { - stub->at(0)->add_success_tablet(tablet_id); + streams->select_one_stream()->add_success_tablet(tablet_id); } for (const auto& [tablet_id, reason] : failed_tablets) { - stub->at(0)->add_failed_tablet(tablet_id, reason); + streams->select_one_stream()->add_failed_tablet(tablet_id, reason); } } diff --git a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy index 33f7e28dbc9..2f6afd5ca69 100644 --- a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy @@ -97,8 +97,6 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") { load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", "failed to send segment data to any replicas") // test segment num check when LoadStreamStub missed tail segments load_with_injection("LoadStreamStub.only_send_segment_0", "segment num mismatch") - // test 1st stream to each backend failure - load_with_injection("VTabletWriterV2._open_streams_to_backend.one_stream_open_failure", "success") // test one backend open failure load_with_injection("VTabletWriterV2._open_streams.skip_one_backend", "success") sql """ set enable_memtable_on_sink_node=false """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org