This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0c5c400b327 branch-3.0: [feature](cloud)Impl file cache microbench 
#47563 (#49440)
0c5c400b327 is described below

commit 0c5c400b3270053b2fa1da2762429436b70ffe6f
Author: deardeng <deng...@selectdb.com>
AuthorDate: Sun Mar 30 10:39:28 2025 +0800

    branch-3.0: [feature](cloud)Impl file cache microbench #47563 (#49440)
    
    cherry pick from #47563
---
 be/CMakeLists.txt                         |   10 +
 be/src/io/tools/CMakeLists.txt            |   68 +
 be/src/io/tools/Makefile                  |   36 +
 be/src/io/tools/file_cache_microbench.cpp | 2354 +++++++++++++++++++++++++++++
 be/src/io/tools/proto/Makefile            |   38 +
 be/src/io/tools/proto/microbench.proto    |   39 +
 be/src/io/tools/readme.md                 |  133 ++
 build.sh                                  |   96 +-
 run-be-ut.sh                              |    1 +
 9 files changed, 2735 insertions(+), 40 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 1124eb864e1..4e3ea02ef47 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -86,6 +86,11 @@ if (DISPLAY_BUILD_TIME)
     set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "time -f 'TimeUsage: 
real=%es, user=%Us, sys=%Ss'")
 endif()
 
+if (BUILD_FILE_CACHE_MICROBENCH_TOOL)
+    add_definitions(-DBE_TEST)
+    add_definitions(-DBE_BENCHMARK)
+endif()
+
 message(STATUS "GLIBC_COMPATIBILITY is ${GLIBC_COMPATIBILITY}")
 message(STATUS "USE_LIBCPP is ${USE_LIBCPP}")
 message(STATUS "USE_MEM_TRACKER is ${USE_MEM_TRACKER}")
@@ -765,6 +770,11 @@ if (BUILD_META_TOOL)
     add_subdirectory(${SRC_DIR}/tools)
 endif()
 
+option(BUILD_FILE_CACHE_MICROBENCH_TOOL "Build file cache mirobench Tool" OFF)
+if (BUILD_FILE_CACHE_MICROBENCH_TOOL)
+    add_subdirectory(${SRC_DIR}/io/tools)
+endif()
+
 option(BUILD_INDEX_TOOL "Build index tool" OFF)
 if (BUILD_INDEX_TOOL)
     add_subdirectory(${SRC_DIR}/index-tools)
diff --git a/be/src/io/tools/CMakeLists.txt b/be/src/io/tools/CMakeLists.txt
new file mode 100644
index 00000000000..40088892a97
--- /dev/null
+++ b/be/src/io/tools/CMakeLists.txt
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# where to put generated libraries
+set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/tools")
+
+# where to put generated binaries
+set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/tools")
+
+# 编译proto文件
+execute_process(
+    COMMAND make clean -C ${CMAKE_CURRENT_SOURCE_DIR}/proto
+    RESULT_VARIABLE CLEAN_RESULT
+)
+
+execute_process(
+    COMMAND make -C ${CMAKE_CURRENT_SOURCE_DIR}/proto
+    RESULT_VARIABLE MAKE_RESULT
+)
+
+if(NOT ${MAKE_RESULT} EQUAL 0)
+    message(FATAL_ERROR "Failed to compile proto files")
+endif()
+
+# 打印当前源代码目录
+message(STATUS "CMAKE_CURRENT_SOURCE_DIR: ${CMAKE_CURRENT_SOURCE_DIR}")
+
+# 查找生成的proto文件
+file(GLOB PROTO_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/build/proto/*.pb.cc")
+file(GLOB PROTO_HDRS "${CMAKE_CURRENT_SOURCE_DIR}/build/proto/*.pb.h")
+
+# 打印PROTO_SRCS和PROTO_HDRS
+message(STATUS "PROTO_SRCS: ${PROTO_SRCS}")
+message(STATUS "PROTO_HDRS: ${PROTO_HDRS}")
+
+# 添加 file_cache_microbench 可执行文件
+add_executable(file_cache_microbench
+    file_cache_microbench.cpp
+    ${PROTO_SRCS}
+)
+
+# 添加proto生成文件的包含路径
+target_include_directories(file_cache_microbench PRIVATE
+    ${CMAKE_CURRENT_SOURCE_DIR}/build/proto
+)
+
+# 链接所需的库
+target_link_libraries(file_cache_microbench
+    ${DORIS_LINK_LIBS}
+    protobuf
+)
+
+# 安装规则
+install(TARGETS file_cache_microbench DESTINATION ${OUTPUT_DIR}/lib/)
diff --git a/be/src/io/tools/Makefile b/be/src/io/tools/Makefile
new file mode 100644
index 00000000000..d3be0e6905d
--- /dev/null
+++ b/be/src/io/tools/Makefile
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This file is to make all generated file needed by fe and be.
+
+BUILD_DIR = ${CURDIR}/build/
+
+all: subdirs
+.PHONY: all
+
+# build all subdir
+SUBDIR = script proto thrift
+subdirs: ${SUBDIR}
+.PHONY: subdirs ${SUBDIR}
+${SUBDIR}:
+       $(MAKE) -C $@
+# script will product new thrift file.
+thrift: script
+
+clean:
+       rm -rf ${BUILD_DIR}
+.PHONY: clean
diff --git a/be/src/io/tools/file_cache_microbench.cpp 
b/be/src/io/tools/file_cache_microbench.cpp
new file mode 100644
index 00000000000..0111ff9f68c
--- /dev/null
+++ b/be/src/io/tools/file_cache_microbench.cpp
@@ -0,0 +1,2354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#if defined(BE_TEST) && defined(BUILD_FILE_CACHE_MICROBENCH_TOOL)
+#include <brpc/controller.h>
+#include <brpc/http_status_code.h>
+#include <brpc/server.h>
+#include <brpc/uri.h>
+#include <bvar/bvar.h>
+#include <fmt/format.h>
+#include <glog/logging.h>
+
+#include <atomic>
+#include <chrono>
+#include <condition_variable>
+#include <cstdlib>
+#include <filesystem> // Add this header file
+#include <future>
+#include <iomanip>
+#include <iostream>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <queue>
+#include <random>
+#include <string>
+#include <thread>
+#include <unordered_set>
+#include <vector>
+
+#include "build/proto/microbench.pb.h"
+#include "common/config.h"
+#include "common/status.h"
+#include "gflags/gflags.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/block_file_cache_factory.h"
+#include "io/cache/cached_remote_file_reader.h"
+#include "io/file_factory.h"
+#include "io/fs/s3_file_system.h"
+#include "io/fs/s3_file_writer.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+#ifdef __clang__
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wkeyword-macro"
+#elif defined(__GNUC__)
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wshadow"
+#endif
+
+#define private public
+#include "runtime/exec_env.h"
+#undef private
+
+#ifdef __clang__
+#pragma clang diagnostic pop
+#elif defined(__GNUC__)
+#pragma GCC diagnostic pop
+#endif
+
+#include <gen_cpp/cloud_version.h>
+
+#include "util/bvar_helper.h"
+#include "util/defer_op.h"
+#include "util/stopwatch.hpp"
+#include "util/string_util.h"
+#include "util/threadpool.h"
+
+using doris::io::FileCacheFactory;
+using doris::io::BlockFileCache;
+
+bvar::LatencyRecorder microbench_write_latency("file_cache_microbench_append");
+bvar::LatencyRecorder microbench_read_latency("file_cache_microbench_read_at");
+
+const std::string HIDDEN_PREFIX = "test_file_cache_microbench/";
+const char PAD_CHAR = 'x';
+const size_t BUFFER_SIZE = 1024 * 1024;
+// Just 10^9.
+static constexpr auto NS = 1000000000UL;
+
+DEFINE_int32(port, 8888, "Http Port of this server");
+
+static std::string build_info() {
+    std::stringstream ss;
+    ss << R"(
+    Version: {)";
+    ss << DORIS_CLOUD_BUILD_VERSION;
+
+#if defined(NDEBUG)
+    ss << R"(-release})";
+#else
+    ss << R"(-debug})";
+#endif
+
+    ss << R"(
+    Code_version: {commit=)" DORIS_CLOUD_BUILD_HASH R"( time=)" 
DORIS_CLOUD_BUILD_VERSION_TIME R"(
+    Build_info: {initiator=)" DORIS_CLOUD_BUILD_INITIATOR R"( build_at=)" 
DORIS_CLOUD_BUILD_TIME R"(
+    Build_on: )" DORIS_CLOUD_BUILD_OS_VERSION R"(})";
+    return ss.str();
+}
+
+// Modify DataGenerator class to generate more standard data blocks
+class DataGenerator {
+public:
+    DataGenerator(size_t total_size) : _total_size(total_size), 
_generated_size(0) {
+        _buffer.resize(BUFFER_SIZE);
+    }
+
+    // Get the next chunk of data
+    doris::Slice next_chunk(const std::string& key) {
+        if (_generated_size >= _total_size) {
+            // Return an empty slice to indicate the end
+            return doris::Slice();
+        }
+
+        size_t remaining = _total_size - _generated_size;
+        size_t chunk_size = std::min(remaining, BUFFER_SIZE);
+
+        // Generate the tag for this block
+        std::string tag = fmt::format("key={},offset={}\n", key, 
_generated_size);
+        size_t tag_size = tag.size();
+
+        // Ensure chunk_size is not less than tag_size
+        if (chunk_size < tag_size) {
+            std::memcpy(_buffer.data(), tag.data(), chunk_size);
+        } else {
+            // Fill the buffer with key:offset
+            std::memcpy(_buffer.data(), tag.data(), tag_size);
+            // Fill the remaining part
+            std::fill(_buffer.data() + tag_size, _buffer.data() + chunk_size, 
PAD_CHAR);
+        }
+
+        _generated_size += chunk_size;
+        return doris::Slice(_buffer.data(), chunk_size);
+    }
+
+    bool has_more() const { return _generated_size < _total_size; }
+
+private:
+    const size_t _total_size;
+    size_t _generated_size;
+    std::vector<char> _buffer;
+};
+
+class DataVerifier {
+public:
+    static bool verify_data(const std::string& key, size_t file_size, size_t 
read_offset,
+                            const std::string& data, size_t data_size) {
+        size_t current_block_start = (read_offset / BUFFER_SIZE) * BUFFER_SIZE;
+        size_t data_pos = 0;
+
+        while (data_pos < data_size) {
+            // Calculate the offset in the current block
+            size_t block_offset = read_offset + data_pos - current_block_start;
+
+            // Check if it exceeds the total file size
+            if (current_block_start >= file_size) {
+                break;
+            }
+
+            // Generate the expected tag
+            std::string expected_tag = fmt::format("key={},offset={}\n", key, 
current_block_start);
+
+            // If within the tag range, need to verify the tag
+            if (block_offset < expected_tag.size()) {
+                // Calculate the length of the tag that can be read in the 
current data
+                size_t available_tag_len =
+                        std::min(expected_tag.size() - block_offset, data_size 
- data_pos);
+
+                // If already at the end of the file, only verify the actual 
existing data
+                if (read_offset + data_pos + available_tag_len > file_size) {
+                    available_tag_len = file_size - (read_offset + data_pos);
+                }
+
+                if (available_tag_len == 0) break;
+                std::string_view actual_tag(data.data() + data_pos, 
available_tag_len);
+                std::string_view expected_tag_part(expected_tag.data() + 
block_offset,
+                                                   available_tag_len);
+
+                if (actual_tag != expected_tag_part) {
+                    LOG(ERROR) << "Tag mismatch at offset " << (read_offset + 
data_pos)
+                               << "\nExpected: " << expected_tag_part << 
"\nGot: " << actual_tag;
+                    return false;
+                }
+                data_pos += available_tag_len;
+            } else {
+                char expected_byte = static_cast<char>(PAD_CHAR);
+                if (data[data_pos] != expected_byte) {
+                    LOG(ERROR) << "Data mismatch at offset " << (read_offset + 
data_pos)
+                               << "\nExpected byte: " << (char)expected_byte
+                               << "\nGot byte: " << (char)data[data_pos];
+                    return false;
+                }
+                data_pos++;
+            }
+
+            // If reaching the end of the block, move to the next block
+            if ((read_offset + data_pos) % BUFFER_SIZE == 0) {
+                current_block_start += BUFFER_SIZE;
+            }
+        }
+
+        return true;
+    }
+};
+
+// Define a struct to store file information
+struct FileInfo {
+    std::string filename; // File name
+    size_t data_size;     // Data size
+    std::string job_id;   // Associated job ID
+};
+
+class S3FileRecords {
+public:
+    void add_file_info(const std::string& job_id, const FileInfo& file_info) {
+        std::lock_guard<std::mutex> lock(mutex_);
+        records_[job_id].emplace_back(file_info);
+    }
+
+    int64_t get_exist_job_perfile_size_by_prefix(const std::string& 
file_prefix) {
+        std::lock_guard<std::mutex> lock(mutex_);
+        for (const auto& pair : records_) {
+            const std::vector<FileInfo>& file_infos = pair.second;
+            for (const auto& file_info : file_infos) {
+                if (file_info.filename.compare(0, file_prefix.length(), 
file_prefix) == 0) {
+                    return file_info.data_size;
+                }
+            }
+        }
+        return -1;
+    }
+
+    void get_exist_job_files_by_prefix(const std::string& file_prefix,
+                                       std::vector<std::string>& result, int 
file_number = -1) {
+        std::lock_guard<std::mutex> lock(mutex_);
+        for (const auto& pair : records_) {
+            const std::vector<FileInfo>& file_infos = pair.second;
+            for (const auto& file_info : file_infos) {
+                if (file_info.filename.compare(0, file_prefix.length(), 
file_prefix) == 0) {
+                    if (file_number == -1 || result.size() < file_number) {
+                        result.push_back(file_info.filename);
+                    }
+                    if (file_number != -1 && result.size() >= file_number) {
+                        return;
+                    }
+                }
+            }
+        }
+    }
+
+    std::string find_job_id_by_prefix(const std::string& file_prefix) {
+        std::lock_guard<std::mutex> lock(mutex_);
+        for (const auto& pair : records_) {
+            const std::vector<FileInfo>& file_infos = pair.second;
+            for (const auto& file_info : file_infos) {
+                if (file_info.filename.compare(0, file_prefix.length(), 
file_prefix) == 0) {
+                    return pair.first;
+                }
+            }
+        }
+        return "";
+    }
+
+private:
+    std::mutex mutex_;
+    std::map<std::string, std::vector<FileInfo>> records_;
+};
+
+// Create a global S3FileRecords instance
+S3FileRecords s3_file_records;
+
+class MircobenchS3FileWriter {
+public:
+    MircobenchS3FileWriter(std::shared_ptr<doris::io::ObjClientHolder> client,
+                           const std::string& bucket, const std::string& key,
+                           const doris::io::FileWriterOptions* options,
+                           std::shared_ptr<doris::S3RateLimiterHolder> 
rate_limiter)
+            : _writer(client, bucket, key, options), 
_rate_limiter(rate_limiter) {}
+
+    doris::Status appendv(const doris::Slice* slices, size_t slices_size,
+                          const std::shared_ptr<bvar::LatencyRecorder>& 
write_bvar) {
+        if (_rate_limiter) {
+            _rate_limiter->add(1); // Consume a token
+        }
+        using namespace doris;
+        if (write_bvar) {
+            SCOPED_BVAR_LATENCY(*write_bvar)
+        }
+        SCOPED_BVAR_LATENCY(microbench_write_latency);
+        return _writer.appendv(slices, slices_size);
+    }
+
+    doris::Status close() { return _writer.close(); }
+
+private:
+    doris::io::S3FileWriter _writer;
+    std::shared_ptr<doris::S3RateLimiterHolder> _rate_limiter;
+};
+
+class MicrobenchFileReader {
+public:
+    MicrobenchFileReader(std::shared_ptr<doris::io::FileReader> base_reader,
+                         std::shared_ptr<doris::S3RateLimiterHolder> 
rate_limiter)
+            : _base_reader(std::move(base_reader)), 
_rate_limiter(rate_limiter) {}
+
+    doris::Status read_at(size_t offset, const doris::Slice& result, size_t* 
bytes_read,
+                          const doris::io::IOContext* io_ctx,
+                          std::shared_ptr<bvar::LatencyRecorder> read_bvar) {
+        if (_rate_limiter) {
+            _rate_limiter->add(1); // Consume a token
+        }
+        using namespace doris;
+        if (read_bvar) {
+            SCOPED_BVAR_LATENCY(*read_bvar)
+        }
+        SCOPED_BVAR_LATENCY(microbench_write_latency);
+        return _base_reader->read_at(offset, result, bytes_read, io_ctx);
+    }
+
+    size_t size() const { return _base_reader->size(); }
+
+    doris::Status close() { return _base_reader->close(); }
+
+private:
+    std::shared_ptr<doris::io::FileReader> _base_reader;
+    std::shared_ptr<doris::S3RateLimiterHolder> _rate_limiter;
+};
+
+class ThreadPool {
+public:
+    ThreadPool(size_t num_threads) : stop(false) {
+        try {
+            for (size_t i = 0; i < num_threads; ++i) {
+                workers.emplace_back([this] {
+                    try {
+                        while (true) {
+                            std::function<void()> task;
+                            {
+                                std::unique_lock<std::mutex> lock(queue_mutex);
+                                condition.wait(lock, [this] { return stop || 
!tasks.empty(); });
+                                if (stop && tasks.empty()) {
+                                    return;
+                                }
+                                task = std::move(tasks.front());
+                                tasks.pop();
+                            }
+                            task();
+                        }
+                    } catch (const std::exception& e) {
+                        LOG(ERROR) << "Exception in thread pool worker: " << 
e.what();
+                    } catch (...) {
+                        LOG(ERROR) << "Unknown exception in thread pool 
worker";
+                    }
+                });
+            }
+        } catch (...) {
+            // Ensure proper cleanup in case of exception during construction
+            stop = true;
+            condition.notify_all();
+            throw;
+        }
+    }
+
+    template <class F>
+    std::future<void> enqueue(F&& f) {
+        auto task = 
std::make_shared<std::packaged_task<void()>>(std::forward<F>(f));
+        std::future<void> res = task->get_future();
+        {
+            std::unique_lock<std::mutex> lock(queue_mutex);
+            if (stop) {
+                throw std::runtime_error("enqueue on stopped ThreadPool");
+            }
+            tasks.emplace([task]() {
+                try {
+                    (*task)();
+                } catch (const std::exception& e) {
+                    LOG(ERROR) << "Exception in task: " << e.what();
+                } catch (...) {
+                    LOG(ERROR) << "Unknown exception in task";
+                }
+            });
+        }
+        condition.notify_one();
+        return res;
+    }
+
+    ~ThreadPool() {
+        {
+            std::unique_lock<std::mutex> lock(queue_mutex);
+            stop = true;
+        }
+        condition.notify_all();
+
+        // Safely wait for all threads to complete
+        for (auto& worker : workers) {
+            try {
+                if (worker.joinable()) {
+                    worker.join();
+                }
+            } catch (const std::system_error& e) {
+                LOG(WARNING) << "Failed to join thread: " << e.what();
+            }
+        }
+    }
+
+private:
+    std::vector<std::thread> workers;
+    std::queue<std::function<void()>> tasks;
+    std::mutex queue_mutex;
+    std::condition_variable condition;
+    bool stop;
+};
+
+class FileCompletionTracker {
+public:
+    void mark_completed(const std::string& key) {
+        std::lock_guard<std::mutex> lock(_mutex);
+        _completed_files.insert(key);
+        _cv.notify_all(); // Notify all waiting threads
+    }
+
+    bool is_completed(const std::string& key) {
+        return _completed_files.find(key) != _completed_files.end();
+    }
+
+    void wait_for_completion(const std::string& key) {
+        std::unique_lock<std::mutex> lock(_mutex);
+        _cv.wait(lock, [&] { return is_completed(key); });
+    }
+
+private:
+    std::mutex _mutex;
+    std::condition_variable _cv;
+    std::unordered_set<std::string> _completed_files;
+};
+
+std::string get_usage(const std::string& progname) {
+    std::string usage = R"(
+    )" + progname + R"( is the Doris microbench tool for testing file cache in 
cloud.
+
+    Usage:
+      Start the server:
+        )" + progname + R"( --port=<port_number>
+        
+    API Endpoints:
+      POST /submit_job
+        Submit a job with the following JSON body:
+        {
+          "size_bytes_perfile": <size>,        // Number of bytes to write per 
segment file
+          "write_iops": <limit>,               // IOPS limit for writing per 
segment files
+          "read_iops": <limit>,                // IOPS limit for reading per 
segment files
+          "num_threads": <count>,              // Number of threads in the 
thread pool, default 200
+          "num_files": <count>,                // Number of segments to 
write/read
+          "file_prefix": "<prefix>",           // Prefix for segment files, 
Notice: this tools hide prefix(test_file_cache_microbench/) before file_prefix
+          "write_batch_size": <size>,          // Size of data to write in 
each write operation
+          "cache_type": <type>,                // Write or Read data enter 
file cache queue type, support NORMAL | TTL | INDEX | DISPOSABLE, default NORMAL
+          "expiration": <timestamp>,           // File cache ttl expire time, 
value is a unix timestamp
+          "repeat": <count>,                   // Read repeat times, default 1
+          "read_offset": [<left>, <right>],    // Range for reading (left 
inclusive, right exclusive)
+          "read_length": [<left>, <right>]     // Range for reading length 
(left inclusive, right exclusive)
+        }
+
+      GET /get_job_status/<job_id>
+        Retrieve the status of a submitted job.
+        Parameters:
+          - job_id: The ID of the job to retrieve status for.
+          - files (optional): If provided, returns the associated file records 
for the job.
+            Example: /get_job_status/job_id?files=10
+
+      GET /list_jobs
+        List all submitted jobs and their statuses.
+
+      GET /get_help
+        Get this help information.
+
+      GET /file_cache_clear
+        Clear the file cache with the following query parameters:
+        {
+          "sync": <true|false>,                // Whether to synchronize the 
cache clear operation
+          "segment_path": "<path>"             // Optional path of the segment 
to clear from the cache
+        }
+        If "segment_path" is not provided, all caches will be cleared based on 
the "sync" parameter.
+
+      GET /file_cache_reset
+        Reset the file cache with the following query parameters:
+        {
+          "capacity": <new_capacity>,          // New capacity for the 
specified path
+          "path": "<path>"                     // Path of the segment to reset
+        }
+
+      GET /file_cache_release
+        Release the file cache with the following query parameters:
+        {
+          "base_path": "<base_path>"           // Optional base path to 
release specific caches
+        }
+
+      GET /update_config
+        Update the configuration with the following JSON body:
+        {
+          "config_key": "<key>",               // The configuration key to 
update
+          "config_value": "<value>",            // The new value for the 
configuration key
+          "persist": <true|false>              // Whether to persist the 
configuration change
+        }
+
+      GET /show_config
+        Retrieve the current configuration settings.
+
+    Notes:
+      - Ensure that the S3 configuration is set correctly in the environment.
+      - The program will create and read files in the specified S3 bucket.
+      - Monitor the logs for detailed execution information and errors.
+    )" + build_info();
+
+    return usage;
+}
+
+// Job configuration structure
+struct JobConfig {
+    // Default value initialization
+    int64_t size_bytes_perfile = 1024 * 1024;
+    int32_t write_iops = 0;
+    int32_t read_iops = 0;
+    int32_t num_threads = 200;
+    int32_t num_files = 1;
+    std::string file_prefix;
+    std::string cache_type = "NORMAL";
+    int64_t expiration = 0;
+    int32_t repeat = 1;
+    int64_t write_batch_size = doris::config::s3_write_buffer_size;
+    int64_t read_offset_left = 0;
+    int64_t read_offset_right = 0;
+    int64_t read_length_left = 0;
+    int64_t read_length_right = 0;
+    bool write_file_cache = true;
+    bool bvar_enable = false;
+
+    // Parse configuration from JSON
+    static JobConfig from_json(const std::string& json_str) {
+        JobConfig config;
+        rapidjson::Document d;
+        d.Parse(json_str.c_str());
+
+        if (d.HasParseError()) {
+            throw std::runtime_error("JSON parse error json args=" + json_str);
+        }
+
+        // Basic validation
+        validate(d);
+
+        // Use helper functions to parse each field
+        parse_basic_fields(d, config);
+        parse_cache_settings(d, config);
+        parse_read_settings(d, config);
+
+        // Additional validation
+        validate_config(config);
+
+        return config;
+    }
+
+private:
+    // Validate the JSON document
+    static void validate(const rapidjson::Document& json_data) {
+        if (!json_data.HasMember("file_prefix") || 
!json_data["file_prefix"].IsString() ||
+            strlen(json_data["file_prefix"].GetString()) == 0) {
+            throw std::runtime_error("file_prefix is required and cannot be 
empty");
+        }
+    }
+
+    // Parse basic fields
+    static void parse_basic_fields(const rapidjson::Document& d, JobConfig& 
config) {
+        // Parse file_prefix (required field)
+        config.file_prefix = d["file_prefix"].GetString();
+
+        // Parse optional fields
+        if (d.HasMember("num_files") && d["num_files"].IsInt()) {
+            config.num_files = d["num_files"].GetInt();
+        }
+
+        if (d.HasMember("size_bytes_perfile") && 
d["size_bytes_perfile"].IsInt64()) {
+            config.size_bytes_perfile = d["size_bytes_perfile"].GetInt64();
+        }
+
+        if (d.HasMember("write_iops") && d["write_iops"].IsInt()) {
+            config.write_iops = d["write_iops"].GetInt();
+        }
+
+        if (d.HasMember("read_iops") && d["read_iops"].IsInt()) {
+            config.read_iops = d["read_iops"].GetInt();
+        }
+
+        if (d.HasMember("num_threads") && d["num_threads"].IsInt()) {
+            config.num_threads = d["num_threads"].GetInt();
+        }
+
+        if (d.HasMember("repeat") && d["repeat"].IsInt64()) {
+            config.repeat = d["repeat"].GetInt64();
+        }
+
+        if (d.HasMember("write_batch_size") && 
d["write_batch_size"].IsInt64()) {
+            config.write_batch_size = d["write_batch_size"].GetInt64();
+        }
+
+        if (d.HasMember("write_file_cache") && d["write_file_cache"].IsBool()) 
{
+            config.write_file_cache = d["write_file_cache"].GetBool();
+        }
+
+        if (d.HasMember("bvar_enable") && d["bvar_enable"].IsBool()) {
+            config.bvar_enable = d["bvar_enable"].GetBool();
+        }
+    }
+
+    // Parse cache-related settings
+    static void parse_cache_settings(const rapidjson::Document& d, JobConfig& 
config) {
+        if (d.HasMember("cache_type") && d["cache_type"].IsString()) {
+            config.cache_type = d["cache_type"].GetString();
+        }
+
+        // Check for TTL cache type
+        if (config.cache_type == "TTL") {
+            if (!d.HasMember("expiration") || !d["expiration"].IsInt64()) {
+                throw std::runtime_error(
+                        "expiration is required and must be an integer when 
cache type is TTL");
+            }
+            config.expiration = d["expiration"].GetInt64();
+        }
+    }
+
+    // Parse read-related settings
+    static void parse_read_settings(const rapidjson::Document& d, JobConfig& 
config) {
+        if (config.read_iops > 0) {
+            // Parse read_offset
+            if (d.HasMember("read_offset") && d["read_offset"].IsArray() &&
+                d["read_offset"].Size() == 2) {
+                const rapidjson::Value& read_offset_array = d["read_offset"];
+                config.read_offset_left = read_offset_array[0].GetInt64();
+                config.read_offset_right = read_offset_array[1].GetInt64();
+            } else {
+                throw std::runtime_error("Invalid read_offset format, expected 
array of size 2");
+            }
+
+            // Parse read_length
+            if (d.HasMember("read_length") && d["read_length"].IsArray() &&
+                d["read_length"].Size() == 2) {
+                const rapidjson::Value& read_length_array = d["read_length"];
+                config.read_length_left = read_length_array[0].GetInt64();
+                config.read_length_right = read_length_array[1].GetInt64();
+            } else {
+                throw std::runtime_error("Invalid read_length format, expected 
array of size 2");
+            }
+        }
+    }
+
+    // Validate the validity of the configuration
+    static void validate_config(const JobConfig& config) {
+        if (config.num_threads <= 0 || config.num_threads > 10000) {
+            throw std::runtime_error("num_threads must be between 1 and 
10000");
+        }
+
+        if (config.size_bytes_perfile <= 0) {
+            throw std::runtime_error("size_bytes_perfile must be positive");
+        }
+
+        if (config.read_iops > 0) {
+            if (config.read_offset_left >= config.read_offset_right) {
+                throw std::runtime_error("read_offset_left must be less than 
read_offset_right");
+            }
+
+            if (config.read_length_left >= config.read_length_right) {
+                throw std::runtime_error("read_length_left must be less than 
read_length_right");
+            }
+        }
+
+        if (config.cache_type == "TTL" && config.expiration <= 0) {
+            throw std::runtime_error("expiration must be positive when cache 
type is TTL");
+        }
+    }
+
+public:
+    std::string to_string() const {
+        return fmt::format(
+                "size_bytes_perfile: {}, write_iops: {}, read_iops: {}, 
num_threads: {}, "
+                "num_files: {}, file_prefix: {}, write_file_cache: {}, 
write_batch_size: {}, "
+                "repeat: {}, expiration: {}, cache_type: {}, read_offset: [{}, 
{}), "
+                "read_length: [{}, {})",
+                size_bytes_perfile, write_iops, read_iops, num_threads, 
num_files,
+                HIDDEN_PREFIX + file_prefix, write_file_cache, 
write_batch_size, repeat, expiration,
+                cache_type, read_offset_left, read_offset_right, 
read_length_left,
+                read_length_right);
+    }
+};
+
+// Job status
+enum class JobStatus { PENDING, RUNNING, COMPLETED, FAILED };
+
+// Job structure
+struct Job {
+    std::string job_id;
+    JobConfig config;
+    JobStatus status;
+    std::string error_message;
+    std::chrono::system_clock::time_point create_time;
+    std::chrono::system_clock::time_point start_time;
+    std::chrono::system_clock::time_point end_time;
+
+    std::shared_ptr<doris::S3RateLimiterHolder> write_limiter;
+    std::shared_ptr<doris::S3RateLimiterHolder> read_limiter;
+
+    // Job execution result statistics
+    struct Statistics {
+        std::string total_write_time;
+        std::string total_read_time;
+        // struct FileCacheStatistics
+        int64_t num_local_io_total = 0;
+        int64_t num_remote_io_total = 0;
+        int64_t num_inverted_index_remote_io_total = 0;
+        int64_t local_io_timer = 0;
+        int64_t bytes_read_from_local = 0;
+        int64_t bytes_read_from_remote = 0;
+        int64_t remote_io_timer = 0;
+        int64_t write_cache_io_timer = 0;
+        int64_t bytes_write_into_cache = 0;
+        int64_t num_skip_cache_io_total = 0;
+        int64_t read_cache_file_directly_timer = 0;
+        int64_t cache_get_or_set_timer = 0;
+        int64_t lock_wait_timer = 0;
+        int64_t get_timer = 0;
+        int64_t set_timer = 0;
+    } stats;
+
+    // Record associated file information for the job
+    std::vector<FileInfo> file_records;
+
+    // Add completion_tracker
+    std::shared_ptr<FileCompletionTracker> completion_tracker;
+
+    std::shared_ptr<bvar::LatencyRecorder> write_latency;
+    std::shared_ptr<bvar::Adder<int64_t>> write_rate_limit_s;
+    std::shared_ptr<bvar::LatencyRecorder> read_latency;
+    std::shared_ptr<bvar::Adder<int64_t>> read_rate_limit_s;
+
+    // Default constructor
+    Job() : job_id(""), status(JobStatus::PENDING), 
create_time(std::chrono::system_clock::now()) {
+        init_latency_recorders("");
+        completion_tracker = std::make_shared<FileCompletionTracker>();
+    }
+
+    // Constructor with parameters
+    Job(const std::string& id, const JobConfig& cfg)
+            : job_id(id),
+              config(cfg),
+              status(JobStatus::PENDING),
+              create_time(std::chrono::system_clock::now()) {
+        init_latency_recorders(id);
+        if (cfg.write_iops > 0 && cfg.read_iops > 0) {
+            completion_tracker = std::make_shared<FileCompletionTracker>();
+        }
+        init_limiters(cfg);
+    }
+
+private:
+    void init_latency_recorders(const std::string& id) {
+        if (config.write_iops > 0 && config.bvar_enable) {
+            write_latency =
+                    
std::make_shared<bvar::LatencyRecorder>("file_cache_microbench_append_" + id);
+            write_rate_limit_s = std::make_shared<bvar::Adder<int64_t>>(
+                    "file_cache_microbench_append_rate_limit_ns_" + id);
+        }
+
+        if (config.read_iops > 0 && config.bvar_enable) {
+            read_latency =
+                    
std::make_shared<bvar::LatencyRecorder>("file_cache_microbench_read_at_" + id);
+            read_rate_limit_s = std::make_shared<bvar::Adder<int64_t>>(
+                    "file_cache_microbench_read_rate_limit_ns_" + id);
+        }
+    }
+
+    void init_limiters(const JobConfig& cfg) {
+        if (cfg.write_iops > 0) {
+            write_limiter = std::make_shared<doris::S3RateLimiterHolder>(
+                    doris::S3RateLimitType::PUT,
+                    cfg.write_iops, // max_speed (IOPS)
+                    cfg.write_iops, // max_burst
+                    0,              // no limit
+                    [this](int64_t wait_time_ns) {
+                        if (wait_time_ns > 0 && write_rate_limit_s) {
+                            *write_rate_limit_s << wait_time_ns / NS;
+                        }
+                    });
+        }
+
+        if (cfg.read_iops > 0) {
+            read_limiter = std::make_shared<doris::S3RateLimiterHolder>(
+                    doris::S3RateLimitType::GET,
+                    cfg.read_iops, // max_speed (IOPS)
+                    cfg.read_iops, // max_burst
+                    0,             // no limit
+                    [this](int64_t wait_time_ns) {
+                        if (wait_time_ns > 0 && read_rate_limit_s) {
+                            *read_rate_limit_s << wait_time_ns / NS;
+                        }
+                    });
+        }
+    }
+};
+
+// Job manager
+class JobManager {
+public:
+    JobManager() : _next_job_id(0), 
_job_executor_pool(std::thread::hardware_concurrency()) {
+        LOG(INFO) << "Initialized JobManager with " << 
std::thread::hardware_concurrency()
+                  << " executor threads";
+    }
+
+    ~JobManager() {
+        try {
+            stop();
+        } catch (const std::exception& e) {
+            LOG(ERROR) << "Error stopping JobManager: " << e.what();
+        }
+    }
+
+    // Submit a new job
+    std::string submit_job(const JobConfig& config) {
+        try {
+            std::string job_id = generate_job_id();
+
+            {
+                std::lock_guard<std::mutex> lock(_mutex);
+                _jobs[job_id] = std::make_shared<Job>(job_id, config);
+            }
+
+            LOG(INFO) << "Submitting job " << job_id << " with config: " << 
config.to_string();
+
+            // Execute the job asynchronously
+            _job_executor_pool.enqueue(
+                    [this, job_id]() { 
execute_job_with_status_updates(job_id); });
+
+            return job_id;
+        } catch (const std::exception& e) {
+            LOG(ERROR) << "Error submitting job: " << e.what();
+            throw std::runtime_error("Failed to submit job: " + 
std::string(e.what()));
+        }
+    }
+
+    // Get job status
+    const Job& get_job_status(const std::string& job_id) {
+        std::lock_guard<std::mutex> lock(_mutex);
+        auto it = _jobs.find(job_id);
+        if (it != _jobs.end()) {
+            return *(it->second);
+        }
+        throw std::runtime_error("Job not found: " + job_id);
+    }
+
+    std::shared_ptr<Job> get_job_ptr(const std::string& job_id) {
+        std::lock_guard<std::mutex> lock(_mutex);
+        auto it = _jobs.find(job_id);
+        if (it != _jobs.end()) {
+            return it->second;
+        }
+        return nullptr;
+    }
+
+    // List all jobs
+    std::vector<std::shared_ptr<Job>> list_jobs() {
+        std::lock_guard<std::mutex> lock(_mutex);
+        std::vector<std::shared_ptr<Job>> job_list;
+        job_list.reserve(_jobs.size());
+        for (const auto& pair : _jobs) {
+            job_list.push_back(pair.second);
+        }
+        return job_list;
+    }
+
+    void start() { LOG(INFO) << "JobManager started"; }
+
+    void stop() {
+        LOG(INFO) << "Stopping JobManager and waiting for all jobs to 
complete";
+        _job_executor_pool.~ThreadPool();
+        LOG(INFO) << "JobManager stopped";
+    }
+
+    // Record file information
+    void record_file_info(const std::string& key, size_t data_size, const 
std::string& job_id) {
+        std::lock_guard<std::mutex> lock(_mutex);
+        auto it = _jobs.find(job_id);
+        if (it != _jobs.end()) {
+            FileInfo file_info = {key, data_size, job_id};
+            it->second->file_records.push_back(file_info);
+            s3_file_records.add_file_info(job_id, file_info);
+        } else {
+            LOG(ERROR) << "Job ID not found when recording file info: " << 
job_id;
+        }
+    }
+
+    // Cancel job (not implemented yet)
+    bool cancel_job(const std::string& job_id) {
+        LOG(WARNING) << "Job cancellation not implemented yet: " << job_id;
+        return false;
+    }
+
+private:
+    // Generate a unique job ID
+    std::string generate_job_id() {
+        std::lock_guard<std::mutex> lock(_mutex);
+        std::string job_id =
+                "job_" + std::to_string(std::time(nullptr)) + "_" + 
std::to_string(_next_job_id++);
+        return job_id;
+    }
+
+    // Execute job with status updates
+    void execute_job_with_status_updates(const std::string& job_id) {
+        std::shared_ptr<Job> job_ptr;
+
+        // Get job pointer and update status to RUNNING
+        {
+            std::lock_guard<std::mutex> lock(_mutex);
+            auto it = _jobs.find(job_id);
+            if (it == _jobs.end()) {
+                LOG(ERROR) << "Job not found for execution: " << job_id;
+                return;
+            }
+            job_ptr = it->second;
+            job_ptr->status = JobStatus::RUNNING;
+            job_ptr->start_time = std::chrono::system_clock::now();
+        }
+
+        LOG(INFO) << "Starting execution of job " << job_id;
+
+        try {
+            // Execute job
+            execute_job(job_id);
+
+            // Update status to COMPLETED
+            {
+                std::lock_guard<std::mutex> lock(_mutex);
+                job_ptr->status = JobStatus::COMPLETED;
+                job_ptr->end_time = std::chrono::system_clock::now();
+            }
+
+            LOG(INFO) << "Job " << job_id << " completed successfully";
+        } catch (const std::exception& e) {
+            // Update status to FAILED
+            {
+                std::lock_guard<std::mutex> lock(_mutex);
+                job_ptr->status = JobStatus::FAILED;
+                job_ptr->error_message = e.what();
+                job_ptr->end_time = std::chrono::system_clock::now();
+            }
+
+            LOG(ERROR) << "Job " << job_id << " failed: " << e.what();
+        }
+    }
+
+    // Core logic for executing a job
+    void execute_job(const std::string& job_id) {
+        std::shared_ptr<Job> job_ptr = get_job_ptr(job_id);
+        if (!job_ptr) {
+            throw std::runtime_error("Job not found");
+        }
+
+        Job& job = *job_ptr;
+        JobConfig& config = job.config;
+        LOG(INFO) << "Executing job " << job_id << " with config: " << 
config.to_string();
+
+        // Generate multiple keys
+        std::vector<std::string> keys;
+        keys.reserve(config.num_files);
+
+        std::string rewrite_job_id = job_id;
+        // If it's a read-only job, find the previously written files
+        if (config.read_iops > 0 && config.write_iops == 0) {
+            std::string old_job_id =
+                    s3_file_records.find_job_id_by_prefix(HIDDEN_PREFIX + 
config.file_prefix);
+            if (old_job_id.empty()) {
+                throw std::runtime_error(
+                        "Can't find previously job uploaded files. Please make 
sure read "
+                        "files exist in obj or It is also possible that you 
have restarted "
+                        "the file_cache_microbench program, job_id = " +
+                        job_id);
+            }
+            rewrite_job_id = old_job_id;
+        }
+
+        // Generate file keys
+        for (int i = 0; i < config.num_files; ++i) {
+            keys.push_back(HIDDEN_PREFIX + config.file_prefix + "/" + 
rewrite_job_id + "_" +
+                           std::to_string(i));
+        }
+
+        // Execute write tasks
+        if (config.write_iops > 0) {
+            execute_write_tasks(keys, job, config);
+        }
+
+        // Execute read tasks
+        if (config.read_iops > 0) {
+            execute_read_tasks(keys, job, config);
+        }
+
+        LOG(INFO) << "Job " << job_id << " execution completed";
+    }
+
+private:
+    doris::S3ClientConf create_s3_client_conf(const JobConfig& config) {
+        doris::S3ClientConf s3_conf;
+        s3_conf.max_connections = std::max(256, config.num_threads * 4);
+        s3_conf.request_timeout_ms = 60000;
+        s3_conf.connect_timeout_ms = 3000;
+        s3_conf.ak = doris::config::test_s3_ak;
+        s3_conf.sk = doris::config::test_s3_sk;
+        s3_conf.region = doris::config::test_s3_region;
+        s3_conf.endpoint = doris::config::test_s3_endpoint;
+        return s3_conf;
+    }
+
+    // Execute write tasks
+    void execute_write_tasks(const std::vector<std::string>& keys, Job& job,
+                             const JobConfig& config) {
+        // Create S3 client configuration
+        doris::S3ClientConf s3_conf = create_s3_client_conf(config);
+
+        // Initialize S3 client
+        auto client = std::make_shared<doris::io::ObjClientHolder>(s3_conf);
+        doris::Status init_status = client->init();
+        if (!init_status.ok()) {
+            throw std::runtime_error("Failed to initialize S3 client: " + 
init_status.to_string());
+        }
+
+        std::atomic<int> completed_writes(0);
+        std::vector<std::future<void>> write_futures;
+        write_futures.reserve(keys.size());
+        ThreadPool write_pool(config.num_threads);
+
+        // Start write tasks
+        doris::MonotonicStopWatch write_stopwatch;
+        write_stopwatch.start();
+        for (int i = 0; i < keys.size(); ++i) {
+            const auto& key = keys[i];
+            write_futures.push_back(write_pool.enqueue([&, key]() {
+                try {
+                    DataGenerator data_generator(config.size_bytes_perfile);
+                    doris::io::FileWriterOptions options;
+                    if (config.cache_type == "TTL") {
+                        options.file_cache_expiration = config.expiration;
+                    }
+                    options.write_file_cache = config.write_file_cache;
+                    auto writer = std::make_unique<MircobenchS3FileWriter>(
+                            client, doris::config::test_s3_bucket, key, 
&options,
+                            job.write_limiter);
+                    doris::Defer defer {[&]() {
+                        if (auto status = writer->close(); !status.ok()) {
+                            LOG(ERROR) << "close file writer failed" << 
status.to_string();
+                        }
+                    }};
+
+                    std::vector<doris::Slice> slices;
+                    slices.reserve(4);
+                    size_t accumulated_size = 0;
+
+                    // Stream data writing
+                    while (data_generator.has_more()) {
+                        doris::Slice chunk = data_generator.next_chunk(key);
+                        slices.push_back(chunk);
+                        accumulated_size += chunk.size;
+
+                        if (accumulated_size >= config.write_batch_size ||
+                            !data_generator.has_more()) {
+                            doris::Status status = 
writer->appendv(slices.data(), slices.size(),
+                                                                   
job.write_latency);
+                            if (!status.ok()) {
+                                throw std::runtime_error("Write error for key 
" + key + ": " +
+                                                         status.to_string());
+                            }
+                            slices.clear();
+                            accumulated_size = 0;
+                        }
+                    }
+                    if (job.completion_tracker) {
+                        job.completion_tracker->mark_completed(key);
+                    }
+
+                    // Record successful file information
+                    size_t data_size = config.size_bytes_perfile;
+                    record_file_info(key, data_size, job.job_id);
+                    completed_writes++;
+                } catch (const std::exception& e) {
+                    LOG(ERROR) << "Write task failed for segment " << key << 
": " << e.what();
+                }
+            }));
+        }
+
+        // Wait for all write tasks to complete
+        for (auto& future : write_futures) {
+            future.get();
+        }
+        write_stopwatch.stop();
+
+        // Convert write time from nanoseconds to seconds and format as string
+        double total_write_time_seconds =
+                write_stopwatch.elapsed_time() / 1e9; // nanoseconds to seconds
+        job.stats.total_write_time =
+                std::to_string(total_write_time_seconds) + " seconds"; // Save 
as string
+        LOG(INFO) << "Total write time: " << job.stats.total_write_time << " 
seconds";
+    }
+
+    // Execute read tasks
+    void execute_read_tasks(const std::vector<std::string>& keys, Job& job, 
JobConfig& config) {
+        LOG(INFO) << "Starting read tasks for job " << job.job_id << ", 
num_keys=" << keys.size()
+                  << ", read_iops=" << config.read_iops;
+        auto start_time = std::chrono::steady_clock::now();
+
+        int64_t exist_job_perfile_size = 
s3_file_records.get_exist_job_perfile_size_by_prefix(
+                HIDDEN_PREFIX + config.file_prefix);
+        std::vector<std::future<void>> read_futures;
+        doris::io::IOContext io_ctx;
+        doris::io::FileCacheStatistics total_stats;
+        io_ctx.file_cache_stats = &total_stats;
+        if (config.cache_type == "DISPOSABLE") {
+            io_ctx.is_disposable = true;
+        } else if (config.cache_type == "TTL") {
+            io_ctx.expiration_time = config.expiration;
+        } else if (config.cache_type == "INDEX") {
+            io_ctx.is_index_data = true;
+        } else { // default NORMAL
+            // do nothing
+        }
+        ThreadPool read_pool(config.num_threads);
+        std::atomic<int> completed_reads(0);
+        doris::MonotonicStopWatch read_stopwatch; // Add read task timer
+
+        // Create S3 client configuration
+        doris::S3ClientConf s3_conf = create_s3_client_conf(config);
+        std::vector<std::string> read_files;
+        if (exist_job_perfile_size != -1) {
+            // read exist files
+            s3_file_records.get_exist_job_files_by_prefix(HIDDEN_PREFIX + 
config.file_prefix,
+                                                          read_files, 
config.num_files);
+        }
+
+        if (read_files.empty()) {
+            // not read exist files
+            read_files = keys;
+        }
+        LOG(INFO) << "job_id = " << job.job_id << " read_files size = " << 
read_files.size();
+
+        read_stopwatch.start();
+        for (int i = 0; i < read_files.size(); ++i) {
+            const auto& key = read_files[i];
+            read_futures.push_back(read_pool.enqueue([&, key]() {
+                try {
+                    if (job.completion_tracker) {
+                        job.completion_tracker->wait_for_completion(
+                                key); // Wait for file completion
+                    }
+                    doris::io::FileReaderOptions reader_opts;
+                    reader_opts.cache_type = 
doris::io::FileCachePolicy::FILE_BLOCK_CACHE;
+                    reader_opts.is_doris_table = true;
+
+                    doris::io::FileDescription fd;
+                    std::string obj_path = "s3://" + 
doris::config::test_s3_bucket + "/";
+                    fd.path = doris::io::Path(obj_path + key);
+                    fd.file_size = exist_job_perfile_size != -1 ? 
exist_job_perfile_size
+                                                                : 
config.size_bytes_perfile;
+                    doris::io::FileSystemProperties fs_props;
+                    fs_props.system_type = doris::TFileType::FILE_S3;
+
+                    std::map<std::string, std::string> props;
+                    props["AWS_ACCESS_KEY"] = s3_conf.ak;
+                    props["AWS_SECRET_KEY"] = s3_conf.sk;
+                    props["AWS_ENDPOINT"] = s3_conf.endpoint;
+                    props["AWS_REGION"] = s3_conf.region;
+                    props["AWS_MAX_CONNECTIONS"] = 
std::to_string(s3_conf.max_connections);
+                    props["AWS_REQUEST_TIMEOUT_MS"] = 
std::to_string(s3_conf.request_timeout_ms);
+                    props["AWS_CONNECT_TIMEOUT_MS"] = 
std::to_string(s3_conf.connect_timeout_ms);
+                    props["use_path_style"] = s3_conf.use_virtual_addressing ? 
"false" : "true";
+
+                    fs_props.properties = std::move(props);
+
+                    int read_retry_count = 0;
+                    const int max_read_retries = 50;
+                    while (read_retry_count < max_read_retries) {
+                        auto status_or_reader = 
doris::FileFactory::create_file_reader(
+                                fs_props, fd, reader_opts, nullptr);
+                        if (!status_or_reader.has_value()) {
+                            if (++read_retry_count >= max_read_retries) {
+                                LOG(ERROR) << "Failed to create reader for key 
" << key
+                                           << status_or_reader.error();
+                            }
+                            
std::this_thread::sleep_for(std::chrono::seconds(1));
+                            continue;
+                        }
+
+                        for (int i = 0; i < config.repeat; i++) {
+                            auto reader = 
std::make_unique<MicrobenchFileReader>(
+                                    status_or_reader.value(), 
job.read_limiter);
+                            doris::Defer defer {[&]() {
+                                if (auto status = reader->close(); 
!status.ok()) {
+                                    LOG(ERROR) << "close file reader failed" 
<< status.to_string();
+                                }
+                            }};
+
+                            size_t read_offset = 0;
+                            size_t read_length = 0;
+
+                            bool use_random = true;
+                            if (config.read_offset_left + 1 == 
config.read_offset_right) {
+                                use_random = false;
+                            }
+                            if (exist_job_perfile_size != -1) {
+                                // read exist files
+                                if (config.read_offset_right > 
exist_job_perfile_size) {
+                                    config.read_offset_right = 
exist_job_perfile_size;
+                                }
+                                if (config.read_length_right > 
exist_job_perfile_size) {
+                                    config.read_length_right = 
exist_job_perfile_size;
+                                }
+
+                                if (use_random) {
+                                    std::random_device rd;
+                                    std::mt19937 gen(rd());
+                                    // Generate random read_offset between 
read_offset_left and read_offset_right - 1
+                                    std::uniform_int_distribution<size_t> 
dis_offset(
+                                            config.read_offset_left, 
config.read_offset_right - 1);
+                                    read_offset = dis_offset(gen); // Generate 
random read_offset
+                                    std::uniform_int_distribution<size_t> 
dis_length(
+                                            config.read_length_left, 
config.read_length_right - 1);
+                                    read_length = dis_length(gen); // Generate 
random read_length
+                                    if (read_offset + read_length > 
exist_job_perfile_size) {
+                                        read_length = exist_job_perfile_size - 
read_offset;
+                                    }
+                                } else { // not random
+                                    read_offset = config.read_offset_left;
+                                    read_length = config.read_length_left;
+                                }
+                            } else {
+                                // new files
+                                read_offset = config.read_offset_left;
+                                read_length = config.read_length_left;
+                                if (read_length == -1 ||
+                                    read_offset + read_length > 
config.size_bytes_perfile) {
+                                    read_length = config.size_bytes_perfile - 
read_offset;
+                                }
+                            }
+                            LOG(INFO) << "read_offset=" << read_offset
+                                      << " read_length=" << read_length;
+                            CHECK(read_offset >= 0)
+                                    << "Calculated read_offset is negative: " 
<< read_offset;
+                            CHECK(read_length >= 0)
+                                    << "Calculated read_length is negative: " 
<< read_length;
+
+                            std::string read_buffer;
+                            read_buffer.resize(read_length);
+
+                            size_t total_bytes_read = 0;
+                            while (total_bytes_read < read_length) {
+                                size_t bytes_to_read = std::min(
+                                        read_length - total_bytes_read,
+                                        static_cast<size_t>(4 * 1024 * 1024)); 
// 4MB chunks
+
+                                doris::Slice read_slice(read_buffer.data() + 
total_bytes_read,
+                                                        bytes_to_read);
+                                size_t bytes_read = 0;
+
+                                doris::Status read_status =
+                                        reader->read_at(read_offset + 
total_bytes_read, read_slice,
+                                                        &bytes_read, &io_ctx, 
job.read_latency);
+
+                                if (!read_status.ok()) {
+                                    throw std::runtime_error("Read error: " +
+                                                             
read_status.to_string());
+                                }
+
+                                if (bytes_read != bytes_to_read) {
+                                    throw std::runtime_error("Incomplete read: 
expected " +
+                                                             
std::to_string(bytes_to_read) +
+                                                             " bytes, got " +
+                                                             
std::to_string(bytes_read));
+                                }
+
+                                total_bytes_read += bytes_read;
+                            }
+
+                            size_t file_size = config.size_bytes_perfile;
+                            if (exist_job_perfile_size != -1) {
+                                file_size = exist_job_perfile_size;
+                            }
+
+                            // Verify read data
+                            if (!DataVerifier::verify_data(key, file_size, 
read_offset, read_buffer,
+                                                           read_length)) {
+                                throw std::runtime_error("Data verification 
failed for key: " +
+                                                         key);
+                            }
+
+                            LOG(INFO)
+                                    << "read_offset=" << read_offset
+                                    << " read_length=" << read_length << " 
file_size=" << file_size;
+
+                            completed_reads++;
+                        }
+                        break;
+                    }
+                } catch (const std::exception& e) {
+                    LOG(ERROR) << "Read task failed for key " << key << ": " 
<< e.what();
+                }
+            }));
+        }
+
+        // Wait for all read tasks to complete
+        for (auto& future : read_futures) {
+            future.get();
+        }
+        read_stopwatch.stop(); // Stop timer
+
+        // Convert read time from nanoseconds to seconds and format as string
+        double total_read_time_seconds =
+                read_stopwatch.elapsed_time() / 1e9; // nanoseconds to seconds
+        job.stats.total_read_time =
+                std::to_string(total_read_time_seconds) + " seconds"; // Save 
as string
+        LOG(INFO) << "Total read time: " << job.stats.total_read_time << " 
seconds";
+
+        // Update job statistics
+        job.stats.num_local_io_total = total_stats.num_local_io_total;
+        job.stats.num_remote_io_total = total_stats.num_remote_io_total;
+        job.stats.num_inverted_index_remote_io_total =
+                total_stats.num_inverted_index_remote_io_total;
+        job.stats.local_io_timer = total_stats.local_io_timer;
+        job.stats.bytes_read_from_local = total_stats.bytes_read_from_local;
+        job.stats.bytes_read_from_remote = total_stats.bytes_read_from_remote;
+        job.stats.remote_io_timer = total_stats.remote_io_timer;
+        job.stats.write_cache_io_timer = total_stats.write_cache_io_timer;
+        job.stats.bytes_write_into_cache = total_stats.bytes_write_into_cache;
+        job.stats.num_skip_cache_io_total = 
total_stats.num_skip_cache_io_total;
+        job.stats.read_cache_file_directly_timer = 
total_stats.read_cache_file_directly_timer;
+        job.stats.cache_get_or_set_timer = total_stats.cache_get_or_set_timer;
+        job.stats.lock_wait_timer = total_stats.lock_wait_timer;
+        job.stats.get_timer = total_stats.lock_wait_timer;
+        job.stats.set_timer = total_stats.lock_wait_timer;
+
+        auto end_time = std::chrono::steady_clock::now();
+        auto duration =
+                std::chrono::duration_cast<std::chrono::milliseconds>(end_time 
- start_time);
+        LOG(INFO) << "Completed read tasks for job " << job.job_id
+                  << ", duration=" << duration.count() << "ms";
+    }
+
+    std::mutex _mutex;
+    std::atomic<int> _next_job_id;
+    std::map<std::string, std::shared_ptr<Job>> _jobs;
+    ThreadPool _job_executor_pool;
+};
+
+namespace microbenchService {
+
+class MicrobenchServiceImpl : public microbench::MicrobenchService {
+public:
+    MicrobenchServiceImpl(JobManager& job_manager) : _job_manager(job_manager) 
{}
+    virtual ~MicrobenchServiceImpl() {}
+
+    /**
+     * Submit a job
+     * 
+     * Receive JSON-formatted job configuration, create and submit the job
+     * Return a JSON response containing the job ID
+     */
+    void submit_job(google::protobuf::RpcController* cntl_base,
+                    const microbench::HttpRequest* request, 
microbench::HttpResponse* response,
+                    google::protobuf::Closure* done) {
+        brpc::ClosureGuard done_guard(done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+        LOG(INFO) << "Received submit job request";
+
+        try {
+            // Parse request body JSON
+            std::string job_config = cntl->request_attachment().to_string();
+            JobConfig config = JobConfig::from_json(job_config);
+
+            LOG(INFO) << "Parsed JobConfig: " << config.to_string();
+
+            std::string job_id = _job_manager.submit_job(config);
+            LOG(INFO) << "Job submitted successfully with ID: " << job_id;
+
+            // Set response headers
+            cntl->http_response().set_content_type("application/json");
+
+            // Return job_id
+            rapidjson::Document response_doc;
+            response_doc.SetObject();
+            rapidjson::Document::AllocatorType& allocator = 
response_doc.GetAllocator();
+            response_doc.AddMember("job_id", rapidjson::Value(job_id.c_str(), 
allocator),
+                                   allocator);
+            response_doc.AddMember("status", "success", allocator);
+
+            // Serialize to string
+            rapidjson::StringBuffer buffer;
+            rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+            response_doc.Accept(writer);
+
+            cntl->response_attachment().append(buffer.GetString());
+        } catch (const std::exception& e) {
+            LOG(ERROR) << "Error submitting job: " << e.what();
+
+            // Set error status code and response
+            
cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST);
+            cntl->http_response().set_content_type("application/json");
+
+            // Build error response
+            rapidjson::Document error_doc;
+            error_doc.SetObject();
+            rapidjson::Document::AllocatorType& allocator = 
error_doc.GetAllocator();
+            error_doc.AddMember("status", "error", allocator);
+            error_doc.AddMember("message", rapidjson::Value(e.what(), 
allocator), allocator);
+
+            // Serialize to string
+            rapidjson::StringBuffer buffer;
+            rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+            error_doc.Accept(writer);
+
+            cntl->response_attachment().append(buffer.GetString());
+        }
+    }
+
+    /**
+     * Get job status
+     * 
+     * Return detailed job status information based on job ID
+     * Optional parameter 'files' is used to limit the number of file records 
returned
+     */
+    void get_job_status(google::protobuf::RpcController* cntl_base,
+                        const microbench::HttpRequest* request, 
microbench::HttpResponse* response,
+                        google::protobuf::Closure* done) {
+        brpc::ClosureGuard done_guard(done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+        std::string job_id = cntl->http_request().unresolved_path();
+        const std::string* files_value = 
cntl->http_request().uri().GetQuery("files");
+        size_t max_files = 1000; // Set maximum file record limit
+
+        if (files_value != nullptr) {
+            try {
+                max_files = std::stoi(*files_value);
+            } catch (const std::exception& e) {
+                LOG(WARNING) << "Invalid files parameter: " << *files_value
+                             << ", using default, error: " << e.what();
+            }
+        }
+
+        LOG(INFO) << "Received get_job_status request for job " << job_id
+                  << ", max_files=" << max_files;
+
+        try {
+            const Job& job = _job_manager.get_job_status(job_id);
+
+            // Set response headers
+            cntl->http_response().set_content_type("application/json");
+
+            // Build JSON response
+            rapidjson::Document d;
+            d.SetObject();
+            rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
+
+            d.AddMember("job_id", rapidjson::Value(job.job_id.c_str(), 
allocator), allocator);
+            d.AddMember("status",
+                        
rapidjson::Value(get_status_string(job.status).c_str(), allocator),
+                        allocator);
+
+            // Add time information
+            add_time_info(d, allocator, job);
+
+            // Add error information (if any)
+            if (!job.error_message.empty()) {
+                d.AddMember("error_message", 
rapidjson::Value(job.error_message.c_str(), allocator),
+                            allocator);
+            }
+
+            // Add configuration information
+            add_config_info(d, allocator, job.config);
+
+            // Add statistics information
+            add_stats_info(d, allocator, job.stats);
+
+            // Add file records (if requested)
+            if (files_value) {
+                add_file_records(d, allocator, job.file_records, max_files);
+            }
+
+            // Serialize to string
+            rapidjson::StringBuffer buffer;
+            rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+            d.Accept(writer);
+
+            cntl->response_attachment().append(buffer.GetString());
+        } catch (const std::exception& e) {
+            LOG(ERROR) << "Error getting job status: " << e.what();
+
+            // Set error status code and response
+            cntl->http_response().set_status_code(brpc::HTTP_STATUS_NOT_FOUND);
+            cntl->http_response().set_content_type("application/json");
+
+            // Build error response
+            rapidjson::Document error_doc;
+            error_doc.SetObject();
+            rapidjson::Document::AllocatorType& allocator = 
error_doc.GetAllocator();
+            error_doc.AddMember("status", "error", allocator);
+            error_doc.AddMember("message", "Job not found", allocator);
+            error_doc.AddMember("exception", rapidjson::Value(e.what(), 
allocator), allocator);
+
+            // Serialize to string
+            rapidjson::StringBuffer buffer;
+            rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+            error_doc.Accept(writer);
+
+            cntl->response_attachment().append(buffer.GetString());
+        }
+    }
+
+    /**
+     * List all jobs
+     * 
+     * Return a list of basic information for all jobs
+     */
+    void list_jobs(google::protobuf::RpcController* cntl_base,
+                   const microbench::HttpRequest* request, 
microbench::HttpResponse* response,
+                   google::protobuf::Closure* done) {
+        brpc::ClosureGuard done_guard(done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+        LOG(INFO) << "Received list_jobs request";
+
+        try {
+            std::vector<std::shared_ptr<Job>> jobs = _job_manager.list_jobs();
+
+            // Set response headers
+            cntl->http_response().set_content_type("application/json");
+
+            // Build JSON response
+            rapidjson::Document d;
+            d.SetObject();
+            rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
+
+            rapidjson::Value jobs_array(rapidjson::kArrayType);
+            for (const auto& job : jobs) {
+                rapidjson::Value job_obj(rapidjson::kObjectType);
+                job_obj.AddMember("job_id", 
rapidjson::Value(job->job_id.c_str(), allocator),
+                                  allocator);
+                job_obj.AddMember(
+                        "status",
+                        
rapidjson::Value(get_status_string(job->status).c_str(), allocator),
+                        allocator);
+
+                // Add creation time
+                auto create_time_t = 
std::chrono::system_clock::to_time_t(job->create_time);
+                std::string create_time_str = std::ctime(&create_time_t);
+                if (!create_time_str.empty() && create_time_str.back() == 
'\n') {
+                    create_time_str.pop_back(); // Remove trailing newline 
character
+                }
+                job_obj.AddMember("create_time",
+                                  rapidjson::Value(create_time_str.c_str(), 
allocator), allocator);
+
+                // Add file prefix
+                job_obj.AddMember("file_prefix",
+                                  
rapidjson::Value(job->config.file_prefix.c_str(), allocator),
+                                  allocator);
+
+                jobs_array.PushBack(job_obj, allocator);
+            }
+
+            d.AddMember("jobs", jobs_array, allocator);
+            d.AddMember("total", 
rapidjson::Value(static_cast<int>(jobs.size())), allocator);
+
+            // Serialize to string
+            rapidjson::StringBuffer buffer;
+            rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+            d.Accept(writer);
+
+            cntl->response_attachment().append(buffer.GetString());
+        } catch (const std::exception& e) {
+            LOG(ERROR) << "Error listing jobs: " << e.what();
+
+            // Set error status code and response
+            
cntl->http_response().set_status_code(brpc::HTTP_STATUS_INTERNAL_SERVER_ERROR);
+            cntl->http_response().set_content_type("application/json");
+
+            // Build error response
+            rapidjson::Document error_doc;
+            error_doc.SetObject();
+            rapidjson::Document::AllocatorType& allocator = 
error_doc.GetAllocator();
+            error_doc.AddMember("status", "error", allocator);
+            error_doc.AddMember("message", rapidjson::Value(e.what(), 
allocator), allocator);
+
+            // Serialize to string
+            rapidjson::StringBuffer buffer;
+            rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+            error_doc.Accept(writer);
+
+            cntl->response_attachment().append(buffer.GetString());
+        }
+    }
+
+    /**
+     * Cancel a job
+     * 
+     * Attempt to cancel the specified job (currently not implemented)
+     */
+    void cancel_job(google::protobuf::RpcController* cntl_base,
+                    const microbench::HttpRequest* request, 
microbench::HttpResponse* response,
+                    google::protobuf::Closure* done) {
+        brpc::ClosureGuard done_guard(done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+        std::string job_id = cntl->http_request().unresolved_path();
+        LOG(INFO) << "Received cancel_job request for job " << job_id;
+
+        // Set response headers
+        cntl->http_response().set_content_type("application/json");
+        
cntl->http_response().set_status_code(brpc::HTTP_STATUS_NOT_IMPLEMENTED);
+
+        // Build response
+        rapidjson::Document d;
+        d.SetObject();
+        rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
+        d.AddMember("status", "error", allocator);
+        d.AddMember("message", "Job cancellation not implemented", allocator);
+
+        // Serialize to string
+        rapidjson::StringBuffer buffer;
+        rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+        d.Accept(writer);
+
+        cntl->response_attachment().append(buffer.GetString());
+    }
+
+    /**
+     * Get help information
+     * 
+     * Return usage instructions for the tool
+     */
+    void get_help(google::protobuf::RpcController* cntl_base,
+                  const microbench::HttpRequest* request, 
microbench::HttpResponse* response,
+                  google::protobuf::Closure* done) {
+        brpc::ClosureGuard done_guard(done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+        LOG(INFO) << "Received get_help request";
+
+        // Get usage help information
+        std::string help_info = get_usage("Doris Microbench Tool");
+
+        // Return help information
+        cntl->response_attachment().append(help_info);
+    }
+
+    /**
+     * Clear file cache
+     * 
+     * Clear file cache for the specified path or all caches
+     */
+    void file_cache_clear(google::protobuf::RpcController* cntl_base,
+                          const microbench::HttpRequest* request,
+                          microbench::HttpResponse* response, 
google::protobuf::Closure* done) {
+        brpc::ClosureGuard done_guard(done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+        const std::string* sync_str = 
cntl->http_request().uri().GetQuery("sync");
+        const std::string* segment_path = 
cntl->http_request().uri().GetQuery("segment_path");
+
+        LOG(INFO) << "Received file_cache_clear request, sync=" << (sync_str ? 
*sync_str : "")
+                  << ", segment_path=" << (segment_path ? *segment_path : "");
+
+        try {
+            bool sync = sync_str ? (doris::to_lower(*sync_str) == "true") : 
false;
+
+            if (segment_path == nullptr) {
+                // Clear all caches
+                FileCacheFactory::instance()->clear_file_caches(sync);
+                LOG(INFO) << "Cleared all file caches, sync=" << sync;
+            } else {
+                // Clear cache for specific path
+                doris::io::UInt128Wrapper hash = 
doris::io::BlockFileCache::hash(*segment_path);
+                doris::io::BlockFileCache* cache = 
FileCacheFactory::instance()->get_by_path(hash);
+                if (cache) {
+                    cache->remove_if_cached(hash);
+                    LOG(INFO) << "Cleared cache for path: " << *segment_path;
+                } else {
+                    LOG(WARNING) << "No cache found for path: " << 
*segment_path;
+                }
+            }
+
+            // Set response headers
+            cntl->http_response().set_content_type("application/json");
+
+            // Build success response
+            rapidjson::Document d;
+            d.SetObject();
+            rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
+            d.AddMember("status", "OK", allocator);
+
+            // Serialize to string
+            rapidjson::StringBuffer buffer;
+            rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+            d.Accept(writer);
+
+            cntl->response_attachment().append(buffer.GetString());
+        } catch (const std::exception& e) {
+            LOG(ERROR) << "Error clearing file cache: " << e.what();
+
+            // Set error status code and response
+            
cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST);
+            cntl->http_response().set_content_type("application/json");
+
+            // Build error response
+            rapidjson::Document error_doc;
+            error_doc.SetObject();
+            rapidjson::Document::AllocatorType& allocator = 
error_doc.GetAllocator();
+            error_doc.AddMember("status", "error", allocator);
+            error_doc.AddMember("message", rapidjson::Value(e.what(), 
allocator), allocator);
+
+            // Serialize to string
+            rapidjson::StringBuffer buffer;
+            rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+            error_doc.Accept(writer);
+
+            cntl->response_attachment().append(buffer.GetString());
+        }
+    }
+
+    /**
+     * Reset file cache
+     * 
+     * Reset file cache for the specified path or all caches
+     */
+    void file_cache_reset(google::protobuf::RpcController* cntl_base,
+                          const microbench::HttpRequest* request,
+                          microbench::HttpResponse* response, 
google::protobuf::Closure* done) {
+        brpc::ClosureGuard done_guard(done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+        LOG(INFO) << "Received file_cache_reset request";
+
+        try {
+            const std::string* capacity_str = 
cntl->http_request().uri().GetQuery("capacity");
+            int64_t new_capacity = 0;
+            new_capacity = std::stoll(*capacity_str);
+            if (new_capacity <= 0) {
+                LOG(ERROR) << "Invalid capacity: " << (capacity_str ? 
*capacity_str : "null");
+                throw std::runtime_error("Invalid capacity");
+            }
+            const std::string* path_str = 
cntl->http_request().uri().GetQuery("path");
+            if (path_str == nullptr) {
+                LOG(ERROR) << "Path is empty";
+                throw std::runtime_error("Path is empty");
+            }
+            std::string path = *path_str;
+            auto ret = FileCacheFactory::instance()->reset_capacity(path, 
new_capacity);
+            LOG(INFO) << "Reset capacity for path: " << path << ", new 
capacity: " << new_capacity
+                      << ", result: " << ret;
+
+            // Set response headers
+            cntl->http_response().set_content_type("application/json");
+
+            // Build success response
+            rapidjson::Document d;
+            d.SetObject();
+            rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
+            d.AddMember("status", "OK", allocator);
+
+            // Serialize to string
+            rapidjson::StringBuffer buffer;
+            rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+            d.Accept(writer);
+
+            cntl->response_attachment().append(buffer.GetString());
+        } catch (const std::exception& e) {
+            LOG(ERROR) << "Error resetting file cache: " << e.what();
+
+            // Set error status code and response
+            
cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST);
+            cntl->http_response().set_content_type("application/json");
+
+            // Build error response
+            rapidjson::Document error_doc;
+            error_doc.SetObject();
+            rapidjson::Document::AllocatorType& allocator = 
error_doc.GetAllocator();
+            error_doc.AddMember("status", "error", allocator);
+            error_doc.AddMember("message", rapidjson::Value(e.what(), 
allocator), allocator);
+
+            // Serialize to string
+            rapidjson::StringBuffer buffer;
+            rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+            error_doc.Accept(writer);
+
+            cntl->response_attachment().append(buffer.GetString());
+        }
+    }
+
+    /**
+     * Release file cache
+     * 
+     * Release file cache for the specified path or all caches
+     */
+    void file_cache_release(google::protobuf::RpcController* cntl_base,
+                            const microbench::HttpRequest* request,
+                            microbench::HttpResponse* response, 
google::protobuf::Closure* done) {
+        brpc::ClosureGuard done_guard(done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+        LOG(INFO) << "Received file_cache_release request";
+
+        try {
+            const std::string* base_path_str = 
cntl->http_request().uri().GetQuery("base_path");
+            size_t released = 0;
+            if (base_path_str == nullptr) {
+                released = FileCacheFactory::instance()->try_release();
+            } else {
+                released = 
FileCacheFactory::instance()->try_release(*base_path_str);
+            }
+            LOG(INFO) << "Released file caches: " << released
+                      << " path: " << (base_path_str ? *base_path_str : 
"null");
+
+            // Set response headers
+            cntl->http_response().set_content_type("application/json");
+
+            // Build success response
+            rapidjson::Document d;
+            d.SetObject();
+            rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
+            d.AddMember("status", "OK", allocator);
+            d.AddMember("released_elements", released, allocator);
+
+            // Serialize to string
+            rapidjson::StringBuffer buffer;
+            rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+            d.Accept(writer);
+
+            cntl->response_attachment().append(buffer.GetString());
+        } catch (const std::exception& e) {
+            LOG(ERROR) << "Error releasing file cache: " << e.what();
+
+            // Set error status code and response
+            
cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST);
+            cntl->http_response().set_content_type("application/json");
+
+            // Build error response
+            rapidjson::Document error_doc;
+            error_doc.SetObject();
+            rapidjson::Document::AllocatorType& allocator = 
error_doc.GetAllocator();
+            error_doc.AddMember("status", "error", allocator);
+            error_doc.AddMember("message", rapidjson::Value(e.what(), 
allocator), allocator);
+
+            // Serialize to string
+            rapidjson::StringBuffer buffer;
+            rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+            error_doc.Accept(writer);
+
+            cntl->response_attachment().append(buffer.GetString());
+        }
+    }
+
+    /**
+     * Update configuration
+     */
+    void update_config(google::protobuf::RpcController* cntl_base,
+                       const microbench::HttpRequest* request, 
microbench::HttpResponse* response,
+                       google::protobuf::Closure* done) {
+        brpc::ClosureGuard done_guard(done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+        LOG(INFO) << "Received update_config request";
+
+        try {
+            bool need_persist = false;
+            const std::string* persist_str = 
cntl->http_request().uri().GetQuery("persist");
+            if (persist_str && *persist_str == "true") {
+                need_persist = true;
+            }
+            cntl->http_request().uri().RemoveQuery("persist");
+            std::string key = "";
+            std::string value = "";
+            for (brpc::URI::QueryIterator it = 
cntl->http_request().uri().QueryBegin();
+                 it != cntl->http_request().uri().QueryEnd(); ++it) {
+                key = it->first;
+                value = it->second;
+                auto s = doris::config::set_config(key, value, need_persist);
+                if (s.ok()) {
+                    LOG(INFO) << "set_config " << key << "=" << value
+                              << " success. persist: " << need_persist;
+                } else {
+                    LOG(WARNING) << "set_config " << key << "=" << value << " 
failed";
+                }
+                // just support update one config
+                break;
+            }
+
+            // Set response headers
+            cntl->http_response().set_content_type("application/json");
+
+            // Build success response
+            rapidjson::Document d;
+            d.SetObject();
+            rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
+            d.AddMember("status", "OK", allocator);
+            d.AddMember(rapidjson::Value(key.c_str(), allocator),
+                        rapidjson::Value(value.c_str(), allocator), allocator);
+
+            // Serialize to string
+            rapidjson::StringBuffer buffer;
+            rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+            d.Accept(writer);
+
+            cntl->response_attachment().append(buffer.GetString());
+        } catch (const std::exception& e) {
+            LOG(ERROR) << "Error updating config: " << e.what();
+
+            // Set error status code and response
+            
cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST);
+            cntl->http_response().set_content_type("application/json");
+
+            // Build error response
+            rapidjson::Document error_doc;
+            error_doc.SetObject();
+            rapidjson::Document::AllocatorType& allocator = 
error_doc.GetAllocator();
+            error_doc.AddMember("status", "error", allocator);
+            error_doc.AddMember("message", rapidjson::Value(e.what(), 
allocator), allocator);
+
+            // Serialize to string
+            rapidjson::StringBuffer buffer;
+            rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+            error_doc.Accept(writer);
+
+            cntl->response_attachment().append(buffer.GetString());
+        }
+    }
+
+    /**
+     * Show configuration
+     */
+    void show_config(google::protobuf::RpcController* cntl_base,
+                     const microbench::HttpRequest* request, 
microbench::HttpResponse* response,
+                     google::protobuf::Closure* done) {
+        brpc::ClosureGuard done_guard(done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+        LOG(INFO) << "Received show_config request";
+
+        try {
+            std::vector<std::vector<std::string>> config_info = 
doris::config::get_config_info();
+            rapidjson::Document d;
+            d.SetObject();
+            rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
+
+            rapidjson::StringBuffer buffer;
+            rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+
+            // Write config array
+            writer.StartArray();
+            const std::string* conf_item_str = 
cntl->http_request().uri().GetQuery("conf_item");
+            std::string conf_item = conf_item_str ? *conf_item_str : "";
+            for (const auto& _config : config_info) {
+                if (!conf_item.empty()) {
+                    if (_config[0] == conf_item) {
+                        writer.StartArray();
+                        for (const std::string& config_filed : _config) {
+                            writer.String(config_filed.c_str());
+                        }
+                        writer.EndArray();
+                        break;
+                    }
+                } else {
+                    writer.StartArray();
+                    for (const std::string& config_filed : _config) {
+                        writer.String(config_filed.c_str());
+                    }
+                    writer.EndArray();
+                }
+            }
+            writer.EndArray();
+
+            // Set response headers
+            cntl->http_response().set_content_type("application/json");
+
+            // Build success response
+            d.AddMember("status", "OK", allocator);
+            d.AddMember("config", rapidjson::Value(buffer.GetString(), 
allocator), allocator);
+
+            buffer.Clear();
+            d.Accept(writer);
+
+            cntl->response_attachment().append(buffer.GetString());
+        } catch (const std::exception& e) {
+            LOG(ERROR) << "Error showing config: " << e.what();
+
+            // Set error status code and response
+            
cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST);
+            cntl->http_response().set_content_type("application/json");
+
+            // Build error response
+            rapidjson::Document error_doc;
+            error_doc.SetObject();
+            rapidjson::Document::AllocatorType& allocator = 
error_doc.GetAllocator();
+            error_doc.AddMember("status", "error", allocator);
+            error_doc.AddMember("message", rapidjson::Value(e.what(), 
allocator), allocator);
+
+            // Serialize to string
+            rapidjson::StringBuffer buffer;
+            rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+            error_doc.Accept(writer);
+
+            cntl->response_attachment().append(buffer.GetString());
+        }
+    }
+
+private:
+    // Get string representation of job status
+    std::string get_status_string(JobStatus status) {
+        switch (status) {
+        case JobStatus::PENDING:
+            return "PENDING";
+        case JobStatus::RUNNING:
+            return "RUNNING";
+        case JobStatus::COMPLETED:
+            return "COMPLETED";
+        case JobStatus::FAILED:
+            return "FAILED";
+        default:
+            return "UNKNOWN";
+        }
+    }
+
+    // Add time information to JSON response
+    void add_time_info(rapidjson::Document& doc, 
rapidjson::Document::AllocatorType& allocator,
+                       const Job& job) {
+        // Add creation time
+        auto create_time_t = 
std::chrono::system_clock::to_time_t(job.create_time);
+        std::string create_time_str = std::ctime(&create_time_t);
+        if (!create_time_str.empty() && create_time_str.back() == '\n') {
+            create_time_str.pop_back(); // Remove trailing newline character
+        }
+        doc.AddMember("create_time", rapidjson::Value(create_time_str.c_str(), 
allocator),
+                      allocator);
+
+        // Add start time (if available)
+        if (job.status != JobStatus::PENDING) {
+            auto start_time_t = 
std::chrono::system_clock::to_time_t(job.start_time);
+            std::string start_time_str = std::ctime(&start_time_t);
+            if (!start_time_str.empty() && start_time_str.back() == '\n') {
+                start_time_str.pop_back();
+            }
+            doc.AddMember("start_time", 
rapidjson::Value(start_time_str.c_str(), allocator),
+                          allocator);
+        }
+
+        // Add end time (if available)
+        if (job.status == JobStatus::COMPLETED || job.status == 
JobStatus::FAILED) {
+            auto end_time_t = 
std::chrono::system_clock::to_time_t(job.end_time);
+            std::string end_time_str = std::ctime(&end_time_t);
+            if (!end_time_str.empty() && end_time_str.back() == '\n') {
+                end_time_str.pop_back();
+            }
+            doc.AddMember("end_time", rapidjson::Value(end_time_str.c_str(), 
allocator), allocator);
+
+            // Calculate duration of the run
+            auto duration =
+                    
std::chrono::duration_cast<std::chrono::seconds>(job.end_time - job.start_time)
+                            .count();
+            doc.AddMember("duration_seconds", duration, allocator);
+        }
+    }
+
+    // Add configuration information to JSON response
+    void add_config_info(rapidjson::Document& doc, 
rapidjson::Document::AllocatorType& allocator,
+                         const JobConfig& config) {
+        rapidjson::Value config_obj(rapidjson::kObjectType);
+
+        config_obj.AddMember("size_bytes_perfile", config.size_bytes_perfile, 
allocator);
+        config_obj.AddMember("write_iops", config.write_iops, allocator);
+        config_obj.AddMember("read_iops", config.read_iops, allocator);
+        config_obj.AddMember("num_threads", config.num_threads, allocator);
+        config_obj.AddMember("num_files", config.num_files, allocator);
+        config_obj.AddMember("file_prefix", 
rapidjson::Value(config.file_prefix.c_str(), allocator),
+                             allocator);
+        config_obj.AddMember("cache_type", 
rapidjson::Value(config.cache_type.c_str(), allocator),
+                             allocator);
+        config_obj.AddMember("expiration", config.expiration, allocator);
+        config_obj.AddMember("repeat", config.repeat, allocator);
+        config_obj.AddMember("write_batch_size", config.write_batch_size, 
allocator);
+        config_obj.AddMember("write_file_cache", config.write_file_cache, 
allocator);
+        config_obj.AddMember("bvar_enable", config.bvar_enable, allocator);
+
+        // Add read offset (if applicable)
+        if (config.read_iops > 0) {
+            rapidjson::Value read_offset_array(rapidjson::kArrayType);
+            read_offset_array.PushBack(config.read_offset_left, allocator);
+            read_offset_array.PushBack(config.read_offset_right, allocator);
+            config_obj.AddMember("read_offset", read_offset_array, allocator);
+
+            rapidjson::Value read_length_array(rapidjson::kArrayType);
+            read_length_array.PushBack(config.read_length_left, allocator);
+            read_length_array.PushBack(config.read_length_right, allocator);
+            config_obj.AddMember("read_length", read_length_array, allocator);
+        }
+
+        doc.AddMember("config", config_obj, allocator);
+    }
+
+    // Add statistics information to JSON response
+    void add_stats_info(rapidjson::Document& doc, 
rapidjson::Document::AllocatorType& allocator,
+                        const Job::Statistics& stats) {
+        rapidjson::Value stats_obj(rapidjson::kObjectType);
+
+        stats_obj.AddMember("total_write_time",
+                            rapidjson::Value(stats.total_write_time.c_str(), 
allocator), allocator);
+        stats_obj.AddMember("total_read_time",
+                            rapidjson::Value(stats.total_read_time.c_str(), 
allocator), allocator);
+
+        // struct FileCacheStatistics
+        stats_obj.AddMember("num_local_io_total", 
static_cast<uint64_t>(stats.num_local_io_total),
+                            allocator);
+        stats_obj.AddMember("num_remote_io_total", 
static_cast<uint64_t>(stats.num_remote_io_total),
+                            allocator);
+        stats_obj.AddMember("num_inverted_index_remote_io_total",
+                            
static_cast<uint64_t>(stats.num_inverted_index_remote_io_total),
+                            allocator);
+        stats_obj.AddMember("local_io_timer", 
static_cast<uint64_t>(stats.local_io_timer),
+                            allocator);
+        stats_obj.AddMember("bytes_read_from_local",
+                            
static_cast<uint64_t>(stats.bytes_read_from_local), allocator);
+        stats_obj.AddMember("bytes_read_from_remote",
+                            
static_cast<uint64_t>(stats.bytes_read_from_remote), allocator);
+        stats_obj.AddMember("remote_io_timer", 
static_cast<uint64_t>(stats.remote_io_timer),
+                            allocator);
+        stats_obj.AddMember("write_cache_io_timer",
+                            static_cast<uint64_t>(stats.write_cache_io_timer), 
allocator);
+        stats_obj.AddMember("bytes_write_into_cache",
+                            
static_cast<uint64_t>(stats.bytes_write_into_cache), allocator);
+        stats_obj.AddMember("num_skip_cache_io_total",
+                            
static_cast<uint64_t>(stats.num_skip_cache_io_total), allocator);
+        stats_obj.AddMember("read_cache_file_directly_timer",
+                            
static_cast<uint64_t>(stats.read_cache_file_directly_timer), allocator);
+        stats_obj.AddMember("cache_get_or_set_timer",
+                            
static_cast<uint64_t>(stats.cache_get_or_set_timer), allocator);
+        stats_obj.AddMember("lock_wait_timer", 
static_cast<uint64_t>(stats.lock_wait_timer),
+                            allocator);
+        stats_obj.AddMember("get_timer", 
static_cast<uint64_t>(stats.get_timer), allocator);
+        stats_obj.AddMember("set_timer", 
static_cast<uint64_t>(stats.set_timer), allocator);
+
+        doc.AddMember("statistics", stats_obj, allocator);
+    }
+
+    // Add file records to JSON response
+    void add_file_records(rapidjson::Document& doc, 
rapidjson::Document::AllocatorType& allocator,
+                          const std::vector<FileInfo>& file_records, size_t 
max_files) {
+        rapidjson::Value files_array(rapidjson::kArrayType);
+        size_t count = 0;
+
+        for (const auto& file_info : file_records) {
+            if (count >= max_files) {
+                break; // Stop adding if max limit is reached
+            }
+            rapidjson::Value file_obj(rapidjson::kObjectType);
+            file_obj.AddMember("filename", 
rapidjson::Value(file_info.filename.c_str(), allocator),
+                               allocator);
+            file_obj.AddMember("data_size", 
static_cast<uint64_t>(file_info.data_size), allocator);
+            file_obj.AddMember("job_id", 
rapidjson::Value(file_info.job_id.c_str(), allocator),
+                               allocator);
+            files_array.PushBack(file_obj, allocator);
+            count++;
+        }
+
+        doc.AddMember("file_records", files_array, allocator);
+        doc.AddMember("file_records_count", static_cast<uint64_t>(count), 
allocator);
+        doc.AddMember("file_records_total", 
static_cast<uint64_t>(file_records.size()), allocator);
+    }
+
+    JobManager& _job_manager;
+};
+} // namespace microbenchService
+
+// HTTP server handling
+class HttpServer {
+public:
+    HttpServer(JobManager& job_manager) : _job_manager(job_manager), 
_server(nullptr) {}
+
+    void start() {
+        _server = new brpc::Server();
+        microbenchService::MicrobenchServiceImpl http_svc(_job_manager);
+
+        LOG(INFO) << "Starting HTTP server on port " << FLAGS_port;
+
+        if (_server->AddService(&http_svc, brpc::SERVER_DOESNT_OWN_SERVICE) != 
0) {
+            LOG(ERROR) << "Failed to add http service";
+            return;
+        }
+
+        brpc::ServerOptions options;
+        if (_server->Start(FLAGS_port, &options) != 0) {
+            LOG(ERROR) << "Failed to start HttpServer";
+            return;
+        }
+
+        LOG(INFO) << "HTTP server started successfully";
+        _server->RunUntilAskedToQuit(); // Wait for signals
+        _server->ClearServices();
+
+        LOG(INFO) << "HTTP server stopped";
+    }
+
+    ~HttpServer() {
+        if (_server) {
+            LOG(INFO) << "Cleaning up HTTP server in destructor";
+            delete _server;
+        }
+    }
+
+private:
+    JobManager& _job_manager;
+    brpc::Server* _server;
+};
+
+void init_exec_env() {
+    auto* exec_env = doris::ExecEnv::GetInstance();
+    
static_cast<void>(doris::ThreadPoolBuilder("MicrobenchS3FileUploadThreadPool")
+                              .set_min_threads(256)
+                              .set_max_threads(512)
+                              
.build(&(exec_env->_s3_file_upload_thread_pool)));
+    exec_env->_file_cache_factory = new FileCacheFactory();
+    std::vector<doris::CachePath> cache_paths;
+    exec_env->init_file_cache_factory(cache_paths);
+    exec_env->_file_cache_open_fd_cache = 
std::make_unique<doris::io::FDCache>();
+}
+
+int main(int argc, char* argv[]) {
+    google::ParseCommandLineFlags(&argc, &argv, true);
+    FLAGS_minloglevel = google::GLOG_INFO;
+    FLAGS_log_dir = "./logs";
+    FLAGS_logbufsecs = 0; // Disable buffering, write immediately
+    std::filesystem::path log_dir(FLAGS_log_dir);
+    if (!std::filesystem::exists(log_dir)) {
+        std::filesystem::create_directories(log_dir);
+        LOG(INFO) << "Log directory created successfully: " << 
log_dir.string();
+    } else {
+        LOG(INFO) << "Log directory already exists: " << log_dir.string();
+    }
+    google::InitGoogleLogging(argv[0]);
+
+    if (-1 == setenv("DORIS_HOME", ".", 0)) {
+        LOG(WARNING) << "set DORIS_HOME error";
+    }
+    const char* doris_home = getenv("DORIS_HOME");
+    if (doris_home == nullptr) {
+        LOG(INFO) << "DORIS_HOME environment variable not set";
+    }
+    LOG(INFO) << "env=" << doris_home;
+    std::string conffile = std::string(doris_home) + "/conf/be.conf";
+    if (!doris::config::init(conffile.c_str(), true, true, true)) {
+        LOG(ERROR) << "Error reading config file";
+        return -1;
+    }
+    std::string custom_conffile = doris::config::custom_config_dir + 
"/be_custom.conf";
+    if (!doris::config::init(custom_conffile.c_str(), true, false, false)) {
+        LOG(ERROR) << "Error reading custom config file";
+        return -1;
+    }
+
+    LOG(INFO) << "Obj config. ak=" << doris::config::test_s3_ak
+              << " sk=" << doris::config::test_s3_sk << " region=" << 
doris::config::test_s3_region
+              << " endpoint=" << doris::config::test_s3_endpoint
+              << " bucket=" << doris::config::test_s3_bucket;
+    LOG(INFO) << "File cache config. enable_file_cache=" << 
doris::config::enable_file_cache
+              << " file_cache_path=" << doris::config::file_cache_path
+              << " file_cache_each_block_size=" << 
doris::config::file_cache_each_block_size
+              << " clear_file_cache=" << doris::config::clear_file_cache
+              << " enable_file_cache_query_limit=" << 
doris::config::enable_file_cache_query_limit
+              << " file_cache_enter_disk_resource_limit_mode_percent="
+              << 
doris::config::file_cache_enter_disk_resource_limit_mode_percent
+              << " file_cache_exit_disk_resource_limit_mode_percent="
+              << 
doris::config::file_cache_exit_disk_resource_limit_mode_percent
+              << " enable_read_cache_file_directly="
+              << doris::config::enable_read_cache_file_directly
+              << " file_cache_enable_evict_from_other_queue_by_size="
+              << 
doris::config::file_cache_enable_evict_from_other_queue_by_size
+              << " file_cache_error_log_limit_bytes="
+              << doris::config::file_cache_error_log_limit_bytes
+              << " cache_lock_wait_long_tail_threshold_us="
+              << doris::config::cache_lock_wait_long_tail_threshold_us
+              << " cache_lock_held_long_tail_threshold_us="
+              << doris::config::cache_lock_held_long_tail_threshold_us
+              << " file_cache_remove_block_qps_limit="
+              << doris::config::file_cache_remove_block_qps_limit
+              << " enable_evict_file_cache_in_advance="
+              << doris::config::enable_evict_file_cache_in_advance
+              << " file_cache_enter_need_evict_cache_in_advance_percent="
+              << 
doris::config::file_cache_enter_need_evict_cache_in_advance_percent
+              << " file_cache_exit_need_evict_cache_in_advance_percent="
+              << 
doris::config::file_cache_exit_need_evict_cache_in_advance_percent
+              << " file_cache_evict_in_advance_interval_ms="
+              << doris::config::file_cache_evict_in_advance_interval_ms
+              << " file_cache_evict_in_advance_batch_bytes="
+              << doris::config::file_cache_evict_in_advance_batch_bytes;
+    LOG(INFO) << "S3 writer config. s3_file_writer_log_interval_second="
+              << doris::config::s3_file_writer_log_interval_second
+              << " s3_write_buffer_size=" << 
doris::config::s3_write_buffer_size
+              << " enable_flush_file_cache_async=" << 
doris::config::enable_flush_file_cache_async;
+
+    init_exec_env();
+    JobManager job_manager;
+
+    std::thread periodiccally_log_thread;
+    std::mutex periodiccally_log_thread_lock;
+    std::condition_variable periodiccally_log_thread_cv;
+    std::atomic_bool periodiccally_log_thread_run = true;
+    auto periodiccally_log = [&]() {
+        while (periodiccally_log_thread_run) {
+            std::unique_lock<std::mutex> lck {periodiccally_log_thread_lock};
+            periodiccally_log_thread_cv.wait_for(lck, 
std::chrono::milliseconds(5000));
+            LOG(INFO) << "Periodically log for file cache microbench";
+        }
+    };
+    periodiccally_log_thread = std::thread {periodiccally_log};
+
+    try {
+        HttpServer http_server(job_manager);
+        http_server.start();
+    } catch (const std::exception& e) {
+        LOG(ERROR) << "Error in HTTP server: " << e.what();
+    }
+
+    if (periodiccally_log_thread.joinable()) {
+        {
+            std::unique_lock<std::mutex> lck {periodiccally_log_thread_lock};
+            periodiccally_log_thread_run = false;
+            // immediately notify the log thread to quickly exit in case it 
block the
+            // whole procedure
+            periodiccally_log_thread_cv.notify_all();
+        }
+        periodiccally_log_thread.join();
+    }
+    LOG(INFO) << "Program exiting normally";
+    return 0;
+}
+#endif
\ No newline at end of file
diff --git a/be/src/io/tools/proto/Makefile b/be/src/io/tools/proto/Makefile
new file mode 100644
index 00000000000..affed0ef22f
--- /dev/null
+++ b/be/src/io/tools/proto/Makefile
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This file compile all protobuf files.
+
+BUILD_DIR = ${CURDIR}/../build/proto
+PROTOC = ${DORIS_THIRDPARTY}/installed/bin/protoc
+
+SOURCES = $(wildcard *.proto)
+OBJECTS = $(patsubst %.proto, ${BUILD_DIR}/%.pb.cc, ${SOURCES})
+HEADERS = $(patsubst %.proto, ${BUILD_DIR}/%.pb.h, ${SOURCES})
+
+all: prepare ${OBJECTS} ${HEADERS}
+.PHONY: all prepare
+
+prepare:
+       mkdir -p ${BUILD_DIR}
+
+${BUILD_DIR}/%.pb.h ${BUILD_DIR}/%.pb.cc: %.proto
+       ${PROTOC} --proto_path=. --cpp_out=${BUILD_DIR} $<
+
+clean:
+       rm -rf ${BUILD_DIR}
+.PHONY: clean
\ No newline at end of file
diff --git a/be/src/io/tools/proto/microbench.proto 
b/be/src/io/tools/proto/microbench.proto
new file mode 100644
index 00000000000..ecd69b4096d
--- /dev/null
+++ b/be/src/io/tools/proto/microbench.proto
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax="proto2";
+
+package microbench;
+ 
+option cc_generic_services = true;
+
+
+message HttpRequest {};
+message HttpResponse {};
+ 
+service MicrobenchService {
+      rpc get_job_status(HttpRequest) returns (HttpResponse);
+      rpc submit_job(HttpRequest) returns (HttpResponse);
+      rpc list_jobs(HttpRequest) returns (HttpResponse);
+      rpc cancel_job(HttpRequest) returns (HttpResponse);
+      rpc get_help(HttpRequest) returns (HttpResponse);
+      rpc file_cache_clear(HttpRequest) returns (HttpResponse);
+      rpc file_cache_reset(HttpRequest) returns (HttpResponse);
+      rpc file_cache_release(HttpRequest) returns (HttpResponse);
+      rpc update_config(HttpRequest) returns (HttpResponse);
+      rpc show_config(HttpRequest) returns (HttpResponse);
+};
diff --git a/be/src/io/tools/readme.md b/be/src/io/tools/readme.md
new file mode 100644
index 00000000000..a5804958da0
--- /dev/null
+++ b/be/src/io/tools/readme.md
@@ -0,0 +1,133 @@
+# File Cache Microbenchmark
+
+## Compilation
+
+To compile the project, run the following command:
+
+```bash
+./build.sh --clean --file-cache-microbench --be
+```
+
+This will generate the `file_cache_microbench` executable in the 
`apache_doris/output/be/lib` directory.
+
+## Usage
+
+1. Create a deployment directory:
+   ```bash
+   mkdir {deploy_dir}
+   ```
+
+2. Create a configuration directory:
+   ```bash
+   mkdir {deploy_dir}/conf
+   ```
+
+3. Copy the executable to the deployment directory:
+   ```bash
+   cp -r apache_doris/output/be/lib/file_cache_microbench {deploy_dir}
+   ```
+
+4. Copy the configuration file to the configuration directory:
+   ```bash
+   cp -r apache_doris/output/be/conf/be.conf {deploy_dir}/conf
+   ```
+
+5. Edit the configuration file `{deploy_dir}/conf/be.conf` and add the 
following configuration information:
+    ```ini
+    enable_file_cache=true
+    file_cache_path = [ {"path": "/mnt/disk2/file_cache", 
"total_size":53687091200, "query_limit": 10737418240}]
+    test_s3_resource = "resource"
+    test_s3_ak = "ak"
+    test_s3_sk = "sk"
+    test_s3_endpoint = "endpoint"
+    test_s3_region = "region"
+    test_s3_bucket = "bucket"
+    test_s3_prefix = "prefix"
+    ```
+
+6. Change to the deployment directory:
+   ```bash
+   cd {deploy_dir}
+   ```
+
+7. Run the microbenchmark:
+   ```bash
+   ./file_cache_microbench --port={test_port}
+   ```
+
+8. Access the variables:
+   ```bash
+   bvar http://${ip}:${port}/vars/
+   ```
+
+9. Check the logs in `{deploy_dir}/log/`.
+
+## API
+
+### get_help
+```
+curl "http://localhost:{port}/MicrobenchService/get_help";
+```
+
+#### Endpoints:
+- **GET /get_job_status/<job_id>**
+  - Retrieve the status of a submitted job.
+  - Parameters:
+    - `job_id`: The ID of the job to retrieve status for.
+    - `files` (optional): If provided, returns the associated file records for 
the job.
+      - Example: `/get_job_status/job_id?files=10`
+
+- **GET /list_jobs**
+  - List all submitted jobs and their statuses.
+
+- **GET /get_help**
+  - Get this help information.
+
+- **GET /file_cache_clear**
+  - Clear the file cache with the following query parameters:
+    ```json
+    {
+      "sync": <true|false>,                // Whether to synchronize the cache 
clear operation
+      "segment_path": "<path>"             // Optional path of the segment to 
clear from the cache
+    }
+    ```
+    If `segment_path` is not provided, all caches will be cleared based on the 
`sync` parameter.
+
+- **GET /file_cache_reset**
+  - Reset the file cache with the following query parameters:
+    ```json
+    {
+      "capacity": <new_capacity>,          // New capacity for the specified 
path
+      "path": "<path>"                     // Path of the segment to reset
+    }
+    ```
+
+- **GET /file_cache_release**
+  - Release the file cache with the following query parameters:
+    ```json
+    {
+      "base_path": "<base_path>"           // Optional base path to release 
specific caches
+    }
+    ```
+
+- **GET /update_config**
+  - Update the configuration with the following JSON body:
+    ```json
+    {
+      "config_key": "<key>",               // The configuration key to update
+      "config_value": "<value>",            // The new value for the 
configuration key
+      "persist": <true|false>              // Whether to persist the 
configuration change
+    }
+    ```
+
+- **GET /show_config**
+  - Retrieve the current configuration settings.
+
+### Notes:
+- Ensure that the S3 configuration is set correctly in the environment.
+- The program will create and read files in the specified S3 bucket.
+- Monitor the logs for detailed execution information and errors.
+
+### Version Information:
+you can see it in get_help return msg
+
diff --git a/build.sh b/build.sh
index 24ceed2baea..870311c109b 100755
--- a/build.sh
+++ b/build.sh
@@ -43,20 +43,21 @@ usage() {
     echo "
 Usage: $0 <options>
   Optional options:
-     [no option]            build all components
-     --fe                   build Frontend and Spark DPP application. Default 
ON.
-     --be                   build Backend. Default ON.
-     --meta-tool            build Backend meta tool. Default OFF.
-     --cloud                build Cloud. Default OFF.
-     --index-tool           build Backend inverted index tool. Default OFF.
-     --broker               build Broker. Default ON.
-     --spark-dpp            build Spark DPP application. Default ON.
-     --hive-udf             build Hive UDF library for Spark Load. Default ON.
-     --be-java-extensions   build Backend java extensions. Default ON.
-     --be-extension-ignore  build be-java-extensions package, choose which 
modules to ignore. Multiple modules separated by commas.
-     --clean                clean and build target
-     --output               specify the output directory
-     -j                     build Backend parallel
+     [no option]                build all components
+     --fe                       build Frontend and Spark DPP application. 
Default ON.
+     --be                       build Backend. Default ON.
+     --meta-tool                build Backend meta tool. Default OFF.
+     --file-cache-microbench    build Backend file cache microbench tool. 
Default OFF.
+     --cloud                    build Cloud. Default OFF.
+     --index-tool               build Backend inverted index tool. Default OFF.
+     --broker                   build Broker. Default ON.
+     --spark-dpp                build Spark DPP application. Default ON.
+     --hive-udf                 build Hive UDF library for Spark Load. Default 
ON.
+     --be-java-extensions       build Backend java extensions. Default ON.
+     --be-extension-ignore      build be-java-extensions package, choose which 
modules to ignore. Multiple modules separated by commas.
+     --clean                    clean and build target
+     --output                   specify the output directory
+     -j                         build Backend parallel
 
   Environment variables:
     USE_AVX2                    If the CPU does not support AVX2 instruction 
set, please set USE_AVX2=0. Default is ON.
@@ -68,6 +69,7 @@ Usage: $0 <options>
     $0                                      build all
     $0 --be                                 build Backend
     $0 --meta-tool                          build Backend meta tool
+    $0 --file-cache-microbench              build Backend file cache 
microbench tool
     $0 --cloud                              build Cloud
     $0 --index-tool                         build Backend inverted index tool
     $0 --fe --clean                         clean and build Frontend and Spark 
Dpp application
@@ -128,6 +130,7 @@ if ! OPTS="$(getopt \
     -l 'cloud' \
     -l 'broker' \
     -l 'meta-tool' \
+    -l 'file-cache-microbench' \
     -l 'index-tool' \
     -l 'spark-dpp' \
     -l 'hive-udf' \
@@ -150,6 +153,7 @@ BUILD_BE=0
 BUILD_CLOUD=0
 BUILD_BROKER=0
 BUILD_META_TOOL='OFF'
+BUILD_FILE_CACHE_MICROBENCH_TOOL='OFF'
 BUILD_INDEX_TOOL='OFF'
 BUILD_SPARK_DPP=0
 BUILD_BE_JAVA_EXTENSIONS=0
@@ -169,6 +173,7 @@ if [[ "$#" == 1 ]]; then
 
     BUILD_BROKER=1
     BUILD_META_TOOL='OFF'
+    BUILD_FILE_CACHE_MICROBENCH_TOOL='OFF'
     BUILD_INDEX_TOOL='OFF'
     BUILD_SPARK_DPP=1
     BUILD_HIVE_UDF=1
@@ -201,6 +206,10 @@ else
             BUILD_META_TOOL='ON'
             shift
             ;;
+        --file-cache-microbench)
+            BUILD_FILE_CACHE_MICROBENCH_TOOL='ON'
+            shift
+            ;;
         --index-tool)
             BUILD_INDEX_TOOL='ON'
             shift
@@ -263,6 +272,7 @@ else
         BUILD_CLOUD=1
         BUILD_BROKER=1
         BUILD_META_TOOL='ON'
+        BUILD_FILE_CACHE_MICROBENCH_TOOL='ON'
         BUILD_INDEX_TOOL='ON'
         BUILD_SPARK_DPP=1
         BUILD_HIVE_UDF=1
@@ -470,32 +480,33 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 && "$(uname 
-s)" == 'Darwin' ]]; then
 fi
 
 echo "Get params:
-    BUILD_FE                    -- ${BUILD_FE}
-    BUILD_BE                    -- ${BUILD_BE}
-    BUILD_CLOUD                 -- ${BUILD_CLOUD}
-    BUILD_BROKER                -- ${BUILD_BROKER}
-    BUILD_META_TOOL             -- ${BUILD_META_TOOL}
-    BUILD_INDEX_TOOL            -- ${BUILD_INDEX_TOOL}
-    BUILD_SPARK_DPP             -- ${BUILD_SPARK_DPP}
-    BUILD_BE_JAVA_EXTENSIONS    -- ${BUILD_BE_JAVA_EXTENSIONS}
-    BUILD_HIVE_UDF              -- ${BUILD_HIVE_UDF}
-    PARALLEL                    -- ${PARALLEL}
-    CLEAN                       -- ${CLEAN}
-    WITH_MYSQL                  -- ${WITH_MYSQL}
-    GLIBC_COMPATIBILITY         -- ${GLIBC_COMPATIBILITY}
-    USE_AVX2                    -- ${USE_AVX2}
-    USE_LIBCPP                  -- ${USE_LIBCPP}
-    USE_DWARF                   -- ${USE_DWARF}
-    USE_UNWIND                  -- ${USE_UNWIND}
-    STRIP_DEBUG_INFO            -- ${STRIP_DEBUG_INFO}
-    USE_MEM_TRACKER             -- ${USE_MEM_TRACKER}
-    USE_JEMALLOC                -- ${USE_JEMALLOC}
-    USE_BTHREAD_SCANNER         -- ${USE_BTHREAD_SCANNER}
-    ENABLE_INJECTION_POINT      -- ${ENABLE_INJECTION_POINT}
-    ENABLE_CACHE_LOCK_DEBUG     -- ${ENABLE_CACHE_LOCK_DEBUG}
-    DENABLE_CLANG_COVERAGE      -- ${DENABLE_CLANG_COVERAGE}
-    DISPLAY_BUILD_TIME          -- ${DISPLAY_BUILD_TIME}
-    ENABLE_PCH                  -- ${ENABLE_PCH}
+    BUILD_FE                            -- ${BUILD_FE}
+    BUILD_BE                            -- ${BUILD_BE}
+    BUILD_CLOUD                         -- ${BUILD_CLOUD}
+    BUILD_BROKER                        -- ${BUILD_BROKER}
+    BUILD_META_TOOL                     -- ${BUILD_META_TOOL}
+    BUILD_FILE_CACHE_MICROBENCH_TOOL    -- ${BUILD_FILE_CACHE_MICROBENCH_TOOL}
+    BUILD_INDEX_TOOL                    -- ${BUILD_INDEX_TOOL}
+    BUILD_SPARK_DPP                     -- ${BUILD_SPARK_DPP}
+    BUILD_BE_JAVA_EXTENSIONS            -- ${BUILD_BE_JAVA_EXTENSIONS}
+    BUILD_HIVE_UDF                      -- ${BUILD_HIVE_UDF}
+    PARALLEL                            -- ${PARALLEL}
+    CLEAN                               -- ${CLEAN}
+    WITH_MYSQL                          -- ${WITH_MYSQL}
+    GLIBC_COMPATIBILITY                 -- ${GLIBC_COMPATIBILITY}
+    USE_AVX2                            -- ${USE_AVX2}
+    USE_LIBCPP                          -- ${USE_LIBCPP}
+    USE_DWARF                           -- ${USE_DWARF}
+    USE_UNWIND                          -- ${USE_UNWIND}
+    STRIP_DEBUG_INFO                    -- ${STRIP_DEBUG_INFO}
+    USE_MEM_TRACKER                     -- ${USE_MEM_TRACKER}
+    USE_JEMALLOC                        -- ${USE_JEMALLOC}
+    USE_BTHREAD_SCANNER                 -- ${USE_BTHREAD_SCANNER}
+    ENABLE_INJECTION_POINT              -- ${ENABLE_INJECTION_POINT}
+    ENABLE_CACHE_LOCK_DEBUG             -- ${ENABLE_CACHE_LOCK_DEBUG}
+    DENABLE_CLANG_COVERAGE              -- ${DENABLE_CLANG_COVERAGE}
+    DISPLAY_BUILD_TIME                  -- ${DISPLAY_BUILD_TIME}
+    ENABLE_PCH                          -- ${ENABLE_PCH}
 "
 
 # Clean and build generated code
@@ -587,6 +598,7 @@ if [[ "${BUILD_BE}" -eq 1 ]]; then
         -DWITH_MYSQL="${WITH_MYSQL}" \
         -DUSE_LIBCPP="${USE_LIBCPP}" \
         -DBUILD_META_TOOL="${BUILD_META_TOOL}" \
+        
-DBUILD_FILE_CACHE_MICROBENCH_TOOL="${BUILD_FILE_CACHE_MICROBENCH_TOOL}" \
         -DBUILD_INDEX_TOOL="${BUILD_INDEX_TOOL}" \
         -DSTRIP_DEBUG_INFO="${STRIP_DEBUG_INFO}" \
         -DUSE_DWARF="${USE_DWARF}" \
@@ -799,6 +811,10 @@ EOF
         cp -r -p "${DORIS_HOME}/be/output/lib/meta_tool" 
"${DORIS_OUTPUT}/be/lib"/
     fi
 
+    if [[ "${BUILD_FILE_CACHE_MICROBENCH_TOOL}" = "ON" ]]; then
+        cp -r -p "${DORIS_HOME}/be/output/lib/file_cache_microbench" 
"${DORIS_OUTPUT}/be/lib"/
+    fi
+
     if [[ "${BUILD_INDEX_TOOL}" = "ON" ]]; then
         cp -r -p "${DORIS_HOME}/be/output/lib/index_tool" 
"${DORIS_OUTPUT}/be/lib"/
     fi
diff --git a/run-be-ut.sh b/run-be-ut.sh
index 5f73f6f0ee6..78b0dc4fcce 100755
--- a/run-be-ut.sh
+++ b/run-be-ut.sh
@@ -240,6 +240,7 @@ cd "${CMAKE_BUILD_DIR}"
     -DUSE_LIBCPP="${USE_LIBCPP}" \
     -DBUILD_META_TOOL=OFF \
     -DBUILD_BENCHMARK_TOOL="${BUILD_BENCHMARK_TOOL}" \
+    -DBUILD_FILE_CACHE_MICROBENCH_TOOL=OFF \
     -DWITH_MYSQL=ON \
     -DUSE_DWARF="${USE_DWARF}" \
     -DUSE_UNWIND="${USE_UNWIND}" \


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to