This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new ae129e773da branch-3.0: [fix](s3filewriter) Fix s3_write_buffer_size boundary issue #47333 (#47341) ae129e773da is described below commit ae129e773da4b46e2abd0a73bfafb7fd1b1413db Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Thu Jan 23 13:19:39 2025 +0800 branch-3.0: [fix](s3filewriter) Fix s3_write_buffer_size boundary issue #47333 (#47341) Cherry-picked from #47333 Co-authored-by: Gavin Chou <ga...@selectdb.com> --- be/src/io/fs/s3_file_writer.cpp | 37 ++-- be/test/io/fs/s3_file_writer_test.cpp | 392 ++++++++++++++++++++++++++++++++++ 2 files changed, 417 insertions(+), 12 deletions(-) diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 7a06ce22074..edec1e17aef 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -90,6 +90,7 @@ S3FileWriter::~S3FileWriter() { } Status S3FileWriter::_create_multi_upload_request() { + LOG(INFO) << "create_multi_upload_request " << _obj_storage_path_opts.path.native(); const auto& client = _obj_client->get(); if (nullptr == client) { return Status::InternalError<false>("invalid obj storage client"); @@ -224,11 +225,6 @@ Status S3FileWriter::_close_impl() { _countdown_event.add_count(); RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf))); _pending_buf = nullptr; - } else if (_bytes_appended != 0) { // Non-empty file and has nothing to be uploaded - // NOTE: When the data size is a multiple of config::s3_write_buffer_size, - // _cur_part_num may exceed the actual number of parts that need to be uploaded. - // This is because it is incremented by 1 in advance within the S3FileWriter::appendv method. - _cur_part_num--; } RETURN_IF_ERROR(_complete()); @@ -265,12 +261,13 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { Slice {data[i].get_data() + pos, data_size_to_append})); TEST_SYNC_POINT_CALLBACK("s3_file_writer::appenv_1", &_pending_buf, _cur_part_num); - // if it's the last part, it could be less than 5MB, or it must - // satisfy that the size is larger than or euqal to 5MB - // _complete() would handle the first situation + // If this is the last part and the data size is less than s3_write_buffer_size, + // the pending_buf will be handled by _close_impl() and _complete() + // If this is the last part and the data size is equal to s3_write_buffer_size, + // the pending_buf is handled here and submitted. it will be waited by _complete() if (_pending_buf->get_size() == buffer_size) { - // only create multiple upload request when the data is more - // than one memory buffer + // only create multiple upload request when the data size is + // larger or equal to s3_write_buffer_size than one memory buffer if (_cur_part_num == 1) { RETURN_IF_ERROR(_create_multi_upload_request()); } @@ -286,6 +283,8 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { } void S3FileWriter::_upload_one_part(int64_t part_num, UploadFileBuffer& buf) { + VLOG_DEBUG << "upload_one_part " << _obj_storage_path_opts.path.native() + << " part=" << part_num; if (buf.is_cancelled()) { LOG_INFO("file {} skip part {} because previous failure {}", _obj_storage_path_opts.path.native(), part_num, _st); @@ -337,11 +336,20 @@ Status S3FileWriter::_complete() { return Status::OK(); } - if (_failed || _completed_parts.size() != _cur_part_num) { + // check number of parts + int expected_num_parts1 = (_bytes_appended / config::s3_write_buffer_size) + + !!(_bytes_appended % config::s3_write_buffer_size); + int expected_num_parts2 = + (_bytes_appended % config::s3_write_buffer_size) ? _cur_part_num : _cur_part_num - 1; + DCHECK_EQ(expected_num_parts1, expected_num_parts2) + << " bytes_appended=" << _bytes_appended << " cur_part_num=" << _cur_part_num + << " s3_write_buffer_size=" << config::s3_write_buffer_size; + if (_failed || _completed_parts.size() != expected_num_parts1 || + expected_num_parts1 != expected_num_parts2) { _st = Status::InternalError( "error status={} failed={} #complete_parts={} #expected_parts={} " "completed_parts_list={} file_path={} file_size={} has left buffer not uploaded={}", - _st, _failed, _completed_parts.size(), _cur_part_num, _dump_completed_part(), + _st, _failed, _completed_parts.size(), expected_num_parts1, _dump_completed_part(), _obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr); LOG(WARNING) << _st; return _st; @@ -350,6 +358,9 @@ Status S3FileWriter::_complete() { std::sort(_completed_parts.begin(), _completed_parts.end(), [](auto& p1, auto& p2) { return p1.part_num < p2.part_num; }); TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts); + LOG(INFO) << "complete_multipart_upload " << _obj_storage_path_opts.path.native() + << " size=" << _bytes_appended << " number_parts=" << _completed_parts.size() + << " s3_write_buffer_size=" << config::s3_write_buffer_size; auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts); if (resp.status.code != ErrorCode::OK) { LOG_WARNING("Compltet multi part upload failed because {}, file path {}", resp.status.msg, @@ -379,6 +390,8 @@ Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() { } void S3FileWriter::_put_object(UploadFileBuffer& buf) { + LOG(INFO) << "put_object " << _obj_storage_path_opts.path.native() + << " size=" << _bytes_appended; if (state() == State::CLOSED) { DCHECK(state() != State::CLOSED) << "state=" << (int)state() << " path=" << _obj_storage_path_opts.path.native(); diff --git a/be/test/io/fs/s3_file_writer_test.cpp b/be/test/io/fs/s3_file_writer_test.cpp index 6469559d0d8..010ef7e1c1e 100644 --- a/be/test/io/fs/s3_file_writer_test.cpp +++ b/be/test/io/fs/s3_file_writer_test.cpp @@ -38,6 +38,7 @@ #include <functional> #include <memory> #include <mutex> +#include <random> #include <sstream> #include <string> #include <system_error> @@ -60,6 +61,9 @@ #include "util/slice.h" #include "util/threadpool.h" #include "util/uuid_generator.h" + +using namespace doris::io; + namespace doris { static std::shared_ptr<io::S3FileSystem> s3_fs {nullptr}; @@ -1078,4 +1082,392 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_3) { ASSERT_FALSE(st.ok()) << st; } +namespace io { +/** + * This class is for boundary test + */ +class SimpleMockObjStorageClient : public io::ObjStorageClient { +public: + SimpleMockObjStorageClient() = default; + ~SimpleMockObjStorageClient() override = default; + + ObjectStorageResponse default_response {ObjectStorageResponse::OK()}; + ObjectStorageUploadResponse default_upload_response {.resp = ObjectStorageResponse::OK(), + .upload_id = "mock-upload-id", + .etag = "mock-etag"}; + ObjectStorageHeadResponse default_head_response {.resp = ObjectStorageResponse::OK(), + .file_size = 1024}; + std::string default_presigned_url = "https://mock-presigned-url.com"; + + ObjectStorageUploadResponse create_multipart_upload( + const ObjectStoragePathOptions& opts) override { + create_multipart_count++; + create_multipart_params.push_back(opts); + last_opts = opts; + return default_upload_response; + } + + ObjectStorageResponse put_object(const ObjectStoragePathOptions& opts, + std::string_view stream) override { + put_object_count++; + put_object_params.emplace_back(opts, std::string(stream)); + last_opts = opts; + last_stream = std::string(stream); + objects.emplace(opts.path.native(), std::string(stream)); + uploaded_bytes += stream.size(); + return default_response; + } + + ObjectStorageUploadResponse upload_part(const ObjectStoragePathOptions& opts, + std::string_view stream, int part_num) override { + upload_part_count++; + upload_part_params.push_back({opts, std::string(stream), part_num}); + last_opts = opts; + last_stream = std::string(stream); + last_part_num = part_num; + std::stringstream ss; + ss << std::setfill('0') << std::setw(3) << part_num; + parts[opts.path.native() + "_" + ss.str()] = std::string(stream); + uploaded_bytes += stream.size(); + return default_upload_response; + } + + ObjectStorageResponse complete_multipart_upload( + const ObjectStoragePathOptions& opts, + const std::vector<ObjectCompleteMultiPart>& completed_parts) override { + complete_multipart_count++; + complete_multipart_params.push_back({opts, completed_parts}); + last_opts = opts; + last_completed_parts = completed_parts; + std::string final_obj; + final_obj.reserve(uploaded_bytes); + for (auto& i : parts) { + final_obj.append(i.second); + } + complete[opts.path.native()] = final_obj; + objects[opts.path.native()] = final_obj; + return default_response; + } + + ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions& opts) override { + last_opts = opts; + return default_head_response; + } + + ObjectStorageResponse get_object(const ObjectStoragePathOptions& opts, void* buffer, + size_t offset, size_t bytes_read, + size_t* size_return) override { + last_opts = opts; + last_offset = offset; + last_bytes_read = bytes_read; + if (size_return) { + *size_return = bytes_read; // return default value + } + return default_response; + } + + ObjectStorageResponse list_objects(const ObjectStoragePathOptions& opts, + std::vector<FileInfo>* files) override { + last_opts = opts; + if (files) { + *files = default_file_list; + } + return default_response; + } + + ObjectStorageResponse delete_objects(const ObjectStoragePathOptions& opts, + std::vector<std::string> objs) override { + last_opts = opts; + last_deleted_objects = std::move(objs); + return default_response; + } + + ObjectStorageResponse delete_object(const ObjectStoragePathOptions& opts) override { + last_opts = opts; + return default_response; + } + + ObjectStorageResponse delete_objects_recursively( + const ObjectStoragePathOptions& opts) override { + last_opts = opts; + return default_response; + } + + std::string generate_presigned_url(const ObjectStoragePathOptions& opts, + int64_t expiration_secs, const S3ClientConf& conf) override { + last_opts = opts; + last_expiration_secs = expiration_secs; + return default_presigned_url; + } + + // Variables to store the last call + ObjectStoragePathOptions last_opts; + std::string last_stream; + int last_part_num = 0; + std::vector<ObjectCompleteMultiPart> last_completed_parts; + size_t last_offset = 0; + size_t last_bytes_read = 0; + std::vector<std::string> last_deleted_objects; + int64_t last_expiration_secs = 0; + std::vector<FileInfo> default_file_list; + + // Add counters for each function + int create_multipart_count = 0; + int put_object_count = 0; + int upload_part_count = 0; + int complete_multipart_count = 0; + + // Structures to store input parameters for each call + struct UploadPartParams { + ObjectStoragePathOptions opts; + std::string stream; + int part_num; + }; + + struct CompleteMultipartParams { + ObjectStoragePathOptions opts; + std::vector<ObjectCompleteMultiPart> parts; + }; + + // Vectors to store parameters from each call + std::vector<ObjectStoragePathOptions> create_multipart_params; + std::vector<std::pair<ObjectStoragePathOptions, std::string>> put_object_params; + std::vector<UploadPartParams> upload_part_params; + std::vector<CompleteMultipartParams> complete_multipart_params; + std::map<std::string, std::string> objects; + std::map<std::string, std::string> complete; + std::map<std::string, std::string> parts; + int64_t uploaded_bytes = 0; + + void reset() { + last_opts = ObjectStoragePathOptions {}; + last_stream.clear(); + last_part_num = 0; + last_completed_parts.clear(); + last_offset = 0; + last_bytes_read = 0; + last_deleted_objects.clear(); + last_expiration_secs = 0; + + create_multipart_count = 0; + put_object_count = 0; + upload_part_count = 0; + complete_multipart_count = 0; + + create_multipart_params.clear(); + put_object_params.clear(); + upload_part_params.clear(); + complete_multipart_params.clear(); + objects.clear(); + complete.clear(); + parts.clear(); + } +}; + +} // namespace io + +/** + * Create a mock S3 client and a S3FileWriter. + * @return A tuple containing the mock S3 client and the S3FileWriter. + */ +std::tuple<std::shared_ptr<SimpleMockObjStorageClient>, std::shared_ptr<S3FileWriter>> +create_s3_client(const std::string& path) { + doris::io::FileWriterOptions opts; + io::FileWriterPtr file_writer; + auto st = s3_fs->create_file(path, &file_writer, &opts); + EXPECT_TRUE(st.ok()) << st; + std::shared_ptr<S3FileWriter> s3_file_writer(static_cast<S3FileWriter*>(file_writer.release())); + auto holder = std::make_shared<ObjClientHolder>(S3ClientConf {}); + auto mock_client = std::make_shared<SimpleMockObjStorageClient>(); + holder->_client = mock_client; + s3_file_writer->_obj_client = holder; + return {mock_client, s3_file_writer}; +} + +/** + * Generate test data for S3FileWriter boundary tests. + * Returns a vector of sizes that we'll use to generate data on demand. + * This way we don't need to hold all the data in memory at once. + */ +std::vector<size_t> generate_test_sizes(size_t num_samples = 20) { + std::vector<size_t> sizes; + const size_t MB = 1024 * 1024; + const size_t MAX_SIZE = 256 * MB; + + // Add boundary cases + sizes.push_back(0); // Empty file + sizes.push_back(1); // Single byte + sizes.push_back(MB - 1); // Just under 1MB + sizes.push_back(MB); // Exactly 1MB + sizes.push_back(MB + 1); // Just over 1MB + + for (size_t i = 1; i <= 10; i++) { // Add buffer boundary cases + sizes.push_back(i * config::s3_write_buffer_size - 1); + sizes.push_back(i * config::s3_write_buffer_size); + sizes.push_back(i * config::s3_write_buffer_size + 1); + } + + // Add MB boundary cases up to 10MB + for (size_t i = 2; i <= 20; i++) { + sizes.push_back(i * MB - 1); + sizes.push_back(i * MB); + sizes.push_back(i * MB + 1); + } + + // Add some larger MB boundaries + for (size_t mb : {1, 2, 4, 8, 16, 32, 64, 128, 256}) { + sizes.push_back(mb * MB - 1); + sizes.push_back(mb * MB); + sizes.push_back(mb * MB + 1); + } + + // Add some random sizes + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<size_t> small_dist( + 2, config::s3_write_buffer_size); // Random sizes under s3_write_buffer_size + std::uniform_int_distribution<size_t> large_dist(config::s3_write_buffer_size, + MAX_SIZE); // Random sizes up to 256MB + for (int i = 0; i < 5; i++) { + sizes.push_back(small_dist(gen)); + } + for (int i = 0; i < 5; i++) { // sparse test + sizes.push_back(large_dist(gen)); + } + + // Sort and remove duplicates + std::sort(sizes.begin(), sizes.end()); + sizes.erase(std::unique(sizes.begin(), sizes.end()), sizes.end()); + std::shuffle(sizes.begin(), sizes.end(), gen); + sizes.resize(std::min(sizes.size(), num_samples)); + return sizes; +} + +/** + * Generate a string of specified size efficiently. + * The string will start and end with the magic character, + * and have random content in between. + */ +std::string generate_test_string(char magic_char, size_t size) { + if (size == 0) return ""; + std::string result; + result.reserve(size); + result.resize(size); + result.front() = magic_char; + result.back() = magic_char; + return result; +} + +// the internal implementation of s3_file_writer and s3_fs +std::string get_s3_path(std::string_view path) { + return std::string("s3://") + s3_fs->bucket() + "/" + s3_fs->prefix() + "/" + std::string(path); +}; + +// put object +// create_multi_parts_upload + upload_part + complete_parts +TEST_F(S3FileWriterTest, write_bufer_boundary) { + // diable file cache to avoid write to cache + bool enable_file_cache = config::enable_file_cache; + config::enable_file_cache = false; + Defer defer {[&]() { config::enable_file_cache = enable_file_cache; }}; + + auto sp = SyncPoint::get_instance(); + sp->enable_processing(); + sp->clear_all_call_backs(); + + // s3_file_writer is the interface to write to s3 + // mock_client is a SimpleMockObjStorageClient for testing, it holds the data in memory + // we check the data in mock_client to make sure s3_file_writer is working as expected + auto test = [](char magic_char, size_t data_size, const std::string& filename) { + std::string content = generate_test_string(magic_char, data_size); + auto [mock_client, s3_file_writer] = create_s3_client(filename); + std::string expected_path = get_s3_path(filename); + std::stringstream ss; + ss << "filename: " << filename << ", data_size: " << data_size + << ", magic_char: " << magic_char; + std::string msg = ss.str(); + EXPECT_EQ(s3_file_writer->append(content), Status::OK()) << msg; + EXPECT_EQ(s3_file_writer->close(), Status::OK()) << msg; + if (content.size() < config::s3_write_buffer_size) { + EXPECT_EQ(mock_client->put_object_count, 1) << msg; + EXPECT_EQ(mock_client->create_multipart_count, 0) << msg; + EXPECT_EQ(mock_client->complete_multipart_count, 0) << msg; + EXPECT_EQ(mock_client->upload_part_count, 0) << msg; + } else { // >= s3_write_buffer_size, use multipart + int expected_num_parts = (content.size() / config::s3_write_buffer_size) + + !!(content.size() % config::s3_write_buffer_size); + EXPECT_EQ(mock_client->put_object_count, 0) << msg; + EXPECT_EQ(mock_client->create_multipart_count, 1) << msg; + EXPECT_EQ(mock_client->complete_multipart_count, 1) << msg; + EXPECT_EQ(mock_client->upload_part_count, expected_num_parts) << msg; + } + EXPECT_EQ(mock_client->last_opts.path.native(), expected_path) << msg; + EXPECT_EQ(mock_client->objects[expected_path].size(), content.size()) << msg; + // EXPECT_EQ(mock_client->last_stream, content); // Will print too many if compare all content if failed + if (content.size() > 0 && mock_client->objects[expected_path].size() > 0) { + EXPECT_EQ(mock_client->objects[expected_path].front(), content.front()) << msg; + EXPECT_EQ(mock_client->objects[expected_path].back(), content.back()) << msg; + } + }; + // fpath is a function to generate a file path for debug to locate line number if some tests failed + auto fpath = [](const char* file, int line, std::string suffix) { + std::stringstream ss; + ss << file << ":" << line << "_" << suffix; + std::string ret = ss.str(); + // return ret.substr(ret.rfind('/') + 1); // keep file name only + return ret.substr(ret[0] == '/'); // remove the first '/' + }; + + // test all sizes in generate_test_sizes() + for (auto& i : generate_test_sizes(20)) { // reduce number of cases if it spends too much time + test(char('a' + (i % 26)), i, fpath(__FILE__, __LINE__, std::to_string(i) + ".dat")); + } + + // clang-format off + // some verbose tests + test('a', 0, fpath(__FILE__, __LINE__, "0.dat")); + test('b', 1, fpath(__FILE__, __LINE__, "1.dat")); + test('c', 2, fpath(__FILE__, __LINE__, "2.dat")); + test('d', 1024L, fpath(__FILE__, __LINE__, "1024.dat")); + test('e', 4 * 1024L, fpath(__FILE__, __LINE__, "512K.dat")); + test('f', 64 * 1024L, fpath(__FILE__, __LINE__, "1M.dat")); + test('g', 512 * 1024L, fpath(__FILE__, __LINE__, "2M.dat")); + test('h', 1 * 1024L * 1024L, fpath(__FILE__, __LINE__, "1M.dat")); + test('i', 2 * 1024L * 1024L, fpath(__FILE__, __LINE__, "2M.dat")); + test('j', 4 * 1024L * 1024L, fpath(__FILE__, __LINE__, "4M.dat")); + test('k', 8 * 1024L * 1024L, fpath(__FILE__, __LINE__, "8M.dat")); + test('l', 16 * 1024L * 1024L, fpath(__FILE__, __LINE__, "16M.dat")); + test('m', 32 * 1024L * 1024L, fpath(__FILE__, __LINE__, "32M.dat")); + test('n', 64 * 1024L * 1024L, fpath(__FILE__, __LINE__, "64M.dat")); + test('o', 128 * 1024L * 1024L, fpath(__FILE__, __LINE__, "128M.dat")); + test('p', 256 * 1024L * 1024L, fpath(__FILE__, __LINE__, "256M.dat")); + // test('q', 512 * 1024L * 1024L, fpath(__FILE__, __LINE__, "512M.dat")); + test('r', config::s3_write_buffer_size - 1, fpath(__FILE__, __LINE__, ".dat")); + test('s', config::s3_write_buffer_size, fpath(__FILE__, __LINE__, ".dat")); + test('t', config::s3_write_buffer_size + 1, fpath(__FILE__, __LINE__, ".dat")); + test('u', 2 * config::s3_write_buffer_size - 1, fpath(__FILE__, __LINE__, ".dat")); + test('v', 2 * config::s3_write_buffer_size, fpath(__FILE__, __LINE__, ".dat")); + test('w', 2 * config::s3_write_buffer_size + 1, fpath(__FILE__, __LINE__, ".dat")); + test('x', 3 * config::s3_write_buffer_size - 1, fpath(__FILE__, __LINE__, ".dat")); + test('y', 3 * config::s3_write_buffer_size, fpath(__FILE__, __LINE__, ".dat")); + test('z', 3 * config::s3_write_buffer_size + 1, fpath(__FILE__, __LINE__, ".dat")); + // test with large buffer size + config::s3_write_buffer_size = 8 * 1024L * 1024L; + test('0', config::s3_write_buffer_size - 1, fpath(__FILE__, __LINE__, ".dat")); + test('1', config::s3_write_buffer_size, fpath(__FILE__, __LINE__, ".dat")); + test('2', config::s3_write_buffer_size + 1, fpath(__FILE__, __LINE__, ".dat")); + test('0', 2 * config::s3_write_buffer_size - 1, fpath(__FILE__, __LINE__, ".dat")); + test('1', 2 * config::s3_write_buffer_size, fpath(__FILE__, __LINE__, ".dat")); + test('2', 2 * config::s3_write_buffer_size + 1, fpath(__FILE__, __LINE__, ".dat")); + // test with small buffer size + config::s3_write_buffer_size = 4 * 1024L * 1024L; + test('0', config::s3_write_buffer_size - 1, fpath(__FILE__, __LINE__, ".dat")); + test('1', config::s3_write_buffer_size, fpath(__FILE__, __LINE__, ".dat")); + test('2', config::s3_write_buffer_size + 1, fpath(__FILE__, __LINE__, ".dat")); + test('0', 2 * config::s3_write_buffer_size - 1, fpath(__FILE__, __LINE__, ".dat")); + test('1', 2 * config::s3_write_buffer_size, fpath(__FILE__, __LINE__, ".dat")); + test('2', 2 * config::s3_write_buffer_size + 1, fpath(__FILE__, __LINE__, ".dat")); + // clang-format on +} + } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org