This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 09b56d5d4d6 [improvement](binlog)Support inverted index in CCR 
(#31743) (#36203)
09b56d5d4d6 is described below

commit 09b56d5d4d6c46725b2beffe39d0764ff1f0cda4
Author: qiye <jianliang5...@gmail.com>
AuthorDate: Sun Jun 16 09:36:46 2024 +0800

    [improvement](binlog)Support inverted index in CCR (#31743) (#36203)
---
 be/src/http/action/download_binlog_action.cpp |  39 ++++++++
 be/src/olap/rowset/beta_rowset.cpp            |  43 ++++++++-
 be/src/olap/snapshot_manager.cpp              |  53 ++++++++++-
 be/src/olap/tablet.cpp                        |  26 ++++-
 be/src/olap/tablet.h                          |   5 +
 be/src/olap/tablet_manager.cpp                |  17 +++-
 be/src/olap/task/engine_clone_task.cpp        |  14 ++-
 be/src/service/backend_service.cpp            | 132 ++++++++++++++++++++++++--
 8 files changed, 307 insertions(+), 22 deletions(-)

diff --git a/be/src/http/action/download_binlog_action.cpp 
b/be/src/http/action/download_binlog_action.cpp
index 697512b2a30..dbe2880d3b4 100644
--- a/be/src/http/action/download_binlog_action.cpp
+++ b/be/src/http/action/download_binlog_action.cpp
@@ -47,6 +47,7 @@ const std::string kTabletIdParameter = "tablet_id";
 const std::string kBinlogVersionParameter = "binlog_version";
 const std::string kRowsetIdParameter = "rowset_id";
 const std::string kSegmentIndexParameter = "segment_index";
+const std::string kSegmentIndexIdParameter = "segment_index_id";
 
 // get http param, if no value throw exception
 const auto& get_http_param(HttpRequest* req, const std::string& param_name) {
@@ -130,6 +131,42 @@ void handle_get_segment_file(HttpRequest* req, 
bufferevent_rate_limit_group* rat
     do_file_response(segment_file_path, req, rate_limit_group);
 }
 
+/// handle get segment index file, need tablet_id, rowset_id, segment_index && 
segment_index_id
+void handle_get_segment_index_file(HttpRequest* req,
+                                   bufferevent_rate_limit_group* 
rate_limit_group) {
+    // Step 1: get download file path
+    std::string segment_index_file_path;
+    try {
+        const auto& tablet_id = get_http_param(req, kTabletIdParameter);
+        auto tablet = get_tablet(tablet_id);
+        const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
+        const auto& segment_index = get_http_param(req, 
kSegmentIndexParameter);
+        const auto& segment_index_id = req->param(kSegmentIndexIdParameter);
+        segment_index_file_path =
+                tablet->get_segment_index_filepath(rowset_id, segment_index, 
segment_index_id);
+    } catch (const std::exception& e) {
+        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, 
e.what());
+        LOG(WARNING) << "get download file path failed, error: " << e.what();
+        return;
+    }
+
+    // Step 2: handle download
+    // check file exists
+    bool exists = false;
+    Status status = 
io::global_local_filesystem()->exists(segment_index_file_path, &exists);
+    if (!status.ok()) {
+        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, 
status.to_string());
+        LOG(WARNING) << "check file exists failed, error: " << 
status.to_string();
+        return;
+    }
+    if (!exists) {
+        HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, "file not exist.");
+        LOG(WARNING) << "file not exist, file path: " << 
segment_index_file_path;
+        return;
+    }
+    do_file_response(segment_index_file_path, req, rate_limit_group);
+}
+
 void handle_get_rowset_meta(HttpRequest* req) {
     try {
         const auto& tablet_id = get_http_param(req, kTabletIdParameter);
@@ -183,6 +220,8 @@ void DownloadBinlogAction::handle(HttpRequest* req) {
         handle_get_binlog_info(req);
     } else if (method == "get_segment_file") {
         handle_get_segment_file(req, _rate_limit_group.get());
+    } else if (method == "get_segment_index_file") {
+        handle_get_segment_index_file(req, _rate_limit_group.get());
     } else if (method == "get_rowset_meta") {
         handle_get_rowset_meta(req);
     } else {
diff --git a/be/src/olap/rowset/beta_rowset.cpp 
b/be/src/olap/rowset/beta_rowset.cpp
index 10d7f326a58..d07b0b2254c 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -454,7 +454,7 @@ Status BetaRowset::add_to_binlog() {
     if (fs->type() != io::FileSystemType::LOCAL) {
         return Status::InternalError("should be local file system");
     }
-    io::LocalFileSystem* local_fs = 
static_cast<io::LocalFileSystem*>(fs.get());
+    auto* local_fs = static_cast<io::LocalFileSystem*>(fs.get());
 
     // all segments are in the same directory, so cache binlog_dir without 
multi times check
     std::string binlog_dir;
@@ -462,6 +462,22 @@ Status BetaRowset::add_to_binlog() {
     auto segments_num = num_segments();
     VLOG_DEBUG << fmt::format("add rowset to binlog. rowset_id={}, 
segments_num={}",
                               rowset_id().to_string(), segments_num);
+
+    Status status;
+    std::vector<string> linked_success_files;
+    Defer remove_linked_files {[&]() { // clear linked files if errors happen
+        if (!status.ok()) {
+            LOG(WARNING) << "will delete linked success files due to error " 
<< status;
+            std::vector<io::Path> paths;
+            for (auto& file : linked_success_files) {
+                paths.emplace_back(file);
+                LOG(WARNING) << "will delete linked success file " << file << 
" due to error";
+            }
+            static_cast<void>(local_fs->batch_delete(paths));
+            LOG(WARNING) << "done delete linked success files due to error " 
<< status;
+        }
+    }};
+
     for (int i = 0; i < segments_num; ++i) {
         auto seg_file = segment_file_path(i);
 
@@ -480,8 +496,29 @@ Status BetaRowset::add_to_binlog() {
                         .string();
         VLOG_DEBUG << "link " << seg_file << " to " << binlog_file;
         if (!local_fs->link_file(seg_file, binlog_file).ok()) {
-            return Status::Error<OS_ERROR>("fail to create hard link. from={}, 
to={}, errno={}",
-                                           seg_file, binlog_file, Errno::no());
+            status = Status::Error<OS_ERROR>("fail to create hard link. 
from={}, to={}, errno={}",
+                                             seg_file, binlog_file, 
Errno::no());
+            return status;
+        }
+        linked_success_files.push_back(binlog_file);
+
+        for (const auto& index : _schema->indexes()) {
+            if (index.index_type() != IndexType::INVERTED) {
+                continue;
+            }
+            auto index_id = index.index_id();
+            auto index_file = 
InvertedIndexDescriptor::get_index_file_name(seg_file, index_id);
+            auto binlog_index_file = (std::filesystem::path(binlog_dir) /
+                                      
std::filesystem::path(index_file).filename())
+                                             .string();
+            VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file;
+            if (!local_fs->link_file(index_file, binlog_index_file).ok()) {
+                status = Status::Error<OS_ERROR>(
+                        "fail to create hard link. from={}, to={}, errno={}", 
index_file,
+                        binlog_index_file, Errno::no());
+                return status;
+            }
+            linked_success_files.push_back(binlog_index_file);
         }
     }
 
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 05e2c771aac..a43a1d187c0 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -647,11 +647,39 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
             break;
         }
 
-        for (auto& rowset_binlog_meta : 
rowset_binlog_metas_pb.rowset_binlog_metas()) {
+        for (const auto& rowset_binlog_meta : 
rowset_binlog_metas_pb.rowset_binlog_metas()) {
             std::string segment_file_path;
             auto num_segments = rowset_binlog_meta.num_segments();
             std::string_view rowset_id = rowset_binlog_meta.rowset_id();
 
+            RowsetMetaPB rowset_meta_pb;
+            if (!rowset_meta_pb.ParseFromString(rowset_binlog_meta.data())) {
+                auto err_msg = fmt::format("fail to parse binlog meta data 
value:{}",
+                                           rowset_binlog_meta.data());
+                res = Status::InternalError(err_msg);
+                LOG(WARNING) << err_msg;
+                return res;
+            }
+            const auto& tablet_schema_pb = rowset_meta_pb.tablet_schema();
+            TabletSchema tablet_schema;
+            tablet_schema.init_from_pb(tablet_schema_pb);
+
+            std::vector<string> linked_success_files;
+            Defer remove_linked_files {[&]() { // clear linked files if errors 
happen
+                if (!res.ok()) {
+                    LOG(WARNING) << "will delete linked success files due to 
error " << res;
+                    std::vector<io::Path> paths;
+                    for (auto& file : linked_success_files) {
+                        paths.emplace_back(file);
+                        LOG(WARNING)
+                                << "will delete linked success file " << file 
<< " due to error";
+                    }
+                    
static_cast<void>(io::global_local_filesystem()->batch_delete(paths));
+                    LOG(WARNING) << "done delete linked success files due to 
error " << res;
+                }
+            }};
+
+            // link segment files and index files
             for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
                 segment_file_path = 
ref_tablet->get_segment_filepath(rowset_id, segment_index);
                 auto snapshot_segment_file_path =
@@ -664,6 +692,29 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
                                  << ", dest=" << snapshot_segment_file_path << 
"]";
                     break;
                 }
+                linked_success_files.push_back(snapshot_segment_file_path);
+
+                for (const auto& index : tablet_schema.indexes()) {
+                    if (index.index_type() != IndexType::INVERTED) {
+                        continue;
+                    }
+                    auto index_id = index.index_id();
+                    auto index_file = ref_tablet->get_segment_index_filepath(
+                            rowset_id, segment_index, index_id);
+                    auto snapshot_segment_index_file_path =
+                            fmt::format("{}/{}_{}_{}.binlog-index", 
schema_full_path, rowset_id,
+                                        segment_index, index_id);
+                    VLOG_DEBUG << "link " << index_file << " to "
+                               << snapshot_segment_index_file_path;
+                    res = io::global_local_filesystem()->link_file(
+                            index_file, snapshot_segment_index_file_path);
+                    if (!res.ok()) {
+                        LOG(WARNING) << "fail to link binlog index file. 
[src=" << index_file
+                                     << ", dest=" << 
snapshot_segment_index_file_path << "]";
+                        break;
+                    }
+                    
linked_success_files.push_back(snapshot_segment_index_file_path);
+                }
             }
 
             if (!res.ok()) {
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 9eb6f218857..97cbfea9554 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3900,6 +3900,19 @@ std::string 
Tablet::get_segment_filepath(std::string_view rowset_id, int64_t seg
     return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id, 
segment_index);
 }
 
+std::string Tablet::get_segment_index_filepath(std::string_view rowset_id,
+                                               std::string_view segment_index,
+                                               std::string_view index_id) 
const {
+    // TODO(qiye): support inverted index file format v2, when 
https://github.com/apache/doris/pull/30145 is merged
+    return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, 
segment_index, index_id);
+}
+
+std::string Tablet::get_segment_index_filepath(std::string_view rowset_id, 
int64_t segment_index,
+                                               int64_t index_id) const {
+    // TODO(qiye): support inverted index file format v2, when 
https://github.com/apache/doris/pull/30145 is merged
+    return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, 
segment_index, index_id);
+}
+
 std::vector<std::string> Tablet::get_binlog_filepath(std::string_view 
binlog_version) const {
     const auto& [rowset_id, num_segments] = get_binlog_info(binlog_version);
     std::vector<std::string> binlog_filepath;
@@ -3929,7 +3942,6 @@ void Tablet::gc_binlogs(int64_t version) {
 
     const auto& tablet_uid = this->tablet_uid();
     const auto tablet_id = this->tablet_id();
-    const auto& tablet_path = this->tablet_path();
     std::string begin_key = make_binlog_meta_key_prefix(tablet_uid);
     std::string end_key = make_binlog_meta_key_prefix(tablet_uid, version + 1);
     LOG(INFO) << fmt::format("gc binlog meta, tablet_id:{}, begin_key:{}, 
end_key:{}", tablet_id,
@@ -3943,10 +3955,16 @@ void Tablet::gc_binlogs(int64_t version) {
         wait_for_deleted_binlog_keys.emplace_back(key);
         
wait_for_deleted_binlog_keys.push_back(get_binlog_data_key_from_meta_key(key));
 
+        // add binlog segment files and index files
         for (int64_t i = 0; i < num_segments; ++i) {
-            auto segment_file = fmt::format("{}_{}.dat", rowset_id, i);
-            wait_for_deleted_binlog_files.emplace_back(
-                    fmt::format("{}/_binlog/{}", tablet_path, segment_file));
+            
wait_for_deleted_binlog_files.emplace_back(get_segment_filepath(rowset_id, i));
+            for (const auto& index : this->tablet_schema()->indexes()) {
+                if (index.index_type() != IndexType::INVERTED) {
+                    continue;
+                }
+                wait_for_deleted_binlog_files.emplace_back(
+                        get_segment_index_filepath(rowset_id, i, 
index.index_id()));
+            }
         }
     };
 
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 73f3e2d140d..fd781722d1e 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -549,6 +549,11 @@ public:
     std::string get_segment_filepath(std::string_view rowset_id,
                                      std::string_view segment_index) const;
     std::string get_segment_filepath(std::string_view rowset_id, int64_t 
segment_index) const;
+    std::string get_segment_index_filepath(std::string_view rowset_id,
+                                           std::string_view segment_index,
+                                           std::string_view index_id) const;
+    std::string get_segment_index_filepath(std::string_view rowset_id, int64_t 
segment_index,
+                                           int64_t index_id) const;
     bool can_add_binlog(uint64_t total_binlog_size) const;
     void gc_binlogs(int64_t version);
     Status ingest_binlog_metas(RowsetBinlogMetasPB* metas_pb);
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index f31cd422caf..75dc5555e39 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -953,19 +953,28 @@ Status TabletManager::load_tablet_from_dir(DataDir* 
store, TTabletId tablet_id,
                 io::global_local_filesystem()->list(schema_hash_path, true, 
&files, &exists));
         for (auto& file : files) {
             auto& filename = file.file_name;
-            if (!filename.ends_with(".binlog")) {
+            std::string new_suffix;
+            std::string old_suffix;
+
+            if (filename.ends_with(".binlog")) {
+                old_suffix = ".binlog";
+                new_suffix = ".dat";
+            } else if (filename.ends_with(".binlog-index")) {
+                old_suffix = ".binlog-index";
+                new_suffix = ".idx";
+            } else {
                 continue;
             }
 
-            // change clone_file suffix .binlog to .dat
             std::string new_filename = filename;
-            new_filename.replace(filename.size() - 7, 7, ".dat");
+            new_filename.replace(filename.size() - old_suffix.size(), 
old_suffix.size(),
+                                 new_suffix);
             auto from = fmt::format("{}/{}", schema_hash_path, filename);
             auto to = fmt::format("{}/_binlog/{}", schema_hash_path, 
new_filename);
             RETURN_IF_ERROR(io::global_local_filesystem()->rename(from, to));
         }
 
-        auto meta = store->get_meta();
+        auto* meta = store->get_meta();
         // if ingest binlog metas error, it will be gc in 
gc_unused_binlog_metas
         RETURN_IF_ERROR(
                 RowsetMetaManager::ingest_binlog_metas(meta, tablet_uid, 
&rowset_binlog_metas_pb));
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index 3cf9ef2c4c8..c71f245f58e 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -82,10 +82,16 @@ namespace {
 /// return value: if binlog file not exist, then return to binlog file path
 Result<std::string> check_dest_binlog_valid(const std::string& tablet_dir,
                                             const std::string& clone_file, 
bool* skip_link_file) {
-    // change clone_file suffix .binlog to .dat
+    std::string to;
     std::string new_clone_file = clone_file;
-    new_clone_file.replace(clone_file.size() - 7, 7, ".dat");
-    auto to = fmt::format("{}/_binlog/{}", tablet_dir, new_clone_file);
+    if (clone_file.ends_with(".binlog")) {
+        // change clone_file suffix from .binlog to .dat
+        new_clone_file.replace(clone_file.size() - 7, 7, ".dat");
+    } else if (clone_file.ends_with(".binlog-index")) {
+        // change clone_file suffix from .binlog-index to .idx
+        new_clone_file.replace(clone_file.size() - 13, 13, ".idx");
+    }
+    to = fmt::format("{}/_binlog/{}", tablet_dir, new_clone_file);
 
     // check to to file exist
     bool exists = true;
@@ -674,7 +680,7 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const 
std::string& clone_d
 
         auto from = fmt::format("{}/{}", clone_dir, clone_file);
         std::string to;
-        if (clone_file.ends_with(".binlog")) {
+        if (clone_file.ends_with(".binlog") || 
clone_file.ends_with(".binlog-index")) {
             if (!contain_binlog) {
                 LOG(WARNING) << "clone binlog file, but not contain binlog 
metas. "
                              << "tablet=" << tablet->full_name() << ", 
clone_file=" << clone_file;
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 339c53cf7dd..745a47d89c0 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -103,6 +103,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
     auto& request = arg->request;
 
     TStatus tstatus;
+    std::vector<std::string> download_success_files;
     Defer defer {[=, &tstatus, ingest_binlog_tstatus = arg->tstatus]() {
         LOG(INFO) << "ingest binlog. result: " << 
apache::thrift::ThriftDebugString(tstatus);
         if (tstatus.status_code != TStatusCode::OK) {
@@ -110,6 +111,15 @@ void _ingest_binlog(IngestBinlogArg* arg) {
             StorageEngine::instance()->txn_manager()->abort_txn(
                     partition_id, txn_id, local_tablet_id, 
local_tablet->schema_hash(),
                     local_tablet_uid);
+            // delete all successfully downloaded files
+            LOG(WARNING) << "will delete downloaded success files due to error 
" << tstatus;
+            std::vector<io::Path> paths;
+            for (const auto& file : download_success_files) {
+                paths.emplace_back(file);
+                LOG(WARNING) << "will delete downloaded success file " << file 
<< " due to error";
+            }
+            
static_cast<void>(io::global_local_filesystem()->batch_delete(paths));
+            LOG(WARNING) << "done delete downloaded success files due to error 
" << tstatus;
         }
 
         if (ingest_binlog_tstatus) {
@@ -224,7 +234,8 @@ void _ingest_binlog(IngestBinlogArg* arg) {
     }
 
     // Step 5.2: check data capacity
-    uint64_t total_size = std::accumulate(segment_file_sizes.begin(), 
segment_file_sizes.end(), 0);
+    uint64_t total_size = std::accumulate(segment_file_sizes.begin(), 
segment_file_sizes.end(),
+                                          0ULL); // 
NOLINT(bugprone-fold-init-type)
     if (!local_tablet->can_add_binlog(total_size)) {
         LOG(WARNING) << "failed to add binlog, no enough space, total_size=" 
<< total_size
                      << ", tablet=" << local_tablet->tablet_id();
@@ -249,10 +260,11 @@ void _ingest_binlog(IngestBinlogArg* arg) {
         LOG(INFO) << fmt::format("download segment file from {} to {}", 
get_segment_file_url,
                                  local_segment_path);
         auto get_segment_file_cb = [&get_segment_file_url, 
&local_segment_path, segment_file_size,
-                                    estimate_timeout](HttpClient* client) {
+                                    estimate_timeout, 
&download_success_files](HttpClient* client) {
             RETURN_IF_ERROR(client->init(get_segment_file_url));
             client->set_timeout_ms(estimate_timeout * 1000);
             RETURN_IF_ERROR(client->download(local_segment_path));
+            download_success_files.push_back(local_segment_path);
 
             std::error_code ec;
             // Check file length
@@ -282,8 +294,116 @@ void _ingest_binlog(IngestBinlogArg* arg) {
         }
     }
 
-    // Step 6: create rowset && calculate delete bitmap && commit
-    // Step 6.1: create rowset
+    // Step 6: get all segment index files
+    // Step 6.1: get all segment index files size
+    std::vector<std::string> segment_index_file_urls;
+    std::vector<uint64_t> segment_index_file_sizes;
+    std::vector<std::string> segment_index_file_names;
+    auto tablet_schema = rowset_meta->tablet_schema();
+    for (const auto& index : tablet_schema->indexes()) {
+        if (index.index_type() != IndexType::INVERTED) {
+            continue;
+        }
+        auto index_id = index.index_id();
+        for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
+            auto get_segment_index_file_size_url = fmt::format(
+                    
"{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={"
+                    "}",
+                    binlog_api_url, "get_segment_index_file", 
request.remote_tablet_id,
+                    remote_rowset_id, segment_index, index_id);
+            uint64_t segment_index_file_size;
+            auto get_segment_index_file_size_cb = 
[&get_segment_index_file_size_url,
+                                                   
&segment_index_file_size](HttpClient* client) {
+                RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
+                client->set_timeout_ms(kMaxTimeoutMs);
+                RETURN_IF_ERROR(client->head());
+                return client->get_content_length(&segment_index_file_size);
+            };
+            auto index_file = 
InvertedIndexDescriptor::inverted_index_file_path(
+                    local_tablet->tablet_path(), rowset_meta->rowset_id(), 
segment_index, index_id);
+            segment_index_file_names.push_back(index_file);
+
+            status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_index_file_size_cb);
+            if (!status.ok()) {
+                LOG(WARNING) << "failed to get segment file size from "
+                             << get_segment_index_file_size_url
+                             << ", status=" << status.to_string();
+                status.to_thrift(&tstatus);
+                return;
+            }
+
+            segment_index_file_sizes.push_back(segment_index_file_size);
+            
segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url));
+        }
+    }
+
+    // Step 6.2: check data capacity
+    uint64_t total_index_size =
+            std::accumulate(segment_index_file_sizes.begin(), 
segment_index_file_sizes.end(),
+                            0ULL); // NOLINT(bugprone-fold-init-type)
+    if (!local_tablet->can_add_binlog(total_index_size)) {
+        LOG(WARNING) << "failed to add binlog, no enough space, 
total_index_size="
+                     << total_index_size << ", tablet=" << 
local_tablet->tablet_id();
+        status = Status::InternalError("no enough space");
+        status.to_thrift(&tstatus);
+        return;
+    }
+
+    // Step 6.3: get all segment index files
+    DCHECK(segment_index_file_sizes.size() == segment_index_file_names.size());
+    DCHECK(segment_index_file_names.size() == segment_index_file_urls.size());
+    for (int64_t i = 0; i < segment_index_file_urls.size(); ++i) {
+        auto segment_index_file_size = segment_index_file_sizes[i];
+        auto get_segment_index_file_url = segment_index_file_urls[i];
+
+        uint64_t estimate_timeout =
+                segment_index_file_size / 
config::download_low_speed_limit_kbps / 1024;
+        if (estimate_timeout < config::download_low_speed_time) {
+            estimate_timeout = config::download_low_speed_time;
+        }
+
+        auto local_segment_index_path = segment_index_file_names[i];
+        LOG(INFO) << fmt::format("download segment index file from {} to {}",
+                                 get_segment_index_file_url, 
local_segment_index_path);
+        auto get_segment_index_file_cb = [&get_segment_index_file_url, 
&local_segment_index_path,
+                                          segment_index_file_size, 
estimate_timeout,
+                                          &download_success_files](HttpClient* 
client) {
+            RETURN_IF_ERROR(client->init(get_segment_index_file_url));
+            client->set_timeout_ms(estimate_timeout * 1000);
+            RETURN_IF_ERROR(client->download(local_segment_index_path));
+            download_success_files.push_back(local_segment_index_path);
+
+            std::error_code ec;
+            // Check file length
+            uint64_t local_index_file_size =
+                    std::filesystem::file_size(local_segment_index_path, ec);
+            if (ec) {
+                LOG(WARNING) << "download index file error" << ec.message();
+                return Status::IOError("can't retrive file_size of {}, due to 
{}",
+                                       local_segment_index_path, ec.message());
+            }
+            if (local_index_file_size != segment_index_file_size) {
+                LOG(WARNING) << "download index file length error"
+                             << ", get_segment_index_file_url=" << 
get_segment_index_file_url
+                             << ", index_file_size=" << segment_index_file_size
+                             << ", local_index_file_size=" << 
local_index_file_size;
+                return Status::InternalError("downloaded index file size is 
not equal");
+            }
+            return 
io::global_local_filesystem()->permission(local_segment_index_path,
+                                                             
io::LocalFileSystem::PERMS_OWNER_RW);
+        };
+
+        status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_index_file_cb);
+        if (!status.ok()) {
+            LOG(WARNING) << "failed to get segment index file from " << 
get_segment_index_file_url
+                         << ", status=" << status.to_string();
+            status.to_thrift(&tstatus);
+            return;
+        }
+    }
+
+    // Step 7: create rowset && calculate delete bitmap && commit
+    // Step 7.1: create rowset
     RowsetSharedPtr rowset;
     status = RowsetFactory::create_rowset(local_tablet->tablet_schema(),
                                           local_tablet->tablet_path(), 
rowset_meta, &rowset);
@@ -298,7 +418,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
         return;
     }
 
-    // Step 6.2 calculate delete bitmap before commit
+    // Step 7.2 calculate delete bitmap before commit
     auto calc_delete_bitmap_token =
             
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
     DeleteBitmapPtr delete_bitmap = 
std::make_shared<DeleteBitmap>(local_tablet_id);
@@ -334,7 +454,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
         static_cast<void>(calc_delete_bitmap_token->wait());
     }
 
-    // Step 6.3: commit txn
+    // Step 7.3: commit txn
     Status commit_txn_status = 
StorageEngine::instance()->txn_manager()->commit_txn(
             local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(),
             rowset_meta->txn_id(), rowset_meta->tablet_id(), 
local_tablet->schema_hash(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to