This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 47c7238c038 branch-3.0: [opt](s3io) Check data integrity after an upload for S3FileWriter #50168 (#50312) 47c7238c038 is described below commit 47c7238c03823c37be54ff4ea28852f72124a7a2 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Tue Apr 29 15:14:28 2025 +0800 branch-3.0: [opt](s3io) Check data integrity after an upload for S3FileWriter #50168 (#50312) Cherry-picked from #50168 Co-authored-by: Gavin Chou <ga...@selectdb.com> --- be/src/common/config.cpp | 1 + be/src/common/config.h | 1 + be/src/io/fs/s3_file_writer.cpp | 61 +++++++++++++++++++++++++++++++++++ be/test/io/fs/s3_file_writer_test.cpp | 6 ++-- 4 files changed, 66 insertions(+), 3 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c8dcf517139..f5a56c23ebd 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1334,6 +1334,7 @@ DEFINE_mBool(force_azure_blob_global_endpoint, "false"); DEFINE_mInt32(max_s3_client_retry, "10"); DEFINE_mInt32(s3_read_base_wait_time_ms, "100"); DEFINE_mInt32(s3_read_max_wait_time_ms, "800"); +DEFINE_mBool(enable_s3_object_check_after_upload, "true"); DEFINE_mBool(enable_s3_rate_limiter, "false"); DEFINE_mInt64(s3_get_bucket_tokens, "1000000000000000000"); diff --git a/be/src/common/config.h b/be/src/common/config.h index d9a93c1f7f6..b2799622d5f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1426,6 +1426,7 @@ DECLARE_mInt32(max_s3_client_retry); // and the max retry time is max_s3_client_retry DECLARE_mInt32(s3_read_base_wait_time_ms); DECLARE_mInt32(s3_read_max_wait_time_ms); +DECLARE_mBool(enable_s3_object_check_after_upload); // write as inverted index tmp directory DECLARE_String(tmp_file_dir); diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index d5a5cd00cea..5eabeea888d 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -313,6 +313,55 @@ void S3FileWriter::_upload_one_part(int64_t part_num, UploadFileBuffer& buf) { _completed_parts.emplace_back(std::move(completed_part)); } +// if enabled check +// 1. issue a head object request for existence check +// 2. check the file size +Status check_after_upload(ObjStorageClient* client, const ObjectStorageResponse& upload_res, + const ObjectStoragePathOptions& path_opt, int64_t bytes_appended, + const std::string& put_or_comp) { + if (!config::enable_s3_object_check_after_upload) return Status::OK(); + + auto head_res = client->head_object(path_opt); + + // clang-format off + auto err_msg = [&]() { + std::stringstream ss; + ss << "failed to check object after upload=" << put_or_comp + << " file_path=" << path_opt.path.native() + << fmt::format(" {}_err=", put_or_comp) << upload_res.status.msg + << fmt::format(" {}_code=", put_or_comp) << upload_res.status.code + << fmt::format(" {}_http_code=", put_or_comp) << upload_res.http_code + << fmt::format(" {}_request_id=", put_or_comp) << upload_res.request_id + << " head_err=" << head_res.resp.status.msg + << " head_code=" << head_res.resp.status.code + << " head_http_code=" << head_res.resp.http_code + << " head_request_id=" << head_res.resp.request_id; + return ss.str(); + }; + // clang-format on + + // TODO(gavin): make it fail by injection + TEST_SYNC_POINT_CALLBACK("S3FileWriter::check_after_load", &head_res); + if (head_res.resp.status.code != ErrorCode::OK && head_res.resp.http_code != 200) { + LOG(WARNING) << "failed to issue head object after upload, " << err_msg(); + DCHECK(false) << "failed to issue head object after upload, " << err_msg(); + // FIXME(gavin): we should retry if this HEAD fails? + return Status::IOError( + "failed to issue head object after upload, status_code={}, http_code={}, err={}", + head_res.resp.status.code, head_res.resp.http_code, head_res.resp.status.msg); + } + if (head_res.file_size != bytes_appended) { + LOG(WARNING) << "failed to check size after upload, expected_size=" << bytes_appended + << " actual_size=" << head_res.file_size << err_msg(); + DCHECK_EQ(bytes_appended, head_res.file_size) + << "failed to check size after upload," << err_msg(); + return Status::IOError( + "failed to check object size after upload, expected_size={} actual_size={}", + bytes_appended, head_res.file_size); + } + return Status::OK(); +} + Status S3FileWriter::_complete() { const auto& client = _obj_client->get(); if (nullptr == client) { @@ -368,6 +417,10 @@ Status S3FileWriter::_complete() { _obj_storage_path_opts.path.native()); return {resp.status.code, std::move(resp.status.msg)}; } + + RETURN_IF_ERROR(check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended, + "complete_multipart")); + s3_file_created_total << 1; return Status::OK(); } @@ -414,6 +467,14 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) { buf.set_status({resp.status.code, std::move(resp.status.msg)}); return; } + + auto st = check_after_upload(client.get(), resp, _obj_storage_path_opts, _bytes_appended, + "put_object"); + if (!st.ok()) { + buf.set_status(st); + return; + } + s3_file_created_total << 1; } diff --git a/be/test/io/fs/s3_file_writer_test.cpp b/be/test/io/fs/s3_file_writer_test.cpp index 79af666f928..0662565ec6a 100644 --- a/be/test/io/fs/s3_file_writer_test.cpp +++ b/be/test/io/fs/s3_file_writer_test.cpp @@ -1150,8 +1150,8 @@ public: } ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions& opts) override { - last_opts = opts; - return default_head_response; + return {.resp = ObjectStorageResponse::OK(), + .file_size = static_cast<int64_t>(objects[opts.path.native()].size())}; } ObjectStorageResponse get_object(const ObjectStoragePathOptions& opts, void* buffer, @@ -1365,7 +1365,7 @@ std::string get_s3_path(std::string_view path) { // put object // create_multi_parts_upload + upload_part + complete_parts -TEST_F(S3FileWriterTest, write_bufer_boundary) { +TEST_F(S3FileWriterTest, write_buffer_boundary) { // diable file cache to avoid write to cache bool enable_file_cache = config::enable_file_cache; config::enable_file_cache = false; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org