qidaye commented on code in PR #31743:
URL: https://github.com/apache/doris/pull/31743#discussion_r1512721756


##########
be/src/service/backend_service.cpp:
##########
@@ -286,8 +286,121 @@ void _ingest_binlog(StorageEngine& engine, 
IngestBinlogArg* arg) {
         }
     }
 
-    // Step 6: create rowset && calculate delete bitmap && commit
-    // Step 6.1: create rowset
+    // Step 6: get all segment index files
+    // Step 6.1: get all segment index files size
+    std::vector<std::string> segment_index_file_urls;
+    std::vector<uint64_t> segment_index_file_sizes;
+    std::vector<std::string> segment_index_file_names;
+    auto tablet_schema = rowset_meta->tablet_schema();
+    for (const auto& column : tablet_schema->columns()) {
+        if (tablet_schema->has_inverted_index(column)) {
+            const auto* index_info = tablet_schema->get_inverted_index(column);
+            auto index_id = index_info->index_id();
+            for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
+                auto get_segment_index_file_size_url = fmt::format(
+                        
"{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_"
+                        "index={}",
+                        binlog_api_url, "get_segment_index_file", 
request.remote_tablet_id,
+                        remote_rowset_id, segment_index, index_id);
+                uint64_t segment_index_file_size;
+                auto get_segment_index_file_size_cb =
+                        [&get_segment_index_file_size_url,
+                         &segment_index_file_size](HttpClient* client) {
+                            
RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
+                            client->set_timeout_ms(kMaxTimeoutMs);
+                            RETURN_IF_ERROR(client->head());
+                            return 
client->get_content_length(&segment_index_file_size);
+                        };
+                auto index_file = 
InvertedIndexDescriptor::inverted_index_file_path(
+                        local_tablet->tablet_path(), rowset_meta->rowset_id(), 
segment_index,
+                        index_id, index_info->get_index_suffix());
+                segment_index_file_names.push_back(index_file);
+
+                status = HttpClient::execute_with_retry(max_retry, 1,
+                                                        
get_segment_index_file_size_cb);
+                if (!status.ok()) {
+                    LOG(WARNING) << "failed to get segment file size from "
+                                 << get_segment_index_file_size_url
+                                 << ", status=" << status.to_string();
+                    status.to_thrift(&tstatus);
+                    return;
+                }
+
+                segment_index_file_sizes.push_back(segment_index_file_size);
+                
segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url));
+            }
+        }
+    }
+
+    // Step 6.2: check data capacity
+    uint64_t total_index_size =
+            std::accumulate(segment_index_file_sizes.begin(), 
segment_index_file_sizes.end(),
+                            0); // NOLINT(bugprone-fold-init-type)
+    if (!local_tablet->can_add_binlog(total_index_size)) {
+        LOG(WARNING) << "failed to add binlog, no enough space, 
total_index_size="
+                     << total_index_size << ", tablet=" << 
local_tablet->tablet_id();
+        status = Status::InternalError("no enough space");
+        status.to_thrift(&tstatus);
+        return;
+    }
+
+    // Step 6.3: get all segment index files
+    LOG(INFO) << "segment_index_file_urls.size=" << 
segment_index_file_urls.size() << ", "
+              << "segment_index_file_names.size=" << 
segment_index_file_names.size() << ", "
+              << "segment_index_file_sizes.size=" << 
segment_index_file_sizes.size();
+    DCHECK(segment_index_file_sizes.size() == segment_index_file_names.size());
+    DCHECK(segment_index_file_names.size() == segment_index_file_urls.size());
+    for (int64_t i = 0; i < segment_index_file_urls.size(); ++i) {
+        auto segment_index_file_size = segment_index_file_sizes[i];
+        auto get_segment_index_file_url = segment_index_file_urls[i];
+
+        uint64_t estimate_timeout =
+                segment_index_file_size / 
config::download_low_speed_limit_kbps / 1024;
+        if (estimate_timeout < config::download_low_speed_time) {
+            estimate_timeout = config::download_low_speed_time;
+        }
+
+        auto local_segment_index_path = segment_index_file_names[i];
+        LOG(INFO) << fmt::format("download segment index file from {} to {}",
+                                 get_segment_index_file_url, 
local_segment_index_path);
+        auto get_segment_index_file_cb = [&get_segment_index_file_url, 
&local_segment_index_path,
+                                          segment_index_file_size,
+                                          estimate_timeout](HttpClient* 
client) {
+            RETURN_IF_ERROR(client->init(get_segment_index_file_url));
+            client->set_timeout_ms(estimate_timeout * 1000);
+            RETURN_IF_ERROR(client->download(local_segment_index_path));
+
+            std::error_code ec;
+            // Check file length
+            uint64_t local_index_file_size =
+                    std::filesystem::file_size(local_segment_index_path, ec);
+            if (ec) {
+                LOG(WARNING) << "download index file error" << ec.message();
+                return Status::IOError("can't retrive file_size of {}, due to 
{}",
+                                       local_segment_index_path, ec.message());
+            }
+            if (local_index_file_size != segment_index_file_size) {
+                LOG(WARNING) << "download index file length error"
+                             << ", get_segment_index_file_url=" << 
get_segment_index_file_url
+                             << ", index_file_size=" << segment_index_file_size
+                             << ", local_index_file_size=" << 
local_index_file_size;
+                return Status::InternalError("downloaded index file size is 
not equal");
+            }
+            return 
io::global_local_filesystem()->permission(local_segment_index_path,
+                                                             
io::LocalFileSystem::PERMS_OWNER_RW);
+        };
+
+        status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_index_file_cb);

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to