This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f730a048b1 [feature-wip](load) Support single replica load (#10298)
f730a048b1 is described below

commit f730a048b1e3baf3d78b27292698eecbae3fcef7
Author: weizuo93 <wei...@apache.org>
AuthorDate: Tue Aug 2 11:44:18 2022 +0800

    [feature-wip](load) Support single replica load (#10298)
    
    During load process, the same operation are performed on all replicas such 
as sort and aggregation,
    which are resource-intensive.
    Concurrent data load would consume much CPU and memory resources.
    It's better to perform write process (writing data into MemTable and then 
data flush) on single replica
    and synchronize data files to other replicas before transaction finished.
---
 be/src/common/config.h                             |  14 ++
 be/src/exec/tablet_sink.cpp                        |  59 ++++++
 be/src/exec/tablet_sink.h                          |   8 +
 be/src/http/action/download_action.cpp             |   4 +-
 be/src/olap/delta_writer.cpp                       | 105 ++++++++-
 be/src/olap/delta_writer.h                         |  16 +-
 be/src/olap/txn_manager.cpp                        |  50 +++++
 be/src/olap/txn_manager.h                          |  28 +++
 be/src/runtime/load_channel.h                      |   8 +-
 be/src/runtime/tablets_channel.cpp                 | 138 +++++++++++-
 be/src/runtime/tablets_channel.h                   |  89 ++------
 be/src/service/CMakeLists.txt                      |   1 +
 be/src/service/brpc_service.cpp                    |   6 +-
 be/src/service/brpc_service.h                      |   2 +-
 be/src/service/doris_main.cpp                      |  31 ++-
 be/src/service/internal_service.cpp                | 236 ++++++++++++++++++++-
 be/src/service/internal_service.h                  |  13 ++
 .../single_replica_load_download_service.cpp       |  56 +++++
 ...ce.h => single_replica_load_download_service.h} |  24 +--
 be/src/vec/sink/vtablet_sink.cpp                   |  42 ++++
 be/test/olap/delta_writer_test.cpp                 |  13 +-
 .../olap/engine_storage_migration_task_test.cpp    |   3 +-
 be/test/olap/remote_rowset_gc_test.cpp             |   3 +-
 be/test/olap/tablet_cooldown_test.cpp              |   3 +-
 docs/en/docs/admin-manual/config/be-config.md      |  36 ++++
 docs/en/docs/admin-manual/config/fe-config.md      |  30 +++
 docs/zh-CN/docs/admin-manual/config/be-config.md   |  36 ++++
 docs/zh-CN/docs/admin-manual/config/fe-config.md   |  30 +++
 .../java/org/apache/doris/analysis/InsertStmt.java |   3 +-
 .../main/java/org/apache/doris/common/Config.java  |   6 +
 .../doris/load/loadv2/LoadingTaskPlanner.java      |   3 +-
 .../apache/doris/load/update/UpdatePlanner.java    |   2 +-
 .../org/apache/doris/planner/OlapTableSink.java    |  45 +++-
 .../apache/doris/planner/StreamLoadPlanner.java    |   3 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  13 ++
 .../apache/doris/planner/OlapTableSinkTest.java    |   8 +-
 gensrc/proto/internal_service.proto                |  51 +++++
 gensrc/thrift/DataSinks.thrift                     |   2 +
 38 files changed, 1087 insertions(+), 133 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index a3f6f6d5bc..2bbee690af 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -35,6 +35,11 @@ CONF_Int32(brpc_port, "8060");
 // the number of bthreads for brpc, the default value is set to -1, which 
means the number of bthreads is #cpu-cores
 CONF_Int32(brpc_num_threads, "-1");
 
+// port to brpc server for single replica load
+CONF_Int32(single_replica_load_brpc_port, "8070");
+// the number of bthreads to brpc server for single replica load
+CONF_Int32(single_replica_load_brpc_num_threads, "64");
+
 // Declare a selection strategy for those servers have many ips.
 // Note that there should at most one ip match this list.
 // this is a list in semicolon-delimited format, in CIDR notation, e.g. 
10.10.10.0/24
@@ -356,11 +361,19 @@ CONF_Int32(webserver_num_workers, "48");
 // Period to update rate counters and sampling counters in ms.
 CONF_mInt32(periodic_counter_update_period_ms, "500");
 
+CONF_Bool(enable_single_replica_load, "false");
+
+// Port to download server for single replica load
+CONF_Int32(single_replica_load_download_port, "8050");
+// Number of download workers for single replica load
+CONF_Int32(single_replica_load_download_num_workers, "64");
+
 // Used for mini Load. mini load data file will be removed after this time.
 CONF_Int64(load_data_reserve_hours, "4");
 // log error log will be removed after this time
 CONF_mInt64(load_error_log_reserve_hours, "48");
 CONF_Int32(number_tablet_writer_threads, "16");
+CONF_Int32(number_slave_replica_download_threads, "64");
 
 // The maximum amount of data that can be processed by a stream load
 CONF_mInt64(streaming_load_max_mb, "10240");
@@ -377,6 +390,7 @@ CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200");
 CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60");
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing 
data.
 CONF_mBool(tablet_writer_ignore_eovercrowded, "false");
+CONF_mInt32(slave_replica_writer_rpc_timeout_sec, "60");
 // Whether to enable stream load record function, the default is false.
 // False: disable stream load record
 CONF_mBool(enable_stream_load_record, "false");
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index ef3d97c2bf..35d024f719 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -21,6 +21,7 @@
 
 #include <sstream>
 #include <string>
+#include <unordered_map>
 
 #include "exprs/expr.h"
 #include "exprs/expr_context.h"
@@ -239,6 +240,29 @@ Status NodeChannel::open_wait() {
                         commit_info.tabletId = tablet.tablet_id();
                         commit_info.backendId = _node_id;
                         
_tablet_commit_infos.emplace_back(std::move(commit_info));
+                        VLOG_CRITICAL
+                                << "master replica commit info: tabletId=" << 
tablet.tablet_id()
+                                << ", backendId=" << _node_id
+                                << ", master node id: " << this->node_id()
+                                << ", host: " << this->host() << ", txn_id=" 
<< _parent->_txn_id;
+                    }
+
+                    if (_parent->_write_single_replica) {
+                        for (auto& tablet_slave_node_ids : 
result.success_slave_tablet_node_ids()) {
+                            for (auto slave_node_id :
+                                 
tablet_slave_node_ids.second.slave_node_ids()) {
+                                TTabletCommitInfo commit_info;
+                                commit_info.tabletId = 
tablet_slave_node_ids.first;
+                                commit_info.backendId = slave_node_id;
+                                
_tablet_commit_infos.emplace_back(std::move(commit_info));
+                                VLOG_CRITICAL << "slave replica commit info: 
tabletId="
+                                              << tablet_slave_node_ids.first
+                                              << ", backendId=" << 
slave_node_id
+                                              << ", master node id: " << 
this->node_id()
+                                              << ", host: " << this->host()
+                                              << ", txn_id=" << 
_parent->_txn_id;
+                            }
+                        }
                     }
                     _add_batches_finished = true;
                 }
@@ -504,6 +528,28 @@ void NodeChannel::try_send_batch(RuntimeState* state) {
             request.add_partition_ids(pid);
         }
 
+        request.set_write_single_replica(false);
+        if (_parent->_write_single_replica) {
+            request.set_write_single_replica(true);
+            for (std::unordered_map<int64_t, std::vector<int64_t>>::iterator 
iter =
+                         _slave_tablet_nodes.begin();
+                 iter != _slave_tablet_nodes.end(); iter++) {
+                PSlaveTabletNodes slave_tablet_nodes;
+                for (auto node_id : iter->second) {
+                    auto node = _parent->_nodes_info->find_node(node_id);
+                    if (node == nullptr) {
+                        return;
+                    }
+                    PNodeInfo* pnode = slave_tablet_nodes.add_slave_nodes();
+                    pnode->set_id(node->id);
+                    pnode->set_option(node->option);
+                    pnode->set_host(node->host);
+                    
pnode->set_async_internal_port(config::single_replica_load_brpc_port);
+                }
+                request.mutable_slave_tablet_nodes()->insert({iter->first, 
slave_tablet_nodes});
+            }
+        }
+
         // eos request must be the last request
         _add_batch_closure->end_mark();
         _send_finished = true;
@@ -587,6 +633,12 @@ Status IndexChannel::init(RuntimeState* state, const 
std::vector<TTabletWithPart
                 channel = it->second;
             }
             channel->add_tablet(tablet);
+            if (_parent->_write_single_replica) {
+                auto slave_location = 
_parent->_slave_location->find_tablet(tablet.tablet_id);
+                if (slave_location != nullptr) {
+                    channel->add_slave_tablet_nodes(tablet.tablet_id, 
slave_location->node_ids);
+                }
+            }
             channels.push_back(channel);
             _tablets_by_channel[node_id].insert(tablet.tablet_id);
         }
@@ -685,6 +737,13 @@ Status OlapTableSink::init(const TDataSink& t_sink) {
     RETURN_IF_ERROR(_partition->init());
     _location = _pool->add(new OlapTableLocationParam(table_sink.location));
     _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
+    if (table_sink.__isset.write_single_replica && 
table_sink.write_single_replica) {
+        _write_single_replica = true;
+        _slave_location = _pool->add(new 
OlapTableLocationParam(table_sink.slave_location));
+        if (!config::enable_single_replica_load) {
+            return Status::InternalError("single replica load is disabled on 
BE.");
+        }
+    }
 
     if (table_sink.__isset.load_channel_timeout_s) {
         _load_channel_timeout_s = table_sink.load_channel_timeout_s;
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 47431aec61..6b9b2f1c32 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -174,6 +174,10 @@ public:
 
     virtual Status init(RuntimeState* state);
 
+    void add_slave_tablet_nodes(int64_t tablet_id, const std::vector<int64_t>& 
slave_nodes) {
+        _slave_tablet_nodes[tablet_id] = slave_nodes;
+    }
+
     // we use open/open_wait to parallel
     void open();
     virtual Status open_wait();
@@ -287,6 +291,8 @@ protected:
     RefCountClosure<PTabletWriterOpenResult>* _open_closure = nullptr;
 
     std::vector<TTabletWithPartition> _all_tablets;
+    // map from tablet_id to node_id where slave replicas locate in
+    std::unordered_map<int64_t, std::vector<int64_t>> _slave_tablet_nodes;
     std::vector<TTabletCommitInfo> _tablet_commit_infos;
 
     AddBatchCounter _add_batch_counter;
@@ -477,6 +483,8 @@ protected:
     // TODO(zc): think about cache this data
     std::shared_ptr<OlapTableSchemaParam> _schema;
     OlapTableLocationParam* _location = nullptr;
+    bool _write_single_replica = false;
+    OlapTableLocationParam* _slave_location = nullptr;
     DorisNodesInfo* _nodes_info = nullptr;
 
     RuntimeProfile* _profile = nullptr;
diff --git a/be/src/http/action/download_action.cpp 
b/be/src/http/action/download_action.cpp
index 10cd6c1998..d75beb4cd1 100644
--- a/be/src/http/action/download_action.cpp
+++ b/be/src/http/action/download_action.cpp
@@ -104,7 +104,7 @@ void DownloadAction::handle_error_log(HttpRequest* req, 
const std::string& file_
 }
 
 void DownloadAction::handle(HttpRequest* req) {
-    LOG(INFO) << "accept one download request " << req->debug_string();
+    VLOG_CRITICAL << "accept one download request " << req->debug_string();
 
     // add tid to cgroup in order to limit read bandwidth
     CgroupsMgr::apply_system_cgroup();
@@ -124,7 +124,7 @@ void DownloadAction::handle(HttpRequest* req) {
         handle_normal(req, file_path);
     }
 
-    LOG(INFO) << "deal with download request finished! ";
+    VLOG_CRITICAL << "deal with download request finished! ";
 }
 
 Status DownloadAction::check_token(HttpRequest* req) {
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index ae732c55b8..6762c3efd9 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -27,6 +27,9 @@
 #include "olap/storage_engine.h"
 #include "runtime/row_batch.h"
 #include "runtime/tuple_row.h"
+#include "service/backend_options.h"
+#include "util/brpc_client_cache.h"
+#include "util/ref_count_closure.h"
 
 namespace doris {
 
@@ -304,7 +307,8 @@ Status DeltaWriter::close() {
     return Status::OK();
 }
 
-Status DeltaWriter::close_wait() {
+Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes,
+                               const bool write_single_replica) {
     std::lock_guard<std::mutex> l(_lock);
     DCHECK(_is_init)
             << "delta writer is supposed be to initialized before close_wait() 
being called";
@@ -338,9 +342,31 @@ Status DeltaWriter::close_wait() {
     const FlushStatistic& stat = _flush_token->get_stats();
     VLOG_CRITICAL << "close delta writer for tablet: " << _tablet->tablet_id()
                   << ", load id: " << print_id(_req.load_id) << ", stats: " << 
stat;
+
+    if (write_single_replica) {
+        for (auto node_info : slave_tablet_nodes.slave_nodes()) {
+            _request_slave_tablet_pull_rowset(node_info);
+        }
+    }
     return Status::OK();
 }
 
+bool DeltaWriter::check_slave_replicas_done(
+        google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>* 
success_slave_tablet_node_ids) {
+    std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
+    if (_unfinished_slave_node.empty()) {
+        success_slave_tablet_node_ids->insert({_tablet->tablet_id(), 
_success_slave_node_ids});
+        return true;
+    }
+    return false;
+}
+
+void DeltaWriter::add_finished_slave_replicas(
+        google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>* 
success_slave_tablet_node_ids) {
+    std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
+    success_slave_tablet_node_ids->insert({_tablet->tablet_id(), 
_success_slave_node_ids});
+}
+
 Status DeltaWriter::cancel() {
     std::lock_guard<std::mutex> l(_lock);
     if (!_is_init || _is_cancelled) {
@@ -390,4 +416,81 @@ void DeltaWriter::_build_current_tablet_schema(int64_t 
index_id,
     }
 }
 
+void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
+    std::shared_ptr<PBackendService_Stub> stub =
+            ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
+                    node_info.host(), node_info.async_internal_port());
+    if (stub == nullptr) {
+        LOG(WARNING) << "failed to send pull rowset request to slave replica. 
get rpc stub failed, "
+                        "slave host="
+                     << node_info.host() << ", port=" << 
node_info.async_internal_port()
+                     << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" 
<< _req.txn_id;
+        return;
+    }
+
+    _storage_engine->txn_manager()->add_txn_tablet_delta_writer(_req.txn_id, 
_tablet->tablet_id(),
+                                                                this);
+    {
+        std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
+        _unfinished_slave_node.insert(node_info.id());
+    }
+
+    PTabletWriteSlaveRequest request;
+    RowsetMetaPB rowset_meta_pb = _cur_rowset->rowset_meta()->get_rowset_pb();
+    request.set_allocated_rowset_meta(&rowset_meta_pb);
+    request.set_host(BackendOptions::get_localhost());
+    request.set_http_port(config::single_replica_load_download_port);
+    string tablet_path = _tablet->tablet_path();
+    request.set_rowset_path(tablet_path);
+    request.set_token(ExecEnv::GetInstance()->token());
+    request.set_brpc_port(config::single_replica_load_brpc_port);
+    request.set_node_id(node_info.id());
+    for (int segment_id = 0; segment_id < 
_cur_rowset->rowset_meta()->num_segments();
+         segment_id++) {
+        std::stringstream segment_name;
+        segment_name << _cur_rowset->rowset_id() << "_" << segment_id << 
".dat";
+        int64_t segment_size = std::filesystem::file_size(tablet_path + "/" + 
segment_name.str());
+        request.mutable_segments_size()->insert({segment_id, segment_size});
+    }
+    RefCountClosure<PTabletWriteSlaveResult>* closure =
+            new RefCountClosure<PTabletWriteSlaveResult>();
+    closure->ref();
+    closure->ref();
+    closure->cntl.set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec 
* 1000);
+    closure->cntl.ignore_eovercrowded();
+    stub->request_slave_tablet_pull_rowset(&closure->cntl, &request, 
&closure->result, closure);
+    request.release_rowset_meta();
+
+    closure->join();
+    if (closure->cntl.Failed()) {
+        if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(
+                    stub, node_info.host(), node_info.async_internal_port())) {
+            ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
+                    closure->cntl.remote_side());
+        }
+        LOG(WARNING) << "failed to send pull rowset request to slave replica, 
error="
+                     << berror(closure->cntl.ErrorCode())
+                     << ", error_text=" << closure->cntl.ErrorText()
+                     << ". slave host: " << node_info.host()
+                     << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" 
<< _req.txn_id;
+        std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
+        _unfinished_slave_node.erase(node_info.id());
+    }
+
+    if (closure->unref()) {
+        delete closure;
+    }
+    closure = nullptr;
+}
+
+void DeltaWriter::finish_slave_tablet_pull_rowset(int64_t node_id, bool 
is_succeed) {
+    std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
+    if (is_succeed) {
+        _success_slave_node_ids.add_slave_node_ids(node_id);
+        VLOG_CRITICAL << "record successful slave replica for txn [" << 
_req.txn_id
+                      << "], tablet_id=" << _tablet->tablet_id() << ", 
node_id=" << node_id;
+    }
+    _unfinished_slave_node.erase(node_id);
+}
+
 } // namespace doris
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 2ede4e085a..f7194f8a4c 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -72,7 +72,13 @@ public:
     Status close();
     // wait for all memtables to be flushed.
     // mem_consumption() should be 0 after this function returns.
-    Status close_wait();
+    Status close_wait(const PSlaveTabletNodes& slave_tablet_nodes, const bool 
write_single_replica);
+
+    bool check_slave_replicas_done(google::protobuf::Map<int64_t, 
PSuccessSlaveTabletNodeIds>*
+                                           success_slave_tablet_node_ids);
+
+    void add_finished_slave_replicas(google::protobuf::Map<int64_t, 
PSuccessSlaveTabletNodeIds>*
+                                             success_slave_tablet_node_ids);
 
     // abandon current memtable and wait for all pending-flushing memtables to 
be destructed.
     // mem_consumption() should be 0 after this function returns.
@@ -102,6 +108,8 @@ public:
 
     int64_t get_mem_consumption_snapshot() const;
 
+    void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
+
 private:
     DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
                 const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool 
is_vec);
@@ -117,6 +125,8 @@ private:
                                       const POlapTableSchemaParam& 
table_schema_param,
                                       const TabletSchema& ori_tablet_schema);
 
+    void _request_slave_tablet_pull_rowset(PNodeInfo node_info);
+
     bool _is_init = false;
     bool _is_cancelled = false;
     WriteRequest _req;
@@ -148,6 +158,10 @@ private:
 
     //only used for std::sort more detail see issue(#9237)
     int64_t _mem_consumption_snapshot = 0;
+
+    std::unordered_set<int64_t> _unfinished_slave_node;
+    PSuccessSlaveTabletNodeIds _success_slave_node_ids;
+    std::shared_mutex _slave_node_lock;
 };
 
 } // namespace doris
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 68d3c57a98..252998446e 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -35,6 +35,7 @@
 #include "olap/base_compaction.h"
 #include "olap/cumulative_compaction.h"
 #include "olap/data_dir.h"
+#include "olap/delta_writer.h"
 #include "olap/lru_cache.h"
 #include "olap/push_handler.h"
 #include "olap/reader.h"
@@ -80,6 +81,8 @@ TxnManager::TxnManager(int32_t txn_map_shard_size, int32_t 
txn_shard_size)
     _txn_tablet_maps = new txn_tablet_map_t[_txn_map_shard_size];
     _txn_partition_maps = new txn_partition_map_t[_txn_map_shard_size];
     _txn_mutex = new std::mutex[_txn_shard_size];
+    _txn_tablet_delta_writer_map = new 
txn_tablet_delta_writer_map_t[_txn_map_shard_size];
+    _txn_tablet_delta_writer_map_locks = new 
std::shared_mutex[_txn_map_shard_size];
 }
 
 Status TxnManager::prepare_txn(TPartitionId partition_id, const 
TabletSharedPtr& tablet,
@@ -662,4 +665,51 @@ void TxnManager::_clear_txn_partition_map_unlocked(int64_t 
transaction_id, int64
     }
 }
 
+void TxnManager::add_txn_tablet_delta_writer(int64_t transaction_id, int64_t 
tablet_id,
+                                             DeltaWriter* delta_writer) {
+    std::lock_guard<std::shared_mutex> txn_wrlock(
+            _get_txn_tablet_delta_writer_map_lock(transaction_id));
+    txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map =
+            _get_txn_tablet_delta_writer_map(transaction_id);
+    auto find = txn_tablet_delta_writer_map.find(transaction_id);
+    if (find == txn_tablet_delta_writer_map.end()) {
+        txn_tablet_delta_writer_map[transaction_id] = std::map<int64_t, 
DeltaWriter*>();
+    }
+    txn_tablet_delta_writer_map[transaction_id][tablet_id] = delta_writer;
+}
+
+void TxnManager::finish_slave_tablet_pull_rowset(int64_t transaction_id, 
int64_t tablet_id,
+                                                 int64_t node_id, bool 
is_succeed) {
+    std::lock_guard<std::shared_mutex> txn_wrlock(
+            _get_txn_tablet_delta_writer_map_lock(transaction_id));
+    txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map =
+            _get_txn_tablet_delta_writer_map(transaction_id);
+    auto find_txn = txn_tablet_delta_writer_map.find(transaction_id);
+    if (find_txn == txn_tablet_delta_writer_map.end()) {
+        LOG(WARNING) << "delta writer manager is not exist, txn_id=" << 
transaction_id
+                     << ", tablet_id=" << tablet_id;
+        return;
+    }
+    auto find_tablet = 
txn_tablet_delta_writer_map[transaction_id].find(tablet_id);
+    if (find_tablet == txn_tablet_delta_writer_map[transaction_id].end()) {
+        LOG(WARNING) << "delta writer is not exist, txn_id=" << transaction_id
+                     << ", tablet_id=" << tablet_id;
+        return;
+    }
+    DeltaWriter* delta_writer = 
txn_tablet_delta_writer_map[transaction_id][tablet_id];
+    delta_writer->finish_slave_tablet_pull_rowset(node_id, is_succeed);
+}
+
+void TxnManager::clear_txn_tablet_delta_writer(int64_t transaction_id) {
+    std::lock_guard<std::shared_mutex> txn_wrlock(
+            _get_txn_tablet_delta_writer_map_lock(transaction_id));
+    txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map =
+            _get_txn_tablet_delta_writer_map(transaction_id);
+    auto it = txn_tablet_delta_writer_map.find(transaction_id);
+    if (it != txn_tablet_delta_writer_map.end()) {
+        txn_tablet_delta_writer_map.erase(it);
+    }
+    VLOG_CRITICAL << "remove delta writer manager, txn_id=" << transaction_id;
+}
+
 } // namespace doris
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index a755f5dc79..db6af360dc 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -47,6 +47,7 @@
 #include "util/time.h"
 
 namespace doris {
+class DeltaWriter;
 
 struct TabletTxnInfo {
     PUniqueId load_id;
@@ -69,6 +70,8 @@ public:
         delete[] _txn_partition_maps;
         delete[] _txn_map_locks;
         delete[] _txn_mutex;
+        delete[] _txn_tablet_delta_writer_map;
+        delete[] _txn_tablet_delta_writer_map_locks;
     }
 
     Status prepare_txn(TPartitionId partition_id, const TabletSharedPtr& 
tablet,
@@ -137,6 +140,12 @@ public:
     void get_partition_ids(const TTransactionId transaction_id,
                            std::vector<TPartitionId>* partition_ids);
 
+    void add_txn_tablet_delta_writer(int64_t transaction_id, int64_t tablet_id,
+                                     DeltaWriter* delta_writer);
+    void clear_txn_tablet_delta_writer(int64_t transaction_id);
+    void finish_slave_tablet_pull_rowset(int64_t transaction_id, int64_t 
tablet_id, int64_t node_id,
+                                         bool is_succeed);
+
 private:
     using TxnKey = std::pair<int64_t, int64_t>; // partition_id, 
transaction_id;
 
@@ -159,6 +168,8 @@ private:
     typedef std::unordered_map<TxnKey, std::map<TabletInfo, TabletTxnInfo>, 
TxnKeyHash, TxnKeyEqual>
             txn_tablet_map_t;
     typedef std::unordered_map<int64_t, std::unordered_set<int64_t>> 
txn_partition_map_t;
+    typedef std::unordered_map<int64_t, std::map<int64_t, DeltaWriter*>>
+            txn_tablet_delta_writer_map_t;
 
     std::shared_mutex& _get_txn_map_lock(TTransactionId transactionId);
 
@@ -168,6 +179,10 @@ private:
 
     inline std::mutex& _get_txn_lock(TTransactionId transactionId);
 
+    std::shared_mutex& _get_txn_tablet_delta_writer_map_lock(TTransactionId 
transactionId);
+
+    txn_tablet_delta_writer_map_t& 
_get_txn_tablet_delta_writer_map(TTransactionId transactionId);
+
     // Insert or remove (transaction_id, partition_id) from _txn_partition_map
     // get _txn_map_lock before calling.
     void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t 
partition_id);
@@ -193,6 +208,9 @@ private:
     std::shared_mutex* _txn_map_locks;
 
     std::mutex* _txn_mutex;
+
+    txn_tablet_delta_writer_map_t* _txn_tablet_delta_writer_map;
+    std::shared_mutex* _txn_tablet_delta_writer_map_locks;
     DISALLOW_COPY_AND_ASSIGN(TxnManager);
 }; // TxnManager
 
@@ -213,4 +231,14 @@ inline std::mutex& 
TxnManager::_get_txn_lock(TTransactionId transactionId) {
     return _txn_mutex[transactionId & (_txn_shard_size - 1)];
 }
 
+inline std::shared_mutex& TxnManager::_get_txn_tablet_delta_writer_map_lock(
+        TTransactionId transactionId) {
+    return _txn_tablet_delta_writer_map_locks[transactionId & 
(_txn_map_shard_size - 1)];
+}
+
+inline TxnManager::txn_tablet_delta_writer_map_t& 
TxnManager::_get_txn_tablet_delta_writer_map(
+        TTransactionId transactionId) {
+    return _txn_tablet_delta_writer_map[transactionId & (_txn_map_shard_size - 
1)];
+}
+
 } // namespace doris
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 9647528304..565cd67ebf 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -81,9 +81,11 @@ protected:
                        Response* response) {
         bool finished = false;
         auto index_id = request.index_id();
-        RETURN_IF_ERROR(channel->close(request.sender_id(), 
request.backend_id(), &finished,
-                                       request.partition_ids(), 
response->mutable_tablet_vec(),
-                                       response->mutable_tablet_errors()));
+        RETURN_IF_ERROR(channel->close(
+                this, request.sender_id(), request.backend_id(), &finished, 
request.partition_ids(),
+                response->mutable_tablet_vec(), 
response->mutable_tablet_errors(),
+                request.slave_tablet_nodes(), 
response->mutable_success_slave_tablet_node_ids(),
+                request.write_single_replica()));
         if (finished) {
             std::lock_guard<std::mutex> l(_lock);
             _tablets_channels.erase(index_id);
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 0934214f89..a33bd21db7 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -18,11 +18,15 @@
 #include "runtime/tablets_channel.h"
 
 #include "exec/tablet_info.h"
+#include "olap/delta_writer.h"
 #include "olap/memtable.h"
+#include "olap/storage_engine.h"
+#include "runtime/load_channel.h"
 #include "runtime/row_batch.h"
 #include "runtime/thread_context.h"
 #include "runtime/tuple_row.h"
 #include "util/doris_metrics.h"
+#include "util/time.h"
 
 namespace doris {
 
@@ -80,10 +84,14 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& 
request) {
     return Status::OK();
 }
 
-Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
-                             const google::protobuf::RepeatedField<int64_t>& 
partition_ids,
-                             google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec,
-                             google::protobuf::RepeatedPtrField<PTabletError>* 
tablet_errors) {
+Status TabletsChannel::close(
+        LoadChannel* parent, int sender_id, int64_t backend_id, bool* finished,
+        const google::protobuf::RepeatedField<int64_t>& partition_ids,
+        google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
+        google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
+        const google::protobuf::Map<int64_t, PSlaveTabletNodes>& 
slave_tablet_nodes,
+        google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>* 
success_slave_tablet_node_ids,
+        const bool write_single_replica) {
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kFinished) {
         return _close_status;
@@ -105,7 +113,7 @@ Status TabletsChannel::close(int sender_id, int64_t 
backend_id, bool* finished,
         _state = kFinished;
         // All senders are closed
         // 1. close all delta writers
-        std::vector<DeltaWriter*> need_wait_writers;
+        std::set<DeltaWriter*> need_wait_writers;
         for (auto& it : _tablet_writers) {
             if (_partition_ids.count(it.second->partition_id()) > 0) {
                 auto st = it.second->close();
@@ -115,7 +123,7 @@ Status TabletsChannel::close(int sender_id, int64_t 
backend_id, bool* finished,
                     // just skip this tablet(writer) and continue to close 
others
                     continue;
                 }
-                need_wait_writers.push_back(it.second);
+                need_wait_writers.insert(it.second);
             } else {
                 auto st = it.second->cancel();
                 if (!st.ok()) {
@@ -127,11 +135,41 @@ Status TabletsChannel::close(int sender_id, int64_t 
backend_id, bool* finished,
             }
         }
 
+        _write_single_replica = write_single_replica;
+
         // 2. wait delta writers and build the tablet vector
         for (auto writer : need_wait_writers) {
+            PSlaveTabletNodes slave_nodes;
+            if (write_single_replica) {
+                slave_nodes = slave_tablet_nodes.at(writer->tablet_id());
+            }
             // close may return failed, but no need to handle it here.
             // tablet_vec will only contains success tablet, and then let FE 
judge it.
-            _close_wait(writer, tablet_vec, tablet_errors);
+            _close_wait(writer, tablet_vec, tablet_errors, slave_nodes, 
write_single_replica);
+        }
+
+        if (write_single_replica) {
+            // The operation waiting for all slave replicas to complete must 
end before the timeout,
+            // so that there is enough time to collect completed replica. 
Otherwise, the task may
+            // timeout and fail even though most of the replicas are 
completed. Here we set 0.9
+            // times the timeout as the maximum waiting time.
+            while (need_wait_writers.size() > 0 &&
+                   (time(nullptr) - parent->last_updated_time()) < 
(parent->timeout() * 0.9)) {
+                std::set<DeltaWriter*>::iterator it;
+                for (it = need_wait_writers.begin(); it != 
need_wait_writers.end();) {
+                    bool is_done = 
(*it)->check_slave_replicas_done(success_slave_tablet_node_ids);
+                    if (is_done) {
+                        need_wait_writers.erase(it++);
+                    } else {
+                        it++;
+                    }
+                }
+                std::this_thread::sleep_for(std::chrono::milliseconds(100));
+            }
+            for (auto writer : need_wait_writers) {
+                
writer->add_finished_slave_replicas(success_slave_tablet_node_ids);
+            }
+            
StorageEngine::instance()->txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
         }
     }
     return Status::OK();
@@ -139,8 +177,10 @@ Status TabletsChannel::close(int sender_id, int64_t 
backend_id, bool* finished,
 
 void TabletsChannel::_close_wait(DeltaWriter* writer,
                                  
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
-                                 
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors) {
-    Status st = writer->close_wait();
+                                 
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
+                                 PSlaveTabletNodes slave_tablet_nodes,
+                                 const bool write_single_replica) {
+    Status st = writer->close_wait(slave_tablet_nodes, write_single_replica);
     if (st.ok()) {
         if (_broken_tablets.find(writer->tablet_id()) == 
_broken_tablets.end()) {
             PTabletInfo* tablet_info = tablet_vec->Add();
@@ -266,6 +306,9 @@ Status TabletsChannel::cancel() {
         it.second->cancel();
     }
     _state = kFinished;
+    if (_write_single_replica) {
+        
StorageEngine::instance()->txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
+    }
     return Status::OK();
 }
 
@@ -280,4 +323,81 @@ std::ostream& operator<<(std::ostream& os, const 
TabletsChannelKey& key) {
     return os;
 }
 
+template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
+Status TabletsChannel::add_batch(const TabletWriterAddRequest& request,
+                                 TabletWriterAddResult* response) {
+    int64_t cur_seq = 0;
+
+    auto status = _get_current_seq(cur_seq, request);
+    if (UNLIKELY(!status.ok())) {
+        return status;
+    }
+
+    if (request.packet_seq() < cur_seq) {
+        LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq
+                  << ", recept_seq=" << request.packet_seq();
+        return Status::OK();
+    }
+
+    std::unordered_map<int64_t /* tablet_id */, std::vector<int> /* row index 
*/> tablet_to_rowidxs;
+    for (int i = 0; i < request.tablet_ids_size(); ++i) {
+        int64_t tablet_id = request.tablet_ids(i);
+        if (_broken_tablets.find(tablet_id) != _broken_tablets.end()) {
+            // skip broken tablets
+            continue;
+        }
+        auto it = tablet_to_rowidxs.find(tablet_id);
+        if (it == tablet_to_rowidxs.end()) {
+            tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int> 
{i});
+        } else {
+            it->second.emplace_back(i);
+        }
+    }
+
+    auto get_send_data = [&]() {
+        if constexpr (std::is_same_v<TabletWriterAddRequest, 
PTabletWriterAddBatchRequest>) {
+            return RowBatch(*_row_desc, request.row_batch());
+        } else {
+            return vectorized::Block(request.block());
+        }
+    };
+
+    auto send_data = get_send_data();
+    google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
+            response->mutable_tablet_errors();
+    for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
+        auto tablet_writer_it = 
_tablet_writers.find(tablet_to_rowidxs_it.first);
+        if (tablet_writer_it == _tablet_writers.end()) {
+            return Status::InternalError("unknown tablet to append data, 
tablet={}",
+                                         tablet_to_rowidxs_it.first);
+        }
+
+        Status st = tablet_writer_it->second->write(&send_data, 
tablet_to_rowidxs_it.second);
+        if (!st.ok()) {
+            auto err_msg = strings::Substitute(
+                    "tablet writer write failed, tablet_id=$0, txn_id=$1, 
err=$2",
+                    tablet_to_rowidxs_it.first, _txn_id, st.code());
+            LOG(WARNING) << err_msg;
+            PTabletError* error = tablet_errors->Add();
+            error->set_tablet_id(tablet_to_rowidxs_it.first);
+            error->set_msg(err_msg);
+            _broken_tablets.insert(tablet_to_rowidxs_it.first);
+            // continue write to other tablet.
+            // the error will return back to sender.
+        }
+    }
+
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        _next_seqs[request.sender_id()] = cur_seq + 1;
+    }
+    return Status::OK();
+}
+
+template Status
+TabletsChannel::add_batch<PTabletWriterAddBatchRequest, 
PTabletWriterAddBatchResult>(
+        PTabletWriterAddBatchRequest const&, PTabletWriterAddBatchResult*);
+template Status
+TabletsChannel::add_batch<PTabletWriterAddBlockRequest, 
PTabletWriterAddBlockResult>(
+        PTabletWriterAddBlockRequest const&, PTabletWriterAddBlockResult*);
 } // namespace doris
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 78ca9ae076..d4d7076473 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -26,7 +26,6 @@
 #include "gen_cpp/Types_types.h"
 #include "gen_cpp/internal_service.pb.h"
 #include "gutil/strings/substitute.h"
-#include "olap/delta_writer.h"
 #include "runtime/descriptors.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/thread_context.h"
@@ -56,6 +55,7 @@ std::ostream& operator<<(std::ostream& os, const 
TabletsChannelKey& key);
 
 class DeltaWriter;
 class OlapTableSchemaParam;
+class LoadChannel;
 
 // Write channel for a particular (load, index).
 class TabletsChannel {
@@ -76,10 +76,14 @@ public:
     // If all senders are closed, close this channel, set '*finished' to true, 
update 'tablet_vec'
     // to include all tablets written in this channel.
     // no-op when this channel has been closed or cancelled
-    Status close(int sender_id, int64_t backend_id, bool* finished,
-                 const google::protobuf::RepeatedField<int64_t>& partition_ids,
-                 google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
-                 google::protobuf::RepeatedPtrField<PTabletError>* 
tablet_error);
+    Status
+    close(LoadChannel* parent, int sender_id, int64_t backend_id, bool* 
finished,
+          const google::protobuf::RepeatedField<int64_t>& partition_ids,
+          google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
+          google::protobuf::RepeatedPtrField<PTabletError>* tablet_error,
+          const google::protobuf::Map<int64_t, PSlaveTabletNodes>& 
slave_tablet_nodes,
+          google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>* 
success_slave_tablet_node_ids,
+          const bool write_single_replica);
 
     // no-op when this channel has been closed or cancelled
     Status cancel();
@@ -102,7 +106,8 @@ private:
     // deal with DeltaWriter close_wait(), add tablet to list for return.
     void _close_wait(DeltaWriter* writer,
                      google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec,
-                     google::protobuf::RepeatedPtrField<PTabletError>* 
tablet_error);
+                     google::protobuf::RepeatedPtrField<PTabletError>* 
tablet_error,
+                     PSlaveTabletNodes slave_tablet_nodes, const bool 
write_single_replica);
 
     // id of this load channel
     TabletsChannelKey _key;
@@ -150,6 +155,8 @@ private:
     bool _is_high_priority = false;
 
     bool _is_vec = false;
+
+    bool _write_single_replica = false;
 };
 
 template <typename Request>
@@ -170,74 +177,4 @@ Status TabletsChannel::_get_current_seq(int64_t& cur_seq, 
const Request& request
     return Status::OK();
 }
 
-template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
-Status TabletsChannel::add_batch(const TabletWriterAddRequest& request,
-                                 TabletWriterAddResult* response) {
-    int64_t cur_seq = 0;
-
-    auto status = _get_current_seq(cur_seq, request);
-    if (UNLIKELY(!status.ok())) {
-        return status;
-    }
-
-    if (request.packet_seq() < cur_seq) {
-        LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq
-                  << ", recept_seq=" << request.packet_seq();
-        return Status::OK();
-    }
-
-    std::unordered_map<int64_t /* tablet_id */, std::vector<int> /* row index 
*/> tablet_to_rowidxs;
-    for (int i = 0; i < request.tablet_ids_size(); ++i) {
-        int64_t tablet_id = request.tablet_ids(i);
-        if (_broken_tablets.find(tablet_id) != _broken_tablets.end()) {
-            // skip broken tablets
-            continue;
-        }
-        auto it = tablet_to_rowidxs.find(tablet_id);
-        if (it == tablet_to_rowidxs.end()) {
-            tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int> 
{i});
-        } else {
-            it->second.emplace_back(i);
-        }
-    }
-
-    auto get_send_data = [&]() {
-        if constexpr (std::is_same_v<TabletWriterAddRequest, 
PTabletWriterAddBatchRequest>) {
-            return RowBatch(*_row_desc, request.row_batch());
-        } else {
-            return vectorized::Block(request.block());
-        }
-    };
-
-    auto send_data = get_send_data();
-    google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
-            response->mutable_tablet_errors();
-    for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
-        auto tablet_writer_it = 
_tablet_writers.find(tablet_to_rowidxs_it.first);
-        if (tablet_writer_it == _tablet_writers.end()) {
-            return Status::InternalError("unknown tablet to append data, 
tablet={}",
-                                         tablet_to_rowidxs_it.first);
-        }
-
-        Status st = tablet_writer_it->second->write(&send_data, 
tablet_to_rowidxs_it.second);
-        if (!st.ok()) {
-            auto err_msg = strings::Substitute(
-                    "tablet writer write failed, tablet_id=$0, txn_id=$1, 
err=$2",
-                    tablet_to_rowidxs_it.first, _txn_id, st.code());
-            LOG(WARNING) << err_msg;
-            PTabletError* error = tablet_errors->Add();
-            error->set_tablet_id(tablet_to_rowidxs_it.first);
-            error->set_msg(err_msg);
-            _broken_tablets.insert(tablet_to_rowidxs_it.first);
-            // continue write to other tablet.
-            // the error will return back to sender.
-        }
-    }
-
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        _next_seqs[request.sender_id()] = cur_seq + 1;
-    }
-    return Status::OK();
-}
 } // namespace doris
diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt
index 31588b62ab..425b0afb85 100644
--- a/be/src/service/CMakeLists.txt
+++ b/be/src/service/CMakeLists.txt
@@ -26,6 +26,7 @@ add_library(Service
     backend_service.cpp
     brpc_service.cpp
     http_service.cpp
+    single_replica_load_download_service.cpp
     internal_service.cpp
 )
 
diff --git a/be/src/service/brpc_service.cpp b/be/src/service/brpc_service.cpp
index 0cd79d4fbe..49249213ec 100644
--- a/be/src/service/brpc_service.cpp
+++ b/be/src/service/brpc_service.cpp
@@ -40,13 +40,13 @@ BRpcService::BRpcService(ExecEnv* exec_env) : 
_exec_env(exec_env), _server(new b
 
 BRpcService::~BRpcService() {}
 
-Status BRpcService::start(int port) {
+Status BRpcService::start(int port, int num_threads) {
     // Add service
     _server->AddService(new PInternalServiceImpl(_exec_env), 
brpc::SERVER_OWNS_SERVICE);
     // start service
     brpc::ServerOptions options;
-    if (config::brpc_num_threads != -1) {
-        options.num_threads = config::brpc_num_threads;
+    if (num_threads != -1) {
+        options.num_threads = num_threads;
     }
 
     if (_server->Start(port, &options) != 0) {
diff --git a/be/src/service/brpc_service.h b/be/src/service/brpc_service.h
index ec519cf71a..ea602169b4 100644
--- a/be/src/service/brpc_service.h
+++ b/be/src/service/brpc_service.h
@@ -35,7 +35,7 @@ public:
     BRpcService(ExecEnv* exec_env);
     ~BRpcService();
 
-    Status start(int port);
+    Status start(int port, int num_threads);
     void join();
 
 private:
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index a87c17eef7..052e94da83 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -56,6 +56,7 @@
 #include "service/backend_service.h"
 #include "service/brpc_service.h"
 #include "service/http_service.h"
+#include "service/single_replica_load_download_service.h"
 #include "util/debug_util.h"
 #include "util/doris_metrics.h"
 #include "util/logging.h"
@@ -415,13 +416,25 @@ int main(int argc, char** argv) {
 
     // 2. bprc service
     doris::BRpcService brpc_service(exec_env);
-    status = brpc_service.start(doris::config::brpc_port);
+    status = brpc_service.start(doris::config::brpc_port, 
doris::config::brpc_num_threads);
     if (!status.ok()) {
         LOG(ERROR) << "BRPC service did not start correctly, exiting";
         doris::shutdown_logging();
         exit(1);
     }
 
+    doris::BRpcService single_replica_load_brpc_service(exec_env);
+    if (doris::config::enable_single_replica_load) {
+        status = single_replica_load_brpc_service.start(
+                doris::config::single_replica_load_brpc_port,
+                doris::config::single_replica_load_brpc_num_threads);
+        if (!status.ok()) {
+            LOG(ERROR) << "single replica load BRPC service did not start 
correctly, exiting";
+            doris::shutdown_logging();
+            exit(1);
+        }
+    }
+
     // 3. http service
     doris::HttpService http_service(exec_env, doris::config::webserver_port,
                                     doris::config::webserver_num_workers);
@@ -432,6 +445,18 @@ int main(int argc, char** argv) {
         exit(1);
     }
 
+    doris::SingleReplicaLoadDownloadService download_service(
+            exec_env, doris::config::single_replica_load_download_port,
+            doris::config::single_replica_load_download_num_workers);
+    if (doris::config::enable_single_replica_load) {
+        status = download_service.start();
+        if (!status.ok()) {
+            LOG(ERROR) << "Doris Be download service did not start correctly, 
exiting";
+            doris::shutdown_logging();
+            exit(1);
+        }
+    }
+
     // 4. heart beat server
     doris::TMasterInfo* master_info = exec_env->master_info();
     doris::ThriftServer* heartbeat_thrift_server;
@@ -481,6 +506,10 @@ int main(int argc, char** argv) {
 
     http_service.stop();
     brpc_service.join();
+    if (doris::config::enable_single_replica_load) {
+        download_service.stop();
+        single_replica_load_brpc_service.join();
+    }
     daemon.stop();
     heartbeat_thrift_server->stop();
     heartbeat_thrift_server->join();
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 15eb95b4b3..02a52ce0e5 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -17,9 +17,15 @@
 
 #include "service/internal_service.h"
 
+#include <string>
+
 #include "common/config.h"
 #include "gen_cpp/BackendService.h"
 #include "gen_cpp/internal_service.pb.h"
+#include "http/http_client.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet.h"
 #include "runtime/buffer_control_block.h"
 #include "runtime/data_stream_mgr.h"
 #include "runtime/exec_env.h"
@@ -35,6 +41,7 @@
 #include "util/brpc_client_cache.h"
 #include "util/md5.h"
 #include "util/proto_util.h"
+#include "util/ref_count_closure.h"
 #include "util/string_util.h"
 #include "util/telemetry/brpc_carrier.h"
 #include "util/telemetry/telemetry.h"
@@ -44,6 +51,8 @@
 
 namespace doris {
 
+const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;
+
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size, 
MetricUnit::NOUNIT);
 
 bthread_key_t btls_key;
@@ -75,7 +84,9 @@ private:
 };
 
 PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
-        : _exec_env(exec_env), 
_tablet_worker_pool(config::number_tablet_writer_threads, 10240) {
+        : _exec_env(exec_env),
+          _tablet_worker_pool(config::number_tablet_writer_threads, 10240),
+          
_slave_replica_worker_pool(config::number_slave_replica_download_threads, 
10240) {
     REGISTER_HOOK_METRIC(add_batch_task_queue_size,
                          [this]() { return 
_tablet_worker_pool.get_queue_size(); });
     CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter));
@@ -758,4 +769,227 @@ void 
PInternalServiceImpl::hand_shake(google::protobuf::RpcController* cntl_base
     response->mutable_status()->set_status_code(0);
 }
 
+void PInternalServiceImpl::request_slave_tablet_pull_rowset(
+        google::protobuf::RpcController* controller, const 
PTabletWriteSlaveRequest* request,
+        PTabletWriteSlaveResult* response, google::protobuf::Closure* done) {
+    brpc::ClosureGuard closure_guard(done);
+    RowsetMetaPB rowset_meta_pb = request->rowset_meta();
+    std::string rowset_path = request->rowset_path();
+    google::protobuf::Map<int64, int64> segments_size = 
request->segments_size();
+    std::string host = request->host();
+    int64_t http_port = request->http_port();
+    int64_t brpc_port = request->brpc_port();
+    std::string token = request->token();
+    int64_t node_id = request->node_id();
+    _slave_replica_worker_pool.offer([=]() {
+        TabletSharedPtr tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
+                rowset_meta_pb.tablet_id(), 
rowset_meta_pb.tablet_schema_hash());
+        if (tablet == nullptr) {
+            LOG(WARNING) << "failed to pull rowset for slave replica. tablet ["
+                         << rowset_meta_pb.tablet_id()
+                         << "] is not exist. txn_id=" << 
rowset_meta_pb.txn_id();
+            _response_pull_slave_rowset(host, brpc_port, 
rowset_meta_pb.txn_id(),
+                                        rowset_meta_pb.tablet_id(), node_id, 
false);
+            return;
+        }
+
+        RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
+        std::string rowset_meta_str;
+        bool ret = rowset_meta_pb.SerializeToString(&rowset_meta_str);
+        if (!ret) {
+            LOG(WARNING) << "failed to pull rowset for slave replica. 
serialize rowset meta "
+                            "failed. rowset_id="
+                         << rowset_meta_pb.rowset_id()
+                         << ", tablet_id=" << rowset_meta_pb.tablet_id()
+                         << ", txn_id=" << rowset_meta_pb.txn_id();
+            _response_pull_slave_rowset(host, brpc_port, 
rowset_meta_pb.txn_id(),
+                                        rowset_meta_pb.tablet_id(), node_id, 
false);
+            return;
+        }
+        bool parsed = rowset_meta->init(rowset_meta_str);
+        if (!parsed) {
+            LOG(WARNING) << "failed to pull rowset for slave replica. parse 
rowset meta string "
+                            "failed. rowset_id="
+                         << rowset_meta_pb.rowset_id()
+                         << ", tablet_id=" << rowset_meta_pb.tablet_id()
+                         << ", txn_id=" << rowset_meta_pb.txn_id();
+            // return false will break meta iterator, return true to skip this 
error
+            _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
+                                        rowset_meta->tablet_id(), node_id, 
false);
+            return;
+        }
+        RowsetId remote_rowset_id = rowset_meta->rowset_id();
+        // change rowset id because it maybe same as other local rowset
+        RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
+        rowset_meta->set_rowset_id(new_rowset_id);
+        rowset_meta->set_tablet_uid(tablet->tablet_uid());
+        VLOG_CRITICAL << "succeed to init rowset meta for slave replica. 
rowset_id="
+                      << rowset_meta->rowset_id() << ", tablet_id=" << 
rowset_meta->tablet_id()
+                      << ", txn_id=" << rowset_meta->txn_id();
+
+        for (auto& segment : segments_size) {
+            uint64_t file_size = segment.second;
+            uint64_t estimate_timeout = file_size / 
config::download_low_speed_limit_kbps / 1024;
+            if (estimate_timeout < config::download_low_speed_time) {
+                estimate_timeout = config::download_low_speed_time;
+            }
+
+            std::stringstream ss;
+            ss << "http://"; << host << ":" << http_port << 
"/api/_tablet/_download?token=" << token
+               << "&file=" << rowset_path << "/" << remote_rowset_id << "_" << 
segment.first
+               << ".dat";
+            std::string remote_file_url = ss.str();
+            ss.str("");
+            ss << tablet->tablet_path() << "/" << rowset_meta->rowset_id() << 
"_" << segment.first
+               << ".dat";
+            std::string local_file_path = ss.str();
+
+            auto download_cb = [remote_file_url, estimate_timeout, 
local_file_path,
+                                file_size](HttpClient* client) {
+                RETURN_IF_ERROR(client->init(remote_file_url));
+                client->set_timeout_ms(estimate_timeout * 1000);
+                RETURN_IF_ERROR(client->download(local_file_path));
+
+                // Check file length
+                uint64_t local_file_size = 
std::filesystem::file_size(local_file_path);
+                if (local_file_size != file_size) {
+                    LOG(WARNING)
+                            << "failed to pull rowset for slave replica. 
download file length error"
+                            << ", remote_path=" << remote_file_url << ", 
file_size=" << file_size
+                            << ", local_file_size=" << local_file_size;
+                    return Status::InternalError("downloaded file size is not 
equal");
+                }
+                chmod(local_file_path.c_str(), S_IRUSR | S_IWUSR);
+                return Status::OK();
+            };
+            auto st = HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 
1, download_cb);
+            if (!st.ok()) {
+                LOG(WARNING)
+                        << "failed to pull rowset for slave replica. failed to 
download file. url="
+                        << remote_file_url << ", local_path=" << 
local_file_path
+                        << ", txn_id=" << rowset_meta->txn_id();
+                _response_pull_slave_rowset(host, brpc_port, 
rowset_meta->txn_id(),
+                                            rowset_meta->tablet_id(), node_id, 
false);
+                return;
+            }
+            VLOG_CRITICAL << "succeed to download file for slave replica. 
url=" << remote_file_url
+                          << ", local_path=" << local_file_path
+                          << ", txn_id=" << rowset_meta->txn_id();
+        }
+
+        RowsetSharedPtr rowset;
+        Status create_status = RowsetFactory::create_rowset(
+                tablet->tablet_schema(), tablet->tablet_path(), rowset_meta, 
&rowset);
+        if (!create_status) {
+            LOG(WARNING) << "failed to create rowset from rowset meta for 
slave replica"
+                         << ". rowset_id: " << rowset_meta->rowset_id()
+                         << ", rowset_type: " << rowset_meta->rowset_type()
+                         << ", rowset_state: " << rowset_meta->rowset_state()
+                         << ", tablet_id=" << rowset_meta->tablet_id()
+                         << ", txn_id=" << rowset_meta->txn_id();
+            _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
+                                        rowset_meta->tablet_id(), node_id, 
false);
+            return;
+        }
+        if (rowset_meta->rowset_state() != RowsetStatePB::COMMITTED) {
+            LOG(WARNING) << "could not commit txn for slave replica because 
master rowset state is "
+                            "not committed, rowset_state="
+                         << rowset_meta->rowset_state()
+                         << ", tablet_id=" << rowset_meta->tablet_id()
+                         << ", txn_id=" << rowset_meta->txn_id();
+            _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
+                                        rowset_meta->tablet_id(), node_id, 
false);
+            return;
+        }
+        Status commit_txn_status = 
StorageEngine::instance()->txn_manager()->commit_txn(
+                tablet->data_dir()->get_meta(), rowset_meta->partition_id(), 
rowset_meta->txn_id(),
+                rowset_meta->tablet_id(), rowset_meta->tablet_schema_hash(), 
tablet->tablet_uid(),
+                rowset_meta->load_id(), rowset, true);
+        if (!commit_txn_status &&
+            commit_txn_status !=
+                    
Status::OLAPInternalError(OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST)) {
+            LOG(WARNING) << "failed to add committed rowset for slave replica. 
rowset_id="
+                         << rowset_meta->rowset_id() << ", tablet_id=" << 
rowset_meta->tablet_id()
+                         << ", txn_id=" << rowset_meta->txn_id();
+            _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
+                                        rowset_meta->tablet_id(), node_id, 
false);
+            return;
+        }
+        VLOG_CRITICAL << "succeed to pull rowset for slave replica. 
successfully to add committed "
+                         "rowset: "
+                      << rowset_meta->rowset_id()
+                      << " to tablet, tablet_id=" << rowset_meta->tablet_id()
+                      << ", schema_hash=" << rowset_meta->tablet_schema_hash()
+                      << ", txn_id=" << rowset_meta->txn_id();
+        _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
+                                    rowset_meta->tablet_id(), node_id, true);
+    });
+    Status::OK().to_protobuf(response->mutable_status());
+}
+
+void PInternalServiceImpl::_response_pull_slave_rowset(const std::string& 
remote_host,
+                                                       int64_t brpc_port, 
int64_t txn_id,
+                                                       int64_t tablet_id, 
int64_t node_id,
+                                                       bool is_succeed) {
+    std::shared_ptr<PBackendService_Stub> stub =
+            
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(remote_host,
+                                                                             
brpc_port);
+    if (stub == nullptr) {
+        LOG(WARNING) << "failed to response result of slave replica to master 
replica. get rpc "
+                        "stub failed, master host="
+                     << remote_host << ", port=" << brpc_port << ", 
tablet_id=" << tablet_id
+                     << ", txn_id=" << txn_id;
+        return;
+    }
+
+    PTabletWriteSlaveDoneRequest request;
+    request.set_txn_id(txn_id);
+    request.set_tablet_id(tablet_id);
+    request.set_node_id(node_id);
+    request.set_is_succeed(is_succeed);
+    RefCountClosure<PTabletWriteSlaveDoneResult>* closure =
+            new RefCountClosure<PTabletWriteSlaveDoneResult>();
+    closure->ref();
+    closure->ref();
+    closure->cntl.set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec 
* 1000);
+    closure->cntl.ignore_eovercrowded();
+    stub->response_slave_tablet_pull_rowset(&closure->cntl, &request, 
&closure->result, closure);
+
+    closure->join();
+    if (closure->cntl.Failed()) {
+        if 
(!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(stub, 
remote_host,
+                                                                             
brpc_port)) {
+            ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
+                    closure->cntl.remote_side());
+        }
+        LOG(WARNING) << "failed to response result of slave replica to master 
replica, error="
+                     << berror(closure->cntl.ErrorCode())
+                     << ", error_text=" << closure->cntl.ErrorText()
+                     << ", master host: " << remote_host << ", tablet_id=" << 
tablet_id
+                     << ", txn_id=" << txn_id;
+    }
+
+    if (closure->unref()) {
+        delete closure;
+    }
+    closure = nullptr;
+    VLOG_CRITICAL << "succeed to response the result of slave replica pull 
rowset to master "
+                     "replica. master host: "
+                  << remote_host << ". is_succeed=" << is_succeed << ", 
tablet_id=" << tablet_id
+                  << ", slave server=" << node_id << ", txn_id=" << txn_id;
+}
+
+void PInternalServiceImpl::response_slave_tablet_pull_rowset(
+        google::protobuf::RpcController* controller, const 
PTabletWriteSlaveDoneRequest* request,
+        PTabletWriteSlaveDoneResult* response, google::protobuf::Closure* 
done) {
+    brpc::ClosureGuard closure_guard(done);
+    VLOG_CRITICAL
+            << "receive the result of slave replica pull rowset from slave 
replica. slave server="
+            << request->node_id() << ", is_succeed=" << request->is_succeed()
+            << ", tablet_id=" << request->tablet_id() << ", txn_id=" << 
request->txn_id();
+    StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset(
+            request->txn_id(), request->tablet_id(), request->node_id(), 
request->is_succeed());
+    Status::OK().to_protobuf(response->mutable_status());
+}
+
 } // namespace doris
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index b23ba3c70a..f07d5c89f3 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -148,6 +148,14 @@ public:
                            google::protobuf::Closure* done) override;
     void hand_shake(google::protobuf::RpcController* controller, const 
PHandShakeRequest* request,
                     PHandShakeResponse* response, google::protobuf::Closure* 
done) override;
+    void request_slave_tablet_pull_rowset(google::protobuf::RpcController* 
controller,
+                                          const PTabletWriteSlaveRequest* 
request,
+                                          PTabletWriteSlaveResult* response,
+                                          google::protobuf::Closure* done) 
override;
+    void response_slave_tablet_pull_rowset(google::protobuf::RpcController* 
controller,
+                                           const PTabletWriteSlaveDoneRequest* 
request,
+                                           PTabletWriteSlaveDoneResult* 
response,
+                                           google::protobuf::Closure* done) 
override;
 
 private:
     Status _exec_plan_fragment(const std::string& s_request, 
PFragmentRequestVersion version,
@@ -175,9 +183,14 @@ private:
                                   PTabletWriterAddBlockResult* response,
                                   google::protobuf::Closure* done);
 
+    void _response_pull_slave_rowset(const std::string& remote_host, int64_t 
brpc_port,
+                                     int64_t txn_id, int64_t tablet_id, 
int64_t node_id,
+                                     bool is_succeed);
+
 private:
     ExecEnv* _exec_env;
     PriorityThreadPool _tablet_worker_pool;
+    PriorityThreadPool _slave_replica_worker_pool;
 };
 
 } // namespace doris
diff --git a/be/src/service/single_replica_load_download_service.cpp 
b/be/src/service/single_replica_load_download_service.cpp
new file mode 100644
index 0000000000..c557ce5b8d
--- /dev/null
+++ b/be/src/service/single_replica_load_download_service.cpp
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "service/single_replica_load_download_service.h"
+
+#include "http/action/download_action.h"
+#include "http/ev_http_server.h"
+#include "runtime/exec_env.h"
+
+namespace doris {
+
+SingleReplicaLoadDownloadService::SingleReplicaLoadDownloadService(ExecEnv* 
env, int port,
+                                                                   int 
num_threads)
+        : _env(env), _ev_http_server(new EvHttpServer(port, num_threads)) {}
+
+SingleReplicaLoadDownloadService::~SingleReplicaLoadDownloadService() {}
+
+Status SingleReplicaLoadDownloadService::start() {
+    // register download action
+    std::vector<std::string> allow_paths;
+    for (auto& path : _env->store_paths()) {
+        if (FilePathDesc::is_remote(path.storage_medium)) {
+            continue;
+        }
+        allow_paths.emplace_back(path.path);
+    }
+    DownloadAction* tablet_download_action = _pool.add(new 
DownloadAction(_env, allow_paths));
+    _ev_http_server->register_handler(HttpMethod::HEAD, 
"/api/_tablet/_download",
+                                      tablet_download_action);
+    _ev_http_server->register_handler(HttpMethod::GET, 
"/api/_tablet/_download",
+                                      tablet_download_action);
+
+    _ev_http_server->start();
+    return Status::OK();
+}
+
+void SingleReplicaLoadDownloadService::stop() {
+    _ev_http_server->stop();
+    _pool.clear();
+}
+
+} // namespace doris
diff --git a/be/src/service/brpc_service.h 
b/be/src/service/single_replica_load_download_service.h
similarity index 71%
copy from be/src/service/brpc_service.h
copy to be/src/service/single_replica_load_download_service.h
index ec519cf71a..ff1537adc0 100644
--- a/be/src/service/brpc_service.h
+++ b/be/src/service/single_replica_load_download_service.h
@@ -19,28 +19,28 @@
 
 #include <memory>
 
+#include "common/object_pool.h"
 #include "common/status.h"
 
-namespace brpc {
-class Server;
-}
-
 namespace doris {
 
 class ExecEnv;
+class EvHttpServer;
 
-// Class enclose brpc service
-class BRpcService {
+// HTTP service for Doris BE
+class SingleReplicaLoadDownloadService {
 public:
-    BRpcService(ExecEnv* exec_env);
-    ~BRpcService();
+    SingleReplicaLoadDownloadService(ExecEnv* env, int port, int num_threads);
+    ~SingleReplicaLoadDownloadService();
 
-    Status start(int port);
-    void join();
+    Status start();
+    void stop();
 
 private:
-    ExecEnv* _exec_env;
-    std::unique_ptr<brpc::Server> _server;
+    ExecEnv* _env;
+    ObjectPool _pool;
+
+    std::unique_ptr<EvHttpServer> _ev_http_server;
 };
 
 } // namespace doris
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 68abea3856..7052f73583 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -123,6 +123,26 @@ Status VNodeChannel::open_wait() {
                     commit_info.tabletId = tablet.tablet_id();
                     commit_info.backendId = _node_id;
                     _tablet_commit_infos.emplace_back(std::move(commit_info));
+                    VLOG_CRITICAL << "master replica commit info: tabletId=" 
<< tablet.tablet_id()
+                                  << ", backendId=" << _node_id
+                                  << ", master node id: " << this->node_id()
+                                  << ", host: " << this->host() << ", txn_id=" 
<< _parent->_txn_id;
+                }
+                if (_parent->_write_single_replica) {
+                    for (auto& tablet_slave_node_ids : 
result.success_slave_tablet_node_ids()) {
+                        for (auto slave_node_id : 
tablet_slave_node_ids.second.slave_node_ids()) {
+                            TTabletCommitInfo commit_info;
+                            commit_info.tabletId = tablet_slave_node_ids.first;
+                            commit_info.backendId = slave_node_id;
+                            
_tablet_commit_infos.emplace_back(std::move(commit_info));
+                            VLOG_CRITICAL << "slave replica commit info: 
tabletId="
+                                          << tablet_slave_node_ids.first
+                                          << ", backendId=" << slave_node_id
+                                          << ", master node id: " << 
this->node_id()
+                                          << ", host: " << this->host()
+                                          << ", txn_id=" << _parent->_txn_id;
+                        }
+                    }
                 }
                 _add_batches_finished = true;
             }
@@ -276,6 +296,28 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
             request.add_partition_ids(pid);
         }
 
+        request.set_write_single_replica(false);
+        if (_parent->_write_single_replica) {
+            request.set_write_single_replica(true);
+            for (std::unordered_map<int64_t, std::vector<int64_t>>::iterator 
iter =
+                         _slave_tablet_nodes.begin();
+                 iter != _slave_tablet_nodes.end(); iter++) {
+                PSlaveTabletNodes slave_tablet_nodes;
+                for (auto node_id : iter->second) {
+                    auto node = _parent->_nodes_info->find_node(node_id);
+                    if (node == nullptr) {
+                        return;
+                    }
+                    PNodeInfo* pnode = slave_tablet_nodes.add_slave_nodes();
+                    pnode->set_id(node->id);
+                    pnode->set_option(node->option);
+                    pnode->set_host(node->host);
+                    pnode->set_async_internal_port(node->brpc_port);
+                }
+                request.mutable_slave_tablet_nodes()->insert({iter->first, 
slave_tablet_nodes});
+            }
+        }
+
         // eos request must be the last request
         _add_block_closure->end_mark();
         _send_finished = true;
diff --git a/be/test/olap/delta_writer_test.cpp 
b/be/test/olap/delta_writer_test.cpp
index eae41cb320..08a3aacc93 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -25,6 +25,7 @@
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PaloInternalService_types.h"
 #include "gen_cpp/Types_types.h"
+#include "gen_cpp/internal_service.pb.h"
 #include "olap/field.h"
 #include "olap/options.h"
 #include "olap/storage_engine.h"
@@ -393,7 +394,7 @@ TEST_F(TestDeltaWriter, open) {
     EXPECT_NE(delta_writer, nullptr);
     res = delta_writer->close();
     EXPECT_EQ(Status::OK(), res);
-    res = delta_writer->close_wait();
+    res = delta_writer->close_wait(PSlaveTabletNodes(), false);
     EXPECT_EQ(Status::OK(), res);
     SAFE_DELETE(delta_writer);
 
@@ -402,7 +403,7 @@ TEST_F(TestDeltaWriter, open) {
     EXPECT_NE(delta_writer, nullptr);
     res = delta_writer->close();
     EXPECT_EQ(Status::OK(), res);
-    res = delta_writer->close_wait();
+    res = delta_writer->close_wait(PSlaveTabletNodes(), false);
     EXPECT_EQ(Status::OK(), res);
     SAFE_DELETE(delta_writer);
 
@@ -504,7 +505,7 @@ TEST_F(TestDeltaWriter, write) {
 
     res = delta_writer->close();
     EXPECT_EQ(Status::OK(), res);
-    res = delta_writer->close_wait();
+    res = delta_writer->close_wait(PSlaveTabletNodes(), false);
     EXPECT_EQ(Status::OK(), res);
 
     // publish version success
@@ -643,7 +644,7 @@ TEST_F(TestDeltaWriter, vec_write) {
 
     res = delta_writer->close();
     ASSERT_TRUE(res.ok());
-    res = delta_writer->close_wait();
+    res = delta_writer->close_wait(PSlaveTabletNodes(), false);
     ASSERT_TRUE(res.ok());
 
     // publish version success
@@ -717,7 +718,7 @@ TEST_F(TestDeltaWriter, sequence_col) {
 
     res = delta_writer->close();
     EXPECT_EQ(Status::OK(), res);
-    res = delta_writer->close_wait();
+    res = delta_writer->close_wait(PSlaveTabletNodes(), false);
     EXPECT_EQ(Status::OK(), res);
 
     // publish version success
@@ -803,7 +804,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
 
     res = delta_writer->close();
     ASSERT_TRUE(res.ok());
-    res = delta_writer->close_wait();
+    res = delta_writer->close_wait(PSlaveTabletNodes(), false);
     ASSERT_TRUE(res.ok());
 
     // publish version success
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp 
b/be/test/olap/engine_storage_migration_task_test.cpp
index d1785b2640..c6edb83360 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -25,6 +25,7 @@
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PaloInternalService_types.h"
 #include "gen_cpp/Types_types.h"
+#include "gen_cpp/internal_service.pb.h"
 #include "olap/delta_writer.h"
 #include "olap/field.h"
 #include "olap/options.h"
@@ -191,7 +192,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) 
{
 
     res = delta_writer->close();
     EXPECT_EQ(Status::OK(), res);
-    res = delta_writer->close_wait();
+    res = delta_writer->close_wait(PSlaveTabletNodes(), false);
     EXPECT_EQ(Status::OK(), res);
 
     // publish version success
diff --git a/be/test/olap/remote_rowset_gc_test.cpp 
b/be/test/olap/remote_rowset_gc_test.cpp
index c02b15c1c4..e1ae7f4995 100644
--- a/be/test/olap/remote_rowset_gc_test.cpp
+++ b/be/test/olap/remote_rowset_gc_test.cpp
@@ -21,6 +21,7 @@
 
 #include "common/config.h"
 #include "common/status.h"
+#include "gen_cpp/internal_service.pb.h"
 #include "io/fs/file_system_map.h"
 #include "io/fs/s3_file_system.h"
 #include "olap/delta_writer.h"
@@ -178,7 +179,7 @@ TEST_F(RemoteRowsetGcTest, normal) {
 
     st = delta_writer->close();
     ASSERT_EQ(Status::OK(), st);
-    st = delta_writer->close_wait();
+    st = delta_writer->close_wait(PSlaveTabletNodes(), false);
     ASSERT_EQ(Status::OK(), st);
 
     // publish version success
diff --git a/be/test/olap/tablet_cooldown_test.cpp 
b/be/test/olap/tablet_cooldown_test.cpp
index eae932e6f0..761ffa6893 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -21,6 +21,7 @@
 
 #include "common/config.h"
 #include "common/status.h"
+#include "gen_cpp/internal_service.pb.h"
 #include "io/fs/file_system_map.h"
 #include "io/fs/s3_file_system.h"
 #include "olap/delta_writer.h"
@@ -177,7 +178,7 @@ TEST_F(TabletCooldownTest, normal) {
 
     st = delta_writer->close();
     ASSERT_EQ(Status::OK(), st);
-    st = delta_writer->close_wait();
+    st = delta_writer->close_wait(PSlaveTabletNodes(), false);
     ASSERT_EQ(Status::OK(), st);
 
     // publish version success
diff --git a/docs/en/docs/admin-manual/config/be-config.md 
b/docs/en/docs/admin-manual/config/be-config.md
index d7b654204f..038f8e88d1 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -925,6 +925,12 @@ Default: 0
 
 The maximum number of threads per disk is also the maximum queue depth of each 
disk
 
+### `number_slave_replica_download_threads`
+
+Default: 64
+
+Number of threads for slave replica synchronize data, used for single replica 
load.
+
 ### `number_tablet_writer_threads`
 
 Default: 16
@@ -1100,6 +1106,36 @@ This configuration is used for the context gc thread 
scheduling cycle. Note: The
 * Type: int32
 * Description: The queue length of the SendBatch thread pool. In NodeChannels' 
sending data tasks,  the SendBatch operation of each NodeChannel will be 
submitted as a thread task to the thread pool waiting to be scheduled, and 
after the number of submitted tasks exceeds the length of the thread pool 
queue, subsequent submitted tasks will be blocked until there is a empty slot 
in the queue.
 
+### `single_replica_load_brpc_port`
+
+* Type: int32
+* Description: The port of BRPC on BE, used for single replica load. There is 
a independent BRPC thread pool for the communication between the Master replica 
and Slave replica during single replica load, which prevents data 
synchronization between the replicas from preempt the thread resources for data 
distribution and query tasks when the load concurrency is large.
+* Default value: 9070
+
+### `single_replica_load_brpc_num_threads`
+
+* Type: int32
+* Description: This configuration is mainly used to modify the number of 
bthreads for single replica load brpc. When the load concurrency increases, you 
can adjust this parameter to ensure that the Slave replica synchronizes data 
files from the Master replica timely.
+* Default value: 64
+
+### `single_replica_load_download_port`
+
+* Type: int32
+* Description: The port of http for segment download on BE, used for single 
replica load. There is a independent HTTP thread pool for the Slave replica to 
download segments during single replica load, which prevents data 
synchronization between the replicas from preempt the thread resources for 
other http tasks when the load concurrency is large.
+* Default value: 8050
+
+### `single_replica_load_download_num_workers`
+
+* Type: int32
+* Description: This configuration is mainly used to modify the number of http 
threads for segment download, used for single replica load. When the load 
concurrency increases, you can adjust this parameter to ensure that the Slave 
replica synchronizes data files from the Master replica timely.
+* Default value: 64
+
+### `slave_replica_writer_rpc_timeout_sec`
+
+* Type: int32
+* Description: This configuration is mainly used to modify timeout of brpc 
between master replica and slave replica, used for single replica load.
+* Default value: 60
+
 ### `sleep_one_second`
 
 + Type: int32
diff --git a/docs/en/docs/admin-manual/config/fe-config.md 
b/docs/en/docs/admin-manual/config/fe-config.md
index 13c6e8374f..81809a9e4f 100644
--- a/docs/en/docs/admin-manual/config/fe-config.md
+++ b/docs/en/docs/admin-manual/config/fe-config.md
@@ -1321,6 +1321,36 @@ MasterOnly:true
 
 fetch stream load record interval.
 
+### `enable_single_replica_stream_load`
+
+Default:false
+
+IsMutable:true
+
+MasterOnly:true
+
+Whether to enable the function of single replica load for stream load.
+
+### `enable_single_replica_broker_load`
+
+Default:false
+
+IsMutable:true
+
+MasterOnly:true
+
+Whether to enable the function of single replica load for broker load.
+
+### `enable_single_replica_insert`
+
+Default:false
+
+IsMutable:true
+
+MasterOnly:true
+
+Whether to enable the function of single replica load for insert.
+
 ### desired_max_waiting_jobs
 
 Default:100
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md 
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index a3680f2439..5f9093f27a 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -926,6 +926,12 @@ BE进程的文件句柄limit要求的下限
 
 每个磁盘的最大线程数也是每个磁盘的最大队列深度
 
+### `number_slave_replica_download_threads`
+
+默认值: 64
+
+每个BE节点上slave副本同步Master副本数据的线程数,用于单副本数据导入功能。
+
 ### `number_tablet_writer_threads`
 
 默认值:16
@@ -1108,6 +1114,36 @@ routine load任务的线程池大小。 这应该大于 FE 配置 'max_concurren
 
 BE之间rpc通信是否序列化RowBatch,用于查询层之间的数据传输
 
+### `single_replica_load_brpc_port`
+
+* 类型: int32
+* 描述: 
单副本数据导入功能中,Master副本和Slave副本之间通信的RPC端口。Master副本flush完成之后通过RPC通知Slave副本同步数据,以及Slave副本同步数据完成后通过RPC通知Master副本。系统为单副本数据导入过程中Master副本和Slave副本之间通信开辟了独立的BRPC线程池,以避免导入并发较大时副本之间的数据同步抢占导入数据分发和查询任务的线程资源。
+* 默认值: 9070
+
+### `single_replica_load_brpc_num_threads`
+
+* 类型: int32
+* 描述: 
单副本数据导入功能中,Master副本和Slave副本之间通信的线程数量。导入并发增大时,可以适当调大该参数来保证Slave副本及时同步Master副本数据。
+* 默认值: 64
+
+### `single_replica_load_download_port`
+
+* 类型: int32
+* 描述: 
单副本数据导入功能中,Slave副本通过HTTP从Master副本下载数据文件的端口。系统为单副本数据导入过程中Slave副本从Master副本下载数据文件开辟了独立的HTTP线程池,以避免导入并发较大时Slave副本下载数据文件抢占其他http任务的线程资源。
+* 默认值: 8050
+
+### `single_replica_load_download_num_workers`
+
+* 类型: int32
+* 描述: 
单副本数据导入功能中,Slave副本通过HTTP从Master副本下载数据文件的线程数。导入并发增大时,可以适当调大该参数来保证Slave副本及时同步Master副本数据。
+* 默认值: 64
+
+### `slave_replica_writer_rpc_timeout_sec`
+
+* 类型: int32
+* 描述: 单副本数据导入功能中,Master副本和Slave副本之间通信的RPC超时时间。
+* 默认值: 60
+
 ### `sleep_one_second`
 + 类型:int32
 + 描述:全局变量,用于BE线程休眠1秒,不应该被修改
diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md 
b/docs/zh-CN/docs/admin-manual/config/fe-config.md
index bd09d69e53..8fb306c591 100644
--- a/docs/zh-CN/docs/admin-manual/config/fe-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md
@@ -1329,6 +1329,36 @@ BE副本数的平衡阈值。
 
 获取 stream load 记录间隔
 
+### `enable_single_replica_stream_load`
+
+默认值:false
+
+是否可以动态配置:true
+
+是否为 Master FE 节点独有的配置项:true
+
+是否启动 stream load 的单副本数据导入功能。
+
+### `enable_single_replica_broker_load`
+
+默认值:false
+
+是否可以动态配置:true
+
+是否为 Master FE 节点独有的配置项:true
+
+是否启动 broker load 的单副本数据导入功能。
+
+### `enable_single_replica_insert`
+
+默认值:false
+
+是否可以动态配置:true
+
+是否为 Master FE 节点独有的配置项:true
+
+是否启动 insert 的单副本数据写入功能。
+
 ### `desired_max_waiting_jobs`
 
 默认值:100
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
index 5fad203599..bb0eee0c6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -706,7 +706,8 @@ public class InsertStmt extends DdlStmt {
             return dataSink;
         }
         if (targetTable instanceof OlapTable) {
-            dataSink = new OlapTableSink((OlapTable) targetTable, olapTuple, 
targetPartitionIds);
+            dataSink = new OlapTableSink((OlapTable) targetTable, olapTuple, 
targetPartitionIds,
+                    
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
             dataPartition = dataSink.getOutputPartition();
         } else if (targetTable instanceof BrokerTable) {
             BrokerTable table = (BrokerTable) targetTable;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index db65ad2f19..bbf255247f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -726,6 +726,12 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static boolean disable_show_stream_load = false;
 
+    /**
+     * Whether to enable to write single replica for stream load and broker 
load.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static boolean enable_single_replica_load = false;
+
     /**
      * maximum concurrent running txn num including prepare, commit txns under 
a single db
      * txn manager will reject coming txns
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 3110f39284..f78476f583 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -131,7 +131,8 @@ public class LoadingTaskPlanner {
 
         // 2. Olap table sink
         List<Long> partitionIds = getAllPartitionIds();
-        OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, 
partitionIds);
+        OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, 
partitionIds,
+                Config.enable_single_replica_load);
         olapTableSink.init(loadId, txnId, dbId, timeoutS, 
sendBatchParallelism, false);
         olapTableSink.complete();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
index 51a28779dd..a0f187277c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
@@ -92,7 +92,7 @@ public class UpdatePlanner extends OriginalPlanner {
         }
         scanNodeList.add(olapScanNode);
         // 2. gen olap table sink
-        OlapTableSink olapTableSink = new OlapTableSink(targetTable, 
computeTargetTupleDesc(), null);
+        OlapTableSink olapTableSink = new OlapTableSink(targetTable, 
computeTargetTupleDesc(), null, false);
         olapTableSink.init(analyzer.getContext().queryId(), txnId, targetDBId,
                 analyzer.getContext().getSessionVariable().queryTimeoutS,
                 
analyzer.getContext().getSessionVariable().sendBatchParallelism, false);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 642b33e0a6..9172d618fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -59,6 +59,7 @@ import org.apache.doris.thrift.TOlapTablePartitionParam;
 import org.apache.doris.thrift.TOlapTableSchemaParam;
 import org.apache.doris.thrift.TOlapTableSink;
 import org.apache.doris.thrift.TPaloNodesInfo;
+import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TTabletLocation;
 import org.apache.doris.thrift.TUniqueId;
 
@@ -67,13 +68,16 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.stream.Collectors;
 
 public class OlapTableSink extends DataSink {
@@ -88,10 +92,14 @@ public class OlapTableSink extends DataSink {
     // set after init called
     private TDataSink tDataSink;
 
-    public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, 
List<Long> partitionIds) {
+    private boolean singleReplicaLoad;
+
+    public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, 
List<Long> partitionIds,
+            boolean singleReplicaLoad) {
         this.dstTable = dstTable;
         this.tupleDescriptor = tupleDescriptor;
         this.partitionIds = partitionIds;
+        this.singleReplicaLoad = singleReplicaLoad;
     }
 
     public void init(TUniqueId loadId, long txnId, long dbId, long 
loadChannelTimeoutS, int sendBatchParallelism,
@@ -122,6 +130,12 @@ public class OlapTableSink extends DataSink {
                 
ErrorReport.reportAnalysisException(ErrorCode.ERR_UNKNOWN_PARTITION, 
partitionId, dstTable.getName());
             }
         }
+
+        if (singleReplicaLoad && dstTable.getStorageFormat() == 
TStorageFormat.V1) {
+            // Single replica load not supported by TStorageFormat.V1
+            singleReplicaLoad = false;
+            LOG.warn("Single replica load not supported by TStorageFormat.V1. 
table: {}", dstTable.getName());
+        }
     }
 
     public void updateLoadId(TUniqueId newLoadId) {
@@ -143,7 +157,12 @@ public class OlapTableSink extends DataSink {
         tSink.setNeedGenRollup(dstTable.shouldLoadToNewRollup());
         tSink.setSchema(createSchema(tSink.getDbId(), dstTable));
         tSink.setPartition(createPartition(tSink.getDbId(), dstTable));
-        tSink.setLocation(createLocation(dstTable));
+        List<TOlapTableLocationParam> locationParams = 
createLocation(dstTable);
+        tSink.setLocation(locationParams.get(0));
+        if (singleReplicaLoad) {
+            tSink.setSlaveLocation(locationParams.get(1));
+        }
+        tSink.setWriteSingleReplica(singleReplicaLoad);
         tSink.setNodesInfo(createPaloNodesInfo());
     }
 
@@ -321,8 +340,9 @@ public class OlapTableSink extends DataSink {
         }
     }
 
-    private TOlapTableLocationParam createLocation(OlapTable table) throws 
UserException {
+    private List<TOlapTableLocationParam> createLocation(OlapTable table) 
throws UserException {
         TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
+        TOlapTableLocationParam slaveLocationParam = new 
TOlapTableLocationParam();
         // BE id -> path hash
         Multimap<Long, Long> allBePathsMap = HashMultimap.create();
         for (Long partitionId : partitionIds) {
@@ -338,8 +358,21 @@ public class OlapTableSink extends DataSink {
                                 "tablet " + tablet.getId() + " has few 
replicas: " + bePathsMap.keySet().size()
                                         + ", alive backends: [" + 
StringUtils.join(bePathsMap.keySet(), ",") + "]");
                     }
-                    locationParam.addToTablets(
-                            new TTabletLocation(tablet.getId(), 
Lists.newArrayList(bePathsMap.keySet())));
+
+                    if (singleReplicaLoad) {
+                        Long[] nodes = bePathsMap.keySet().toArray(new 
Long[0]);
+                        Random random = new Random();
+                        Long masterNode = nodes[random.nextInt(nodes.length)];
+                        Multimap<Long, Long> slaveBePathsMap = bePathsMap;
+                        slaveBePathsMap.removeAll(masterNode);
+                        locationParam.addToTablets(new 
TTabletLocation(tablet.getId(),
+                                
Lists.newArrayList(Sets.newHashSet(masterNode))));
+                        slaveLocationParam.addToTablets(new 
TTabletLocation(tablet.getId(),
+                                Lists.newArrayList(slaveBePathsMap.keySet())));
+                    } else {
+                        locationParam.addToTablets(new 
TTabletLocation(tablet.getId(),
+                                Lists.newArrayList(bePathsMap.keySet())));
+                    }
                     allBePathsMap.putAll(bePathsMap);
                 }
             }
@@ -351,7 +384,7 @@ public class OlapTableSink extends DataSink {
         if (!st.ok()) {
             throw new DdlException(st.getErrorMsg());
         }
-        return locationParam;
+        return Arrays.asList(locationParam, slaveLocationParam);
     }
 
     private TPaloNodesInfo createPaloNodesInfo() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 669601756a..0398fa43e2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -154,7 +154,8 @@ public class StreamLoadPlanner {
 
         // create dest sink
         List<Long> partitionIds = getAllPartitionIds();
-        OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, 
partitionIds);
+        OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, 
partitionIds,
+                Config.enable_single_replica_load);
         olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), 
taskInfo.getTimeout(),
                 taskInfo.getSendBatchParallelism(), 
taskInfo.isLoadToSingleTablet());
         olapTableSink.complete();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 6ceb826332..f3281f56ce 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -205,6 +205,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     static final String SESSION_CONTEXT = "session_context";
 
+    public static final String ENABLE_SINGLE_REPLICA_INSERT = 
"enable_single_replica_insert";
+
     // session origin value
     public Map<Field, String> sessionOriginValue = new HashMap<Field, 
String>();
     // check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@@ -511,6 +513,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = SESSION_CONTEXT, needForward = true)
     public String sessionContext = "";
 
+    @VariableMgr.VarAttr(name = ENABLE_SINGLE_REPLICA_INSERT, needForward = 
true)
+    public boolean enableSingleReplicaInsert = false;
+
     public String getBlockEncryptionMode() {
         return blockEncryptionMode;
     }
@@ -1039,6 +1044,14 @@ public class SessionVariable implements Serializable, 
Writable {
         enableNereidsReorderToEliminateCrossJoin = value;
     }
 
+    public boolean isEnableSingleReplicaInsert() {
+        return enableSingleReplicaInsert;
+    }
+
+    public void setEnableSingleReplicaInsert(boolean 
enableSingleReplicaInsert) {
+        this.enableSingleReplicaInsert = enableSingleReplicaInsert;
+    }
+
     /**
      * Serialize to thrift object.
      * Used for rest api.
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java
index 181bd08bda..146a3b4d93 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java
@@ -102,7 +102,7 @@ public class OlapTableSinkTest {
             }
         };
 
-        OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(2L));
+        OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(2L), false);
         sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false);
         sink.complete();
         LOG.info("sink is {}", sink.toThrift());
@@ -139,7 +139,7 @@ public class OlapTableSinkTest {
             }
         };
 
-        OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(p1.getId()));
+        OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(p1.getId()), false);
         sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false);
         try {
             sink.complete();
@@ -164,7 +164,7 @@ public class OlapTableSinkTest {
             }
         };
 
-        OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(unknownPartId));
+        OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(unknownPartId), false);
         sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false);
         sink.complete();
         LOG.info("sink is {}", sink.toThrift());
@@ -201,7 +201,7 @@ public class OlapTableSinkTest {
             }
         };
 
-        OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(p1.getId()));
+        OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(p1.getId()), false);
         sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false);
         try {
             sink.complete();
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index e4937fbc7d..2379a28646 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -23,6 +23,7 @@ option java_package = "org.apache.doris.proto";
 import "data.proto";
 import "descriptors.proto";
 import "types.proto";
+import "olap_file.proto";
 
 option cc_generic_services = true;
 
@@ -107,6 +108,8 @@ message PTabletWriterAddBatchRequest {
     // transfer the RowBatch to the Controller Attachment
     optional bool transfer_by_attachment = 10 [default = false];
     optional bool is_high_priority = 11 [default = false];
+    optional bool write_single_replica = 12 [default = false];
+    map<int64, PSlaveTabletNodes> slave_tablet_nodes = 13;
 };
 
 message PTabletWriterAddBlockRequest {
@@ -129,8 +132,26 @@ message PTabletWriterAddBlockRequest {
     // transfer the vectorized::Block to the Controller Attachment
     optional bool transfer_by_attachment = 10 [default = false];
     optional bool is_high_priority = 11 [default = false];
+    optional bool write_single_replica = 12 [default = false];
+    map<int64, PSlaveTabletNodes> slave_tablet_nodes = 13;
 };
 
+message PSlaveTabletNodes {
+    repeated PNodeInfo slave_nodes = 1;
+}
+
+message PNodeInfo {
+    optional int64 id = 1;
+    optional int64 option = 2;
+    optional string host = 3;
+    // used to transfer data between nodes
+    optional int32 async_internal_port = 4;
+}
+
+message PSuccessSlaveTabletNodeIds {
+    repeated int64 slave_node_ids = 1;
+}
+
 message PTabletError {
     optional int64 tablet_id = 1;
     optional string msg = 2;
@@ -143,6 +164,7 @@ message PTabletWriterAddBatchResult {
     optional int64 wait_lock_time_us = 4;
     optional int64 wait_execution_time_us = 5;
     repeated PTabletError tablet_errors = 6;
+    map<int64, PSuccessSlaveTabletNodeIds> success_slave_tablet_node_ids = 7;
 };
 
 message PTabletWriterAddBlockResult {
@@ -152,6 +174,7 @@ message PTabletWriterAddBlockResult {
     optional int64 wait_lock_time_us = 4;
     optional int64 wait_execution_time_us = 5;
     repeated PTabletError tablet_errors = 6;
+    map<int64, PSuccessSlaveTabletNodeIds> success_slave_tablet_node_ids = 7;
 };
 
 // tablet writer cancel
@@ -476,6 +499,32 @@ message PResetRPCChannelResponse {
 
 message PEmptyRequest {};
 
+message PTabletWriteSlaveRequest {
+    optional RowsetMetaPB rowset_meta = 1;
+    optional string rowset_path = 2;
+    map<int64, int64> segments_size = 3;
+    optional string host = 4;
+    optional int32 http_port = 5;
+    optional int32 brpc_port = 6;
+    optional string token = 7;
+    optional int32 node_id = 8;
+};
+
+message PTabletWriteSlaveResult {
+    optional PStatus status = 1;
+};
+
+message PTabletWriteSlaveDoneRequest {
+    optional int64 txn_id = 1;
+    optional int64 tablet_id = 2;
+    optional int64 node_id = 3;
+    optional bool is_succeed = 4 [default = false];
+};
+
+message PTabletWriteSlaveDoneResult {
+    optional PStatus status = 1;
+};
+
 service PBackendService {
     rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
     rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult);
@@ -507,5 +556,7 @@ service PBackendService {
     rpc check_rpc_channel(PCheckRPCChannelRequest) returns 
(PCheckRPCChannelResponse);
     rpc reset_rpc_channel(PResetRPCChannelRequest) returns 
(PResetRPCChannelResponse);
     rpc hand_shake(PHandShakeRequest) returns (PHandShakeResponse);
+    rpc request_slave_tablet_pull_rowset(PTabletWriteSlaveRequest) returns 
(PTabletWriteSlaveResult);
+    rpc response_slave_tablet_pull_rowset(PTabletWriteSlaveDoneRequest) 
returns (PTabletWriteSlaveDoneResult);
 };
 
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 91716a4f91..1ca7ad45fc 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -131,6 +131,8 @@ struct TOlapTableSink {
     14: optional i64 load_channel_timeout_s // the timeout of load channels in 
second
     15: optional i32 send_batch_parallelism
     16: optional bool load_to_single_tablet
+    17: optional bool write_single_replica
+    18: optional Descriptors.TOlapTableLocationParam slave_location
 }
 
 struct TDataSink {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to