This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cc68381ec2 [feature](binlog) Add ingest_binlog/http_get_snapshot 
limit download speed && Add async ingest_binlog (#26323)
2cc68381ec2 is described below

commit 2cc68381ec2eb3415ea8588b47c31a8c36d8a2cc
Author: Jack Drogon <jack.xsuper...@gmail.com>
AuthorDate: Mon Nov 6 11:14:44 2023 +0800

    [feature](binlog) Add ingest_binlog/http_get_snapshot limit download speed 
&& Add async ingest_binlog (#26323)
---
 be/src/common/config.cpp                           |   6 +
 be/src/common/config.h                             |   6 +
 be/src/http/action/download_action.cpp             |  29 +-
 be/src/http/action/download_action.h               |   9 +-
 be/src/http/action/download_binlog_action.cpp      |  12 +-
 be/src/http/action/download_binlog_action.h        |   7 +-
 be/src/http/ev_http_server.cpp                     |  58 +-
 be/src/http/ev_http_server.h                       |   5 +
 be/src/http/http_channel.cpp                       |  13 +-
 be/src/http/http_channel.h                         |   4 +-
 be/src/http/utils.cpp                              |   5 +-
 be/src/http/utils.h                                |   5 +-
 be/src/olap/txn_manager.cpp                        | 156 ++++--
 be/src/olap/txn_manager.h                          |  32 +-
 be/src/runtime/snapshot_loader.cpp                 |   2 +-
 be/src/service/backend_service.cpp                 | 599 +++++++++++++--------
 be/src/service/backend_service.h                   |   5 +
 be/src/service/http_service.cpp                    |  40 +-
 be/src/service/http_service.h                      |   4 +
 .../org/apache/doris/common/GenericPoolTest.java   |   8 +
 .../apache/doris/utframe/MockedBackendFactory.java |   8 +
 gensrc/thrift/BackendService.thrift                |  23 +
 .../apache/doris/regression/suite/Syncer.groovy    |   3 +-
 .../suites/ccr_syncer_p0/test_ingest_binlog.groovy |   4 +-
 24 files changed, 710 insertions(+), 333 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index deda8d3d6ff..8c57e9c8b71 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1121,6 +1121,12 @@ DEFINE_String(default_tzfiles_path, 
"${DORIS_HOME}/zoneinfo");
 // Max size(bytes) of group commit queues, used for mem back pressure.
 DEFINE_Int32(group_commit_max_queue_size, "65536");
 
+// Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
+DEFINE_Int32(ingest_binlog_work_pool_size, "-1");
+
+// Download binlog rate limit, unit is KB/s, 0 means no limit
+DEFINE_Int32(download_binlog_rate_limit_kbs, "0");
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4c18f5e001d..e23fa5c357b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1191,6 +1191,12 @@ DECLARE_String(default_tzfiles_path);
 // Max size(bytes) of group commit queues, used for mem back pressure.
 DECLARE_Int32(group_commit_max_queue_size);
 
+// Ingest binlog work pool size
+DECLARE_Int32(ingest_binlog_work_pool_size);
+
+// Download binlog rate limit, unit is KB/s
+DECLARE_Int32(download_binlog_rate_limit_kbs);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/http/action/download_action.cpp 
b/be/src/http/action/download_action.cpp
index f2068d83c7b..f271b4f1916 100644
--- a/be/src/http/action/download_action.cpp
+++ b/be/src/http/action/download_action.cpp
@@ -33,13 +33,20 @@
 #include "runtime/exec_env.h"
 
 namespace doris {
-
-const std::string FILE_PARAMETER = "file";
-const std::string TOKEN_PARAMETER = "token";
-
-DownloadAction::DownloadAction(ExecEnv* exec_env, const 
std::vector<std::string>& allow_dirs,
-                               int32_t num_workers)
-        : _exec_env(exec_env), _download_type(NORMAL), 
_num_workers(num_workers) {
+namespace {
+static const std::string FILE_PARAMETER = "file";
+static const std::string TOKEN_PARAMETER = "token";
+static const std::string CHANNEL_PARAMETER = "channel";
+static const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
+} // namespace
+
+DownloadAction::DownloadAction(ExecEnv* exec_env,
+                               std::shared_ptr<bufferevent_rate_limit_group> 
rate_limit_group,
+                               const std::vector<std::string>& allow_dirs, 
int32_t num_workers)
+        : _exec_env(exec_env),
+          _download_type(NORMAL),
+          _num_workers(num_workers),
+          _rate_limit_group(std::move(rate_limit_group)) {
     for (auto& dir : allow_dirs) {
         std::string p;
         Status st = io::global_local_filesystem()->canonicalize(dir, &p);
@@ -107,7 +114,13 @@ void DownloadAction::handle_normal(HttpRequest* req, const 
std::string& file_par
     if (is_dir) {
         do_dir_response(file_param, req);
     } else {
-        do_file_response(file_param, req);
+        const auto& channel = req->param(CHANNEL_PARAMETER);
+        bool ingest_binlog = (channel == CHANNEL_INGEST_BINLOG_TYPE);
+        if (ingest_binlog) {
+            do_file_response(file_param, req, _rate_limit_group.get());
+        } else {
+            do_file_response(file_param, req);
+        }
     }
 }
 
diff --git a/be/src/http/action/download_action.h 
b/be/src/http/action/download_action.h
index d8e468d9585..3aab1a0d314 100644
--- a/be/src/http/action/download_action.h
+++ b/be/src/http/action/download_action.h
@@ -24,6 +24,8 @@
 #include "http/http_handler.h"
 #include "util/threadpool.h"
 
+struct bufferevent_rate_limit_group;
+
 namespace doris {
 
 class ExecEnv;
@@ -36,8 +38,9 @@ class HttpRequest;
 // We use parameter named 'file' to specify the static resource path, it is an 
absolute path.
 class DownloadAction : public HttpHandler {
 public:
-    DownloadAction(ExecEnv* exec_env, const std::vector<std::string>& 
allow_dirs,
-                   int32_t num_workers = 0);
+    DownloadAction(ExecEnv* exec_env,
+                   std::shared_ptr<bufferevent_rate_limit_group> 
rate_limit_group,
+                   const std::vector<std::string>& allow_dirs, int32_t 
num_workers = 0);
 
     // for load error
     DownloadAction(ExecEnv* exec_env, const std::string& error_log_root_dir);
@@ -67,6 +70,8 @@ private:
     std::string _error_log_root_dir;
     int32_t _num_workers;
     std::unique_ptr<ThreadPool> _download_workers;
+
+    std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group {nullptr};
 }; // end class DownloadAction
 
 } // end namespace doris
diff --git a/be/src/http/action/download_binlog_action.cpp 
b/be/src/http/action/download_binlog_action.cpp
index a23d5ec109f..697512b2a30 100644
--- a/be/src/http/action/download_binlog_action.cpp
+++ b/be/src/http/action/download_binlog_action.cpp
@@ -21,8 +21,10 @@
 #include <fmt/ranges.h>
 
 #include <cstdint>
+#include <limits>
 #include <stdexcept>
 #include <string_view>
+#include <utility>
 #include <vector>
 
 #include "common/config.h"
@@ -96,7 +98,7 @@ void handle_get_binlog_info(HttpRequest* req) {
 }
 
 /// handle get segment file, need tablet_id, rowset_id && index
-void handle_get_segment_file(HttpRequest* req) {
+void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group* 
rate_limit_group) {
     // Step 1: get download file path
     std::string segment_file_path;
     try {
@@ -125,7 +127,7 @@ void handle_get_segment_file(HttpRequest* req) {
         LOG(WARNING) << "file not exist, file path: " << segment_file_path;
         return;
     }
-    do_file_response(segment_file_path, req);
+    do_file_response(segment_file_path, req, rate_limit_group);
 }
 
 void handle_get_rowset_meta(HttpRequest* req) {
@@ -149,7 +151,9 @@ void handle_get_rowset_meta(HttpRequest* req) {
 
 } // namespace
 
-DownloadBinlogAction::DownloadBinlogAction(ExecEnv* exec_env) : 
_exec_env(exec_env) {}
+DownloadBinlogAction::DownloadBinlogAction(
+        ExecEnv* exec_env, std::shared_ptr<bufferevent_rate_limit_group> 
rate_limit_group)
+        : _exec_env(exec_env), _rate_limit_group(std::move(rate_limit_group)) 
{}
 
 void DownloadBinlogAction::handle(HttpRequest* req) {
     VLOG_CRITICAL << "accept one download binlog request " << 
req->debug_string();
@@ -178,7 +182,7 @@ void DownloadBinlogAction::handle(HttpRequest* req) {
     if (method == "get_binlog_info") {
         handle_get_binlog_info(req);
     } else if (method == "get_segment_file") {
-        handle_get_segment_file(req);
+        handle_get_segment_file(req, _rate_limit_group.get());
     } else if (method == "get_rowset_meta") {
         handle_get_rowset_meta(req);
     } else {
diff --git a/be/src/http/action/download_binlog_action.h 
b/be/src/http/action/download_binlog_action.h
index 3cbd9b9e5b0..77a2ed08780 100644
--- a/be/src/http/action/download_binlog_action.h
+++ b/be/src/http/action/download_binlog_action.h
@@ -17,12 +17,15 @@
 
 #pragma once
 
+#include <memory>
 #include <string>
 #include <vector>
 
 #include "common/status.h"
 #include "http/http_handler.h"
 
+struct bufferevent_rate_limit_group;
+
 namespace doris {
 
 class ExecEnv;
@@ -30,7 +33,8 @@ class HttpRequest;
 
 class DownloadBinlogAction : public HttpHandler {
 public:
-    DownloadBinlogAction(ExecEnv* exec_env);
+    DownloadBinlogAction(ExecEnv* exec_env,
+                         std::shared_ptr<bufferevent_rate_limit_group> 
rate_limit_group);
     virtual ~DownloadBinlogAction() = default;
 
     void handle(HttpRequest* req) override;
@@ -40,6 +44,7 @@ private:
 
 private:
     ExecEnv* _exec_env;
+    std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group {nullptr};
 };
 
 } // namespace doris
diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp
index 5d231cb9087..1bbd2c0e178 100644
--- a/be/src/http/ev_http_server.cpp
+++ b/be/src/http/ev_http_server.cpp
@@ -84,7 +84,17 @@ static int on_connection(struct evhttp_request* req, void* 
param) {
 EvHttpServer::EvHttpServer(int port, int num_workers)
         : _port(port), _num_workers(num_workers), _real_port(0) {
     _host = BackendOptions::get_service_bind_address();
+
+    evthread_use_pthreads();
     DCHECK_GT(_num_workers, 0);
+    _event_bases.resize(_num_workers);
+    for (int i = 0; i < _num_workers; ++i) {
+        std::shared_ptr<event_base> base(event_base_new(),
+                                         [](event_base* base) { 
event_base_free(base); });
+        CHECK(base != nullptr) << "Couldn't create an event_base.";
+        std::lock_guard lock(_event_bases_lock);
+        _event_bases[i] = base;
+    }
 }
 
 EvHttpServer::EvHttpServer(const std::string& host, int port, int num_workers)
@@ -107,34 +117,28 @@ void EvHttpServer::start() {
                               .set_min_threads(_num_workers)
                               .set_max_threads(_num_workers)
                               .build(&_workers));
-
-    evthread_use_pthreads();
-    _event_bases.resize(_num_workers);
     for (int i = 0; i < _num_workers; ++i) {
-        CHECK(_workers->submit_func([this, i]() {
-                          std::shared_ptr<event_base> base(event_base_new(), 
[](event_base* base) {
-                              event_base_free(base);
-                          });
-                          CHECK(base != nullptr) << "Couldn't create an 
event_base.";
-                          {
-                              std::lock_guard<std::mutex> 
lock(_event_bases_lock);
-                              _event_bases[i] = base;
-                          }
-
-                          /* Create a new evhttp object to handle requests. */
-                          std::shared_ptr<evhttp> http(evhttp_new(base.get()),
-                                                       [](evhttp* http) { 
evhttp_free(http); });
-                          CHECK(http != nullptr) << "Couldn't create an 
evhttp.";
-
-                          auto res = evhttp_accept_socket(http.get(), 
_server_fd);
-                          CHECK(res >= 0) << "evhttp accept socket failed, 
res=" << res;
-
-                          evhttp_set_newreqcb(http.get(), on_connection, this);
-                          evhttp_set_gencb(http.get(), on_request, this);
-
-                          event_base_dispatch(base.get());
-                      })
-                      .ok());
+        auto status = _workers->submit_func([this, i]() {
+            std::shared_ptr<event_base> base;
+            {
+                std::lock_guard lock(_event_bases_lock);
+                base = _event_bases[i];
+            }
+
+            /* Create a new evhttp object to handle requests. */
+            std::shared_ptr<evhttp> http(evhttp_new(base.get()),
+                                         [](evhttp* http) { evhttp_free(http); 
});
+            CHECK(http != nullptr) << "Couldn't create an evhttp.";
+
+            auto res = evhttp_accept_socket(http.get(), _server_fd);
+            CHECK(res >= 0) << "evhttp accept socket failed, res=" << res;
+
+            evhttp_set_newreqcb(http.get(), on_connection, this);
+            evhttp_set_gencb(http.get(), on_request, this);
+
+            event_base_dispatch(base.get());
+        });
+        CHECK(status.ok());
     }
 }
 
diff --git a/be/src/http/ev_http_server.h b/be/src/http/ev_http_server.h
index e7ad1c052ab..d74a8cb4efd 100644
--- a/be/src/http/ev_http_server.h
+++ b/be/src/http/ev_http_server.h
@@ -55,6 +55,11 @@ public:
     // get real port
     int get_real_port() const { return _real_port; }
 
+    std::vector<std::shared_ptr<event_base>> get_event_bases() {
+        std::lock_guard lock(_event_bases_lock);
+        return _event_bases;
+    }
+
 private:
     Status _bind();
     HttpHandler* _find_handler(HttpRequest* req);
diff --git a/be/src/http/http_channel.cpp b/be/src/http/http_channel.cpp
index 5727ba3902e..96679195316 100644
--- a/be/src/http/http_channel.cpp
+++ b/be/src/http/http_channel.cpp
@@ -18,6 +18,7 @@
 #include "http/http_channel.h"
 
 #include <event2/buffer.h>
+#include <event2/bufferevent.h>
 #include <event2/http.h>
 
 #include <algorithm>
@@ -69,11 +70,17 @@ void HttpChannel::send_reply(HttpRequest* request, 
HttpStatus status, const std:
     evbuffer_free(evb);
 }
 
-void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t 
size) {
+void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t 
size,
+                            bufferevent_rate_limit_group* rate_limit_group) {
     auto evb = evbuffer_new();
     evbuffer_add_file(evb, fd, off, size);
-    evhttp_send_reply(request->get_evhttp_request(), HttpStatus::OK,
-                      default_reason(HttpStatus::OK).c_str(), evb);
+    auto* evhttp_request = request->get_evhttp_request();
+    if (rate_limit_group) {
+        auto* evhttp_connection = 
evhttp_request_get_connection(evhttp_request);
+        auto* buffer_event = 
evhttp_connection_get_bufferevent(evhttp_connection);
+        bufferevent_add_to_rate_limit_group(buffer_event, rate_limit_group);
+    }
+    evhttp_send_reply(evhttp_request, HttpStatus::OK, 
default_reason(HttpStatus::OK).c_str(), evb);
     evbuffer_free(evb);
 }
 
diff --git a/be/src/http/http_channel.h b/be/src/http/http_channel.h
index 478f013af82..ee1e6c0888f 100644
--- a/be/src/http/http_channel.h
+++ b/be/src/http/http_channel.h
@@ -23,6 +23,7 @@
 
 #include "http/http_status.h"
 
+struct bufferevent_rate_limit_group;
 namespace doris {
 
 class HttpRequest;
@@ -43,7 +44,8 @@ public:
 
     static void send_reply(HttpRequest* request, HttpStatus status, const 
std::string& content);
 
-    static void send_file(HttpRequest* request, int fd, size_t off, size_t 
size);
+    static void send_file(HttpRequest* request, int fd, size_t off, size_t 
size,
+                          bufferevent_rate_limit_group* rate_limit_group = 
nullptr);
 
     static bool compress_content(const std::string& accept_encoding, const 
std::string& input,
                                  std::string* output);
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index f55b5d47696..31550456c55 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -124,7 +124,8 @@ std::string get_content_type(const std::string& file_name) {
     return "";
 }
 
-void do_file_response(const std::string& file_path, HttpRequest* req) {
+void do_file_response(const std::string& file_path, HttpRequest* req,
+                      bufferevent_rate_limit_group* rate_limit_group) {
     if (file_path.find("..") != std::string::npos) {
         LOG(WARNING) << "Not allowed to read relative path: " << file_path;
         HttpChannel::send_error(req, HttpStatus::FORBIDDEN);
@@ -165,7 +166,7 @@ void do_file_response(const std::string& file_path, 
HttpRequest* req) {
         return;
     }
 
-    HttpChannel::send_file(req, fd, 0, file_size);
+    HttpChannel::send_file(req, fd, 0, file_size, rate_limit_group);
 }
 
 void do_dir_response(const std::string& dir_path, HttpRequest* req) {
diff --git a/be/src/http/utils.h b/be/src/http/utils.h
index 5928039c492..2d1e13fbe4e 100644
--- a/be/src/http/utils.h
+++ b/be/src/http/utils.h
@@ -22,6 +22,8 @@
 #include "common/utils.h"
 #include "http/http_request.h"
 
+struct bufferevent_rate_limit_group;
+
 namespace doris {
 
 struct AuthInfo;
@@ -34,7 +36,8 @@ bool parse_basic_auth(const HttpRequest& req, std::string* 
user, std::string* pa
 
 bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth);
 
-void do_file_response(const std::string& dir_path, HttpRequest* req);
+void do_file_response(const std::string& dir_path, HttpRequest* req,
+                      bufferevent_rate_limit_group* rate_limit_group = 
nullptr);
 
 void do_dir_response(const std::string& dir_path, HttpRequest* req);
 
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 1ec7899b460..af6b0d67235 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -140,6 +140,7 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, 
TTransactionId transac
     // case 1: user start a new txn, rowset = null
     // case 2: loading txn from meta env
     TabletTxnInfo load_info(load_id, nullptr, ingest);
+    load_info.prepare();
     txn_tablet_map[key][tablet_info] = load_info;
     _insert_txn_partition_map_unlocked(transaction_id, partition_id);
     VLOG_NOTICE << "add transaction to engine successfully."
@@ -162,6 +163,29 @@ Status TxnManager::publish_txn(TPartitionId partition_id, 
const TabletSharedPtr&
                        tablet->tablet_id(), tablet->tablet_uid(), version, 
stats);
 }
 
+void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId 
transaction_id,
+                           TTabletId tablet_id, TabletUid tablet_uid) {
+    pair<int64_t, int64_t> key(partition_id, transaction_id);
+    TabletInfo tablet_info(tablet_id, tablet_uid);
+
+    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
+
+    auto& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+    auto it = txn_tablet_map.find(key);
+    if (it == txn_tablet_map.end()) {
+        return;
+    }
+
+    auto& tablet_txn_info_map = it->second;
+    if (auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info);
+        tablet_txn_info_iter == tablet_txn_info_map.end()) {
+        return;
+    } else {
+        auto& txn_info = tablet_txn_info_iter->second;
+        txn_info.abort();
+    }
+}
+
 // delete the txn from manager if it is not committed(not have a valid rowset)
 Status TxnManager::rollback_txn(TPartitionId partition_id, const Tablet& 
tablet,
                                 TTransactionId transaction_id) {
@@ -217,6 +241,7 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId 
partition_id,
                    << " partition_id=" << partition_id << " transaction_id=" 
<< transaction_id
                    << " tablet_id=" << tablet_id;
     }
+
     pair<int64_t, int64_t> key(partition_id, transaction_id);
     TabletInfo tablet_info(tablet_id, tablet_uid);
     if (rowset_ptr == nullptr) {
@@ -252,28 +277,30 @@ Status TxnManager::commit_txn(OlapMeta* meta, 
TPartitionId partition_id,
         // case 1: user commit rowset, then the load id must be equal
         TabletTxnInfo& load_info = load_itr->second;
         // check if load id is equal
-        if (load_info.load_id.hi() == load_id.hi() && load_info.load_id.lo() 
== load_id.lo() &&
-            load_info.rowset != nullptr &&
-            load_info.rowset->rowset_id() == rowset_ptr->rowset_id()) {
-            // find a rowset with same rowset id, then it means a duplicate 
call
+        if (load_info.rowset == nullptr) {
+            break;
+        }
+
+        if (load_info.load_id.hi() != load_id.hi() || load_info.load_id.lo() 
!= load_id.lo()) {
+            break;
+        }
+
+        // find a rowset with same rowset id, then it means a duplicate call
+        if (load_info.rowset->rowset_id() == rowset_ptr->rowset_id()) {
             LOG(INFO) << "find rowset exists when commit transaction to 
engine."
                       << "partition_id: " << key.first << ", transaction_id: " 
<< key.second
                       << ", tablet: " << tablet_info.to_string()
                       << ", rowset_id: " << load_info.rowset->rowset_id();
             return Status::OK();
-        } else if (load_info.load_id.hi() == load_id.hi() &&
-                   load_info.load_id.lo() == load_id.lo() && load_info.rowset 
!= nullptr &&
-                   load_info.rowset->rowset_id() != rowset_ptr->rowset_id()) {
-            // find a rowset with different rowset id, then it should not 
happen, just return errors
-            return Status::Error<PUSH_TRANSACTION_ALREADY_EXIST>(
-                    "find rowset exists when commit transaction to engine. but 
rowset ids are not "
-                    "same. partition_id: {}, transaction_id: {}, tablet: {}, 
exist rowset_id: {}, "
-                    "new rowset_id: {}",
-                    key.first, key.second, tablet_info.to_string(),
-                    load_info.rowset->rowset_id().to_string(), 
rowset_ptr->rowset_id().to_string());
-        } else {
-            break;
         }
+
+        // find a rowset with different rowset id, then it should not happen, 
just return errors
+        return Status::Error<PUSH_TRANSACTION_ALREADY_EXIST>(
+                "find rowset exists when commit transaction to engine. but 
rowset ids are not "
+                "same. partition_id: {}, transaction_id: {}, tablet: {}, exist 
rowset_id: {}, new "
+                "rowset_id: {}",
+                key.first, key.second, tablet_info.to_string(),
+                load_info.rowset->rowset_id().to_string(), 
rowset_ptr->rowset_id().to_string());
     } while (false);
 
     // if not in recovery mode, then should persist the meta to meta env
@@ -301,6 +328,8 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId 
partition_id,
                 load_info.delete_bitmap.reset(new 
DeleteBitmap(tablet->tablet_id()));
             }
         }
+        load_info.commit();
+
         txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
         txn_tablet_map[key][tablet_info] = load_info;
         _insert_txn_partition_map_unlocked(transaction_id, partition_id);
@@ -453,30 +482,36 @@ Status TxnManager::rollback_txn(TPartitionId 
partition_id, TTransactionId transa
                                 TTabletId tablet_id, TabletUid tablet_uid) {
     pair<int64_t, int64_t> key(partition_id, transaction_id);
     TabletInfo tablet_info(tablet_id, tablet_uid);
+
     std::lock_guard<std::shared_mutex> 
wrlock(_get_txn_map_lock(transaction_id));
     txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+
     auto it = txn_tablet_map.find(key);
-    if (it != txn_tablet_map.end()) {
-        auto load_itr = it->second.find(tablet_info);
-        if (load_itr != it->second.end()) {
-            // found load for txn,tablet
-            // case 1: user commit rowset, then the load id must be equal
-            TabletTxnInfo& load_info = load_itr->second;
-            if (load_info.rowset != nullptr) {
-                return Status::Error<TRANSACTION_ALREADY_COMMITTED>(
-                        "if rowset is not null, it means other thread may 
commit the rowset should "
-                        "not delete txn any more");
-            }
-        }
-        it->second.erase(tablet_info);
-        LOG(INFO) << "rollback transaction from engine successfully."
-                  << " partition_id: " << key.first << ", transaction_id: " << 
key.second
-                  << ", tablet: " << tablet_info.to_string();
-        if (it->second.empty()) {
-            txn_tablet_map.erase(it);
-            _clear_txn_partition_map_unlocked(transaction_id, partition_id);
+    if (it == txn_tablet_map.end()) {
+        return Status::OK();
+    }
+
+    auto& tablet_txn_info_map = it->second;
+    if (auto load_itr = tablet_txn_info_map.find(tablet_info);
+        load_itr != tablet_txn_info_map.end()) {
+        // found load for txn,tablet
+        // case 1: user commit rowset, then the load id must be equal
+        TabletTxnInfo& load_info = load_itr->second;
+        if (load_info.rowset != nullptr) {
+            return Status::Error<TRANSACTION_ALREADY_COMMITTED>(
+                    "if rowset is not null, it means other thread may commit 
the rowset should "
+                    "not delete txn any more");
         }
     }
+
+    tablet_txn_info_map.erase(tablet_info);
+    LOG(INFO) << "rollback transaction from engine successfully."
+              << " partition_id: " << key.first << ", transaction_id: " << 
key.second
+              << ", tablet: " << tablet_info.to_string();
+    if (tablet_txn_info_map.empty()) {
+        txn_tablet_map.erase(it);
+        _clear_txn_partition_map_unlocked(transaction_id, partition_id);
+    }
     return Status::OK();
 }
 
@@ -650,18 +685,6 @@ void TxnManager::get_all_commit_tablet_txn_info_by_tablet(
     }
 }
 
-bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId 
transaction_id,
-                         TTabletId tablet_id, TabletUid tablet_uid) {
-    pair<int64_t, int64_t> key(partition_id, transaction_id);
-    TabletInfo tablet_info(tablet_id, tablet_uid);
-    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
-    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
-    auto it = txn_tablet_map.find(key);
-    bool found = it != txn_tablet_map.end() && it->second.find(tablet_info) != 
it->second.end();
-
-    return found;
-}
-
 void TxnManager::build_expire_txn_map(std::map<TabletInfo, 
std::vector<int64_t>>* expire_txn_map) {
     int64_t now = UnixSeconds();
     // traverse the txn map, and get all expired txns
@@ -671,13 +694,15 @@ void 
TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>
             auto txn_id = it.first.second;
             for (auto& t_map : it.second) {
                 double diff = difftime(now, t_map.second.creation_time);
-                if (diff >= config::pending_data_expire_time_sec) {
-                    (*expire_txn_map)[t_map.first].push_back(txn_id);
-                    if (VLOG_IS_ON(3)) {
-                        VLOG_NOTICE << "find expired txn."
-                                    << " tablet=" << t_map.first.to_string()
-                                    << " transaction_id=" << txn_id << " 
exist_sec=" << diff;
-                    }
+                if (diff < config::pending_data_expire_time_sec) {
+                    continue;
+                }
+
+                (*expire_txn_map)[t_map.first].push_back(txn_id);
+                if (VLOG_IS_ON(3)) {
+                    VLOG_NOTICE << "find expired txn."
+                                << " tablet=" << t_map.first.to_string()
+                                << " transaction_id=" << txn_id << " 
exist_sec=" << diff;
                 }
             }
         }
@@ -796,4 +821,27 @@ void TxnManager::update_tablet_version_txn(int64_t 
tablet_id, int64_t version, i
     _tablet_version_cache->release(handle);
 }
 
+TxnState TxnManager::get_txn_state(TPartitionId partition_id, TTransactionId 
transaction_id,
+                                   TTabletId tablet_id, TabletUid tablet_uid) {
+    pair<int64_t, int64_t> key(partition_id, transaction_id);
+    TabletInfo tablet_info(tablet_id, tablet_uid);
+
+    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
+
+    auto& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+    auto it = txn_tablet_map.find(key);
+    if (it == txn_tablet_map.end()) {
+        return TxnState::NOT_FOUND;
+    }
+
+    auto& tablet_txn_info_map = it->second;
+    if (auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info);
+        tablet_txn_info_iter == tablet_txn_info_map.end()) {
+        return TxnState::NOT_FOUND;
+    } else {
+        const auto& txn_info = tablet_txn_info_iter->second;
+        return txn_info.state;
+    }
+}
+
 } // namespace doris
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 6d2222a3ec5..b5a1db0b46e 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -51,6 +51,15 @@ class DeltaWriter;
 class OlapMeta;
 struct TabletPublishStatistics;
 
+enum class TxnState {
+    NOT_FOUND = 0,
+    PREPARED = 1,
+    COMMITTED = 2,
+    ROLLEDBACK = 3,
+    ABORTED = 4,
+    DELETED = 5,
+};
+
 struct TabletTxnInfo {
     PUniqueId load_id;
     RowsetSharedPtr rowset;
@@ -61,6 +70,9 @@ struct TabletTxnInfo {
     int64_t creation_time;
     bool ingest {false};
     std::shared_ptr<PartialUpdateInfo> partial_update_info;
+    TxnState state {TxnState::PREPARED};
+
+    TabletTxnInfo() = default;
 
     TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
             : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {}
@@ -77,7 +89,14 @@ struct TabletTxnInfo {
               rowset_ids(ids),
               creation_time(UnixSeconds()) {}
 
-    TabletTxnInfo() {}
+    void prepare() { state = TxnState::PREPARED; }
+    void commit() { state = TxnState::COMMITTED; }
+    void rollback() { state = TxnState::ROLLEDBACK; }
+    void abort() {
+        if (state == TxnState::PREPARED) {
+            state = TxnState::ABORTED;
+        }
+    }
 };
 
 struct CommitTabletTxnInfo {
@@ -145,6 +164,10 @@ public:
                        TTabletId tablet_id, TabletUid tablet_uid, const 
Version& version,
                        TabletPublishStatistics* stats);
 
+    // only abort not committed txn
+    void abort_txn(TPartitionId partition_id, TTransactionId transaction_id, 
TTabletId tablet_id,
+                   TabletUid tablet_uid);
+
     // delete the txn from manager if it is not committed(not have a valid 
rowset)
     Status rollback_txn(TPartitionId partition_id, TTransactionId 
transaction_id,
                         TTabletId tablet_id, TabletUid tablet_uid);
@@ -163,10 +186,6 @@ public:
 
     void get_all_related_tablets(std::set<TabletInfo>* tablet_infos);
 
-    // Just check if the txn exists.
-    bool has_txn(TPartitionId partition_id, TTransactionId transaction_id, 
TTabletId tablet_id,
-                 TabletUid tablet_uid);
-
     // Get all expired txns and save them in expire_txn_map.
     // This is currently called before reporting all tablet info, to avoid 
iterating txn map for every tablets.
     void build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* 
expire_txn_map);
@@ -195,6 +214,9 @@ public:
     int64_t get_txn_by_tablet_version(int64_t tablet_id, int64_t version);
     void update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t 
txn_id);
 
+    TxnState get_txn_state(TPartitionId partition_id, TTransactionId 
transaction_id,
+                           TTabletId tablet_id, TabletUid tablet_uid);
+
 private:
     using TxnKey = std::pair<int64_t, int64_t>; // partition_id, 
transaction_id;
 
diff --git a/be/src/runtime/snapshot_loader.cpp 
b/be/src/runtime/snapshot_loader.cpp
index b5752710ae8..26103e50427 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -467,7 +467,7 @@ Status SnapshotLoader::remote_http_download(
 
         for (const auto& filename : filename_list) {
             std::string remote_file_url = fmt::format(
-                    "http://{}:{}/api/_tablet/_download?token={}&file={}/{}";,
+                    
"http://{}:{}/api/_tablet/_download?token={}&file={}/{}&channel=ingest_binlog";,
                     remote_tablet_snapshot.remote_be_addr.hostname,
                     remote_tablet_snapshot.remote_be_addr.port, 
remote_tablet_snapshot.remote_token,
                     remote_tablet_snapshot.remote_snapshot_path, filename);
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index a312996162c..ebf553b14f0 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -36,6 +36,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <thread>
 #include <utility>
 #include <vector>
 
@@ -63,6 +64,7 @@
 #include "runtime/stream_load/stream_load_recorder.h"
 #include "util/arrow/row_batch.h"
 #include "util/defer_op.h"
+#include "util/threadpool.h"
 #include "util/thrift_server.h"
 #include "util/uid_util.h"
 
@@ -79,6 +81,283 @@ class TTransportException;
 
 namespace doris {
 
+namespace {
+constexpr uint64_t kMaxTimeoutMs = 3000; // 3s
+struct IngestBinlogArg {
+    int64_t txn_id;
+    int64_t partition_id;
+    int64_t local_tablet_id;
+    TabletSharedPtr local_tablet;
+    TIngestBinlogRequest request;
+    TStatus* tstatus;
+};
+
+void _ingest_binlog(IngestBinlogArg* arg) {
+    auto txn_id = arg->txn_id;
+    auto partition_id = arg->partition_id;
+    auto local_tablet_id = arg->local_tablet_id;
+    const auto& local_tablet = arg->local_tablet;
+    const auto& local_tablet_uid = local_tablet->tablet_uid();
+
+    auto& request = arg->request;
+
+    TStatus tstatus;
+    Defer defer {[=, &tstatus, ingest_binlog_tstatus = arg->tstatus]() {
+        LOG(INFO) << "ingest binlog. result: " << 
apache::thrift::ThriftDebugString(tstatus);
+        if (tstatus.status_code != TStatusCode::OK) {
+            // abort txn
+            StorageEngine::instance()->txn_manager()->abort_txn(partition_id, 
txn_id,
+                                                                
local_tablet_id, local_tablet_uid);
+        }
+
+        if (ingest_binlog_tstatus) {
+            *ingest_binlog_tstatus = std::move(tstatus);
+        }
+    }};
+
+    auto set_tstatus = [&tstatus](TStatusCode::type code, std::string 
error_msg) {
+        tstatus.__set_status_code(code);
+        tstatus.__isset.error_msgs = true;
+        tstatus.error_msgs.push_back(std::move(error_msg));
+    };
+
+    // Step 3: get binlog info
+    auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download";, 
request.remote_host,
+                                      request.remote_port);
+    constexpr int max_retry = 3;
+
+    auto get_binlog_info_url =
+            fmt::format("{}?method={}&tablet_id={}&binlog_version={}", 
binlog_api_url,
+                        "get_binlog_info", request.remote_tablet_id, 
request.binlog_version);
+    std::string binlog_info;
+    auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* 
client) {
+        RETURN_IF_ERROR(client->init(get_binlog_info_url));
+        client->set_timeout_ms(kMaxTimeoutMs);
+        return client->execute(&binlog_info);
+    };
+    auto status = HttpClient::execute_with_retry(max_retry, 1, 
get_binlog_info_cb);
+    if (!status.ok()) {
+        LOG(WARNING) << "failed to get binlog info from " << 
get_binlog_info_url
+                     << ", status=" << status.to_string();
+        status.to_thrift(&tstatus);
+        return;
+    }
+
+    std::vector<std::string> binlog_info_parts = strings::Split(binlog_info, 
":");
+    // TODO(Drogon): check binlog info content is right
+    DCHECK(binlog_info_parts.size() == 2);
+    const std::string& remote_rowset_id = binlog_info_parts[0];
+    int64_t num_segments = std::stoll(binlog_info_parts[1]);
+
+    // Step 4: get rowset meta
+    auto get_rowset_meta_url = fmt::format(
+            "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}", 
binlog_api_url,
+            "get_rowset_meta", request.remote_tablet_id, remote_rowset_id, 
request.binlog_version);
+    std::string rowset_meta_str;
+    auto get_rowset_meta_cb = [&get_rowset_meta_url, 
&rowset_meta_str](HttpClient* client) {
+        RETURN_IF_ERROR(client->init(get_rowset_meta_url));
+        client->set_timeout_ms(kMaxTimeoutMs);
+        return client->execute(&rowset_meta_str);
+    };
+    status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
+    if (!status.ok()) {
+        LOG(WARNING) << "failed to get rowset meta from " << 
get_rowset_meta_url
+                     << ", status=" << status.to_string();
+        status.to_thrift(&tstatus);
+        return;
+    }
+    RowsetMetaPB rowset_meta_pb;
+    if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) {
+        LOG(WARNING) << "failed to parse rowset meta from " << 
get_rowset_meta_url;
+        status = Status::InternalError("failed to parse rowset meta");
+        status.to_thrift(&tstatus);
+        return;
+    }
+    // rewrite rowset meta
+    rowset_meta_pb.set_tablet_id(local_tablet_id);
+    rowset_meta_pb.set_partition_id(partition_id);
+    
rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash());
+    rowset_meta_pb.set_txn_id(txn_id);
+    rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED);
+    auto rowset_meta = std::make_shared<RowsetMeta>();
+    if (!rowset_meta->init_from_pb(rowset_meta_pb)) {
+        LOG(WARNING) << "failed to init rowset meta from " << 
get_rowset_meta_url;
+        status = Status::InternalError("failed to init rowset meta");
+        status.to_thrift(&tstatus);
+        return;
+    }
+    RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
+    rowset_meta->set_rowset_id(new_rowset_id);
+    rowset_meta->set_tablet_uid(local_tablet->tablet_uid());
+
+    // Step 5: get all segment files
+    // Step 5.1: get all segment files size
+    std::vector<std::string> segment_file_urls;
+    segment_file_urls.reserve(num_segments);
+    std::vector<uint64_t> segment_file_sizes;
+    segment_file_sizes.reserve(num_segments);
+    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
+        auto get_segment_file_size_url = fmt::format(
+                "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}", 
binlog_api_url,
+                "get_segment_file", request.remote_tablet_id, 
remote_rowset_id, segment_index);
+        uint64_t segment_file_size;
+        auto get_segment_file_size_cb = [&get_segment_file_size_url,
+                                         &segment_file_size](HttpClient* 
client) {
+            RETURN_IF_ERROR(client->init(get_segment_file_size_url));
+            client->set_timeout_ms(kMaxTimeoutMs);
+            RETURN_IF_ERROR(client->head());
+            return client->get_content_length(&segment_file_size);
+        };
+
+        status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_file_size_cb);
+        if (!status.ok()) {
+            LOG(WARNING) << "failed to get segment file size from " << 
get_segment_file_size_url
+                         << ", status=" << status.to_string();
+            status.to_thrift(&tstatus);
+            return;
+        }
+
+        segment_file_sizes.push_back(segment_file_size);
+        segment_file_urls.push_back(std::move(get_segment_file_size_url));
+    }
+
+    // Step 5.2: check data capacity
+    uint64_t total_size = std::accumulate(segment_file_sizes.begin(), 
segment_file_sizes.end(), 0);
+    if (!local_tablet->can_add_binlog(total_size)) {
+        LOG(WARNING) << "failed to add binlog, no enough space, total_size=" 
<< total_size
+                     << ", tablet=" << local_tablet->tablet_id();
+        status = Status::InternalError("no enough space");
+        status.to_thrift(&tstatus);
+        return;
+    }
+
+    // Step 5.3: get all segment files
+    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
+        auto segment_file_size = segment_file_sizes[segment_index];
+        auto get_segment_file_url = segment_file_urls[segment_index];
+
+        uint64_t estimate_timeout =
+                segment_file_size / config::download_low_speed_limit_kbps / 
1024;
+        if (estimate_timeout < config::download_low_speed_time) {
+            estimate_timeout = config::download_low_speed_time;
+        }
+
+        auto local_segment_path = BetaRowset::segment_file_path(
+                local_tablet->tablet_path(), rowset_meta->rowset_id(), 
segment_index);
+        LOG(INFO) << fmt::format("download segment file from {} to {}", 
get_segment_file_url,
+                                 local_segment_path);
+        auto get_segment_file_cb = [&get_segment_file_url, 
&local_segment_path, segment_file_size,
+                                    estimate_timeout](HttpClient* client) {
+            RETURN_IF_ERROR(client->init(get_segment_file_url));
+            client->set_timeout_ms(estimate_timeout * 1000);
+            RETURN_IF_ERROR(client->download(local_segment_path));
+
+            std::error_code ec;
+            // Check file length
+            uint64_t local_file_size = 
std::filesystem::file_size(local_segment_path, ec);
+            if (ec) {
+                LOG(WARNING) << "download file error" << ec.message();
+                return Status::IOError("can't retrive file_size of {}, due to 
{}",
+                                       local_segment_path, ec.message());
+            }
+            if (local_file_size != segment_file_size) {
+                LOG(WARNING) << "download file length error"
+                             << ", get_segment_file_url=" << 
get_segment_file_url
+                             << ", file_size=" << segment_file_size
+                             << ", local_file_size=" << local_file_size;
+                return Status::InternalError("downloaded file size is not 
equal");
+            }
+            chmod(local_segment_path.c_str(), S_IRUSR | S_IWUSR);
+            return Status::OK();
+        };
+
+        auto status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_file_cb);
+        if (!status.ok()) {
+            LOG(WARNING) << "failed to get segment file from " << 
get_segment_file_url
+                         << ", status=" << status.to_string();
+            status.to_thrift(&tstatus);
+            return;
+        }
+    }
+
+    // Step 6: create rowset && calculate delete bitmap && commit
+    // Step 6.1: create rowset
+    RowsetSharedPtr rowset;
+    status = RowsetFactory::create_rowset(local_tablet->tablet_schema(),
+                                          local_tablet->tablet_path(), 
rowset_meta, &rowset);
+
+    if (!status) {
+        LOG(WARNING) << "failed to create rowset from rowset meta for remote 
tablet"
+                     << ". rowset_id: " << rowset_meta_pb.rowset_id()
+                     << ", rowset_type: " << rowset_meta_pb.rowset_type()
+                     << ", remote_tablet_id=" << rowset_meta_pb.tablet_id() << 
", txn_id=" << txn_id
+                     << ", status=" << status.to_string();
+        status.to_thrift(&tstatus);
+        return;
+    }
+
+    // Step 6.2 calculate delete bitmap before commit
+    auto calc_delete_bitmap_token =
+            
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
+    DeleteBitmapPtr delete_bitmap = 
std::make_shared<DeleteBitmap>(local_tablet_id);
+    RowsetIdUnorderedSet pre_rowset_ids;
+    if (local_tablet->enable_unique_key_merge_on_write()) {
+        auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
+        std::vector<segment_v2::SegmentSharedPtr> segments;
+        status = beta_rowset->load_segments(&segments);
+        if (!status) {
+            LOG(WARNING) << "failed to load segments from rowset"
+                         << ". rowset_id: " << beta_rowset->rowset_id() << ", 
txn_id=" << txn_id
+                         << ", status=" << status.to_string();
+            status.to_thrift(&tstatus);
+            return;
+        }
+        if (segments.size() > 1) {
+            // calculate delete bitmap between segments
+            status = local_tablet->calc_delete_bitmap_between_segments(rowset, 
segments,
+                                                                       
delete_bitmap);
+            if (!status) {
+                LOG(WARNING) << "failed to calculate delete bitmap"
+                             << ". tablet_id: " << local_tablet->tablet_id()
+                             << ". rowset_id: " << rowset->rowset_id() << ", 
txn_id=" << txn_id
+                             << ", status=" << status.to_string();
+                status.to_thrift(&tstatus);
+                return;
+            }
+        }
+
+        static_cast<void>(local_tablet->commit_phase_update_delete_bitmap(
+                rowset, pre_rowset_ids, delete_bitmap, segments, txn_id,
+                calc_delete_bitmap_token.get(), nullptr));
+        static_cast<void>(calc_delete_bitmap_token->wait());
+    }
+
+    // Step 6.3: commit txn
+    Status commit_txn_status = 
StorageEngine::instance()->txn_manager()->commit_txn(
+            local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(),
+            rowset_meta->txn_id(), rowset_meta->tablet_id(), 
local_tablet->tablet_uid(),
+            rowset_meta->load_id(), rowset, false);
+    if (!commit_txn_status && 
!commit_txn_status.is<ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST>()) {
+        auto err_msg = fmt::format(
+                "failed to commit txn for remote tablet. rowset_id: {}, 
remote_tablet_id={}, "
+                "txn_id={}, status={}",
+                rowset_meta->rowset_id().to_string(), rowset_meta->tablet_id(),
+                rowset_meta->txn_id(), commit_txn_status.to_string());
+        LOG(WARNING) << err_msg;
+        set_tstatus(TStatusCode::RUNTIME_ERROR, std::move(err_msg));
+        return;
+    }
+
+    if (local_tablet->enable_unique_key_merge_on_write()) {
+        
StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap(
+                partition_id, txn_id, local_tablet_id, 
local_tablet->tablet_uid(), true,
+                delete_bitmap, pre_rowset_ids, nullptr);
+    }
+
+    tstatus.__set_status_code(TStatusCode::OK);
+}
+} // namespace
+
 using apache::thrift::TException;
 using apache::thrift::TProcessor;
 using apache::thrift::TMultiplexedProcessor;
@@ -90,19 +369,33 @@ BackendService::BackendService(ExecEnv* exec_env)
 
 Status BackendService::create_service(ExecEnv* exec_env, int port,
                                       std::unique_ptr<ThriftServer>* server) {
-    std::shared_ptr<BackendService> handler(new BackendService(exec_env));
+    auto service = std::make_shared<BackendService>(exec_env);
     // TODO: do we want a BoostThreadFactory?
     // TODO: we want separate thread factories here, so that fe requests can't 
starve
     // be requests
-    std::shared_ptr<ThreadFactory> thread_factory(new ThreadFactory());
-
-    std::shared_ptr<TProcessor> be_processor(new 
BackendServiceProcessor(handler));
+    // std::shared_ptr<TProcessor> be_processor = 
std::make_shared<BackendServiceProcessor>(service);
+    auto be_processor = std::make_shared<BackendServiceProcessor>(service);
 
     *server = std::make_unique<ThriftServer>("backend", be_processor, port,
                                              config::be_service_threads);
 
     LOG(INFO) << "Doris BackendService listening on " << port;
 
+    auto thread_num = config::ingest_binlog_work_pool_size;
+    if (thread_num < 0) {
+        LOG(INFO) << fmt::format("ingest binlog thread pool size is {}, so we 
will in sync mode",
+                                 thread_num);
+        return Status::OK();
+    }
+
+    if (thread_num == 0) {
+        thread_num = std::thread::hardware_concurrency();
+    }
+    static_cast<void>(doris::ThreadPoolBuilder("IngestBinlog")
+                              .set_min_threads(thread_num)
+                              .set_max_threads(thread_num * 2)
+                              .build(&(service->_ingest_binlog_workers)));
+    LOG(INFO) << fmt::format("ingest binlog thread pool size is {}, in async 
mode", thread_num);
     return Status::OK();
 }
 
@@ -396,8 +689,6 @@ void BackendService::ingest_binlog(TIngestBinlogResult& 
result,
                                    const TIngestBinlogRequest& request) {
     LOG(INFO) << "ingest binlog. request: " << 
apache::thrift::ThriftDebugString(request);
 
-    constexpr uint64_t kMaxTimeoutMs = 1000;
-
     TStatus tstatus;
     Defer defer {[&result, &tstatus]() {
         result.__set_status(tstatus);
@@ -452,6 +743,12 @@ void BackendService::ingest_binlog(TIngestBinlogResult& 
result,
         set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg);
         return;
     }
+    if (!request.__isset.local_tablet_id) {
+        auto error_msg = "local_tablet_id is empty";
+        LOG(WARNING) << error_msg;
+        set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg);
+        return;
+    }
     if (!request.__isset.load_id) {
         auto error_msg = "load_id is empty";
         LOG(WARNING) << error_msg;
@@ -486,239 +783,105 @@ void BackendService::ingest_binlog(TIngestBinlogResult& 
result,
         return;
     }
 
-    // Step 3: get binlog info
-    auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download";, 
request.remote_host,
-                                      request.remote_port);
-    constexpr int max_retry = 3;
+    bool is_async = (_ingest_binlog_workers != nullptr);
+    result.__set_is_async(is_async);
 
-    auto get_binlog_info_url =
-            fmt::format("{}?method={}&tablet_id={}&binlog_version={}", 
binlog_api_url,
-                        "get_binlog_info", request.remote_tablet_id, 
request.binlog_version);
-    std::string binlog_info;
-    auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* 
client) {
-        RETURN_IF_ERROR(client->init(get_binlog_info_url));
-        client->set_timeout_ms(kMaxTimeoutMs);
-        return client->execute(&binlog_info);
-    };
-    status = HttpClient::execute_with_retry(max_retry, 1, get_binlog_info_cb);
-    if (!status.ok()) {
-        LOG(WARNING) << "failed to get binlog info from " << 
get_binlog_info_url
-                     << ", status=" << status.to_string();
-        status.to_thrift(&tstatus);
-        return;
-    }
+    auto ingest_binlog_func = [=, tstatus = &tstatus]() {
+        IngestBinlogArg ingest_binlog_arg = {
+                .txn_id = txn_id,
+                .partition_id = partition_id,
+                .local_tablet_id = local_tablet_id,
+                .local_tablet = local_tablet,
 
-    std::vector<std::string> binlog_info_parts = strings::Split(binlog_info, 
":");
-    // TODO(Drogon): check binlog info content is right
-    DCHECK(binlog_info_parts.size() == 2);
-    const std::string& remote_rowset_id = binlog_info_parts[0];
-    int64_t num_segments = std::stoll(binlog_info_parts[1]);
+                .request = std::move(request),
+                .tstatus = is_async ? nullptr : tstatus,
+        };
 
-    // Step 4: get rowset meta
-    auto get_rowset_meta_url = fmt::format(
-            "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}", 
binlog_api_url,
-            "get_rowset_meta", request.remote_tablet_id, remote_rowset_id, 
request.binlog_version);
-    std::string rowset_meta_str;
-    auto get_rowset_meta_cb = [&get_rowset_meta_url, 
&rowset_meta_str](HttpClient* client) {
-        RETURN_IF_ERROR(client->init(get_rowset_meta_url));
-        client->set_timeout_ms(kMaxTimeoutMs);
-        return client->execute(&rowset_meta_str);
+        _ingest_binlog(&ingest_binlog_arg);
     };
-    status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
-    if (!status.ok()) {
-        LOG(WARNING) << "failed to get rowset meta from " << 
get_rowset_meta_url
-                     << ", status=" << status.to_string();
-        status.to_thrift(&tstatus);
-        return;
-    }
-    RowsetMetaPB rowset_meta_pb;
-    if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) {
-        LOG(WARNING) << "failed to parse rowset meta from " << 
get_rowset_meta_url;
-        status = Status::InternalError("failed to parse rowset meta");
-        status.to_thrift(&tstatus);
-        return;
-    }
-    // rewrite rowset meta
-    rowset_meta_pb.set_tablet_id(local_tablet_id);
-    rowset_meta_pb.set_partition_id(partition_id);
-    
rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash());
-    rowset_meta_pb.set_txn_id(txn_id);
-    rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED);
-    auto rowset_meta = std::make_shared<RowsetMeta>();
-    if (!rowset_meta->init_from_pb(rowset_meta_pb)) {
-        LOG(WARNING) << "failed to init rowset meta from " << 
get_rowset_meta_url;
-        status = Status::InternalError("failed to init rowset meta");
-        status.to_thrift(&tstatus);
-        return;
-    }
-    RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
-    rowset_meta->set_rowset_id(new_rowset_id);
-    rowset_meta->set_tablet_uid(local_tablet->tablet_uid());
 
-    // Step 5: get all segment files
-    // Step 5.1: get all segment files size
-    std::vector<std::string> segment_file_urls;
-    segment_file_urls.reserve(num_segments);
-    std::vector<uint64_t> segment_file_sizes;
-    segment_file_sizes.reserve(num_segments);
-    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
-        auto get_segment_file_size_url = fmt::format(
-                "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}", 
binlog_api_url,
-                "get_segment_file", request.remote_tablet_id, 
remote_rowset_id, segment_index);
-        uint64_t segment_file_size;
-        auto get_segment_file_size_cb = [&get_segment_file_size_url,
-                                         &segment_file_size](HttpClient* 
client) {
-            RETURN_IF_ERROR(client->init(get_segment_file_size_url));
-            client->set_timeout_ms(kMaxTimeoutMs);
-            RETURN_IF_ERROR(client->head());
-            return client->get_content_length(&segment_file_size);
-        };
-
-        status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_file_size_cb);
+    if (is_async) {
+        status = 
_ingest_binlog_workers->submit_func(std::move(ingest_binlog_func));
         if (!status.ok()) {
-            LOG(WARNING) << "failed to get segment file size from " << 
get_segment_file_size_url
-                         << ", status=" << status.to_string();
             status.to_thrift(&tstatus);
             return;
         }
-
-        segment_file_sizes.push_back(segment_file_size);
-        segment_file_urls.push_back(std::move(get_segment_file_size_url));
-    }
-
-    // Step 5.2: check data capacity
-    uint64_t total_size = std::accumulate(segment_file_sizes.begin(), 
segment_file_sizes.end(), 0);
-    if (!local_tablet->can_add_binlog(total_size)) {
-        LOG(WARNING) << "failed to add binlog, no enough space, total_size=" 
<< total_size
-                     << ", tablet=" << local_tablet->tablet_id();
-        status = Status::InternalError("no enough space");
-        status.to_thrift(&tstatus);
-        return;
+    } else {
+        ingest_binlog_func();
     }
+}
 
-    // Step 5.3: get all segment files
-    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
-        auto segment_file_size = segment_file_sizes[segment_index];
-        auto get_segment_file_url = segment_file_urls[segment_index];
-
-        uint64_t estimate_timeout =
-                segment_file_size / config::download_low_speed_limit_kbps / 
1024;
-        if (estimate_timeout < config::download_low_speed_time) {
-            estimate_timeout = config::download_low_speed_time;
-        }
-
-        auto local_segment_path = BetaRowset::segment_file_path(
-                local_tablet->tablet_path(), rowset_meta->rowset_id(), 
segment_index);
-        LOG(INFO) << fmt::format("download segment file from {} to {}", 
get_segment_file_url,
-                                 local_segment_path);
-        auto get_segment_file_cb = [&get_segment_file_url, 
&local_segment_path, segment_file_size,
-                                    estimate_timeout](HttpClient* client) {
-            RETURN_IF_ERROR(client->init(get_segment_file_url));
-            client->set_timeout_ms(estimate_timeout * 1000);
-            RETURN_IF_ERROR(client->download(local_segment_path));
+void BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
+                                         const TQueryIngestBinlogRequest& 
request) {
+    LOG(INFO) << "query ingest binlog. request: " << 
apache::thrift::ThriftDebugString(request);
 
-            std::error_code ec;
-            // Check file length
-            uint64_t local_file_size = 
std::filesystem::file_size(local_segment_path, ec);
-            if (ec) {
-                LOG(WARNING) << "download file error" << ec.message();
-                return Status::IOError("can't retrive file_size of {}, due to 
{}",
-                                       local_segment_path, ec.message());
-            }
-            if (local_file_size != segment_file_size) {
-                LOG(WARNING) << "download file length error"
-                             << ", get_segment_file_url=" << 
get_segment_file_url
-                             << ", file_size=" << segment_file_size
-                             << ", local_file_size=" << local_file_size;
-                return Status::InternalError("downloaded file size is not 
equal");
-            }
-            chmod(local_segment_path.c_str(), S_IRUSR | S_IWUSR);
-            return Status::OK();
-        };
+    auto set_result = [&](TIngestBinlogStatus::type status, std::string 
error_msg) {
+        result.__set_status(status);
+        result.__set_err_msg(std::move(error_msg));
+    };
 
-        auto status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_file_cb);
-        if (!status.ok()) {
-            LOG(WARNING) << "failed to get segment file from " << 
get_segment_file_url
-                         << ", status=" << status.to_string();
-            status.to_thrift(&tstatus);
-            return;
-        }
+    /// Check args: txn_id, partition_id, tablet_id, load_id
+    if (!request.__isset.txn_id) {
+        auto error_msg = "txn_id is empty";
+        LOG(WARNING) << error_msg;
+        set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
+        return;
     }
-
-    // Step 6: create rowset && calculate delete bitmap && commit
-    // Step 6.1: create rowset
-    RowsetSharedPtr rowset;
-    status = RowsetFactory::create_rowset(local_tablet->tablet_schema(),
-                                          local_tablet->tablet_path(), 
rowset_meta, &rowset);
-
-    if (!status) {
-        LOG(WARNING) << "failed to create rowset from rowset meta for remote 
tablet"
-                     << ". rowset_id: " << rowset_meta_pb.rowset_id()
-                     << ", rowset_type: " << rowset_meta_pb.rowset_type()
-                     << ", remote_tablet_id=" << rowset_meta_pb.tablet_id() << 
", txn_id=" << txn_id
-                     << ", status=" << status.to_string();
-        status.to_thrift(&tstatus);
+    if (!request.__isset.partition_id) {
+        auto error_msg = "partition_id is empty";
+        LOG(WARNING) << error_msg;
+        set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
         return;
     }
-
-    // Step 6.2 calculate delete bitmap before commit
-    auto calc_delete_bitmap_token =
-            
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
-    DeleteBitmapPtr delete_bitmap = 
std::make_shared<DeleteBitmap>(local_tablet_id);
-    RowsetIdUnorderedSet pre_rowset_ids;
-    if (local_tablet->enable_unique_key_merge_on_write()) {
-        auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
-        std::vector<segment_v2::SegmentSharedPtr> segments;
-        status = beta_rowset->load_segments(&segments);
-        if (!status) {
-            LOG(WARNING) << "failed to load segments from rowset"
-                         << ". rowset_id: " << beta_rowset->rowset_id() << ", 
txn_id=" << txn_id
-                         << ", status=" << status.to_string();
-            status.to_thrift(&tstatus);
-            return;
-        }
-        if (segments.size() > 1) {
-            // calculate delete bitmap between segments
-            status = local_tablet->calc_delete_bitmap_between_segments(rowset, 
segments,
-                                                                       
delete_bitmap);
-            if (!status) {
-                LOG(WARNING) << "failed to calculate delete bitmap"
-                             << ". tablet_id: " << local_tablet->tablet_id()
-                             << ". rowset_id: " << rowset->rowset_id() << ", 
txn_id=" << txn_id
-                             << ", status=" << status.to_string();
-                status.to_thrift(&tstatus);
-                return;
-            }
-        }
-
-        static_cast<void>(local_tablet->commit_phase_update_delete_bitmap(
-                rowset, pre_rowset_ids, delete_bitmap, segments, txn_id,
-                calc_delete_bitmap_token.get(), nullptr));
-        static_cast<void>(calc_delete_bitmap_token->wait());
+    if (!request.__isset.tablet_id) {
+        auto error_msg = "tablet_id is empty";
+        LOG(WARNING) << error_msg;
+        set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
+        return;
     }
-
-    // Step 6.3: commit txn
-    Status commit_txn_status = 
StorageEngine::instance()->txn_manager()->commit_txn(
-            local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(),
-            rowset_meta->txn_id(), rowset_meta->tablet_id(), 
local_tablet->tablet_uid(),
-            rowset_meta->load_id(), rowset, false);
-    if (!commit_txn_status && 
!commit_txn_status.is<ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST>()) {
-        auto err_msg = fmt::format(
-                "failed to commit txn for remote tablet. rowset_id: {}, 
remote_tablet_id={}, "
-                "txn_id={}, status={}",
-                rowset_meta->rowset_id().to_string(), rowset_meta->tablet_id(),
-                rowset_meta->txn_id(), commit_txn_status.to_string());
-        LOG(WARNING) << err_msg;
-        set_tstatus(TStatusCode::RUNTIME_ERROR, std::move(err_msg));
+    if (!request.__isset.load_id) {
+        auto error_msg = "load_id is empty";
+        LOG(WARNING) << error_msg;
+        set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
         return;
     }
 
-    if (local_tablet->enable_unique_key_merge_on_write()) {
-        
StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap(
-                partition_id, txn_id, local_tablet_id, 
local_tablet->tablet_uid(), true,
-                delete_bitmap, pre_rowset_ids, nullptr);
+    auto partition_id = request.partition_id;
+    auto txn_id = request.txn_id;
+    auto tablet_id = request.tablet_id;
+
+    // Step 1: get local tablet
+    auto local_tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
+    if (local_tablet == nullptr) {
+        auto error_msg = fmt::format("tablet {} not found", tablet_id);
+        LOG(WARNING) << error_msg;
+        set_result(TIngestBinlogStatus::NOT_FOUND, std::move(error_msg));
+        return;
     }
 
-    tstatus.__set_status_code(TStatusCode::OK);
+    // Step 2: get txn state
+    auto tablet_uid = local_tablet->tablet_uid();
+    auto txn_state = 
StorageEngine::instance()->txn_manager()->get_txn_state(partition_id, txn_id,
+                                                                             
tablet_id, tablet_uid);
+    switch (txn_state) {
+    case TxnState::NOT_FOUND:
+        result.__set_status(TIngestBinlogStatus::NOT_FOUND);
+        break;
+    case TxnState::PREPARED:
+        result.__set_status(TIngestBinlogStatus::DOING);
+        break;
+    case TxnState::COMMITTED:
+        result.__set_status(TIngestBinlogStatus::OK);
+        break;
+    case TxnState::ROLLEDBACK:
+        result.__set_status(TIngestBinlogStatus::FAILED);
+        break;
+    case TxnState::ABORTED:
+        result.__set_status(TIngestBinlogStatus::FAILED);
+        break;
+    case TxnState::DELETED:
+        result.__set_status(TIngestBinlogStatus::FAILED);
+        break;
+    }
 }
 } // namespace doris
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index 8ad55e43e6e..09a79a68bf0 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -58,6 +58,7 @@ class TTransmitDataParams;
 class TUniqueId;
 class TIngestBinlogRequest;
 class TIngestBinlogResult;
+class ThreadPool;
 
 // This class just forward rpc for actual handler
 // make this class because we can bind multiple service on single point
@@ -137,10 +138,14 @@ public:
 
     void ingest_binlog(TIngestBinlogResult& result, const 
TIngestBinlogRequest& request) override;
 
+    void query_ingest_binlog(TQueryIngestBinlogResult& result,
+                             const TQueryIngestBinlogRequest& request) 
override;
+
 private:
     Status start_plan_fragment_execution(const TExecPlanFragmentParams& 
exec_params);
     ExecEnv* _exec_env;
     std::unique_ptr<AgentServer> _agent_server;
+    std::unique_ptr<ThreadPool> _ingest_binlog_workers;
 };
 
 } // namespace doris
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 41578d976dd..c4f24cfdf8f 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -17,6 +17,9 @@
 
 #include "service/http_service.h"
 
+#include <event2/bufferevent.h>
+#include <event2/http.h>
+
 #include <algorithm>
 #include <string>
 #include <vector>
@@ -59,6 +62,30 @@
 #include "util/doris_metrics.h"
 
 namespace doris {
+namespace {
+std::shared_ptr<bufferevent_rate_limit_group> get_rate_limit_group(event_base* 
event_base) {
+    auto rate_limit = config::download_binlog_rate_limit_kbs;
+    if (rate_limit <= 0) {
+        return nullptr;
+    }
+
+    auto max_value = std::numeric_limits<int32_t>::max() / 1024 * 10;
+    if (rate_limit > max_value) {
+        LOG(WARNING) << "rate limit is too large, set to max value.";
+        rate_limit = max_value;
+    }
+    struct timeval cfg_tick = {0, 100 * 1000}; // 100ms
+    rate_limit = rate_limit / 10 * 1024;       // convert to KB/S
+
+    auto token_bucket = std::unique_ptr<ev_token_bucket_cfg, 
decltype(&ev_token_bucket_cfg_free)>(
+            ev_token_bucket_cfg_new(rate_limit, rate_limit * 2, rate_limit, 
rate_limit * 2,
+                                    &cfg_tick),
+            ev_token_bucket_cfg_free);
+    return std::shared_ptr<bufferevent_rate_limit_group>(
+            bufferevent_rate_limit_group_new(event_base, token_bucket.get()),
+            bufferevent_rate_limit_group_free);
+}
+} // namespace
 
 HttpService::HttpService(ExecEnv* env, int port, int num_threads)
         : _env(env),
@@ -72,6 +99,9 @@ HttpService::~HttpService() {
 Status HttpService::start() {
     add_default_path_handlers(_web_page_handler.get());
 
+    auto event_base = _ev_http_server->get_event_bases()[0];
+    _rate_limit_group = get_rate_limit_group(event_base.get());
+
     // register load
     StreamLoadAction* streamload_action = _pool.add(new 
StreamLoadAction(_env));
     _ev_http_server->register_handler(HttpMethod::PUT, 
"/api/{db}/{table}/_load",
@@ -93,18 +123,19 @@ Status HttpService::start() {
     for (auto& path : _env->store_paths()) {
         allow_paths.emplace_back(path.path);
     }
-    DownloadAction* download_action = _pool.add(new DownloadAction(_env, 
allow_paths));
+    DownloadAction* download_action = _pool.add(new DownloadAction(_env, 
nullptr, allow_paths));
     _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_download_load", 
download_action);
     _ev_http_server->register_handler(HttpMethod::GET, "/api/_download_load", 
download_action);
 
-    DownloadAction* tablet_download_action = _pool.add(new 
DownloadAction(_env, allow_paths));
+    DownloadAction* tablet_download_action =
+            _pool.add(new DownloadAction(_env, _rate_limit_group, 
allow_paths));
     _ev_http_server->register_handler(HttpMethod::HEAD, 
"/api/_tablet/_download",
                                       tablet_download_action);
     _ev_http_server->register_handler(HttpMethod::GET, 
"/api/_tablet/_download",
                                       tablet_download_action);
     if (config::enable_single_replica_load) {
         DownloadAction* single_replica_download_action = _pool.add(new 
DownloadAction(
-                _env, allow_paths, 
config::single_replica_load_download_num_workers));
+                _env, nullptr, allow_paths, 
config::single_replica_load_download_num_workers));
         _ev_http_server->register_handler(HttpMethod::HEAD, 
"/api/_single_replica/_download",
                                           single_replica_download_action);
         _ev_http_server->register_handler(HttpMethod::GET, 
"/api/_single_replica/_download",
@@ -118,7 +149,8 @@ Status HttpService::start() {
     _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_load_error_log",
                                       error_log_download_action);
 
-    DownloadBinlogAction* download_binlog_action = _pool.add(new 
DownloadBinlogAction(_env));
+    DownloadBinlogAction* download_binlog_action =
+            _pool.add(new DownloadBinlogAction(_env, _rate_limit_group));
     _ev_http_server->register_handler(HttpMethod::GET, 
"/api/_binlog/_download",
                                       download_binlog_action);
     _ev_http_server->register_handler(HttpMethod::HEAD, 
"/api/_binlog/_download",
diff --git a/be/src/service/http_service.h b/be/src/service/http_service.h
index 46b5b3c5b3c..eeaf9516594 100644
--- a/be/src/service/http_service.h
+++ b/be/src/service/http_service.h
@@ -22,6 +22,8 @@
 #include "common/object_pool.h"
 #include "common/status.h"
 
+struct bufferevent_rate_limit_group;
+
 namespace doris {
 
 class ExecEnv;
@@ -47,6 +49,8 @@ private:
     std::unique_ptr<EvHttpServer> _ev_http_server;
     std::unique_ptr<WebPageHandler> _web_page_handler;
 
+    std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group {nullptr};
+
     bool stopped = false;
 };
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
index a2f3867de2b..ba66d07ec6b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
@@ -34,6 +34,8 @@ import org.apache.doris.thrift.TIngestBinlogResult;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPublishTopicRequest;
 import org.apache.doris.thrift.TPublishTopicResult;
+import org.apache.doris.thrift.TQueryIngestBinlogRequest;
+import org.apache.doris.thrift.TQueryIngestBinlogResult;
 import org.apache.doris.thrift.TRoutineLoadTask;
 import org.apache.doris.thrift.TScanBatchResult;
 import org.apache.doris.thrift.TScanCloseParams;
@@ -229,6 +231,12 @@ public class GenericPoolTest {
         public TIngestBinlogResult ingestBinlog(TIngestBinlogRequest 
ingestBinlogRequest) throws TException {
             return null;
         }
+
+        @Override
+        public TQueryIngestBinlogResult 
queryIngestBinlog(TQueryIngestBinlogRequest queryIngestBinlogRequest)
+                throws TException {
+            return null;
+        }
     }
 
     @Test
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 058e356a325..860b58a47a7 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -52,6 +52,8 @@ import org.apache.doris.thrift.TMasterInfo;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPublishTopicRequest;
 import org.apache.doris.thrift.TPublishTopicResult;
+import org.apache.doris.thrift.TQueryIngestBinlogRequest;
+import org.apache.doris.thrift.TQueryIngestBinlogResult;
 import org.apache.doris.thrift.TRoutineLoadTask;
 import org.apache.doris.thrift.TScanBatchResult;
 import org.apache.doris.thrift.TScanCloseParams;
@@ -375,6 +377,12 @@ public class MockedBackendFactory {
         public TIngestBinlogResult ingestBinlog(TIngestBinlogRequest 
ingestBinlogRequest) throws TException {
             return null;
         }
+
+        @Override
+        public TQueryIngestBinlogResult 
queryIngestBinlog(TQueryIngestBinlogRequest queryIngestBinlogRequest)
+                throws TException {
+            return null;
+        }
     }
 
     // The default Brpc service.
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index a7a9c50aed2..d35d6166d3b 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -136,6 +136,28 @@ struct TIngestBinlogRequest {
 
 struct TIngestBinlogResult {
     1: optional Status.TStatus status;
+    2: optional bool is_async;
+}
+
+struct TQueryIngestBinlogRequest {
+    1: optional i64 txn_id;
+    2: optional i64 partition_id;
+    3: optional i64 tablet_id;
+    4: optional Types.TUniqueId load_id;
+}
+
+enum TIngestBinlogStatus {
+    ANALYSIS_ERROR,
+    UNKNOWN,
+    NOT_FOUND,
+    OK,
+    FAILED,
+    DOING
+}
+
+struct TQueryIngestBinlogResult {
+    1: optional TIngestBinlogStatus status;
+    2: optional string err_msg;
 }
 
 enum TTopicInfoType {
@@ -211,6 +233,7 @@ service BackendService {
     TCheckStorageFormatResult check_storage_format();
 
     TIngestBinlogResult ingest_binlog(1: TIngestBinlogRequest 
ingest_binlog_request);
+    TQueryIngestBinlogResult query_ingest_binlog(1: TQueryIngestBinlogRequest 
query_ingest_binlog_request);
 
     TPublishTopicResult publish_topic_info(1:TPublishTopicRequest 
topic_request);
 }
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
index 493c4daa493..62b257b1bc8 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
@@ -72,7 +72,6 @@ class Syncer {
     }
 
     private Boolean checkBinlog(TBinlog binlog, String table, Boolean update) {
-
         // step 1: check binlog availability
         if (binlog == null) {
             return false
@@ -735,6 +734,7 @@ class Syncer {
                 if (!binlogRecords.contains(srcPartition.key)) {
                     continue
                 }
+
                 Iterator srcTabletIter = 
srcPartition.value.tabletMeta.iterator()
                 Iterator tarTabletIter = 
tarPartition.value.tabletMeta.iterator()
 
@@ -771,6 +771,7 @@ class Syncer {
                     logger.info("request -> ${request}")
                     TIngestBinlogResult result = 
tarClient.client.ingestBinlog(request)
                     if (!checkIngestBinlog(result)) {
+                        logger.error("Ingest binlog error! result: ${result}")
                         return false
                     }
 
diff --git a/regression-test/suites/ccr_syncer_p0/test_ingest_binlog.groovy 
b/regression-test/suites/ccr_syncer_p0/test_ingest_binlog.groovy
index cef48715aeb..3004e344cc9 100644
--- a/regression-test/suites/ccr_syncer_p0/test_ingest_binlog.groovy
+++ b/regression-test/suites/ccr_syncer_p0/test_ingest_binlog.groovy
@@ -22,6 +22,7 @@ suite("test_ingest_binlog") {
         logger.info("fe enable_feature_binlog is false, skip case 
test_ingest_binlog")
         return
     }
+
     def tableName = "tbl_ingest_binlog"
     def insert_num = 5
     def test_num = 0
@@ -102,6 +103,7 @@ suite("test_ingest_binlog") {
     logger.info("=== Test 2.2: Wrong binlog version case ===")
     // -1 means use the number of syncer.context
     // Boolean ingestBinlog(long fakePartitionId = -1, long fakeVersion = -1)
+    // use fakeVersion = 1, 1 is doris be talet first version, so no binlog, 
only http error
     assertTrue(syncer.ingestBinlog(-1, 1) == false)
 
 
@@ -120,4 +122,4 @@ suite("test_ingest_binlog") {
 
     // End Test 2
     syncer.closeBackendClients()
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to