github-actions[bot] commented on code in PR #35307:
URL: https://github.com/apache/doris/pull/35307#discussion_r1617226153


##########
be/test/io/fs/s3_file_writer_test.cpp:
##########
@@ -15,464 +15,1069 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "io/fs/s3_file_writer.h"
+
+#include <aws/core/utils/HashingUtils.h>
 #include <aws/s3/S3Client.h>
+#include <aws/s3/model/AbortMultipartUploadRequest.h>
+#include <aws/s3/model/CompleteMultipartUploadRequest.h>
 #include <aws/s3/model/CompletedPart.h>
+#include <aws/s3/model/CreateMultipartUploadRequest.h>
+#include <aws/s3/model/HeadObjectRequest.h>
+#include <aws/s3/model/PutObjectRequest.h>
 #include <aws/s3/model/UploadPartRequest.h>
 #include <gtest/gtest.h>
 
+#include <any>
+#include <array>
 #include <atomic>
 #include <chrono>
+#include <cstddef>
 #include <cstdlib>
+#include <cstring>
+#include <functional>
 #include <memory>
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <system_error>
 #include <thread>
+#include <type_traits>
+#include <unordered_map>
 
 #include "common/config.h"
+#include "common/status.h"
+#include "common/sync_point.h"
 #include "io/fs/file_reader.h"
 #include "io/fs/file_system.h"
 #include "io/fs/file_writer.h"
 #include "io/fs/local_file_system.h"
+#include "io/fs/s3_file_bufferpool.h"
 #include "io/fs/s3_file_system.h"
+#include "io/fs/s3_obj_storage_client.h"
 #include "io/io_common.h"
 #include "runtime/exec_env.h"
-#include "testutil/http_utils.h"
-#include "util/debug_points.h"
-#include "util/s3_util.h"
 #include "util/slice.h"
 #include "util/threadpool.h"
+#include "util/uuid_generator.h"
 namespace doris {
 
 static std::shared_ptr<io::S3FileSystem> s3_fs {nullptr};
 
-class S3FileWriterTest : public testing::Test {
+// This MockS3Client is only responsible for handling normal situations,
+// while error injection is left to other macros to resolve
+class MockS3Client {
 public:
-    static void SetUpTestSuite() {
-        if (getenv(RUN_S3_TESTS) == nullptr || std::string 
{getenv(RUN_S3_TESTS)} != "true") {
-            GTEST_SKIP();
+    MockS3Client() = default;
+    ~MockS3Client() = default;
+
+    Aws::S3::Model::CreateMultipartUploadOutcome create_multi_part_upload(
+            const Aws::S3::Model::CreateMultipartUploadRequest request) {
+        auto uuid = UUIDGenerator::instance()->next_uuid();
+        std::stringstream ss;
+        ss << uuid;
+        upload_id = ss.str();
+        bucket = request.GetBucket();
+        key = request.GetKey();
+        auto result = Aws::S3::Model::CreateMultipartUploadResult();
+        result.SetUploadId(upload_id);
+        auto outcome = 
Aws::S3::Model::CreateMultipartUploadOutcome(std::move(result));
+        return outcome;
+    }
+
+    Aws::S3::Model::AbortMultipartUploadOutcome abort_multi_part_upload(
+            const Aws::S3::Model::AbortMultipartUploadRequest& request) {
+        if (request.GetKey() != key || request.GetBucket() != bucket ||
+            upload_id != request.GetUploadId()) {
+            return Aws::S3::Model::AbortMultipartUploadOutcome(
+                    
Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::NO_SUCH_UPLOAD,
+                                                             false));
         }
-        config::enable_debug_points = true;
-        DebugPoints::instance()->clear();
-
-        S3Conf s3_conf {.bucket = config::test_s3_bucket,
-                        .prefix = "s3_file_writer_test",
-                        .client_conf = S3ClientConf {
-                                .endpoint = config::test_s3_endpoint,
-                                .region = config::test_s3_region,
-                                .ak = config::test_s3_ak,
-                                .sk = config::test_s3_sk,
-                        }};
-        std::cout << "s3 conf: " << s3_conf.to_string() << std::endl;
-        auto res = io::S3FileSystem::create(std::move(s3_conf), 
io::FileSystem::TMP_FS_ID);
-        ASSERT_TRUE(res.has_value()) << res.error();
+        uploaded_parts.clear();
+        return Aws::S3::Model::AbortMultipartUploadOutcome(
+                Aws::S3::Model::AbortMultipartUploadResult());
+    }
 
-        std::unique_ptr<doris::ThreadPool> _s3_file_upload_thread_pool;
-        static_cast<void>(ThreadPoolBuilder("S3FileUploadThreadPool")
-                                  .set_min_threads(16)
-                                  .set_max_threads(64)
-                                  .build(&_s3_file_upload_thread_pool));
-        ExecEnv::GetInstance()->_s3_file_upload_thread_pool =
-                std::move(_s3_file_upload_thread_pool);
+    Aws::S3::Model::UploadPartOutcome upload_part(const 
Aws::S3::Model::UploadPartRequest& request,
+                                                  std::string_view buf) {
+        if (request.GetKey() != key || request.GetBucket() != bucket ||
+            upload_id != request.GetUploadId()) {
+            return 
Aws::S3::Model::UploadPartOutcome(Aws::Client::AWSError<Aws::S3::S3Errors>(
+                    Aws::S3::S3Errors::NO_SUCH_UPLOAD, false));
+        }
+        if (request.ContentMD5HasBeenSet()) {
+            const auto& origin_md5 = request.GetContentMD5();
+            auto content = request.GetBody();
+            Aws::Utils::ByteBuffer 
part_md5(Aws::Utils::HashingUtils::CalculateMD5(*content));
+            const auto& md5 = Aws::Utils::HashingUtils::Base64Encode(part_md5);
+            if (origin_md5 != md5) {
+                return 
Aws::S3::Model::UploadPartOutcome(Aws::Client::AWSError<Aws::S3::S3Errors>(
+                        Aws::S3::S3Errors::INVALID_OBJECT_STATE, "wrong md5", 
"md5 not match",
+                        false));
+            }
+        }
+        {
+            Slice slice {buf.data(), buf.size()};
+            std::string str;
+            str.resize(slice.get_size());
+            std::memcpy(str.data(), slice.get_data(), slice.get_size());
+            std::unique_lock lck {latch};
+            uploaded_parts.insert({request.GetPartNumber(), std::move(str)});
+            file_size += request.GetContentLength();
+        }
+        LOG_INFO("upload part size is {}", request.GetContentLength());
+        return 
Aws::S3::Model::UploadPartOutcome(Aws::S3::Model::UploadPartResult());
     }
 
-    static void TearDownTestSuite() {
-        if (getenv(RUN_S3_TESTS) == nullptr || std::string 
{getenv(RUN_S3_TESTS)} != "true") {
-            GTEST_SKIP();
+    Aws::S3::Model::CompleteMultipartUploadOutcome complete_multi_part_upload(
+            const Aws::S3::Model::CompleteMultipartUploadRequest& request) {
+        if (request.GetKey() != key || request.GetBucket() != bucket ||
+            upload_id != request.GetUploadId()) {
+            return Aws::S3::Model::CompleteMultipartUploadOutcome(
+                    
Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::NO_SUCH_UPLOAD,
+                                                             false));
+        }
+        const auto& multi_part_upload = request.GetMultipartUpload();
+        if (multi_part_upload.GetParts().size() != uploaded_parts.size()) {
+            return Aws::S3::Model::CompleteMultipartUploadOutcome(
+                    Aws::Client::AWSError<Aws::S3::S3Errors>(
+                            Aws::S3::S3Errors::INVALID_OBJECT_STATE, "part num 
not match",
+                            "part num not match", false));
+        }
+        for (size_t i = 0; i < multi_part_upload.GetParts().size(); i++) {
+            if (i + 1 != multi_part_upload.GetParts().at(i).GetPartNumber()) {
+                return Aws::S3::Model::CompleteMultipartUploadOutcome(
+                        Aws::Client::AWSError<Aws::S3::S3Errors>(
+                                Aws::S3::S3Errors::INVALID_OBJECT_STATE, "part 
num not coutinous",
+                                "part num not coutinous", false));
+            }
         }
-        ExecEnv::GetInstance()->_s3_file_upload_thread_pool->shutdown();
-        ExecEnv::GetInstance()->_s3_file_upload_thread_pool = nullptr;
-        s3_fs.reset();
+        exists = true;
+        return Aws::S3::Model::CompleteMultipartUploadOutcome(
+                Aws::S3::Model::CompleteMultipartUploadResult());
     }
 
-    void SetUp() override {
-        if (getenv(RUN_S3_TESTS) == nullptr || std::string 
{getenv(RUN_S3_TESTS)} != "true") {
-            GTEST_SKIP();
+    Aws::S3::Model::PutObjectOutcome put_object(const 
Aws::S3::Model::PutObjectRequest& request,
+                                                std::string_view& buf) {
+        exists = true;
+        file_size = request.GetContentLength();
+        key = request.GetKey();
+        bucket = request.GetBucket();
+        Slice s {buf.data(), buf.size()};
+        std::string str;
+        str.resize(s.get_size());
+        std::memcpy(str.data(), s.get_data(), s.get_size());
+        uploaded_parts.insert({1, std::move(str)});
+        return 
Aws::S3::Model::PutObjectOutcome(Aws::S3::Model::PutObjectResult());
+    }
+
+    Aws::S3::Model::HeadObjectOutcome head_object(
+            const Aws::S3::Model::HeadObjectRequest& request) {
+        if (request.GetKey() != key || request.GetBucket() != bucket || 
!exists) {
+            auto error = Aws::Client::AWSError<Aws::S3::S3Errors>(
+                    Aws::S3::S3Errors::RESOURCE_NOT_FOUND, false);
+            error.SetResponseCode(Aws::Http::HttpResponseCode::NOT_FOUND);
+            return Aws::S3::Model::HeadObjectOutcome(error);
         }
+        auto result = Aws::S3::Model::HeadObjectResult();
+        result.SetContentLength(file_size);
+        return Aws::S3::Model::HeadObjectOutcome(result);
     }
 
+    [[nodiscard]] const std::map<int64_t, std::string>& contents() const { 
return uploaded_parts; }
+
 private:
-    static constexpr char RUN_S3_TESTS[] = "RUN_S3_TESTS";
+    std::mutex latch;
+    std::string upload_id;
+    size_t file_size {0};
+    std::map<int64_t, std::string> uploaded_parts;
+    std::string key;
+    std::string bucket;
+    bool exists {false};
+};
+
+static std::shared_ptr<MockS3Client> mock_client = nullptr;
+
+struct MockCallback {
+    std::string point_name;
+    std::function<void(std::vector<std::any>&&)> callback;
+};
+
+static auto test_mock_callbacks = std::array {
+        MockCallback {"s3_file_writer::create_multi_part_upload",
+                      [](auto&& outcome) {
+                          const auto& req =
+                                  try_any_cast<const 
Aws::S3::Model::CreateMultipartUploadRequest&>(
+                                          outcome.at(0));
+                          auto pair =
+                                  
try_any_cast_ret<Aws::S3::Model::CreateMultipartUploadOutcome>(
+                                          outcome);
+                          pair->second = true;
+                          pair->first = 
mock_client->create_multi_part_upload(req);
+                      }},
+        MockCallback {"s3_file_writer::abort_multi_part",
+                      [](auto&& outcome) {
+                          const auto& req =
+                                  try_any_cast<const 
Aws::S3::Model::AbortMultipartUploadRequest&>(
+                                          outcome.at(0));
+                          auto pair = 
try_any_cast_ret<Aws::S3::Model::AbortMultipartUploadOutcome>(
+                                  outcome);
+                          pair->second = true;
+                          pair->first = 
mock_client->abort_multi_part_upload(req);
+                      }},
+        MockCallback {"s3_file_writer::upload_part",
+                      [](auto&& outcome) {
+                          const auto& req = try_any_cast<const 
Aws::S3::Model::UploadPartRequest&>(
+                                  outcome.at(0));
+                          const auto& buf = 
try_any_cast<std::string_view*>(outcome.at(1));
+                          auto pair = 
try_any_cast_ret<Aws::S3::Model::UploadPartOutcome>(outcome);
+                          pair->second = true;
+                          pair->first = mock_client->upload_part(req, *buf);
+                      }},
+        MockCallback {
+                "s3_file_writer::complete_multi_part",
+                [](auto&& outcome) {
+                    const auto& req =
+                            try_any_cast<const 
Aws::S3::Model::CompleteMultipartUploadRequest&>(
+                                    outcome.at(0));
+                    auto pair = 
try_any_cast_ret<Aws::S3::Model::CompleteMultipartUploadOutcome>(
+                            outcome);
+                    pair->second = true;
+                    pair->first = mock_client->complete_multi_part_upload(req);
+                }},
+        MockCallback {"s3_file_writer::put_object",
+                      [](auto&& outcome) {
+                          const auto& req = try_any_cast<const 
Aws::S3::Model::PutObjectRequest&>(
+                                  outcome.at(0));
+                          const auto& buf = 
try_any_cast<std::string_view*>(outcome.at(1));
+                          auto pair = 
try_any_cast_ret<Aws::S3::Model::PutObjectOutcome>(outcome);
+                          pair->second = true;
+                          pair->first = mock_client->put_object(req, *buf);
+                      }},
+        MockCallback {"s3_file_system::head_object",
+                      [](auto&& outcome) {
+                          const auto& req = try_any_cast<const 
Aws::S3::Model::HeadObjectRequest&>(
+                                  outcome.at(0));
+                          auto pair = 
try_any_cast_ret<Aws::S3::Model::HeadObjectOutcome>(outcome);
+                          pair->second = true;
+                          pair->first = mock_client->head_object(req);
+                      }},
+        MockCallback {"s3_client_factory::create", [](auto&& outcome) {
+                          auto pair = 
try_any_cast_ret<std::shared_ptr<Aws::S3::S3Client>>(outcome);
+                          pair->second = true;
+                      }}};
+
+class S3FileWriterTest : public testing::Test {
+public:
+    static void SetUpTestSuite() {
+        auto sp = SyncPoint::get_instance();
+        sp->enable_processing();
+        config::file_cache_enter_disk_resource_limit_mode_percent = 99;
+        std::for_each(test_mock_callbacks.begin(), test_mock_callbacks.end(),
+                      [sp](const MockCallback& mockcallback) {
+                          sp->set_call_back(mockcallback.point_name, 
mockcallback.callback);
+                      });
+        std::string cur_path = std::filesystem::current_path();
+        S3Conf s3_conf;
+        s3_conf.client_conf.ak = "fake_ak";
+        s3_conf.client_conf.sk = "fake_sk";
+        s3_conf.client_conf.endpoint = "fake_s3_endpoint";
+        s3_conf.client_conf.region = "fake_s3_region";
+        s3_conf.bucket = "fake_s3_bucket";
+        s3_conf.prefix = "s3_file_writer_test";
+        LOG_INFO("s3 conf is {}", s3_conf.to_string());
+        auto res = io::S3FileSystem::create(std::move(s3_conf), 
io::FileSystem::TMP_FS_ID);
+        ASSERT_TRUE(res.has_value()) << res.error();
+        s3_fs = res.value();
+
+        std::unique_ptr<ThreadPool> _pool;
+        std::ignore = ThreadPoolBuilder("s3_upload_file_thread_pool")
+                              .set_min_threads(5)
+                              .set_max_threads(10)
+                              .build(&_pool);
+        ExecEnv::GetInstance()->_s3_file_upload_thread_pool = std::move(_pool);
+    }
+
+    static void TearDownTestSuite() {
+        auto sp = SyncPoint::get_instance();
+        std::for_each(test_mock_callbacks.begin(), test_mock_callbacks.end(),
+                      [sp](const MockCallback& mockcallback) {
+                          sp->clear_call_back(mockcallback.point_name);
+                      });
+        sp->disable_processing();
+    }
 };
 
 TEST_F(S3FileWriterTest, multi_part_io_error) {
+    mock_client = std::make_shared<MockS3Client>();
     doris::io::FileWriterOptions state;
     auto fs = io::global_local_filesystem();
-    {
-        
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/s3_file_writer::_upload_one_part");
-        Defer defer {[&]() {
-            
POST_HTTP_TO_TEST_SERVER("/api/debug_point/remove/s3_file_writer::_upload_one_part");
-        }};
-        io::FileReaderSPtr local_file_reader;
-
-        auto st =
-                fs->open_file("./be/test/olap/test_data/all_types_100000.txt", 
&local_file_reader);
+
+    auto sp = SyncPoint::get_instance();
+    int largerThan5MB = 0;
+    sp->set_call_back("S3FileWriter::_upload_one_part", 
[&largerThan5MB](auto&& outcome) {
+        // Deliberately make one upload one part task fail to test if s3 file 
writer could
+        // handle io error
+        if (largerThan5MB > 0) {
+            LOG(INFO) << "set upload one part to error";
+            std::this_thread::sleep_for(std::chrono::milliseconds(500));
+            auto ptr = try_any_cast<
+                    Aws::Utils::Outcome<Aws::S3::Model::UploadPartResult, 
Aws::S3::S3Error>*>(
+                    outcome.back());
+            *ptr = Aws::Utils::Outcome<Aws::S3::Model::UploadPartResult, 
Aws::S3::S3Error>(
+                    Aws::Client::AWSError<Aws::S3::S3Errors>());
+        }
+        largerThan5MB++;
+    });
+    Defer defer {[&]() { 
sp->clear_call_back("S3FileWriter::_upload_one_part"); }};
+    auto client = s3_fs->client_holder();
+    io::FileReaderSPtr local_file_reader;
+
+    auto st = fs->open_file("./be/test/olap/test_data/all_types_100000.txt", 
&local_file_reader);
+    ASSERT_TRUE(st.ok()) << st;
+
+    constexpr int buf_size = 8192;
+
+    io::FileWriterPtr s3_file_writer;
+    st = s3_fs->create_file("multi_part_io_error", &s3_file_writer, &state);
+    ASSERT_TRUE(st.ok()) << st;
+
+    char buf[buf_size];
+    doris::Slice slice(buf, buf_size);
+    size_t offset = 0;
+    size_t bytes_read = 0;
+    auto file_size = local_file_reader->size();
+    while (offset < file_size) {
+        st = local_file_reader->read_at(offset, slice, &bytes_read);
+        ASSERT_TRUE(st.ok()) << st;
+        st = s3_file_writer->append(Slice(buf, bytes_read));
         ASSERT_TRUE(st.ok()) << st;
+        offset += bytes_read;
+    }
+    ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
+    st = s3_file_writer->close(true);
+    ASSERT_TRUE(st.ok()) << st;
+    // The second part would fail uploading itself to s3
+    // so the result of close should be not ok
+    st = s3_file_writer->close();
+    ASSERT_FALSE(st.ok()) << st;
+    bool exists = false;
+    st = s3_fs->exists("multi_part_io_error", &exists);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_FALSE(exists);
+}
 
-        constexpr int buf_size = 8192;
+TEST_F(S3FileWriterTest, offset_test) {

Review Comment:
   warning: function 'TEST_F' exceeds recommended size/complexity thresholds 
[readability-function-size]
   ```cpp
   TEST_F(S3FileWriterTest, offset_test) {
   ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/test/io/fs/s3_file_writer_test.cpp:369:** 84 lines including whitespace 
and comments (threshold 80)
   ```cpp
   TEST_F(S3FileWriterTest, offset_test) {
   ^
   ```
   
   </details>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to