This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 47c7238c038 branch-3.0: [opt](s3io) Check data integrity after an 
upload for S3FileWriter #50168 (#50312)
47c7238c038 is described below

commit 47c7238c03823c37be54ff4ea28852f72124a7a2
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Apr 29 15:14:28 2025 +0800

    branch-3.0: [opt](s3io) Check data integrity after an upload for 
S3FileWriter #50168 (#50312)
    
    Cherry-picked from #50168
    
    Co-authored-by: Gavin Chou <ga...@selectdb.com>
---
 be/src/common/config.cpp              |  1 +
 be/src/common/config.h                |  1 +
 be/src/io/fs/s3_file_writer.cpp       | 61 +++++++++++++++++++++++++++++++++++
 be/test/io/fs/s3_file_writer_test.cpp |  6 ++--
 4 files changed, 66 insertions(+), 3 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index c8dcf517139..f5a56c23ebd 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1334,6 +1334,7 @@ DEFINE_mBool(force_azure_blob_global_endpoint, "false");
 DEFINE_mInt32(max_s3_client_retry, "10");
 DEFINE_mInt32(s3_read_base_wait_time_ms, "100");
 DEFINE_mInt32(s3_read_max_wait_time_ms, "800");
+DEFINE_mBool(enable_s3_object_check_after_upload, "true");
 
 DEFINE_mBool(enable_s3_rate_limiter, "false");
 DEFINE_mInt64(s3_get_bucket_tokens, "1000000000000000000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index d9a93c1f7f6..b2799622d5f 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1426,6 +1426,7 @@ DECLARE_mInt32(max_s3_client_retry);
 // and the max retry time is max_s3_client_retry
 DECLARE_mInt32(s3_read_base_wait_time_ms);
 DECLARE_mInt32(s3_read_max_wait_time_ms);
+DECLARE_mBool(enable_s3_object_check_after_upload);
 
 // write as inverted index tmp directory
 DECLARE_String(tmp_file_dir);
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index d5a5cd00cea..5eabeea888d 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -313,6 +313,55 @@ void S3FileWriter::_upload_one_part(int64_t part_num, 
UploadFileBuffer& buf) {
     _completed_parts.emplace_back(std::move(completed_part));
 }
 
+// if enabled check
+// 1. issue a head object request for existence check
+// 2. check the file size
+Status check_after_upload(ObjStorageClient* client, const 
ObjectStorageResponse& upload_res,
+                          const ObjectStoragePathOptions& path_opt, int64_t 
bytes_appended,
+                          const std::string& put_or_comp) {
+    if (!config::enable_s3_object_check_after_upload) return Status::OK();
+
+    auto head_res = client->head_object(path_opt);
+
+    // clang-format off
+    auto err_msg = [&]() {
+        std::stringstream ss;
+        ss << "failed to check object after upload=" << put_or_comp
+            << " file_path=" << path_opt.path.native()
+            << fmt::format(" {}_err=", put_or_comp) << upload_res.status.msg
+            << fmt::format(" {}_code=", put_or_comp) << upload_res.status.code
+            << fmt::format(" {}_http_code=", put_or_comp) << 
upload_res.http_code
+            << fmt::format(" {}_request_id=", put_or_comp) << 
upload_res.request_id
+            << " head_err=" << head_res.resp.status.msg
+            << " head_code=" << head_res.resp.status.code
+            << " head_http_code=" << head_res.resp.http_code
+            << " head_request_id=" << head_res.resp.request_id;
+        return ss.str();
+    };
+    // clang-format on
+
+    // TODO(gavin): make it fail by injection
+    TEST_SYNC_POINT_CALLBACK("S3FileWriter::check_after_load", &head_res);
+    if (head_res.resp.status.code != ErrorCode::OK && head_res.resp.http_code 
!= 200) {
+        LOG(WARNING) << "failed to issue head object after upload, " << 
err_msg();
+        DCHECK(false) << "failed to issue head object after upload, " << 
err_msg();
+        // FIXME(gavin): we should retry if this HEAD fails?
+        return Status::IOError(
+                "failed to issue head object after upload, status_code={}, 
http_code={}, err={}",
+                head_res.resp.status.code, head_res.resp.http_code, 
head_res.resp.status.msg);
+    }
+    if (head_res.file_size != bytes_appended) {
+        LOG(WARNING) << "failed to check size after upload, expected_size=" << 
bytes_appended
+                     << " actual_size=" << head_res.file_size << err_msg();
+        DCHECK_EQ(bytes_appended, head_res.file_size)
+                << "failed to check size after upload," << err_msg();
+        return Status::IOError(
+                "failed to check object size after upload, expected_size={} 
actual_size={}",
+                bytes_appended, head_res.file_size);
+    }
+    return Status::OK();
+}
+
 Status S3FileWriter::_complete() {
     const auto& client = _obj_client->get();
     if (nullptr == client) {
@@ -368,6 +417,10 @@ Status S3FileWriter::_complete() {
                     _obj_storage_path_opts.path.native());
         return {resp.status.code, std::move(resp.status.msg)};
     }
+
+    RETURN_IF_ERROR(check_after_upload(client.get(), resp, 
_obj_storage_path_opts, _bytes_appended,
+                                       "complete_multipart"));
+
     s3_file_created_total << 1;
     return Status::OK();
 }
@@ -414,6 +467,14 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) {
         buf.set_status({resp.status.code, std::move(resp.status.msg)});
         return;
     }
+
+    auto st = check_after_upload(client.get(), resp, _obj_storage_path_opts, 
_bytes_appended,
+                                 "put_object");
+    if (!st.ok()) {
+        buf.set_status(st);
+        return;
+    }
+
     s3_file_created_total << 1;
 }
 
diff --git a/be/test/io/fs/s3_file_writer_test.cpp 
b/be/test/io/fs/s3_file_writer_test.cpp
index 79af666f928..0662565ec6a 100644
--- a/be/test/io/fs/s3_file_writer_test.cpp
+++ b/be/test/io/fs/s3_file_writer_test.cpp
@@ -1150,8 +1150,8 @@ public:
     }
 
     ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions& 
opts) override {
-        last_opts = opts;
-        return default_head_response;
+        return {.resp = ObjectStorageResponse::OK(),
+                .file_size = 
static_cast<int64_t>(objects[opts.path.native()].size())};
     }
 
     ObjectStorageResponse get_object(const ObjectStoragePathOptions& opts, 
void* buffer,
@@ -1365,7 +1365,7 @@ std::string get_s3_path(std::string_view path) {
 
 // put object
 // create_multi_parts_upload + upload_part + complete_parts
-TEST_F(S3FileWriterTest, write_bufer_boundary) {
+TEST_F(S3FileWriterTest, write_buffer_boundary) {
     // diable file cache to avoid write to cache
     bool enable_file_cache = config::enable_file_cache;
     config::enable_file_cache = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to