This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 41ccf77c7d [feature][fix](fs)(s3)add fs_s3 benchmark tool and fix s3 file writer bug (#20926) 41ccf77c7d is described below commit 41ccf77c7d0409fe3053829c66f7c205e4c03c6e Author: zhangdong <493738...@qq.com> AuthorDate: Thu Jun 29 19:03:49 2023 +0800 [feature][fix](fs)(s3)add fs_s3 benchmark tool and fix s3 file writer bug (#20926) 1. Fix bug that the field of s3_file_write_bufferpool is not initialized, causing undefined behavior. 2. add fs_s3 benchmark tool,Reference to the usage of tools https://github.com/apache/doris/pull/20770 And opt the output: `sh bin/run-fs-benchmark.sh --conf=conf/s3.conf --fs_type=s3 --operation=single_read --threads=1 --iterations=1` ``` ------------------------------------------------------------------------------------------------------------------------------ Benchmark Time CPU Iterations UserCounters... ------------------------------------------------------------------------------------------------------------------------------ S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1 7366 ms 123 ms 1 ReadRate(B/S)=12.1823M/s ReadTime(S)=7.36572 ReadTotal(B)=89.7314M S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1 6163 ms 116 ms 1 ReadRate(B/S)=14.5597M/s ReadTime(S)=6.16299 ReadTotal(B)=89.7314M S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1 6048 ms 110 ms 1 ReadRate(B/S)=14.8366M/s ReadTime(S)=6.04796 ReadTotal(B)=89.7314M S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_mean 6526 ms 116 ms 3 ReadRate(B/S)=13.8596M/s ReadTime(S)=6.52556 ReadTotal(B)=89.7314M S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_median 6163 ms 116 ms 3 ReadRate(B/S)=14.5597M/s ReadTime(S)=6.16299 ReadTotal(B)=89.7314M S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_stddev 730 ms 6.68 ms 3 ReadRate(B/S)=1.45914M/s ReadTime(S)=0.729876 ReadTotal(B)=0 S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_cv 11.18 % 5.75 % 3 ReadRate(B/S)=10.53% ReadTime(S)=11.18% ReadTotal(B)=0.00% S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_max 7366 ms 123 ms 3 ReadRate(B/S)=14.8366M/s ReadTime(S)=7.36572 ReadTotal(B)=89.7314M S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_min 6048 ms 110 ms 3 ReadRate(B/S)=12.1823M/s ReadTime(S)=6.04796 ReadTotal(B)=89.7314M ``` --- be/src/io/fs/benchmark/base_benchmark.h | 117 +++++++++++++++- be/src/io/fs/benchmark/benchmark_factory.hpp | 14 +- be/src/io/fs/benchmark/fs_benchmark_tool.cpp | 11 ++ be/src/io/fs/benchmark/hdfs_benchmark.hpp | 166 +++++------------------ be/src/io/fs/benchmark/s3_benchmark.hpp | 192 ++++++++++++++++++++++++--- be/src/io/fs/s3_file_write_bufferpool.cpp | 22 +-- be/src/io/fs/s3_file_write_bufferpool.h | 14 +- be/src/service/doris_main.cpp | 7 + 8 files changed, 371 insertions(+), 172 deletions(-) diff --git a/be/src/io/fs/benchmark/base_benchmark.h b/be/src/io/fs/benchmark/base_benchmark.h index c28ad02de5..41dae7cea2 100644 --- a/be/src/io/fs/benchmark/base_benchmark.h +++ b/be/src/io/fs/benchmark/base_benchmark.h @@ -27,6 +27,9 @@ #include <vector> #include "common/status.h" +#include "io/fs/file_reader.h" +#include "io/fs/file_writer.h" +#include "util/slice.h" namespace doris::io { @@ -44,24 +47,22 @@ void bm_log(const std::string& fmt, Args&&... args) { class BaseBenchmark { public: BaseBenchmark(const std::string& name, int threads, int iterations, size_t file_size, - int repetitions, const std::map<std::string, std::string>& conf_map) + const std::map<std::string, std::string>& conf_map) : _name(name), _threads(threads), _iterations(iterations), _file_size(file_size), - _repetitions(repetitions), _conf_map(conf_map) {} virtual ~BaseBenchmark() = default; virtual Status init() { return Status::OK(); } virtual Status run(benchmark::State& state) { return Status::OK(); } + void set_repetition(int rep) { _repetitions = rep; } + void register_bm() { auto bm = benchmark::RegisterBenchmark(_name.c_str(), [&](benchmark::State& state) { - Status st; - if (state.thread_index() == 0) { - st = this->init(); - } + Status st = this->init(); if (st != Status::OK()) { bm_log("Benchmark {} init error: {}", _name, st.to_string()); return; @@ -92,12 +93,114 @@ public: }); } + virtual std::string get_file_path(benchmark::State& state) { + std::string base_dir = _conf_map["base_dir"]; + std::string file_path; + if (base_dir.ends_with("/")) { + file_path = fmt::format("{}test_{}", base_dir, state.thread_index()); + } else { + file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); + } + bm_log("file_path: {}", file_path); + return file_path; + } + + Status read(benchmark::State& state, FileReaderSPtr reader) { + bm_log("begin to read {}", _name); + size_t buffer_size = + _conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L; + std::vector<char> buffer; + buffer.resize(buffer_size); + doris::Slice data = {buffer.data(), buffer.size()}; + size_t offset = 0; + size_t bytes_read = 0; + + size_t read_size = reader->size(); + if (_file_size > 0) { + read_size = std::min(read_size, _file_size); + } + long remaining_size = read_size; + + Status status; + auto start = std::chrono::high_resolution_clock::now(); + while (remaining_size > 0) { + bytes_read = 0; + size_t size = std::min(buffer_size, (size_t)remaining_size); + data.size = size; + status = reader->read_at(offset, data, &bytes_read); + if (status != Status::OK() || bytes_read < 0) { + bm_log("reader read_at error: {}", status.to_string()); + break; + } + if (bytes_read == 0) { // EOF + break; + } + offset += bytes_read; + remaining_size -= bytes_read; + } + auto end = std::chrono::high_resolution_clock::now(); + auto elapsed_seconds = + std::chrono::duration_cast<std::chrono::duration<double>>(end - start); + state.SetIterationTime(elapsed_seconds.count()); + state.counters["ReadRate(B/S)"] = + benchmark::Counter(read_size, benchmark::Counter::kIsRate); + state.counters["ReadTotal(B)"] = read_size; + state.counters["ReadTime(S)"] = elapsed_seconds.count(); + + if (status.ok() && reader != nullptr) { + status = reader->close(); + } + bm_log("finish to read {}, size {}, seconds: {}, status: {}", _name, read_size, + elapsed_seconds.count(), status); + return status; + } + + Status write(benchmark::State& state, FileWriter* writer) { + bm_log("begin to write {}, size: {}", _name, _file_size); + size_t write_size = _file_size; + size_t buffer_size = + _conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L; + long remaining_size = write_size; + std::vector<char> buffer; + buffer.resize(buffer_size); + doris::Slice data = {buffer.data(), buffer.size()}; + + Status status; + auto start = std::chrono::high_resolution_clock::now(); + while (remaining_size > 0) { + size_t size = std::min(buffer_size, (size_t)remaining_size); + data.size = size; + status = writer->append(data); + if (status != Status::OK()) { + bm_log("writer append error: {}", status.to_string()); + break; + } + remaining_size -= size; + } + if (status.ok() && writer != nullptr) { + status = writer->close(); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto elapsed_seconds = + std::chrono::duration_cast<std::chrono::duration<double>>(end - start); + state.SetIterationTime(elapsed_seconds.count()); + state.counters["WriteRate(B/S)"] = + benchmark::Counter(write_size, benchmark::Counter::kIsRate); + state.counters["WriteTotal(B)"] = write_size; + state.counters["WriteTime(S)"] = elapsed_seconds.count(); + + bm_log("finish to write {}, size: {}, seconds: {}, status: {}", _name, write_size, + elapsed_seconds.count(), status); + return status; + } + protected: std::string _name; int _threads; int _iterations; size_t _file_size; - int _repetitions = 1; + int _repetitions = 3; std::map<std::string, std::string> _conf_map; }; diff --git a/be/src/io/fs/benchmark/benchmark_factory.hpp b/be/src/io/fs/benchmark/benchmark_factory.hpp index 3e8c9314ca..0b8af3b96b 100644 --- a/be/src/io/fs/benchmark/benchmark_factory.hpp +++ b/be/src/io/fs/benchmark/benchmark_factory.hpp @@ -38,8 +38,18 @@ Status BenchmarkFactory::getBm(const std::string fs_type, const std::string op_t const std::map<std::string, std::string>& conf_map, BaseBenchmark** bm) { if (fs_type == "s3") { - if (op_type == "read") { - *bm = new S3ReadBenchmark(threads, iterations, file_size, conf_map); + if (op_type == "create_write") { + *bm = new S3CreateWriteBenchmark(threads, iterations, file_size, conf_map); + } else if (op_type == "open_read") { + *bm = new S3OpenReadBenchmark(threads, iterations, file_size, conf_map); + } else if (op_type == "single_read") { + *bm = new S3SingleReadBenchmark(threads, iterations, file_size, conf_map); + } else if (op_type == "rename") { + *bm = new S3RenameBenchmark(threads, iterations, file_size, conf_map); + } else if (op_type == "exists") { + *bm = new S3ExistsBenchmark(threads, iterations, file_size, conf_map); + } else if (op_type == "list") { + *bm = new S3ListBenchmark(threads, iterations, file_size, conf_map); } else { return Status::Error<ErrorCode::INVALID_ARGUMENT>( "unknown params: fs_type: {}, op_type: {}, iterations: {}", fs_type, op_type, diff --git a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp index a5be5db80a..50085ae1e7 100644 --- a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp +++ b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp @@ -20,6 +20,8 @@ #include <fstream> #include "io/fs/benchmark/benchmark_factory.hpp" +#include "io/fs/s3_file_write_bufferpool.h" +#include "util/threadpool.h" DEFINE_string(fs_type, "hdfs", "Supported File System: s3, hdfs"); DEFINE_string(operation, "create_write", @@ -107,6 +109,15 @@ int main(int argc, char** argv) { return 1; } + // init s3 write buffer pool + std::unique_ptr<doris::ThreadPool> buffered_reader_prefetch_thread_pool; + doris::ThreadPoolBuilder("BufferedReaderPrefetchThreadPool") + .set_min_threads(16) + .set_max_threads(64) + .build(&buffered_reader_prefetch_thread_pool); + doris::io::S3FileBufferPool* s3_buffer_pool = doris::io::S3FileBufferPool::GetInstance(); + s3_buffer_pool->init(524288000, 5242880, buffered_reader_prefetch_thread_pool.get()); + try { doris::io::MultiBenchmark multi_bm(FLAGS_fs_type, FLAGS_operation, std::stoi(FLAGS_threads), std::stoi(FLAGS_iterations), std::stol(FLAGS_file_size), diff --git a/be/src/io/fs/benchmark/hdfs_benchmark.hpp b/be/src/io/fs/benchmark/hdfs_benchmark.hpp index 1307ddc95a..b508e14a24 100644 --- a/be/src/io/fs/benchmark/hdfs_benchmark.hpp +++ b/be/src/io/fs/benchmark/hdfs_benchmark.hpp @@ -33,75 +33,30 @@ class HdfsOpenReadBenchmark : public BaseBenchmark { public: HdfsOpenReadBenchmark(int threads, int iterations, size_t file_size, const std::map<std::string, std::string>& conf_map) - : BaseBenchmark("HdfsReadBenchmark", threads, iterations, file_size, 3, conf_map) {} + : BaseBenchmark("HdfsReadBenchmark", threads, iterations, file_size, conf_map) {} virtual ~HdfsOpenReadBenchmark() = default; - Status init() override { return Status::OK(); } - - virtual std::string get_file_path(benchmark::State& state) { - std::string base_dir = _conf_map["base_dir"]; - auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); - bm_log("file_path: {}", file_path); - return file_path; + virtual void set_default_file_size() { + if (_file_size <= 0) { + _file_size = 10 * 1024 * 1024; // default 10MB + } } Status run(benchmark::State& state) override { + auto file_path = get_file_path(state); + + auto start = std::chrono::high_resolution_clock::now(); std::shared_ptr<io::FileSystem> fs; io::FileReaderSPtr reader; - bm_log("begin to init {}", _name); - size_t buffer_size = - _conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L; io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); THdfsParams hdfs_params = parse_properties(_conf_map); - - auto file_path = get_file_path(state); RETURN_IF_ERROR( FileFactory::create_hdfs_reader(hdfs_params, file_path, &fs, &reader, reader_opts)); - bm_log("finish to init {}", _name); - - bm_log("begin to run {}", _name); - Status status; - std::vector<char> buffer; - buffer.resize(buffer_size); - doris::Slice data = {buffer.data(), buffer.size()}; - size_t offset = 0; - size_t bytes_read = 0; - - size_t read_size = reader->size(); - if (_file_size > 0) { - read_size = std::min(read_size, _file_size); - } - long remaining_size = read_size; - - auto start = std::chrono::high_resolution_clock::now(); - while (remaining_size > 0) { - bytes_read = 0; - size_t size = std::min(buffer_size, (size_t)remaining_size); - data.size = size; - status = reader->read_at(offset, data, &bytes_read); - if (status != Status::OK() || bytes_read < 0) { - bm_log("reader read_at error: {}", status.to_string()); - break; - } - if (bytes_read == 0) { // EOF - break; - } - offset += bytes_read; - remaining_size -= bytes_read; - } - bm_log("finish to run {}", _name); auto end = std::chrono::high_resolution_clock::now(); - auto elapsed_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(end - start); - - state.SetIterationTime(elapsed_seconds.count()); - state.counters["ReadRate"] = benchmark::Counter(read_size, benchmark::Counter::kIsRate); - - if (reader != nullptr) { - reader->close(); - } - return status; + state.counters["OpenReaderTime(S)"] = elapsed_seconds.count(); + return read(state, reader); } }; @@ -113,6 +68,10 @@ public: : HdfsOpenReadBenchmark(threads, iterations, file_size, conf_map) {} virtual ~HdfsSingleReadBenchmark() = default; + virtual void set_default_file_size() override { + // do nothing, default is 0, which means it will read the whole file + } + virtual std::string get_file_path(benchmark::State& state) override { std::string file_path = _conf_map["file_path"]; bm_log("file_path: {}", file_path); @@ -124,56 +83,20 @@ class HdfsCreateWriteBenchmark : public BaseBenchmark { public: HdfsCreateWriteBenchmark(int threads, int iterations, size_t file_size, const std::map<std::string, std::string>& conf_map) - : BaseBenchmark("HdfsCreateWriteBenchmark", threads, iterations, file_size, 3, - conf_map) {} + : BaseBenchmark("HdfsCreateWriteBenchmark", threads, iterations, file_size, conf_map) {} virtual ~HdfsCreateWriteBenchmark() = default; - Status init() override { return Status::OK(); } - Status run(benchmark::State& state) override { - bm_log("begin to run {}", _name); - std::string base_dir = _conf_map["base_dir"]; - io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); - THdfsParams hdfs_params = parse_properties(_conf_map); - auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); - bm_log("file_path: {}", file_path); - - auto start = std::chrono::high_resolution_clock::now(); + auto file_path = get_file_path(state); + if (_file_size <= 0) { + _file_size = 10 * 1024 * 1024; // default 10MB + } std::shared_ptr<io::HdfsFileSystem> fs; io::FileWriterPtr writer; + THdfsParams hdfs_params = parse_properties(_conf_map); RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); RETURN_IF_ERROR(fs->create_file(file_path, &writer)); - Status status; - size_t write_size = _file_size; - size_t buffer_size = - _conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L; - long remaining_size = write_size; - std::vector<char> buffer; - buffer.resize(buffer_size); - doris::Slice data = {buffer.data(), buffer.size()}; - while (remaining_size > 0) { - size_t size = std::min(buffer_size, (size_t)remaining_size); - data.size = size; - status = writer->append(data); - if (status != Status::OK()) { - bm_log("writer append error: {}", status.to_string()); - break; - } - remaining_size -= size; - } - auto end = std::chrono::high_resolution_clock::now(); - auto elapsed_seconds = - std::chrono::duration_cast<std::chrono::duration<double>>(end - start); - - state.SetIterationTime(elapsed_seconds.count()); - bm_log("finish to run {}", _name); - - state.counters["WriteRate"] = benchmark::Counter(write_size, benchmark::Counter::kIsRate); - - if (writer != nullptr) { - writer->close(); - } - return status; + return write(state, writer.get()); } }; @@ -181,75 +104,56 @@ class HdfsRenameBenchmark : public BaseBenchmark { public: HdfsRenameBenchmark(int threads, int iterations, size_t file_size, const std::map<std::string, std::string>& conf_map) - : BaseBenchmark("HdfsRenameBenchmark", threads, 1, file_size, 1, conf_map) {} + : BaseBenchmark("HdfsRenameBenchmark", threads, iterations, file_size, conf_map) { + // rename can only do once + set_repetition(1); + } virtual ~HdfsRenameBenchmark() = default; - Status init() override { return Status::OK(); } - Status run(benchmark::State& state) override { - bm_log("begin to run {}", _name); - std::string base_dir = _conf_map["base_dir"]; - io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); + auto file_path = get_file_path(state); + auto new_file_path = file_path + "_new"; THdfsParams hdfs_params = parse_properties(_conf_map); - auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); - auto new_file_path = fmt::format("{}/test_{}_new", base_dir, state.thread_index()); - bm_log("file_path: {}", file_path); - - auto start = std::chrono::high_resolution_clock::now(); std::shared_ptr<io::HdfsFileSystem> fs; - io::FileWriterPtr writer; RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + + auto start = std::chrono::high_resolution_clock::now(); RETURN_IF_ERROR(fs->rename(file_path, new_file_path)); auto end = std::chrono::high_resolution_clock::now(); auto elapsed_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(end - start); - state.SetIterationTime(elapsed_seconds.count()); - bm_log("finish to run {}", _name); - state.counters["RenameCost"] = benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert); - if (writer != nullptr) { - writer->close(); - } return Status::OK(); } - -private: }; class HdfsExistsBenchmark : public BaseBenchmark { public: HdfsExistsBenchmark(int threads, int iterations, size_t file_size, const std::map<std::string, std::string>& conf_map) - : BaseBenchmark("HdfsExistsBenchmark", threads, iterations, file_size, 3, conf_map) {} + : BaseBenchmark("HdfsExistsBenchmark", threads, iterations, file_size, conf_map) {} virtual ~HdfsExistsBenchmark() = default; - Status init() override { return Status::OK(); } - Status run(benchmark::State& state) override { - bm_log("begin to run {}", _name); - std::string base_dir = _conf_map["base_dir"]; - io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); - THdfsParams hdfs_params = parse_properties(_conf_map); - auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); - bm_log("file_path: {}", file_path); + auto file_path = get_file_path(state); - auto start = std::chrono::high_resolution_clock::now(); std::shared_ptr<io::HdfsFileSystem> fs; + THdfsParams hdfs_params = parse_properties(_conf_map); RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + + auto start = std::chrono::high_resolution_clock::now(); bool res = false; RETURN_IF_ERROR(fs->exists(file_path, &res)); auto end = std::chrono::high_resolution_clock::now(); auto elapsed_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(end - start); - state.SetIterationTime(elapsed_seconds.count()); - bm_log("finish to run {}", _name); - state.counters["ExistsCost"] = benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert); + return Status::OK(); } }; diff --git a/be/src/io/fs/benchmark/s3_benchmark.hpp b/be/src/io/fs/benchmark/s3_benchmark.hpp index 7e958cefdb..c2ee8ddd99 100644 --- a/be/src/io/fs/benchmark/s3_benchmark.hpp +++ b/be/src/io/fs/benchmark/s3_benchmark.hpp @@ -19,41 +19,193 @@ #include "io/file_factory.h" #include "io/fs/benchmark/base_benchmark.h" +#include "io/fs/file_writer.h" #include "io/fs/s3_file_reader.h" #include "io/fs/s3_file_system.h" +#include "runtime/exec_env.h" +#include "util/s3_uri.h" #include "util/slice.h" namespace doris::io { -class S3ReadBenchmark : public BaseBenchmark { +class S3Benchmark : public BaseBenchmark { public: - S3ReadBenchmark(int threads, int iterations, size_t file_size, - const std::map<std::string, std::string>& conf_map) - : BaseBenchmark("S3ReadBenchmark", threads, iterations, file_size, 3, conf_map), - _result(buffer, 128) {} - virtual ~S3ReadBenchmark() = default; + S3Benchmark(const std::string& name, int threads, int iterations, size_t file_size, + const std::map<std::string, std::string>& conf_map) + : BaseBenchmark(name, threads, iterations, file_size, conf_map) {} + virtual ~S3Benchmark() = default; - Status init() override { - bm_log("begin to init {}", _name); - std::string file_path = _conf_map["file"]; - io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); + Status get_fs(const std::string& path) { + S3URI s3_uri(path); + RETURN_IF_ERROR(s3_uri.parse()); RETURN_IF_ERROR( - FileFactory::create_s3_reader(_conf_map, file_path, &_fs, &_reader, reader_opts)); - bm_log("finish to init {}", _name); + S3ClientFactory::convert_properties_to_s3_conf(_conf_map, s3_uri, &_s3_conf)); + return io::S3FileSystem::create(std::move(_s3_conf), "", &_fs); + } + +protected: + doris::S3Conf _s3_conf; + std::shared_ptr<io::S3FileSystem> _fs; +}; + +class S3OpenReadBenchmark : public S3Benchmark { +public: + S3OpenReadBenchmark(int threads, int iterations, size_t file_size, + const std::map<std::string, std::string>& conf_map) + : S3Benchmark("S3ReadBenchmark", threads, iterations, file_size, conf_map) {} + virtual ~S3OpenReadBenchmark() = default; + + virtual void set_default_file_size() { + if (_file_size <= 0) { + _file_size = 10 * 1024 * 1024; // default 10MB + } + } + + Status run(benchmark::State& state) override { + auto file_path = get_file_path(state); + RETURN_IF_ERROR(get_fs(file_path)); + + io::FileReaderSPtr reader; + io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); + RETURN_IF_ERROR(FileFactory::create_s3_reader( + _conf_map, file_path, reinterpret_cast<std::shared_ptr<io::FileSystem>*>(&_fs), + &reader, reader_opts)); + + return read(state, reader); + } +}; + +// Read a single specified file +class S3SingleReadBenchmark : public S3OpenReadBenchmark { +public: + S3SingleReadBenchmark(int threads, int iterations, size_t file_size, + const std::map<std::string, std::string>& conf_map) + : S3OpenReadBenchmark(threads, iterations, file_size, conf_map) {} + virtual ~S3SingleReadBenchmark() = default; + + virtual void set_default_file_size() override {} + + virtual std::string get_file_path(benchmark::State& state) override { + std::string file_path = _conf_map["file_path"]; + bm_log("file_path: {}", file_path); + return file_path; + } +}; + +class S3CreateWriteBenchmark : public S3Benchmark { +public: + S3CreateWriteBenchmark(int threads, int iterations, size_t file_size, + const std::map<std::string, std::string>& conf_map) + : S3Benchmark("S3CreateWriteBenchmark", threads, iterations, file_size, conf_map) {} + virtual ~S3CreateWriteBenchmark() = default; + + Status run(benchmark::State& state) override { + auto file_path = get_file_path(state); + if (_file_size <= 0) { + _file_size = 10 * 1024 * 1024; // default 10MB + } + RETURN_IF_ERROR(get_fs(file_path)); + + io::FileWriterPtr writer; + RETURN_IF_ERROR(_fs->create_file(file_path, &writer)); + return write(state, writer.get()); + } +}; + +class S3ListBenchmark : public S3Benchmark { +public: + S3ListBenchmark(int threads, int iterations, size_t file_size, + const std::map<std::string, std::string>& conf_map) + : S3Benchmark("S3ListBenchmark", threads, iterations, file_size, conf_map) {} + virtual ~S3ListBenchmark() = default; + + virtual std::string get_file_path(benchmark::State& state) override { + return _conf_map["base_dir"]; + } + + Status run(benchmark::State& state) override { + auto file_path = get_file_path(state); + RETURN_IF_ERROR(get_fs(file_path)); + + auto start = std::chrono::high_resolution_clock::now(); + std::vector<FileInfo> files; + bool exists = true; + RETURN_IF_ERROR(_fs->list(file_path, true, &files, &exists)); + auto end = std::chrono::high_resolution_clock::now(); + auto elapsed_seconds = + std::chrono::duration_cast<std::chrono::duration<double>>(end - start); + state.SetIterationTime(elapsed_seconds.count()); + state.counters["ListCost"] = + benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert); + + std::stringstream ss; + int i = 0; + for (auto& file_info : files) { + if (i > 2) { + break; + } + ++i; + ss << "[" << file_info.file_name << ", " << file_info.file_size << ", " + << file_info.is_file << "] "; + } + bm_log("list files: {}", ss.str()); + return Status::OK(); } +}; + +class S3RenameBenchmark : public S3Benchmark { +public: + S3RenameBenchmark(int threads, int iterations, size_t file_size, + const std::map<std::string, std::string>& conf_map) + : S3Benchmark("S3RenameBenchmark", threads, iterations, file_size, conf_map) { + // rename can only do once + set_repetition(1); + } + + virtual ~S3RenameBenchmark() = default; Status run(benchmark::State& state) override { - return _reader->read_at(0, _result, &_bytes_read); + auto file_path = get_file_path(state); + auto new_file_path = file_path + "_new"; + RETURN_IF_ERROR(get_fs(file_path)); + + auto start = std::chrono::high_resolution_clock::now(); + RETURN_IF_ERROR(_fs->rename(file_path, new_file_path)); + auto end = std::chrono::high_resolution_clock::now(); + auto elapsed_seconds = + std::chrono::duration_cast<std::chrono::duration<double>>(end - start); + state.SetIterationTime(elapsed_seconds.count()); + state.counters["RenameCost"] = + benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert); + + return Status::OK(); } +}; -private: - doris::S3Conf _s3_conf; - std::shared_ptr<io::FileSystem> _fs; - io::FileReaderSPtr _reader; - char buffer[128]; - doris::Slice _result; - size_t _bytes_read = 0; +class S3ExistsBenchmark : public S3Benchmark { +public: + S3ExistsBenchmark(int threads, int iterations, size_t file_size, + const std::map<std::string, std::string>& conf_map) + : S3Benchmark("S3ExistsBenchmark", threads, iterations, file_size, conf_map) {} + virtual ~S3ExistsBenchmark() = default; + + Status run(benchmark::State& state) override { + auto file_path = get_file_path(state); + RETURN_IF_ERROR(get_fs(file_path)); + + auto start = std::chrono::high_resolution_clock::now(); + bool res = false; + RETURN_IF_ERROR(_fs->exists(file_path, &res)); + auto end = std::chrono::high_resolution_clock::now(); + auto elapsed_seconds = + std::chrono::duration_cast<std::chrono::duration<double>>(end - start); + state.SetIterationTime(elapsed_seconds.count()); + state.counters["ExistsCost"] = + benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert); + + return Status::OK(); + } }; } // namespace doris::io diff --git a/be/src/io/fs/s3_file_write_bufferpool.cpp b/be/src/io/fs/s3_file_write_bufferpool.cpp index c6ec1a8b60..48887f9c6e 100644 --- a/be/src/io/fs/s3_file_write_bufferpool.cpp +++ b/be/src/io/fs/s3_file_write_bufferpool.cpp @@ -24,6 +24,7 @@ #include "io/fs/s3_common.h" #include "runtime/exec_env.h" #include "util/defer_op.h" +#include "util/threadpool.h" namespace doris { namespace io { @@ -59,26 +60,27 @@ void S3FileBuffer::submit() { _stream_ptr = std::make_shared<StringViewStream>(_buf.data, _size); } - ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( - [buf = this->shared_from_this()]() { buf->_on_upload(); }); + _thread_pool->submit_func([buf = this->shared_from_this()]() { buf->_on_upload(); }); } -S3FileBufferPool::S3FileBufferPool() { +void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size, + doris::ThreadPool* thread_pool) { // the nums could be one configuration - size_t buf_num = config::s3_write_buffer_whole_size / config::s3_write_buffer_size; - DCHECK((config::s3_write_buffer_size >= 5 * 1024 * 1024) && - (config::s3_write_buffer_whole_size > config::s3_write_buffer_size)); + size_t buf_num = s3_write_buffer_whole_size / s3_write_buffer_size; + DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) && + (s3_write_buffer_whole_size > s3_write_buffer_size)); LOG_INFO("S3 file buffer pool with {} buffers", buf_num); - _whole_mem_buffer = std::make_unique<char[]>(config::s3_write_buffer_whole_size); + _whole_mem_buffer = std::make_unique<char[]>(s3_write_buffer_whole_size); for (size_t i = 0; i < buf_num; i++) { - Slice s {_whole_mem_buffer.get() + i * config::s3_write_buffer_size, - static_cast<size_t>(config::s3_write_buffer_size)}; + Slice s {_whole_mem_buffer.get() + i * s3_write_buffer_size, + static_cast<size_t>(s3_write_buffer_size)}; _free_raw_buffers.emplace_back(s); } + _thread_pool = thread_pool; } std::shared_ptr<S3FileBuffer> S3FileBufferPool::allocate(bool reserve) { - std::shared_ptr<S3FileBuffer> buf = std::make_shared<S3FileBuffer>(); + std::shared_ptr<S3FileBuffer> buf = std::make_shared<S3FileBuffer>(_thread_pool); // if need reserve then we must ensure return buf with memory preserved if (reserve) { { diff --git a/be/src/io/fs/s3_file_write_bufferpool.h b/be/src/io/fs/s3_file_write_bufferpool.h index 660cbc8e8a..55fa53df42 100644 --- a/be/src/io/fs/s3_file_write_bufferpool.h +++ b/be/src/io/fs/s3_file_write_bufferpool.h @@ -31,13 +31,14 @@ #include "util/slice.h" namespace doris { +class ThreadPool; namespace io { // TODO(AlexYue): 1. support write into cache 2. unify write buffer and read buffer struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> { using Callback = std::function<void()>; - S3FileBuffer() = default; + S3FileBuffer(ThreadPool* pool) { _thread_pool = pool; } ~S3FileBuffer() = default; void rob_buffer(std::shared_ptr<S3FileBuffer>& other) { @@ -110,13 +111,20 @@ struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> { // only served as one reserved buffer Slice _buf; size_t _append_offset {0}; + // not owned + ThreadPool* _thread_pool = nullptr; }; class S3FileBufferPool { public: - S3FileBufferPool(); + S3FileBufferPool() = default; ~S3FileBufferPool() = default; + // should be called one and only once + // at startup + void init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size, + doris::ThreadPool* thread_pool); + static S3FileBufferPool* GetInstance() { static S3FileBufferPool _pool; return &_pool; @@ -135,6 +143,8 @@ private: std::condition_variable _cv; std::unique_ptr<char[]> _whole_mem_buffer; std::list<Slice> _free_raw_buffers; + // not owned + ThreadPool* _thread_pool = nullptr; }; } // namespace io } // namespace doris diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 699f598967..246032ef9a 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -54,6 +54,7 @@ #include "common/signal_handler.h" #include "common/status.h" #include "io/cache/block/block_file_cache_factory.h" +#include "io/fs/s3_file_write_bufferpool.h" #include "olap/options.h" #include "olap/storage_engine.h" #include "runtime/exec_env.h" @@ -432,6 +433,12 @@ int main(int argc, char** argv) { doris::ExecEnv::init(exec_env, paths); doris::TabletSchemaCache::create_global_schema_cache(); + // init s3 write buffer pool + doris::io::S3FileBufferPool* s3_buffer_pool = doris::io::S3FileBufferPool::GetInstance(); + s3_buffer_pool->init(doris::config::s3_write_buffer_whole_size, + doris::config::s3_write_buffer_size, + exec_env->buffered_reader_prefetch_thread_pool()); + // init and open storage engine doris::EngineOptions options; options.store_paths = paths; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org