This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 25d7d0b2554 [fix](move-memtable) abstract multi-streams to one logical 
stream (#42039) (#42250)
25d7d0b2554 is described below

commit 25d7d0b2554aea3b440c3bb8c58d0837e2740ead
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Tue Oct 22 20:26:42 2024 +0800

    [fix](move-memtable) abstract multi-streams to one logical stream (#42039) 
(#42250)
    
    backport #42039
---
 be/src/vec/sink/load_stream_map_pool.cpp           |  50 +++----
 be/src/vec/sink/load_stream_map_pool.h             |  12 +-
 be/src/vec/sink/load_stream_stub.cpp               |  66 ++++++++-
 be/src/vec/sink/load_stream_stub.h                 |  69 +++++++++
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       | 160 ++++++++++-----------
 be/src/vec/sink/writer/vtablet_writer_v2.h         |   8 +-
 be/test/vec/sink/vtablet_writer_v2_test.cpp        |   7 +-
 .../test_multi_replica_fault_injection.groovy      |   2 -
 8 files changed, 234 insertions(+), 140 deletions(-)

diff --git a/be/src/vec/sink/load_stream_map_pool.cpp 
b/be/src/vec/sink/load_stream_map_pool.cpp
index e8407f4730d..d6dddcc96dc 100644
--- a/be/src/vec/sink/load_stream_map_pool.cpp
+++ b/be/src/vec/sink/load_stream_map_pool.cpp
@@ -35,22 +35,20 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t 
src_id, int num_streams,
     DCHECK(num_use > 0) << "use num should be greater than 0";
 }
 
-std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id, bool 
incremental) {
+std::shared_ptr<LoadStreamStubs> LoadStreamMap::get_or_create(int64_t dst_id, 
bool incremental) {
     std::lock_guard<std::mutex> lock(_mutex);
-    std::shared_ptr<Streams> streams = _streams_for_node[dst_id];
+    std::shared_ptr<LoadStreamStubs> streams = _streams_for_node[dst_id];
     if (streams != nullptr) {
         return streams;
     }
-    streams = std::make_shared<Streams>();
-    for (int i = 0; i < _num_streams; i++) {
-        streams->emplace_back(new LoadStreamStub(_load_id, _src_id, 
_tablet_schema_for_index,
-                                                 _enable_unique_mow_for_index, 
incremental));
-    }
+    streams = std::make_shared<LoadStreamStubs>(_num_streams, _load_id, 
_src_id,
+                                                _tablet_schema_for_index,
+                                                _enable_unique_mow_for_index, 
incremental);
     _streams_for_node[dst_id] = streams;
     return streams;
 }
 
-std::shared_ptr<Streams> LoadStreamMap::at(int64_t dst_id) {
+std::shared_ptr<LoadStreamStubs> LoadStreamMap::at(int64_t dst_id) {
     std::lock_guard<std::mutex> lock(_mutex);
     return _streams_for_node.at(dst_id);
 }
@@ -60,7 +58,7 @@ bool LoadStreamMap::contains(int64_t dst_id) {
     return _streams_for_node.contains(dst_id);
 }
 
-void LoadStreamMap::for_each(std::function<void(int64_t, const Streams&)> fn) {
+void LoadStreamMap::for_each(std::function<void(int64_t, LoadStreamStubs&)> 
fn) {
     decltype(_streams_for_node) snapshot;
     {
         std::lock_guard<std::mutex> lock(_mutex);
@@ -71,7 +69,7 @@ void LoadStreamMap::for_each(std::function<void(int64_t, 
const Streams&)> fn) {
     }
 }
 
-Status LoadStreamMap::for_each_st(std::function<Status(int64_t, const 
Streams&)> fn) {
+Status LoadStreamMap::for_each_st(std::function<Status(int64_t, 
LoadStreamStubs&)> fn) {
     decltype(_streams_for_node) snapshot;
     {
         std::lock_guard<std::mutex> lock(_mutex);
@@ -108,7 +106,10 @@ bool LoadStreamMap::release() {
 }
 
 void LoadStreamMap::close_load(bool incremental) {
-    auto st = for_each_st([this, incremental](int64_t dst_id, const Streams& 
streams) -> Status {
+    for (auto& [dst_id, streams] : _streams_for_node) {
+        if (streams->is_incremental()) {
+            continue;
+        }
         std::vector<PTabletID> tablets_to_commit;
         const auto& tablets = _tablets_to_commit[dst_id];
         tablets_to_commit.reserve(tablets.size());
@@ -116,30 +117,11 @@ void LoadStreamMap::close_load(bool incremental) {
             tablets_to_commit.push_back(tablet);
             
tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]);
         }
-        Status status = Status::OK();
-        bool first = true;
-        for (auto& stream : streams) {
-            if (stream->is_incremental() != incremental) {
-                continue;
-            }
-            if (first) {
-                auto st = stream->close_load(tablets_to_commit);
-                if (!st.ok() && status.ok()) {
-                    status = st;
-                }
-                first = false;
-            } else {
-                auto st = stream->close_load({});
-                if (!st.ok() && status.ok()) {
-                    status = st;
-                }
-            }
+        auto st = streams->close_load(tablets_to_commit);
+        if (!st.ok()) {
+            LOG(WARNING) << "close_load for " << (incremental ? "incremental" 
: "non-incremental")
+                         << " streams failed: " << st << ", load_id=" << 
_load_id;
         }
-        return status;
-    });
-    if (!st.ok()) {
-        LOG(WARNING) << "close_load for " << (incremental ? "incremental" : 
"non-incremental")
-                     << " streams failed: " << st << ", load_id=" << _load_id;
     }
 }
 
diff --git a/be/src/vec/sink/load_stream_map_pool.h 
b/be/src/vec/sink/load_stream_map_pool.h
index e5b66aaf9c9..602f1711a94 100644
--- a/be/src/vec/sink/load_stream_map_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -71,22 +71,20 @@ class LoadStreamStub;
 
 class LoadStreamMapPool;
 
-using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
-
 class LoadStreamMap {
 public:
     LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int 
num_use,
                   LoadStreamMapPool* pool);
 
-    std::shared_ptr<Streams> get_or_create(int64_t dst_id, bool incremental = 
false);
+    std::shared_ptr<LoadStreamStubs> get_or_create(int64_t dst_id, bool 
incremental = false);
 
-    std::shared_ptr<Streams> at(int64_t dst_id);
+    std::shared_ptr<LoadStreamStubs> at(int64_t dst_id);
 
     bool contains(int64_t dst_id);
 
-    void for_each(std::function<void(int64_t, const Streams&)> fn);
+    void for_each(std::function<void(int64_t, LoadStreamStubs&)> fn);
 
-    Status for_each_st(std::function<Status(int64_t, const Streams&)> fn);
+    Status for_each_st(std::function<Status(int64_t, LoadStreamStubs&)> fn);
 
     void save_tablets_to_commit(int64_t dst_id, const std::vector<PTabletID>& 
tablets_to_commit);
 
@@ -107,7 +105,7 @@ private:
     const int _num_streams;
     std::atomic<int> _use_cnt;
     std::mutex _mutex;
-    std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
+    std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>> 
_streams_for_node;
     LoadStreamMapPool* _pool = nullptr;
     std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
     std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 1d13ca4b903..672a0be44f7 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -207,7 +207,8 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
     LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << 
node_info.brpc_port
               << ", " << *this;
     _is_open.store(true);
-    return Status::OK();
+    _status = Status::OK();
+    return _status;
 }
 
 // APPEND_DATA
@@ -504,4 +505,67 @@ inline std::ostream& operator<<(std::ostream& ostr, const 
LoadStreamStub& stub)
     return ostr;
 }
 
+Status LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* 
client_cache,
+                             const NodeInfo& node_info, int64_t txn_id,
+                             const OlapTableSchemaParam& schema,
+                             const std::vector<PTabletID>& tablets_for_schema, 
int total_streams,
+                             int64_t idle_timeout_ms, bool enable_profile) {
+    bool get_schema = true;
+    auto status = Status::OK();
+    for (auto& stream : _streams) {
+        Status st;
+        if (get_schema) {
+            st = stream->open(client_cache, node_info, txn_id, schema, 
tablets_for_schema,
+                              total_streams, idle_timeout_ms, enable_profile);
+        } else {
+            st = stream->open(client_cache, node_info, txn_id, schema, {}, 
total_streams,
+                              idle_timeout_ms, enable_profile);
+        }
+        if (st.ok()) {
+            get_schema = false;
+        } else {
+            LOG(WARNING) << "open stream failed: " << st << "; stream: " << 
*stream;
+            status = st;
+            // no break here to try get schema from the rest streams
+        }
+    }
+    // only mark open when all streams open success
+    _open_success.store(status.ok());
+    // cancel all streams if open failed
+    if (!status.ok()) {
+        cancel(status);
+    }
+    return status;
+}
+
+Status LoadStreamStubs::close_load(const std::vector<PTabletID>& 
tablets_to_commit) {
+    if (!_open_success.load()) {
+        return Status::InternalError("streams not open");
+    }
+    bool first = true;
+    auto status = Status::OK();
+    for (auto& stream : _streams) {
+        Status st;
+        if (first) {
+            st = stream->close_load(tablets_to_commit);
+            first = false;
+        } else {
+            st = stream->close_load({});
+        }
+        if (!st.ok()) {
+            LOG(WARNING) << "close_load failed: " << st << "; stream: " << 
*stream;
+        }
+    }
+    return status;
+}
+
+Status LoadStreamStubs::close_wait(RuntimeState* state, int64_t timeout_ms) {
+    MonotonicStopWatch watch;
+    watch.start();
+    for (auto& stream : _streams) {
+        RETURN_IF_ERROR(stream->close_wait(state, timeout_ms - 
watch.elapsed_time() / 1000 / 1000));
+    }
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 223babb42e3..241d7e612ce 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -266,4 +266,73 @@ protected:
     bool _is_incremental = false;
 };
 
+// a collection of LoadStreams connect to the same node
+class LoadStreamStubs {
+public:
+    LoadStreamStubs(size_t num_streams, UniqueId load_id, int64_t src_id,
+                    std::shared_ptr<IndexToTabletSchema> schema_map,
+                    std::shared_ptr<IndexToEnableMoW> mow_map, bool 
incremental = false)
+            : _is_incremental(incremental) {
+        _streams.reserve(num_streams);
+        for (size_t i = 0; i < num_streams; i++) {
+            _streams.emplace_back(
+                    new LoadStreamStub(load_id, src_id, schema_map, mow_map, 
incremental));
+        }
+    }
+
+    Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const 
NodeInfo& node_info,
+                int64_t txn_id, const OlapTableSchemaParam& schema,
+                const std::vector<PTabletID>& tablets_for_schema, int 
total_streams,
+                int64_t idle_timeout_ms, bool enable_profile);
+
+    bool is_incremental() const { return _is_incremental; }
+
+    size_t size() const { return _streams.size(); }
+
+    // for UT only
+    void mark_open() { _open_success.store(true); }
+
+    std::shared_ptr<LoadStreamStub> select_one_stream() {
+        if (!_open_success.load()) {
+            return nullptr;
+        }
+        size_t i = _select_index.fetch_add(1);
+        return _streams[i % _streams.size()];
+    }
+
+    void cancel(Status reason) {
+        for (auto& stream : _streams) {
+            stream->cancel(reason);
+        }
+    }
+
+    Status close_load(const std::vector<PTabletID>& tablets_to_commit);
+
+    Status close_wait(RuntimeState* state, int64_t timeout_ms = 0);
+
+    std::unordered_set<int64_t> success_tablets() {
+        std::unordered_set<int64_t> s;
+        for (auto& stream : _streams) {
+            auto v = stream->success_tablets();
+            std::copy(v.begin(), v.end(), std::inserter(s, s.end()));
+        }
+        return s;
+    }
+
+    std::unordered_map<int64_t, Status> failed_tablets() {
+        std::unordered_map<int64_t, Status> m;
+        for (auto& stream : _streams) {
+            auto v = stream->failed_tablets();
+            m.insert(v.begin(), v.end());
+        }
+        return m;
+    }
+
+private:
+    std::vector<std::shared_ptr<LoadStreamStub>> _streams;
+    std::atomic<bool> _open_success = false;
+    std::atomic<size_t> _select_index = 0;
+    const bool _is_incremental;
+};
+
 } // namespace doris
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 16c11b1cf42..c693e20c3a8 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -269,6 +269,8 @@ Status VTabletWriterV2::open(RuntimeState* state, 
RuntimeProfile* profile) {
 
 Status VTabletWriterV2::_open_streams() {
     bool fault_injection_skip_be = true;
+    bool any_backend = false;
+    bool any_success = false;
     for (auto& [dst_id, _] : _tablets_for_node) {
         auto streams = _load_stream_map->get_or_create(dst_id);
         DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_one_backend", {
@@ -277,12 +279,17 @@ Status VTabletWriterV2::_open_streams() {
                 continue;
             }
         });
-        RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
+        auto st = _open_streams_to_backend(dst_id, *streams);
+        any_backend = true;
+        any_success = any_success || st.ok();
+    }
+    if (any_backend && !any_success) {
+        return Status::InternalError("failed to open streams to any BE");
     }
     return Status::OK();
 }
 
-Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& 
streams) {
+Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, 
LoadStreamStubs& streams) {
     const auto* node_info = _nodes_info->find_node(dst_id);
     DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.node_info_null",
                     { node_info = nullptr; });
@@ -293,26 +300,14 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t 
dst_id, Streams& stream
     std::vector<PTabletID>& tablets_for_schema = 
_indexes_from_node[node_info->id];
     
DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.no_schema_when_open_streams",
                     { tablets_for_schema.clear(); });
-    int fault_injection_skip_cnt = 0;
-    for (auto& stream : streams) {
-        
DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.one_stream_open_failure",
 {
-            if (fault_injection_skip_cnt < 1) {
-                fault_injection_skip_cnt++;
-                continue;
-            }
-        });
-        auto st = 
stream->open(_state->exec_env()->brpc_streaming_client_cache(), *node_info,
-                               _txn_id, *_schema, tablets_for_schema, 
_total_streams,
-                               idle_timeout_ms, _state->enable_profile());
-        if (st.ok()) {
-            // get tablet schema from each backend only in the 1st stream
-            tablets_for_schema.clear();
-        } else {
-            LOG(WARNING) << "failed to open stream to backend " << dst_id
-                         << ", load_id=" << print_id(_load_id);
-        }
+    auto st = streams.open(_state->exec_env()->brpc_streaming_client_cache(), 
*node_info, _txn_id,
+                           *_schema, tablets_for_schema, _total_streams, 
idle_timeout_ms,
+                           _state->enable_profile());
+    if (!st.ok()) {
+        LOG(WARNING) << "failed to open stream to backend " << dst_id
+                     << ", load_id=" << print_id(_load_id) << ", err=" << st;
     }
-    return Status::OK();
+    return st;
 }
 
 Status VTabletWriterV2::_build_tablet_node_mapping() {
@@ -375,7 +370,7 @@ void 
VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& r
 }
 
 Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t 
partition_id, int64_t index_id,
-                                        Streams& streams) {
+                                        
std::vector<std::shared_ptr<LoadStreamStub>>& streams) {
     const auto* location = _location->find_tablet(tablet_id);
     DBUG_EXECUTE_IF("VTabletWriterV2._select_streams.location_null", { 
location = nullptr; });
     if (location == nullptr) {
@@ -388,13 +383,16 @@ Status VTabletWriterV2::_select_streams(int64_t 
tablet_id, int64_t partition_id,
         tablet.set_tablet_id(tablet_id);
         VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, 
index_id, tablet_id);
         _tablets_for_node[node_id].emplace(tablet_id, tablet);
-        auto stream = _load_stream_map->at(node_id)->at(_stream_index);
-        for (int i = 1; i < _stream_per_node && !stream->is_open(); i++) {
-            stream = _load_stream_map->at(node_id)->at((_stream_index + i) % 
_stream_per_node);
+        auto stream = _load_stream_map->at(node_id)->select_one_stream();
+        if (stream == nullptr) {
+            continue;
         }
         streams.emplace_back(std::move(stream));
     }
-    _stream_index = (_stream_index + 1) % _stream_per_node;
+    if (streams.size() <= location->node_ids.size() / 2) {
+        return Status::InternalError("not enough streams {}/{}", 
streams.size(),
+                                     location->node_ids.size());
+    }
     Status st;
     for (auto& stream : streams) {
         st = stream->wait_for_schema(partition_id, index_id, tablet_id);
@@ -458,9 +456,10 @@ Status VTabletWriterV2::write(Block& input_block) {
 
 Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> 
block, int64_t tablet_id,
                                         const Rows& rows) {
+    auto st = Status::OK();
     auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, 
[&]() {
-        Streams streams;
-        auto st = _select_streams(tablet_id, rows.partition_id, rows.index_id, 
streams);
+        std::vector<std::shared_ptr<LoadStreamStub>> streams;
+        st = _select_streams(tablet_id, rows.partition_id, rows.index_id, 
streams);
         if (!st.ok()) [[unlikely]] {
             LOG(WARNING) << "select stream failed, " << st << ", load_id=" << 
print_id(_load_id);
             return std::unique_ptr<DeltaWriterV2>(nullptr);
@@ -487,7 +486,8 @@ Status 
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
         }
         DBUG_EXECUTE_IF("VTabletWriterV2._write_memtable.index_not_found",
                         { index_not_found = true; });
-        if (index_not_found) {
+        if (index_not_found) [[unlikely]] {
+            st = Status::InternalError("no index {} in schema", rows.index_id);
             LOG(WARNING) << "index " << rows.index_id
                          << " not found in schema, load_id=" << 
print_id(_load_id);
             return std::unique_ptr<DeltaWriterV2>(nullptr);
@@ -496,15 +496,15 @@ Status 
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
     });
     if (delta_writer == nullptr) {
         LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id
-                     << ", load_id=" << print_id(_load_id);
-        return Status::InternalError("failed to open DeltaWriter for tablet 
{}", tablet_id);
+                     << ", load_id=" << print_id(_load_id) << ", err: " << st;
+        return Status::InternalError("failed to open DeltaWriter {}: {}", 
tablet_id, st.msg());
     }
     {
         SCOPED_TIMER(_wait_mem_limit_timer);
         
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
     }
     SCOPED_TIMER(_write_memtable_timer);
-    auto st = delta_writer->write(block.get(), rows.row_idxes);
+    st = delta_writer->write(block.get(), rows.row_idxes);
     return st;
 }
 
@@ -517,11 +517,8 @@ void VTabletWriterV2::_cancel(Status status) {
         _delta_writer_for_tablet.reset();
     }
     if (_load_stream_map) {
-        _load_stream_map->for_each([status](int64_t dst_id, const Streams& 
streams) {
-            for (auto& stream : streams) {
-                stream->cancel(status);
-            }
-        });
+        _load_stream_map->for_each(
+                [status](int64_t dst_id, LoadStreamStubs& streams) { 
streams.cancel(status); });
         _load_stream_map->release();
     }
 }
@@ -624,17 +621,14 @@ Status VTabletWriterV2::close(Status exec_status) {
             DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", {
                 auto streams = 
_load_stream_map->at(_tablets_for_node.begin()->first);
                 int64_t tablet_id = -1;
-                for (auto& stream : *streams) {
-                    const auto& tablets = stream->success_tablets();
-                    if (tablets.size() > 0) {
-                        tablet_id = tablets[0];
-                        break;
-                    }
+                for (auto tablet : streams->success_tablets()) {
+                    tablet_id = tablet;
+                    break;
                 }
                 if (tablet_id != -1) {
                     LOG(INFO) << "fault injection: adding failed tablet_id: " 
<< tablet_id;
-                    streams->front()->add_failed_tablet(tablet_id,
-                                                        
Status::InternalError("fault injection"));
+                    streams->select_one_stream()->add_failed_tablet(
+                            tablet_id, Status::InternalError("fault 
injection"));
                 } else {
                     LOG(INFO) << "fault injection: failed to inject failed 
tablet_id";
                 }
@@ -672,26 +666,24 @@ Status VTabletWriterV2::close(Status exec_status) {
 void VTabletWriterV2::_close_wait(bool incremental) {
     SCOPED_TIMER(_close_load_timer);
     auto st = _load_stream_map->for_each_st(
-            [this, incremental](int64_t dst_id, const Streams& streams) -> 
Status {
-                Status status = Status::OK();
-                for (auto& stream : streams) {
-                    if (stream->is_incremental() != incremental) {
-                        continue;
-                    }
-                    int64_t remain_ms = 
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
-                                        _timeout_watch.elapsed_time() / 1000 / 
1000;
-                    
DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; });
-                    if (remain_ms <= 0) {
-                        LOG(WARNING) << "load timed out before close waiting, 
load_id="
-                                     << print_id(_load_id);
-                        return Status::TimedOut("load timed out before close 
waiting");
-                    }
-                    auto st = stream->close_wait(_state, remain_ms);
-                    if (!st.ok() && status.ok()) {
-                        status = st;
-                    }
+            [this, incremental](int64_t dst_id, LoadStreamStubs& streams) -> 
Status {
+                if (streams.is_incremental() != incremental) {
+                    return Status::OK();
+                }
+                int64_t remain_ms = 
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
+                                    _timeout_watch.elapsed_time() / 1000 / 
1000;
+                DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { 
remain_ms = 0; });
+                if (remain_ms <= 0) {
+                    LOG(WARNING) << "load timed out before close waiting, 
load_id="
+                                 << print_id(_load_id);
+                    return Status::TimedOut("load timed out before close 
waiting");
                 }
-                return status;
+                auto st = streams.close_wait(_state, remain_ms);
+                if (!st.ok()) {
+                    LOG(WARNING) << "close_wait timeout on streams to dst_id=" 
<< dst_id
+                                 << ", load_id=" << print_id(_load_id) << ": " 
<< st;
+                }
+                return st;
             });
     if (!st.ok()) {
         LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << 
print_id(_load_id);
@@ -730,31 +722,23 @@ Status 
VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>& tabl
                                             int num_replicas) {
     std::unordered_map<int64_t, int> failed_tablets;
     std::unordered_map<int64_t, Status> failed_reason;
-    load_stream_map->for_each([&](int64_t dst_id, const Streams& streams) {
-        std::unordered_set<int64_t> known_tablets;
-        for (const auto& stream : streams) {
-            LOG(INFO) << "stream " << stream->stream_id()
-                      << " success tablets: " << 
stream->success_tablets().size()
-                      << ", failed tablets: " << 
stream->failed_tablets().size();
-            for (auto [tablet_id, reason] : stream->failed_tablets()) {
-                if (known_tablets.contains(tablet_id)) {
-                    continue;
-                }
-                known_tablets.insert(tablet_id);
-                failed_tablets[tablet_id]++;
-                failed_reason[tablet_id] = reason;
-            }
-            for (auto tablet_id : stream->success_tablets()) {
-                if (known_tablets.contains(tablet_id)) {
-                    continue;
-                }
-                known_tablets.insert(tablet_id);
-                TTabletCommitInfo commit_info;
-                commit_info.tabletId = tablet_id;
-                commit_info.backendId = dst_id;
-                tablet_commit_infos.emplace_back(std::move(commit_info));
-            }
+    load_stream_map->for_each([&](int64_t dst_id, LoadStreamStubs& streams) {
+        size_t num_success_tablets = 0;
+        size_t num_failed_tablets = 0;
+        for (auto [tablet_id, reason] : streams.failed_tablets()) {
+            failed_tablets[tablet_id]++;
+            failed_reason[tablet_id] = reason;
+            num_failed_tablets++;
+        }
+        for (auto tablet_id : streams.success_tablets()) {
+            TTabletCommitInfo commit_info;
+            commit_info.tabletId = tablet_id;
+            commit_info.backendId = dst_id;
+            tablet_commit_infos.emplace_back(std::move(commit_info));
+            num_success_tablets++;
         }
+        LOG(INFO) << "streams to dst_id: " << dst_id << ", success tablets: " 
<< num_success_tablets
+                  << ", failed tablets: " << num_failed_tablets;
     });
 
     for (auto [tablet_id, replicas] : failed_tablets) {
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index f65e0c8f3cd..b50044ede93 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -69,6 +69,7 @@
 namespace doris {
 class DeltaWriterV2;
 class LoadStreamStub;
+class LoadStreamStubs;
 class LoadStreamMap;
 class ObjectPool;
 class RowDescriptor;
@@ -85,8 +86,6 @@ class OlapTabletFinder;
 class VTabletWriterV2;
 class DeltaWriterV2Map;
 
-using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
-
 struct Rows {
     int64_t partition_id;
     int64_t index_id;
@@ -128,7 +127,7 @@ private:
 
     Status _open_streams();
 
-    Status _open_streams_to_backend(int64_t dst_id, Streams& streams);
+    Status _open_streams_to_backend(int64_t dst_id, LoadStreamStubs& streams);
 
     Status _incremental_open_streams(const std::vector<TOlapTablePartition>& 
partitions);
 
@@ -143,7 +142,7 @@ private:
                            const Rows& rows);
 
     Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t 
index_id,
-                           Streams& streams);
+                           std::vector<std::shared_ptr<LoadStreamStub>>& 
streams);
 
     void _calc_tablets_to_commit();
 
@@ -226,7 +225,6 @@ private:
 
     std::shared_ptr<LoadStreamMap> _load_stream_map;
 
-    size_t _stream_index = 0;
     std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
 
     VRowDistribution _row_distribution;
diff --git a/be/test/vec/sink/vtablet_writer_v2_test.cpp 
b/be/test/vec/sink/vtablet_writer_v2_test.cpp
index 6289896c75f..67dc9d089ab 100644
--- a/be/test/vec/sink/vtablet_writer_v2_test.cpp
+++ b/be/test/vec/sink/vtablet_writer_v2_test.cpp
@@ -37,12 +37,13 @@ const int64_t src_id = 1000;
 static void add_stream(std::shared_ptr<LoadStreamMap> load_stream_map, int64_t 
node_id,
                        std::vector<int64_t> success_tablets,
                        std::unordered_map<int64_t, Status> failed_tablets) {
-    auto stub = load_stream_map->get_or_create(node_id);
+    auto streams = load_stream_map->get_or_create(node_id);
+    streams->mark_open();
     for (const auto& tablet_id : success_tablets) {
-        stub->at(0)->add_success_tablet(tablet_id);
+        streams->select_one_stream()->add_success_tablet(tablet_id);
     }
     for (const auto& [tablet_id, reason] : failed_tablets) {
-        stub->at(0)->add_failed_tablet(tablet_id, reason);
+        streams->select_one_stream()->add_failed_tablet(tablet_id, reason);
     }
 }
 
diff --git 
a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
index 33f7e28dbc9..2f6afd5ca69 100644
--- 
a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
@@ -97,8 +97,6 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") {
         
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
 "failed to send segment data to any replicas")
         // test segment num check when LoadStreamStub missed tail segments
         load_with_injection("LoadStreamStub.only_send_segment_0", "segment num 
mismatch")
-        // test 1st stream to each backend failure
-        
load_with_injection("VTabletWriterV2._open_streams_to_backend.one_stream_open_failure",
 "success")
         // test one backend open failure
         load_with_injection("VTabletWriterV2._open_streams.skip_one_backend", 
"success")
         sql """ set enable_memtable_on_sink_node=false """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to