This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b3a9c247af [refactor](move-memtable) add load stream stub (#23642)
b3a9c247af is described below

commit b3a9c247af89fa9b5eaa7e8463cf589e4b02d9fd
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Thu Aug 31 19:39:34 2023 +0800

    [refactor](move-memtable) add load stream stub (#23642)
---
 be/src/io/fs/stream_sink_file_writer.cpp       |  60 ++-----
 be/src/io/fs/stream_sink_file_writer.h         |  36 +---
 be/src/olap/delta_writer_context.h             |   6 -
 be/src/olap/delta_writer_v2.cpp                |  24 ++-
 be/src/olap/delta_writer_v2.h                  |  10 +-
 be/src/olap/rowset/beta_rowset_writer_v2.cpp   |  22 +--
 be/src/olap/rowset/beta_rowset_writer_v2.h     |   6 +-
 be/src/olap/rowset/rowset_writer_context.h     |   3 -
 be/src/vec/sink/load_stream_stub.cpp           | 232 +++++++++++++++++++++++++
 be/src/vec/sink/load_stream_stub.h             | 192 ++++++++++++++++++++
 be/src/vec/sink/vtablet_sink_v2.cpp            | 228 +++++-------------------
 be/src/vec/sink/vtablet_sink_v2.h              |  20 +--
 be/test/io/fs/stream_sink_file_writer_test.cpp | 161 ++++++-----------
 13 files changed, 578 insertions(+), 422 deletions(-)

diff --git a/be/src/io/fs/stream_sink_file_writer.cpp 
b/be/src/io/fs/stream_sink_file_writer.cpp
index ff2667b9fa..6726933513 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -21,6 +21,7 @@
 
 #include "olap/olap_common.h"
 #include "olap/rowset/beta_rowset_writer.h"
+#include "vec/sink/load_stream_stub.h"
 
 namespace doris {
 namespace io {
@@ -35,38 +36,24 @@ void StreamSinkFileWriter::init(PUniqueId load_id, int64_t 
partition_id, int64_t
     _index_id = index_id;
     _tablet_id = tablet_id;
     _segment_id = segment_id;
-
-    _header.set_src_id(_sender_id);
-    *_header.mutable_load_id() = _load_id;
-    _header.set_partition_id(_partition_id);
-    _header.set_index_id(_index_id);
-    _header.set_tablet_id(_tablet_id);
-    _header.set_segment_id(_segment_id);
-    _header.set_opcode(doris::PStreamHeader::APPEND_DATA);
-    _append_header();
 }
 
 Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) {
     size_t bytes_req = 0;
     for (int i = 0; i < data_cnt; i++) {
         bytes_req += data[i].get_size();
-        _buf.append(data[i].get_data(), data[i].get_size());
     }
-    _pending_bytes += bytes_req;
     _bytes_appended += bytes_req;
 
     VLOG_DEBUG << "writer appendv, load_id: " << UniqueId(_load_id).to_string()
                << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id
-               << ", segment_id: " << _segment_id << ", data_length: " << 
bytes_req
-               << ", current batched bytes: " << _pending_bytes;
+               << ", segment_id: " << _segment_id << ", data_length: " << 
bytes_req;
 
-    if (_pending_bytes >= _max_pending_bytes) {
-        RETURN_IF_ERROR(_stream_sender(_buf));
-        _buf.clear();
-        _append_header();
-        _pending_bytes = 0;
+    std::span<const Slice> slices {data, data_cnt};
+    for (auto& stream : _streams) {
+        RETURN_IF_ERROR(
+                stream->append_data(_partition_id, _index_id, _tablet_id, 
_segment_id, slices));
     }
-
     return Status::OK();
 }
 
@@ -75,38 +62,11 @@ Status StreamSinkFileWriter::finalize() {
                << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id
                << ", segment_id: " << _segment_id;
     // TODO(zhengyu): update get_inverted_index_file_size into stat
-    Status status = _stream_sender(_buf);
-    // send eos
-    _buf.clear();
-    _header.set_segment_eos(true);
-    _append_header();
-    status = _stream_sender(_buf);
-    return status;
-}
-
-void StreamSinkFileWriter::_append_header() {
-    size_t header_len = _header.ByteSizeLong();
-    _buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
-    _buf.append(_header.SerializeAsString());
-}
-
-Status StreamSinkFileWriter::send_with_retry(brpc::StreamId stream, 
butil::IOBuf buf) {
-    while (true) {
-        int ret = brpc::StreamWrite(stream, buf);
-        if (ret == EAGAIN) {
-            const timespec time = butil::seconds_from_now(60);
-            int wait_result = brpc::StreamWait(stream, &time);
-            if (wait_result == 0) {
-                continue;
-            } else {
-                return Status::InternalError("fail to send data when wait 
stream");
-            }
-        } else if (ret == EINVAL) {
-            return Status::InternalError("fail to send data when stream 
write");
-        } else {
-            return Status::OK();
-        }
+    for (auto& stream : _streams) {
+        RETURN_IF_ERROR(
+                stream->append_data(_partition_id, _index_id, _tablet_id, 
_segment_id, {}, true));
     }
+    return Status::OK();
 }
 
 Status StreamSinkFileWriter::close() {
diff --git a/be/src/io/fs/stream_sink_file_writer.h 
b/be/src/io/fs/stream_sink_file_writer.h
index 074a2f0f5a..6c40543b93 100644
--- a/be/src/io/fs/stream_sink_file_writer.h
+++ b/be/src/io/fs/stream_sink_file_writer.h
@@ -27,18 +27,16 @@
 
 namespace doris {
 
+class LoadStreamStub;
+
 struct RowsetId;
 struct SegmentStatistics;
 
 namespace io {
 class StreamSinkFileWriter : public FileWriter {
 public:
-    StreamSinkFileWriter(int sender_id, std::vector<brpc::StreamId> stream_id)
-            : _sender_id(sender_id), _streams(stream_id) {}
-
-    static void deleter(void* data) { ::free(data); }
-
-    static Status send_with_retry(brpc::StreamId stream, butil::IOBuf buf);
+    StreamSinkFileWriter(std::vector<std::shared_ptr<LoadStreamStub>>& streams)
+            : _streams(streams) {}
 
     void init(PUniqueId load_id, int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
               int32_t segment_id);
@@ -58,30 +56,10 @@ public:
     }
 
 private:
-    Status _stream_sender(butil::IOBuf buf) const {
-        for (auto stream : _streams) {
-            LOG(INFO) << "writer flushing, load_id: " << 
UniqueId(_load_id).to_string()
-                      << ", index_id: " << _index_id << ", tablet_id: " << 
_tablet_id
-                      << ", segment_id: " << _segment_id << ", stream id: " << 
stream
-                      << ", buf size: " << buf.size();
-
-            RETURN_IF_ERROR(send_with_retry(stream, buf));
-        }
-        return Status::OK();
-    }
-
-    void _append_header();
-
-private:
-    PStreamHeader _header;
-    butil::IOBuf _buf;
-
-    std::queue<Slice> _pending_slices;
-    size_t _max_pending_bytes = config::brpc_streaming_client_batch_bytes;
-    size_t _pending_bytes;
+    template <bool eos>
+    Status _flush();
 
-    int _sender_id;
-    std::vector<brpc::StreamId> _streams;
+    std::vector<std::shared_ptr<LoadStreamStub>> _streams;
 
     PUniqueId _load_id;
     int64_t _partition_id;
diff --git a/be/src/olap/delta_writer_context.h 
b/be/src/olap/delta_writer_context.h
index 680f2d0b6f..da506b10b5 100644
--- a/be/src/olap/delta_writer_context.h
+++ b/be/src/olap/delta_writer_context.h
@@ -28,7 +28,6 @@ namespace doris {
 class TupleDescriptor;
 class SlotDescriptor;
 class OlapTableSchemaParam;
-class TabletSchema;
 
 struct WriteRequest {
     int64_t tablet_id;
@@ -42,11 +41,6 @@ struct WriteRequest {
     bool is_high_priority = false;
     OlapTableSchemaParam* table_schema_param;
     int64_t index_id = 0;
-    // for DeltaWriterV2
-    std::shared_ptr<TabletSchema> tablet_schema;
-    bool enable_unique_key_merge_on_write = false;
-    int sender_id = 0;
-    std::vector<brpc::StreamId> streams;
 };
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 6f42ae068e..ff1a341f5e 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -58,23 +58,27 @@
 #include "util/stopwatch.hpp"
 #include "util/time.h"
 #include "vec/core/block.h"
+#include "vec/sink/load_stream_stub.h"
 
 namespace doris {
 using namespace ErrorCode;
 
-Status DeltaWriterV2::open(WriteRequest* req, DeltaWriterV2** writer, 
RuntimeProfile* profile) {
-    *writer = new DeltaWriterV2(req, StorageEngine::instance(), profile);
+Status DeltaWriterV2::open(WriteRequest* req,
+                           const std::vector<std::shared_ptr<LoadStreamStub>>& 
streams,
+                           DeltaWriterV2** writer, RuntimeProfile* profile) {
+    *writer = new DeltaWriterV2(req, streams, StorageEngine::instance(), 
profile);
     return Status::OK();
 }
 
-DeltaWriterV2::DeltaWriterV2(WriteRequest* req, StorageEngine* storage_engine,
-                             RuntimeProfile* profile)
+DeltaWriterV2::DeltaWriterV2(WriteRequest* req,
+                             const 
std::vector<std::shared_ptr<LoadStreamStub>>& streams,
+                             StorageEngine* storage_engine, RuntimeProfile* 
profile)
         : _req(*req),
           _tablet_schema(new TabletSchema),
           _profile(profile->create_child(fmt::format("DeltaWriterV2 {}", 
_req.tablet_id), true,
                                          true)),
           _memtable_writer(new MemTableWriter(*req)),
-          _streams(req->streams) {
+          _streams(streams) {
     _init_profile(profile);
 }
 
@@ -97,7 +101,8 @@ Status DeltaWriterV2::init() {
         return Status::OK();
     }
     // build tablet schema in request level
-    _build_current_tablet_schema(_req.index_id, _req.table_schema_param, 
*_req.tablet_schema.get());
+    _build_current_tablet_schema(_req.index_id, _req.table_schema_param,
+                                 *_streams[0]->tablet_schema(_req.index_id));
     RowsetWriterContext context;
     context.txn_id = _req.txn_id;
     context.load_id = _req.load_id;
@@ -112,17 +117,18 @@ Status DeltaWriterV2::init() {
     context.tablet_id = _req.tablet_id;
     context.partition_id = _req.partition_id;
     context.tablet_schema_hash = _req.schema_hash;
-    context.enable_unique_key_merge_on_write = 
_req.enable_unique_key_merge_on_write;
+    context.enable_unique_key_merge_on_write = 
_streams[0]->enable_unique_mow(_req.index_id);
     context.rowset_type = RowsetTypePB::BETA_ROWSET;
     context.rowset_id = StorageEngine::instance()->next_rowset_id();
     context.data_dir = nullptr;
-    context.sender_id = _req.sender_id;
 
     _rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
     _rowset_writer->init(context);
-    _memtable_writer->init(_rowset_writer, _tablet_schema, 
_req.enable_unique_key_merge_on_write);
+    _memtable_writer->init(_rowset_writer, _tablet_schema,
+                           _streams[0]->enable_unique_mow(_req.index_id));
     
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
     _is_init = true;
+    _streams.clear();
     return Status::OK();
 }
 
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index 0d78162035..741d939fa8 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -52,6 +52,7 @@ class TupleDescriptor;
 class SlotDescriptor;
 class OlapTableSchemaParam;
 class BetaRowsetWriterV2;
+class LoadStreamStub;
 
 namespace vectorized {
 class Block;
@@ -61,7 +62,9 @@ class Block;
 // This class is NOT thread-safe, external synchronization is required.
 class DeltaWriterV2 {
 public:
-    static Status open(WriteRequest* req, DeltaWriterV2** writer, 
RuntimeProfile* profile);
+    static Status open(WriteRequest* req,
+                       const std::vector<std::shared_ptr<LoadStreamStub>>& 
streams,
+                       DeltaWriterV2** writer, RuntimeProfile* profile);
 
     ~DeltaWriterV2();
 
@@ -95,7 +98,8 @@ public:
     int64_t total_received_rows() const { return _total_received_rows; }
 
 private:
-    DeltaWriterV2(WriteRequest* req, StorageEngine* storage_engine, 
RuntimeProfile* profile);
+    DeltaWriterV2(WriteRequest* req, const 
std::vector<std::shared_ptr<LoadStreamStub>>& streams,
+                  StorageEngine* storage_engine, RuntimeProfile* profile);
 
     void _build_current_tablet_schema(int64_t index_id,
                                       const OlapTableSchemaParam* 
table_schema_param,
@@ -122,7 +126,7 @@ private:
     std::shared_ptr<MemTableWriter> _memtable_writer;
     MonotonicStopWatch _lock_watch;
 
-    std::vector<brpc::StreamId> _streams;
+    std::vector<std::shared_ptr<LoadStreamStub>> _streams;
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp 
b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
index d7a641b6e0..a4bc32e32b 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
@@ -54,11 +54,12 @@
 #include "util/time.h"
 #include "vec/common/schema_util.h" // LocalSchemaChangeRecorder
 #include "vec/core/block.h"
+#include "vec/sink/load_stream_stub.h"
 
 namespace doris {
 using namespace ErrorCode;
 
-BetaRowsetWriterV2::BetaRowsetWriterV2(const std::vector<brpc::StreamId>& 
streams)
+BetaRowsetWriterV2::BetaRowsetWriterV2(const 
std::vector<std::shared_ptr<LoadStreamStub>>& streams)
         : _next_segment_id(0),
           _num_segment(0),
           _num_rows_written(0),
@@ -78,33 +79,20 @@ Status BetaRowsetWriterV2::init(const RowsetWriterContext& 
rowset_writer_context
 
 Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, 
io::FileWriterPtr& file_writer) {
     auto partition_id = _context.partition_id;
-    auto sender_id = _context.sender_id;
     auto index_id = _context.index_id;
     auto tablet_id = _context.tablet_id;
     auto load_id = _context.load_id;
 
-    auto stream_writer = std::make_unique<io::StreamSinkFileWriter>(sender_id, 
_streams);
+    auto stream_writer = std::make_unique<io::StreamSinkFileWriter>(_streams);
     stream_writer->init(load_id, partition_id, index_id, tablet_id, 
segment_id);
     file_writer = std::move(stream_writer);
     return Status::OK();
 }
 
 Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, SegmentStatistics& 
segstat) {
-    butil::IOBuf buf;
-    PStreamHeader header;
-    header.set_src_id(_context.sender_id);
-    *header.mutable_load_id() = _context.load_id;
-    header.set_partition_id(_context.partition_id);
-    header.set_index_id(_context.index_id);
-    header.set_tablet_id(_context.tablet_id);
-    header.set_segment_id(segment_id);
-    header.set_opcode(doris::PStreamHeader::ADD_SEGMENT);
-    segstat.to_pb(header.mutable_segment_statistics());
-    size_t header_len = header.ByteSizeLong();
-    buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
-    buf.append(header.SerializeAsString());
     for (const auto& stream : _streams) {
-        io::StreamSinkFileWriter::send_with_retry(stream, buf);
+        RETURN_IF_ERROR(stream->add_segment(_context.partition_id, 
_context.index_id,
+                                            _context.tablet_id, segment_id, 
segstat));
     }
     return Status::OK();
 }
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h 
b/be/src/olap/rowset/beta_rowset_writer_v2.h
index 919c128607..a982272217 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.h
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.h
@@ -60,9 +60,11 @@ namespace vectorized::schema_util {
 class LocalSchemaChangeRecorder;
 }
 
+class LoadStreamStub;
+
 class BetaRowsetWriterV2 : public RowsetWriter {
 public:
-    BetaRowsetWriterV2(const std::vector<brpc::StreamId>& streams);
+    BetaRowsetWriterV2(const std::vector<std::shared_ptr<LoadStreamStub>>& 
streams);
 
     ~BetaRowsetWriterV2() override;
 
@@ -157,7 +159,7 @@ private:
 
     fmt::memory_buffer vlog_buffer;
 
-    std::vector<brpc::StreamId> _streams;
+    std::vector<std::shared_ptr<LoadStreamStub>> _streams;
 
     int64_t _delete_bitmap_ns = 0;
     int64_t _segment_writer_ns = 0;
diff --git a/be/src/olap/rowset/rowset_writer_context.h 
b/be/src/olap/rowset/rowset_writer_context.h
index b8bfd1225c..cb78a1233a 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -45,7 +45,6 @@ struct RowsetWriterContext {
               rowset_type(BETA_ROWSET),
               rowset_state(PREPARED),
               version(Version(0, 0)),
-              sender_id(0),
               txn_id(0),
               tablet_uid(0, 0),
               segments_overlap(OVERLAP_UNKNOWN) {
@@ -68,8 +67,6 @@ struct RowsetWriterContext {
     // properties for non-pending rowset
     Version version;
 
-    int sender_id;
-
     // properties for pending rowset
     int64_t txn_id;
     PUniqueId load_id;
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
new file mode 100644
index 0000000000..1fbc8228b2
--- /dev/null
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/load_stream_stub.h"
+
+#include "olap/rowset/rowset_writer.h"
+#include "util/brpc_client_cache.h"
+#include "util/network_util.h"
+#include "util/thrift_util.h"
+
+namespace doris {
+
+int 
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId id,
+                                                                 butil::IOBuf* 
const messages[],
+                                                                 size_t size) {
+    for (size_t i = 0; i < size; i++) {
+        butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]);
+        PWriteStreamSinkResponse response;
+        response.ParseFromZeroCopyStream(&wrapper);
+
+        Status st = Status::create(response.status());
+
+        std::stringstream ss;
+        ss << "received response from backend " << _stub->_dst_id;
+        if (response.success_tablet_ids_size() > 0) {
+            ss << ", success tablet ids:";
+            for (auto tablet_id : response.success_tablet_ids()) {
+                ss << " " << tablet_id;
+            }
+            std::lock_guard<bthread::Mutex> 
lock(_stub->_success_tablets_mutex);
+            for (auto tablet_id : response.success_tablet_ids()) {
+                _stub->_success_tablets.push_back(tablet_id);
+            }
+        }
+        if (response.failed_tablet_ids_size() > 0) {
+            ss << ", failed tablet ids:";
+            for (auto tablet_id : response.failed_tablet_ids()) {
+                ss << " " << tablet_id;
+            }
+            std::lock_guard<bthread::Mutex> lock(_stub->_failed_tablets_mutex);
+            for (auto tablet_id : response.failed_tablet_ids()) {
+                _stub->_failed_tablets.push_back(tablet_id);
+            }
+        }
+        ss << ", status: " << st;
+        LOG(INFO) << ss.str();
+
+        if (response.has_load_stream_profile()) {
+            TRuntimeProfileTree tprofile;
+            const uint8_t* buf =
+                    reinterpret_cast<const 
uint8_t*>(response.load_stream_profile().data());
+            uint32_t len = response.load_stream_profile().size();
+            auto status = deserialize_thrift_msg(buf, &len, false, &tprofile);
+            if (status.ok()) {
+                // TODO
+                //_sink->_state->load_channel_profile()->update(tprofile);
+            } else {
+                LOG(WARNING) << "load stream TRuntimeProfileTree deserialize 
failed, errmsg="
+                             << status;
+            }
+        }
+    }
+    return 0;
+}
+
+void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
+    std::lock_guard<bthread::Mutex> lock(_stub->_mutex);
+    _stub->_is_closed = true;
+    _stub->_close_cv.notify_all();
+}
+
+LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id)
+        : _load_id(load_id),
+          _src_id(src_id),
+          _tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
+          _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) 
{};
+
+LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
+        : _load_id(stub._load_id),
+          _src_id(stub._src_id),
+          _tablet_schema_for_index(stub._tablet_schema_for_index),
+          _enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {};
+
+LoadStreamStub::~LoadStreamStub() {
+    std::unique_lock<bthread::Mutex> lock(_mutex);
+    if (_is_init && !_is_closed) {
+        brpc::StreamClose(_stream_id);
+    }
+}
+
+// open_stream_sink
+// tablets means
+Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* 
client_cache,
+                            const NodeInfo& node_info, int64_t txn_id,
+                            const OlapTableSchemaParam& schema,
+                            const std::vector<PTabletID>& tablets_for_schema, 
bool enable_profile) {
+    std::unique_lock<bthread::Mutex> lock(_mutex);
+    if (_is_init) {
+        return Status::OK();
+    }
+    _dst_id = node_info.id;
+    std::string host_port = get_host_port(node_info.host, node_info.brpc_port);
+    brpc::StreamOptions opt;
+    opt.max_buf_size = 20 << 20; // 20MB
+    opt.idle_timeout_ms = 30000;
+    opt.messages_in_batch = 128;
+    opt.handler = &_handler;
+    brpc::Controller cntl;
+    if (int ret = StreamCreate(&_stream_id, cntl, &opt)) {
+        return Status::Error<true>(ret, "Failed to create stream");
+    }
+    cntl.set_timeout_ms(config::open_stream_sink_timeout_ms);
+    POpenStreamSinkRequest request;
+    *request.mutable_load_id() = _load_id;
+    request.set_src_id(_src_id);
+    request.set_txn_id(txn_id);
+    request.set_enable_profile(enable_profile);
+    schema.to_protobuf(request.mutable_schema());
+    for (auto& tablet : tablets_for_schema) {
+        *request.add_tablets() = tablet;
+    }
+    POpenStreamSinkResponse response;
+    // use "pooled" connection to avoid conflicts between streaming rpc and 
regular rpc,
+    // see: https://github.com/apache/brpc/issues/392
+    const auto& stub = client_cache->get_new_client_no_cache(host_port, 
"baidu_std", "pooled");
+    stub->open_stream_sink(&cntl, &request, &response, nullptr);
+    for (const auto& resp : response.tablet_schemas()) {
+        auto tablet_schema = std::make_unique<TabletSchema>();
+        tablet_schema->init_from_pb(resp.tablet_schema());
+        _tablet_schema_for_index->emplace(resp.index_id(), 
std::move(tablet_schema));
+        _enable_unique_mow_for_index->emplace(resp.index_id(),
+                                              
resp.enable_unique_key_merge_on_write());
+    }
+    if (cntl.Failed()) {
+        return Status::InternalError("Failed to connect to backend {}: {}", 
_dst_id,
+                                     cntl.ErrorText());
+    }
+    LOG(INFO) << "Opened stream " << _stream_id << " for backend " << _dst_id 
<< " (" << host_port
+              << ")";
+    _is_init = true;
+    return Status::OK();
+}
+
+// APPEND_DATA
+Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
+                                   int64_t segment_id, std::span<const Slice> 
data,
+                                   bool segment_eos) {
+    PStreamHeader header;
+    header.set_src_id(_src_id);
+    *header.mutable_load_id() = _load_id;
+    header.set_partition_id(partition_id);
+    header.set_index_id(index_id);
+    header.set_tablet_id(tablet_id);
+    header.set_segment_id(segment_id);
+    header.set_segment_eos(segment_eos);
+    header.set_opcode(doris::PStreamHeader::APPEND_DATA);
+    return _encode_and_send(header, data);
+}
+
+// ADD_SEGMENT
+Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
+                                   int64_t segment_id, SegmentStatistics& 
segment_stat) {
+    PStreamHeader header;
+    header.set_src_id(_src_id);
+    *header.mutable_load_id() = _load_id;
+    header.set_partition_id(partition_id);
+    header.set_index_id(index_id);
+    header.set_tablet_id(tablet_id);
+    header.set_segment_id(segment_id);
+    header.set_opcode(doris::PStreamHeader::ADD_SEGMENT);
+    segment_stat.to_pb(header.mutable_segment_statistics());
+    return _encode_and_send(header);
+}
+
+// CLOSE_LOAD
+Status LoadStreamStub::close_load(const std::vector<PTabletID>& 
tablets_to_commit) {
+    PStreamHeader header;
+    *header.mutable_load_id() = _load_id;
+    header.set_src_id(_src_id);
+    header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
+    for (const auto& tablet : tablets_to_commit) {
+        *header.add_tablets_to_commit() = tablet;
+    }
+    return _encode_and_send(header);
+}
+
+Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const 
Slice> data) {
+    butil::IOBuf buf;
+    size_t header_len = header.ByteSizeLong();
+    buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
+    buf.append(header.SerializeAsString());
+    for (const auto& slice : data) {
+        buf.append(slice.get_data(), slice.get_size());
+    }
+    return _send_with_retry(buf);
+}
+
+Status LoadStreamStub::_send_with_retry(butil::IOBuf buf) {
+    for (;;) {
+        int ret = brpc::StreamWrite(_stream_id, buf);
+        switch (ret) {
+        case 0:
+            return Status::OK();
+        case EAGAIN: {
+            const timespec time = butil::seconds_from_now(60);
+            int wait_ret = brpc::StreamWait(_stream_id, &time);
+            if (wait_ret != 0) {
+                return Status::InternalError("StreamWait failed, err = ", 
wait_ret);
+            }
+            break;
+        }
+        default:
+            return Status::InternalError("StreamWrite failed, err = {}", ret);
+        }
+    }
+}
+
+} // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
new file mode 100644
index 0000000000..268de6ca83
--- /dev/null
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <brpc/controller.h>
+#include <bthread/types.h>
+#include <butil/errno.h>
+#include <fmt/format.h>
+#include <gen_cpp/PaloInternalService_types.h>
+#include <gen_cpp/Types_types.h>
+#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/types.pb.h>
+#include <glog/logging.h>
+#include <google/protobuf/stubs/callback.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+// IWYU pragma: no_include <bits/chrono.h>
+#include <chrono> // IWYU pragma: keep
+#include <functional>
+#include <initializer_list>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <queue>
+#include <set>
+#include <span>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "common/config.h"
+#include "common/status.h"
+#include "exec/data_sink.h"
+#include "exec/tablet_info.h"
+#include "gutil/ref_counted.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker.h"
+#include "runtime/thread_context.h"
+#include "runtime/types.h"
+#include "util/countdown_latch.h"
+#include "util/runtime_profile.h"
+#include "util/stopwatch.hpp"
+#include "vec/columns/column.h"
+#include "vec/common/allocator.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type.h"
+#include "vec/exprs/vexpr_fwd.h"
+
+namespace doris {
+class TabletSchema;
+class LoadStreamStub;
+
+struct SegmentStatistics;
+
+using IndexToTabletSchema = std::unordered_map<int64_t, 
std::shared_ptr<TabletSchema>>;
+using IndexToEnableMoW = std::unordered_map<int64_t, bool>;
+
+class LoadStreamStub {
+private:
+    class LoadStreamReplyHandler : public brpc::StreamInputHandler {
+    public:
+        LoadStreamReplyHandler(LoadStreamStub* stub) : _stub(stub) {}
+
+        int on_received_messages(brpc::StreamId id, butil::IOBuf* const 
messages[],
+                                 size_t size) override;
+
+        void on_idle_timeout(brpc::StreamId id) override {}
+
+        void on_closed(brpc::StreamId id) override;
+
+    private:
+        LoadStreamStub* _stub;
+    };
+
+public:
+    // construct new stub
+    LoadStreamStub(PUniqueId load_id, int64_t src_id);
+
+    // copy constructor, shared_ptr members are shared
+    LoadStreamStub(LoadStreamStub& stub);
+
+// for mock this class in UT
+#ifdef BE_TEST
+    virtual
+#endif
+            ~LoadStreamStub();
+
+    // open_stream_sink
+    Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const 
NodeInfo& node_info,
+                int64_t txn_id, const OlapTableSchemaParam& schema,
+                const std::vector<PTabletID>& tablets_for_schema, bool 
enable_profile);
+
+// for mock this class in UT
+#ifdef BE_TEST
+    virtual
+#endif
+            // APPEND_DATA
+            Status
+            append_data(int64_t partition_id, int64_t index_id, int64_t 
tablet_id,
+                        int64_t segment_id, std::span<const Slice> data, bool 
segment_eos = false);
+
+    // ADD_SEGMENT
+    Status add_segment(int64_t partition_id, int64_t index_id, int64_t 
tablet_id,
+                       int64_t segment_id, SegmentStatistics& segment_stat);
+
+    // CLOSE_LOAD
+    Status close_load(const std::vector<PTabletID>& tablets_to_commit);
+
+    // wait remote to close stream,
+    // remote will close stream when it receives CLOSE_LOAD
+    Status close_wait(int64_t timeout_ms = 0) {
+        std::unique_lock<bthread::Mutex> lock(_mutex);
+        if (!_is_init || _is_closed) {
+            return Status::OK();
+        }
+        if (timeout_ms > 0) {
+            int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
+            return ret == 0 ? Status::OK() : Status::Error<true>(ret, "stream 
close_wait timeout");
+        }
+        _close_cv.wait(lock);
+        return Status::OK();
+    }
+
+    std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const {
+        return _tablet_schema_for_index->at(index_id);
+    }
+
+    bool enable_unique_mow(int64_t index_id) const {
+        return _enable_unique_mow_for_index->at(index_id);
+    }
+
+    std::vector<int64_t> success_tablets() {
+        std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
+        return _success_tablets;
+    }
+
+    std::vector<int64_t> failed_tablets() {
+        std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
+        return _failed_tablets;
+    }
+
+    brpc::StreamId stream_id() const { return _stream_id; }
+
+    int64_t src_id() const { return _src_id; }
+
+    int64_t dst_id() const { return _dst_id; }
+
+private:
+    Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data 
= {});
+    Status _send_with_retry(butil::IOBuf buf);
+
+protected:
+    bool _is_init = false;
+    bool _is_closed = false;
+    bthread::Mutex _mutex;
+    bthread::ConditionVariable _close_cv;
+
+    PUniqueId _load_id;
+    brpc::StreamId _stream_id;
+    int64_t _src_id = -1; // source backend_id
+    int64_t _dst_id = -1; // destination backend_id
+    LoadStreamReplyHandler _handler {this};
+
+    bthread::Mutex _success_tablets_mutex;
+    bthread::Mutex _failed_tablets_mutex;
+    std::vector<int64_t> _success_tablets;
+    std::vector<int64_t> _failed_tablets;
+
+    std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
+    std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
+};
+
+} // namespace doris
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp 
b/be/src/vec/sink/vtablet_sink_v2.cpp
index 243507db71..eb66eac664 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -39,7 +39,6 @@
 #include "common/object_pool.h"
 #include "common/status.h"
 #include "exec/tablet_info.h"
-#include "io/fs/stream_sink_file_writer.h"
 #include "olap/delta_writer_v2.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
@@ -55,6 +54,7 @@
 #include "util/uid_util.h"
 #include "vec/core/block.h"
 #include "vec/exprs/vexpr.h"
+#include "vec/sink/load_stream_stub.h"
 #include "vec/sink/vtablet_block_convertor.h"
 #include "vec/sink/vtablet_finder.h"
 
@@ -63,76 +63,6 @@ class TExpr;
 
 namespace stream_load {
 
-int StreamSinkHandler::on_received_messages(brpc::StreamId id, butil::IOBuf* 
const messages[],
-                                            size_t size) {
-    int64_t backend_id = _sink->_node_id_for_stream->at(id);
-
-    for (size_t i = 0; i < size; i++) {
-        butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]);
-        PWriteStreamSinkResponse response;
-        response.ParseFromZeroCopyStream(&wrapper);
-
-        Status st = Status::create(response.status());
-
-        std::stringstream ss;
-        ss << "received response from backend " << backend_id << ", status: " 
<< st
-           << ", success tablet ids:";
-        for (auto tablet_id : response.success_tablet_ids()) {
-            ss << " " << tablet_id;
-        }
-        ss << ", failed tablet ids:";
-        for (auto tablet_id : response.failed_tablet_ids()) {
-            ss << " " << tablet_id;
-        }
-        LOG(INFO) << ss.str();
-
-        int replica = _sink->_num_replicas;
-
-        {
-            std::lock_guard<bthread::Mutex> 
l(_sink->_tablet_success_map_mutex);
-            for (auto tablet_id : response.success_tablet_ids()) {
-                if (_sink->_tablet_success_map.count(tablet_id) == 0) {
-                    _sink->_tablet_success_map.insert({tablet_id, {}});
-                }
-                _sink->_tablet_success_map[tablet_id].push_back(backend_id);
-            }
-        }
-        {
-            std::lock_guard<bthread::Mutex> 
l(_sink->_tablet_failure_map_mutex);
-            for (auto tablet_id : response.failed_tablet_ids()) {
-                if (_sink->_tablet_failure_map.count(tablet_id) == 0) {
-                    _sink->_tablet_failure_map.insert({tablet_id, {}});
-                }
-                _sink->_tablet_failure_map[tablet_id].push_back(backend_id);
-                if (_sink->_tablet_failure_map[tablet_id].size() * 2 >= 
replica) {
-                    _sink->_cancel(Status::Cancelled(
-                            "Failed to meet num replicas requirements for 
tablet {}", tablet_id));
-                    break;
-                }
-            }
-        }
-
-        if (response.has_load_stream_profile()) {
-            TRuntimeProfileTree tprofile;
-            const uint8_t* buf =
-                    reinterpret_cast<const 
uint8_t*>(response.load_stream_profile().data());
-            uint32_t len = response.load_stream_profile().size();
-            auto status = deserialize_thrift_msg(buf, &len, false, &tprofile);
-            if (status.ok()) {
-                _sink->_state->load_channel_profile()->update(tprofile);
-            } else {
-                LOG(WARNING) << "load channel TRuntimeProfileTree deserialize 
failed, errmsg="
-                             << status;
-            }
-        }
-    }
-    return 0;
-}
-
-void StreamSinkHandler::on_closed(brpc::StreamId id) {
-    _sink->_pending_streams.fetch_add(-1);
-}
-
 VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& 
row_desc,
                                    const std::vector<TExpr>& texprs, Status* 
status)
         : DataSink(row_desc), _pool(pool) {
@@ -210,7 +140,6 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
     _close_timer = ADD_TIMER(_profile, "CloseWaitTime");
     _close_writer_timer = ADD_CHILD_TIMER(_profile, "CloseWriterTime", 
"CloseWaitTime");
     _close_load_timer = ADD_CHILD_TIMER(_profile, "CloseLoadTime", 
"CloseWaitTime");
-    _close_stream_timer = ADD_CHILD_TIMER(_profile, "CloseStreamTime", 
"CloseWaitTime");
 
     // Prepare the exprs to run.
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc));
@@ -224,8 +153,7 @@ Status VOlapTableSinkV2::open(RuntimeState* state) {
     SCOPED_TIMER(_open_timer);
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
-    _stream_pool_for_node = std::make_shared<StreamPoolForNode>();
-    _node_id_for_stream = std::make_shared<NodeIdForStream>();
+    _stream_pool_for_node = std::make_shared<NodeToStreams>();
     _delta_writer_for_tablet = std::make_shared<DeltaWriterForTablet>();
     _build_tablet_node_mapping();
     RETURN_IF_ERROR(_init_stream_pools());
@@ -234,71 +162,32 @@ Status VOlapTableSinkV2::open(RuntimeState* state) {
 }
 
 Status VOlapTableSinkV2::_init_stream_pools() {
+    // stub template is for sharing internal schema map among all stubs
+    LoadStreamStub stub_template {_load_id, _sender_id};
     for (auto& [node_id, _] : _tablets_for_node) {
         auto node_info = _nodes_info->find_node(node_id);
         if (node_info == nullptr) {
             return Status::InternalError("Unknown node {} in tablet location", 
node_id);
         }
-        _stream_pool_for_node->insert({node_id, StreamPool {}});
-        StreamPool& stream_pool = _stream_pool_for_node->at(node_id);
-        RETURN_IF_ERROR(_init_stream_pool(*node_info, stream_pool));
-        for (auto stream : stream_pool) {
-            _node_id_for_stream->insert({stream, node_id});
-        }
+        Streams& stream_pool = (*_stream_pool_for_node)[node_id];
+        RETURN_IF_ERROR(_init_stream_pool(*node_info, stream_pool, 
stub_template));
     }
     return Status::OK();
 }
 
-Status VOlapTableSinkV2::_init_stream_pool(const NodeInfo& node_info, 
StreamPool& stream_pool) {
-    DCHECK_GT(config::num_streams_per_sink, 0);
+Status VOlapTableSinkV2::_init_stream_pool(const NodeInfo& node_info, Streams& 
stream_pool,
+                                           LoadStreamStub& stub_template) {
     stream_pool.reserve(config::num_streams_per_sink);
     for (int i = 0; i < config::num_streams_per_sink; ++i) {
-        brpc::StreamOptions opt;
-        opt.max_buf_size = 20 << 20; // 20MB
-        opt.idle_timeout_ms = 30000;
-        opt.messages_in_batch = 128;
-        opt.handler = new StreamSinkHandler(this);
-        brpc::StreamId stream;
-        brpc::Controller cntl;
-        if (int ret = StreamCreate(&stream, cntl, &opt)) {
-            return Status::RpcError("Failed to create stream, code = {}", ret);
-        }
-        LOG(INFO) << "Created stream " << stream << " for backend " << 
node_info.id << " ("
-                  << node_info.host << ":" << node_info.brpc_port << ")";
-        std::string host_port = get_host_port(node_info.host, 
node_info.brpc_port);
-        // use "pooled" connection to avoid conflicts between streaming rpc 
and regular rpc,
-        // see: https://github.com/apache/brpc/issues/392
-        const auto& stub =
-                
_state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(
-                        host_port, "baidu_std", "pooled");
-        POpenStreamSinkRequest request;
-        *request.mutable_load_id() = _load_id;
-        request.set_src_id(_sender_id);
-        request.set_txn_id(_txn_id);
-        request.set_enable_profile(_state->enable_profile());
-        _schema->to_protobuf(request.mutable_schema());
-        if (i == 0) {
-            // get tablet schema from each backend only in the 1st stream
-            for (auto& tablet : _indexes_from_node[node_info.id]) {
-                auto req = request.add_tablets();
-                *req = tablet;
-            }
-        }
-        POpenStreamSinkResponse response;
-        cntl.set_timeout_ms(config::open_stream_sink_timeout_ms);
-        stub->open_stream_sink(&cntl, &request, &response, nullptr);
-        for (const auto& resp : response.tablet_schemas()) {
-            auto tablet_schema = std::make_shared<TabletSchema>();
-            tablet_schema->init_from_pb(resp.tablet_schema());
-            _tablet_schema_for_index[resp.index_id()] = tablet_schema;
-            _enable_unique_mow_for_index[resp.index_id()] = 
resp.enable_unique_key_merge_on_write();
-        }
-        if (cntl.Failed()) {
-            return Status::InternalError("Failed to connect to backend {}: 
{}", node_info.id,
-                                         cntl.ErrorText());
-        }
-        stream_pool.push_back(stream);
-        _pending_streams.fetch_add(1);
+        // internal tablet schema map will be shared among all stubs
+        auto stream = std::make_unique<LoadStreamStub>(stub_template);
+        // get tablet schema from each backend only in the 1st stream
+        const std::vector<PTabletID>& tablets_for_schema =
+                i == 0 ? _indexes_from_node[node_info.id] : 
std::vector<PTabletID> {};
+        
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), 
node_info,
+                                     _txn_id, *_schema, tablets_for_schema,
+                                     _state->enable_profile()));
+        stream_pool.emplace_back(std::move(stream));
     }
     return Status::OK();
 }
@@ -343,13 +232,13 @@ void 
VOlapTableSinkV2::_generate_rows_for_tablet(RowsForTablet& rows_for_tablet,
     }
 }
 
-Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, 
std::vector<brpc::StreamId>& streams) {
+Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, Streams& streams) {
     auto location = _location->find_tablet(tablet_id);
     if (location == nullptr) {
         return Status::InternalError("unknown tablet location, tablet id = 
{}", tablet_id);
     }
     for (auto& node_id : location->node_ids) {
-        streams.push_back(_stream_pool_for_node->at(node_id)[_stream_index]);
+        
streams.emplace_back(_stream_pool_for_node->at(node_id)[_stream_index]);
     }
     _stream_index = (_stream_index + 1) % config::num_streams_per_sink;
     return Status::OK();
@@ -359,8 +248,6 @@ Status VOlapTableSinkV2::send(RuntimeState* state, 
vectorized::Block* input_bloc
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     Status status = Status::OK();
 
-    LOG(INFO) << "upstream id = " << state->backend_id();
-
     auto input_rows = input_block->rows();
     auto input_bytes = input_block->bytes();
     if (UNLIKELY(input_rows == 0)) {
@@ -408,7 +295,7 @@ Status VOlapTableSinkV2::send(RuntimeState* state, 
vectorized::Block* input_bloc
 
     // For each tablet, send its input_rows from block to delta writer
     for (const auto& [tablet_id, rows] : rows_for_tablet) {
-        std::vector<brpc::StreamId> streams;
+        Streams streams;
         RETURN_IF_ERROR(_select_streams(tablet_id, streams));
         RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows, streams));
     }
@@ -418,7 +305,7 @@ Status VOlapTableSinkV2::send(RuntimeState* state, 
vectorized::Block* input_bloc
 
 Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> 
block,
                                          int64_t tablet_id, const Rows& rows,
-                                         const std::vector<brpc::StreamId>& 
streams) {
+                                         const Streams& streams) {
     DeltaWriterV2* delta_writer = nullptr;
     {
         auto it = _delta_writer_for_tablet->find(tablet_id);
@@ -434,10 +321,6 @@ Status 
VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> bloc
             req.tuple_desc = _output_tuple_desc;
             req.is_high_priority = _is_high_priority;
             req.table_schema_param = _schema.get();
-            req.tablet_schema = _tablet_schema_for_index[rows.index_id];
-            req.enable_unique_key_merge_on_write = 
_enable_unique_mow_for_index[rows.index_id];
-            req.sender_id = _sender_id;
-            req.streams = streams;
             for (auto& index : _schema->indexes()) {
                 if (index->index_id == rows.index_id) {
                     req.slots = &index->slots;
@@ -445,9 +328,8 @@ Status 
VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> bloc
                     break;
                 }
             }
-            DeltaWriterV2::open(&req, &delta_writer, _profile);
-            _delta_writer_for_tablet->insert(
-                    {tablet_id, std::unique_ptr<DeltaWriterV2>(delta_writer)});
+            DeltaWriterV2::open(&req, streams, &delta_writer, _profile);
+            _delta_writer_for_tablet->emplace(tablet_id, delta_writer);
         } else {
             VLOG_DEBUG << "Reusing DeltaWriterV2 for Tablet(tablet id: " << 
tablet_id
                        << ", index id: " << rows.index_id << ")";
@@ -472,11 +354,6 @@ Status VOlapTableSinkV2::_cancel(Status status) {
                       [&status](auto&& entry) { 
entry.second->cancel_with_status(status); });
     }
     _delta_writer_for_tablet.reset();
-    if (_stream_pool_for_node.use_count() == 1) {
-        std::for_each(std::begin(*_node_id_for_stream), 
std::end(*_node_id_for_stream),
-                      [](auto&& entry) { brpc::StreamClose(entry.first); });
-    }
-    _stream_pool_for_node.reset();
     return Status::OK();
 }
 
@@ -515,43 +392,35 @@ Status VOlapTableSinkV2::close(RuntimeState* state, 
Status exec_status) {
 
         {
             // send CLOSE_LOAD to all streams, return ERROR if any
-            RETURN_IF_ERROR(std::transform_reduce(
-                    std::begin(*_node_id_for_stream), 
std::end(*_node_id_for_stream), Status::OK(),
-                    [](Status& left, Status&& right) { return left.ok() ? 
right : left; },
-                    [this](auto&& entry) { return _close_load(entry.first); 
}));
-        }
-
-        {
-            SCOPED_TIMER(_close_load_timer);
-            while (_pending_streams.load() > 0) {
-                // TODO: use a better wait
-                std::this_thread::sleep_for(std::chrono::milliseconds(1));
-                LOG(INFO) << "sinkv2 close_wait, pending streams: " << 
_pending_streams.load();
+            for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) {
+                RETURN_IF_ERROR(_close_load(stream_pool));
             }
         }
 
         {
-            SCOPED_TIMER(_close_stream_timer);
-            // close streams
-            if (_stream_pool_for_node.use_count() == 1) {
-                std::for_each(std::begin(*_node_id_for_stream), 
std::end(*_node_id_for_stream),
-                              [](auto&& entry) { 
brpc::StreamClose(entry.first); });
+            SCOPED_TIMER(_close_load_timer);
+            for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) {
+                for (const auto& stream : stream_pool) {
+                    stream->close_wait();
+                }
             }
-            _stream_pool_for_node.reset();
         }
 
         std::vector<TTabletCommitInfo> tablet_commit_infos;
-        for (auto& [tablet_id, backends] : _tablet_success_map) {
-            for (int64_t be_id : backends) {
-                TTabletCommitInfo commit_info;
-                commit_info.tabletId = tablet_id;
-                commit_info.backendId = be_id;
-                tablet_commit_infos.emplace_back(std::move(commit_info));
+        for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) {
+            for (const auto& stream : stream_pool) {
+                for (auto tablet_id : stream->success_tablets()) {
+                    TTabletCommitInfo commit_info;
+                    commit_info.tabletId = tablet_id;
+                    commit_info.backendId = node_id;
+                    tablet_commit_infos.emplace_back(std::move(commit_info));
+                }
             }
         }
         state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
                                             
std::make_move_iterator(tablet_commit_infos.begin()),
                                             
std::make_move_iterator(tablet_commit_infos.end()));
+        _stream_pool_for_node.reset();
 
         // _number_input_rows don't contain num_rows_load_filtered and 
num_rows_load_unselected in scan node
         int64_t num_rows_load_total = _number_input_rows + 
state->num_rows_load_filtered() +
@@ -573,22 +442,17 @@ Status VOlapTableSinkV2::close(RuntimeState* state, 
Status exec_status) {
     return status;
 }
 
-Status VOlapTableSinkV2::_close_load(brpc::StreamId stream) {
-    butil::IOBuf buf;
-    PStreamHeader header;
-    *header.mutable_load_id() = _load_id;
-    header.set_src_id(_sender_id);
-    header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
-    auto node_id = _node_id_for_stream.get()->at(stream);
+Status VOlapTableSinkV2::_close_load(const Streams& streams) {
+    auto node_id = streams[0]->dst_id();
+    std::vector<PTabletID> tablets_to_commit;
     for (auto tablet : _tablets_for_node[node_id]) {
         if (_tablet_finder->partition_ids().contains(tablet.partition_id())) {
-            *header.add_tablets_to_commit() = tablet;
+            tablets_to_commit.push_back(tablet);
         }
     }
-    size_t header_len = header.ByteSizeLong();
-    buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
-    buf.append(header.SerializeAsString());
-    io::StreamSinkFileWriter::send_with_retry(stream, buf);
+    for (const auto& stream : streams) {
+        RETURN_IF_ERROR(stream->close_load(tablets_to_commit));
+    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/sink/vtablet_sink_v2.h 
b/be/src/vec/sink/vtablet_sink_v2.h
index c2c24a26fb..5f50463268 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -66,6 +66,7 @@
 
 namespace doris {
 class DeltaWriterV2;
+class LoadStreamStub;
 class ObjectPool;
 class RowDescriptor;
 class RuntimeState;
@@ -81,8 +82,8 @@ class OlapTabletFinder;
 class VOlapTableSinkV2;
 
 using DeltaWriterForTablet = std::unordered_map<int64_t, 
std::unique_ptr<DeltaWriterV2>>;
-using StreamPool = std::vector<brpc::StreamId>;
-using StreamPoolForNode = std::unordered_map<int64_t, StreamPool>;
+using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
+using NodeToStreams = std::unordered_map<int64_t, Streams>;
 using NodeIdForStream = std::unordered_map<brpc::StreamId, int64_t>;
 using NodePartitionTabletMapping =
         std::unordered_map<int64_t, std::unordered_map<int64_t, 
std::unordered_set<int64_t>>>;
@@ -135,7 +136,8 @@ public:
     RuntimeProfile* profile() override { return _profile; }
 
 private:
-    Status _init_stream_pool(const NodeInfo& node_info, StreamPool& 
stream_pool);
+    Status _init_stream_pool(const NodeInfo& node_info, Streams& stream_pool,
+                             LoadStreamStub& stub_template);
 
     Status _init_stream_pools();
 
@@ -146,11 +148,11 @@ private:
                                    int row_idx);
 
     Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t 
tablet_id,
-                           const Rows& rows, const 
std::vector<brpc::StreamId>& streams);
+                           const Rows& rows, const Streams& streams);
 
-    Status _select_streams(int64_t tablet_id, std::vector<brpc::StreamId>& 
streams);
+    Status _select_streams(int64_t tablet_id, Streams& streams);
 
-    Status _close_load(brpc::StreamId stream);
+    Status _close_load(const Streams& streams);
 
     Status _cancel(Status status);
 
@@ -176,8 +178,6 @@ private:
 
     // TODO(zc): think about cache this data
     std::shared_ptr<OlapTableSchemaParam> _schema;
-    std::unordered_map<int64_t, std::shared_ptr<TabletSchema>> 
_tablet_schema_for_index;
-    std::unordered_map<int64_t, bool> _enable_unique_mow_for_index;
     OlapTableLocationParam* _location = nullptr;
     DorisNodesInfo* _nodes_info = nullptr;
 
@@ -206,7 +206,6 @@ private:
     RuntimeProfile::Counter* _close_timer = nullptr;
     RuntimeProfile::Counter* _close_writer_timer = nullptr;
     RuntimeProfile::Counter* _close_load_timer = nullptr;
-    RuntimeProfile::Counter* _close_stream_timer = nullptr;
 
     // Save the status of close() method
     Status _close_status;
@@ -221,8 +220,7 @@ private:
     std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_for_node;
     std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
 
-    std::shared_ptr<StreamPoolForNode> _stream_pool_for_node;
-    std::shared_ptr<NodeIdForStream> _node_id_for_stream;
+    std::shared_ptr<NodeToStreams> _stream_pool_for_node;
     size_t _stream_index = 0;
     std::shared_ptr<DeltaWriterForTablet> _delta_writer_for_tablet;
 
diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp 
b/be/test/io/fs/stream_sink_file_writer_test.cpp
index e894cbaf91..c6ac4a3f50 100644
--- a/be/test/io/fs/stream_sink_file_writer_test.cpp
+++ b/be/test/io/fs/stream_sink_file_writer_test.cpp
@@ -24,6 +24,7 @@
 #include "olap/olap_common.h"
 #include "util/debug/leakcheck_disabler.h"
 #include "util/faststring.h"
+#include "vec/sink/load_stream_stub.h"
 
 namespace doris {
 
@@ -35,134 +36,74 @@ namespace doris {
     } while (false)
 #endif
 
-DEFINE_string(connection_type, "", "Connection type. Available values: single, 
pooled, short");
-DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
-DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
-DEFINE_int32(idle_timeout_s, -1,
-             "Connection will be closed if there is no "
-             "read/write operations during the last `idle_timeout_s'");
+constexpr int64_t LOAD_ID_LO = 1;
+constexpr int64_t LOAD_ID_HI = 2;
+constexpr int64_t NUM_STREAM = 3;
+constexpr int64_t PARTITION_ID = 1234;
+constexpr int64_t INDEX_ID = 2345;
+constexpr int64_t TABLET_ID = 3456;
+constexpr int32_t SEGMENT_ID = 4567;
+const std::string DATA0 = "segment data";
+const std::string DATA1 = "hello world";
+
+static std::atomic<int64_t> g_num_request;
 
 class StreamSinkFileWriterTest : public testing::Test {
-    class MockStreamSinkFileRecevier : public brpc::StreamInputHandler {
+    class MockStreamStub : public LoadStreamStub {
     public:
-        virtual int on_received_messages(brpc::StreamId id, butil::IOBuf* 
const messages[],
-                                         size_t size) {
-            std::stringstream str;
-            for (size_t i = 0; i < size; ++i) {
-                str << "msg[" << i << "]=" << *messages[i];
+        MockStreamStub(PUniqueId load_id, int64_t src_id) : 
LoadStreamStub(load_id, src_id) {};
+
+        virtual ~MockStreamStub() = default;
+
+        // APPEND_DATA
+        virtual Status append_data(int64_t partition_id, int64_t index_id, 
int64_t tablet_id,
+                                   int64_t segment_id, std::span<const Slice> 
data,
+                                   bool segment_eos = false) override {
+            EXPECT_EQ(PARTITION_ID, partition_id);
+            EXPECT_EQ(INDEX_ID, index_id);
+            EXPECT_EQ(TABLET_ID, tablet_id);
+            EXPECT_EQ(SEGMENT_ID, segment_id);
+            if (segment_eos) {
+                EXPECT_EQ(0, data.size());
+            } else {
+                EXPECT_EQ(2, data.size());
+                EXPECT_EQ(DATA0, data[0].to_string());
+                EXPECT_EQ(DATA1, data[1].to_string());
             }
-            LOG(INFO) << "Received from Stream=" << id << ": " << str.str();
-            return 0;
-        }
-        virtual void on_idle_timeout(brpc::StreamId id) {
-            LOG(INFO) << "Stream=" << id << " has no data transmission for a 
while";
-        }
-        virtual void on_closed(brpc::StreamId id) { LOG(INFO) << "Stream=" << 
id << " is closed"; }
-    };
-
-    class StreamingSinkFileService : public PBackendService {
-    public:
-        StreamingSinkFileService() : _sd(brpc::INVALID_STREAM_ID) {}
-        virtual ~StreamingSinkFileService() { brpc::StreamClose(_sd); };
-        virtual void open_stream_sink(google::protobuf::RpcController* 
controller,
-                                      const POpenStreamSinkRequest*,
-                                      POpenStreamSinkResponse* response,
-                                      google::protobuf::Closure* done) {
-            brpc::ClosureGuard done_guard(done);
-
-            brpc::Controller* cntl = 
static_cast<brpc::Controller*>(controller);
-            brpc::StreamOptions stream_options;
-            stream_options.handler = &_receiver;
-            CHECK_EQ(0, brpc::StreamAccept(&_sd, *cntl, &stream_options));
-            Status::OK().to_protobuf(response->mutable_status());
+            g_num_request++;
+            return Status::OK();
         }
-
-    private:
-        MockStreamSinkFileRecevier _receiver;
-        brpc::StreamId _sd;
     };
 
 public:
-    StreamSinkFileWriterTest() { srand(time(nullptr)); }
-    ~StreamSinkFileWriterTest() {}
+    StreamSinkFileWriterTest() = default;
+    ~StreamSinkFileWriterTest() = default;
 
 protected:
     virtual void SetUp() {
-        // init channel
-        brpc::Channel channel;
-        brpc::ChannelOptions options;
-        options.protocol = brpc::PROTOCOL_BAIDU_STD;
-        options.connection_type = FLAGS_connection_type;
-        options.timeout_ms = FLAGS_timeout_ms;
-        options.max_retry = FLAGS_max_retry;
-        std::stringstream port;
-        CHECK_EQ(0, channel.Init("127.0.0.1:18946", nullptr));
-
-        // init server
-        _stream_service = new StreamingSinkFileService();
-        CHECK_EQ(0, _server.AddService(_stream_service, 
brpc::SERVER_DOESNT_OWN_SERVICE));
-        brpc::ServerOptions server_options;
-        server_options.idle_timeout_sec = FLAGS_idle_timeout_s;
-        {
-            debug::ScopedLeakCheckDisabler disable_lsan;
-            CHECK_EQ(0, _server.Start("127.0.0.1:18946", &server_options));
+        _load_id.set_hi(LOAD_ID_HI);
+        _load_id.set_lo(LOAD_ID_LO);
+        for (int src_id = 0; src_id < NUM_STREAM; src_id++) {
+            _streams.emplace_back(new MockStreamStub(_load_id, src_id));
         }
-
-        // init stream connect
-        PBackendService_Stub stub(&channel);
-        brpc::Controller cntl;
-        brpc::StreamId stream;
-        CHECK_EQ(0, brpc::StreamCreate(&stream, cntl, NULL));
-
-        POpenStreamSinkRequest request;
-        POpenStreamSinkResponse response;
-        request.mutable_load_id()->set_hi(1);
-        request.mutable_load_id()->set_lo(1);
-        stub.open_stream_sink(&cntl, &request, &response, NULL);
-
-        brpc::Join(cntl.call_id());
-        _stream = stream;
     }
 
-    virtual void TearDown() {
-        CHECK_EQ(0, brpc::StreamClose(_stream));
-        CHECK_EQ(0, _server.Stop(1000));
-        CHECK_EQ(0, _server.Join());
-        delete _stream_service;
-    }
+    virtual void TearDown() {}
 
-    StreamingSinkFileService* _stream_service;
-    brpc::StreamId _stream;
-    brpc::Server _server;
+    PUniqueId _load_id;
+    std::vector<std::shared_ptr<LoadStreamStub>> _streams;
 };
 
-TEST_F(StreamSinkFileWriterTest, TestInit) {
-    std::vector<brpc::StreamId> stream_ids {_stream};
-    io::StreamSinkFileWriter writer(0, stream_ids);
-    PUniqueId load_id;
-    load_id.set_hi(1);
-    load_id.set_lo(2);
-    writer.init(load_id, 3, 4, 5, 6);
-}
+TEST_F(StreamSinkFileWriterTest, Test) {
+    g_num_request = 0;
+    io::StreamSinkFileWriter writer(_streams);
+    writer.init(_load_id, PARTITION_ID, INDEX_ID, TABLET_ID, SEGMENT_ID);
+    std::vector<Slice> slices {DATA0, DATA1};
 
-TEST_F(StreamSinkFileWriterTest, TestAppend) {
-    std::vector<brpc::StreamId> stream_ids {_stream};
-    io::StreamSinkFileWriter writer(0, stream_ids);
-    PUniqueId load_id;
-    load_id.set_hi(1);
-    load_id.set_lo(2);
-    writer.init(load_id, 3, 4, 5, 6);
-    std::vector<Slice> slices {"hello"};
-    CHECK_STATUS_OK(writer.appendv(&slices[0], slices.size()));
-}
-
-TEST_F(StreamSinkFileWriterTest, TestFinalize) {
-    std::vector<brpc::StreamId> stream_ids {_stream};
-    io::StreamSinkFileWriter writer(0, stream_ids);
-    PUniqueId load_id;
-    load_id.set_hi(1);
-    load_id.set_lo(2);
-    writer.init(load_id, 3, 4, 5, 6);
+    CHECK_STATUS_OK(writer.appendv(&(*slices.begin()), slices.size()));
+    EXPECT_EQ(NUM_STREAM, g_num_request);
     CHECK_STATUS_OK(writer.finalize());
+    EXPECT_EQ(NUM_STREAM * 2, g_num_request);
 }
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to