This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d49c412c59 [Feature](multi-catalog) Add hdfs benchmark tools. (#21074) d49c412c59 is described below commit d49c412c59aa94e3c0e5671e7c4f9cb676712542 Author: Qi Chen <kaka11.c...@gmail.com> AuthorDate: Sun Jun 25 09:35:27 2023 +0800 [Feature](multi-catalog) Add hdfs benchmark tools. (#21074) --- be/src/io/fs/benchmark/base_benchmark.h | 49 ++-- be/src/io/fs/benchmark/benchmark_factory.hpp | 61 ++++- be/src/io/fs/benchmark/fs_benchmark_tool.cpp | 22 +- be/src/io/fs/benchmark/hdfs_benchmark.hpp | 299 ++++++++++++++++++++++++ be/src/io/fs/benchmark/s3_benchmark.hpp | 10 +- bin/run-fs-benchmark.sh | 331 +++++++++++++++++++++++++++ 6 files changed, 741 insertions(+), 31 deletions(-) diff --git a/be/src/io/fs/benchmark/base_benchmark.h b/be/src/io/fs/benchmark/base_benchmark.h index bb1c00233f..bcc8ed284c 100644 --- a/be/src/io/fs/benchmark/base_benchmark.h +++ b/be/src/io/fs/benchmark/base_benchmark.h @@ -43,44 +43,61 @@ void bm_log(const std::string& fmt, Args&&... args) { class BaseBenchmark { public: - BaseBenchmark(const std::string& name, int iterations, - const std::map<std::string, std::string>& conf_map) - : _name(name), _iterations(iterations), _conf_map(conf_map) {} + BaseBenchmark(const std::string& name, int threads, int iterations, size_t file_size, + int repetitions, const std::map<std::string, std::string>& conf_map) + : _name(name), + _threads(threads), + _iterations(iterations), + _file_size(file_size), + _repetitions(repetitions), + _conf_map(conf_map) {} virtual ~BaseBenchmark() = default; virtual Status init() { return Status::OK(); } - virtual Status run() { return Status::OK(); } + virtual Status run(benchmark::State& state) { return Status::OK(); } void register_bm() { auto bm = benchmark::RegisterBenchmark(_name.c_str(), [&](benchmark::State& state) { - // first turn will use more time Status st; - st = this->init(); - if (!st) { - std::cerr << "failed to init. bm: " << _name << ", err: " << st; - return; + if (state.thread_index() == 0) { + st = this->init(); } - st = this->run(); - if (!st) { - std::cerr << "failed to run at first time. bm: " << _name << ", err: " << st; + if (st != Status::OK()) { + bm_log("Benchmark {} init error: {}", _name, st.to_string()); return; } for (auto _ : state) { - state.PauseTiming(); - this->init(); - state.ResumeTiming(); - this->run(); + st = this->run(state); + if (st != Status::OK()) { + bm_log("Benchmark {} run error: {}", _name, st.to_string()); + return; + } } }); + if (_threads != 0) { + bm->Threads(_threads); + } if (_iterations != 0) { bm->Iterations(_iterations); } + bm->Repetitions(_repetitions); + bm->Unit(benchmark::kMillisecond); + bm->UseManualTime(); + bm->ComputeStatistics("max", [](const std::vector<double>& v) -> double { + return *(std::max_element(std::begin(v), std::end(v))); + }); + bm->ComputeStatistics("min", [](const std::vector<double>& v) -> double { + return *(std::min_element(std::begin(v), std::end(v))); + }); } protected: std::string _name; + int _threads; int _iterations; + size_t _file_size; + int _repetitions = 3; std::map<std::string, std::string> _conf_map; }; diff --git a/be/src/io/fs/benchmark/benchmark_factory.hpp b/be/src/io/fs/benchmark/benchmark_factory.hpp index 3f48bd16ce..bc73f904be 100644 --- a/be/src/io/fs/benchmark/benchmark_factory.hpp +++ b/be/src/io/fs/benchmark/benchmark_factory.hpp @@ -21,23 +21,43 @@ #include <string> #include <vector> +#include "io/fs/benchmark/hdfs_benchmark.hpp" #include "io/fs/benchmark/s3_benchmark.hpp" namespace doris::io { class BenchmarkFactory { public: - static Status getBm(const std::string fs_type, const std::string op_type, int64_t iterations, + static Status getBm(const std::string fs_type, const std::string op_type, int64_t threads, + int64_t iterations, size_t file_size, const std::map<std::string, std::string>& conf_map, BaseBenchmark** bm); }; Status BenchmarkFactory::getBm(const std::string fs_type, const std::string op_type, - int64_t iterations, + int64_t threads, int64_t iterations, size_t file_size, const std::map<std::string, std::string>& conf_map, BaseBenchmark** bm) { if (fs_type == "s3") { if (op_type == "read") { - *bm = new S3ReadBenchmark(iterations, conf_map); + *bm = new S3ReadBenchmark(threads, iterations, file_size, conf_map); + } else { + return Status::Error<ErrorCode::INVALID_ARGUMENT>( + "unknown params: fs_type: {}, op_type: {}, iterations: {}", fs_type, op_type, + iterations); + } + } else if (fs_type == "hdfs") { + if (op_type == "create_write") { + *bm = new HdfsCreateWriteBenchmark(threads, iterations, file_size, conf_map); + } else if (op_type == "open_read") { + *bm = new HdfsOpenReadBenchmark(threads, iterations, file_size, conf_map); + } else if (op_type == "open") { + *bm = new HdfsOpenBenchmark(threads, iterations, file_size, conf_map); + } else if (op_type == "rename") { + *bm = new HdfsRenameBenchmark(threads, iterations, file_size, conf_map); + } else if (op_type == "delete") { + *bm = new HdfsDeleteBenchmark(threads, iterations, file_size, conf_map); + } else if (op_type == "exists") { + *bm = new HdfsExistsBenchmark(threads, iterations, file_size, conf_map); } else { return Status::Error<ErrorCode::INVALID_ARGUMENT>( "unknown params: fs_type: {}, op_type: {}, iterations: {}", fs_type, op_type, @@ -49,9 +69,15 @@ Status BenchmarkFactory::getBm(const std::string fs_type, const std::string op_t class MultiBenchmark { public: - MultiBenchmark(const std::string& type, const std::string& operation, int64_t iterations, + MultiBenchmark(const std::string& type, const std::string& operation, int64_t threads, + int64_t iterations, size_t file_size, const std::map<std::string, std::string>& conf_map) - : _type(type), _operation(operation), _iterations(iterations), _conf_map(conf_map) {} + : _type(type), + _operation(operation), + _threads(threads), + _iterations(iterations), + _file_size(file_size), + _conf_map(conf_map) {} ~MultiBenchmark() { for (auto bm : benchmarks) { @@ -59,11 +85,30 @@ public: } } - Status init_env() { return Status::OK(); } + Status init_env() { + std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf"; + if (!doris::config::init(conffile.c_str(), true, true, true)) { + fprintf(stderr, "error read config file. \n"); + return Status::Error<INTERNAL_ERROR>(); + } + doris::CpuInfo::init(); + Status status = Status::OK(); + if (doris::config::enable_java_support) { + // Init jni + status = doris::JniUtil::Init(); + if (!status.ok()) { + LOG(WARNING) << "Failed to initialize JNI: " << status; + exit(1); + } + } + + return Status::OK(); + } Status init_bms() { BaseBenchmark* bm; - Status st = BenchmarkFactory::getBm(_type, _operation, _iterations, _conf_map, &bm); + Status st = BenchmarkFactory::getBm(_type, _operation, _threads, _iterations, _file_size, + _conf_map, &bm); if (!st) { return st; } @@ -76,7 +121,9 @@ private: std::vector<BaseBenchmark*> benchmarks; std::string _type; std::string _operation; + int64_t _threads; int64_t _iterations; + size_t _file_size; std::map<std::string, std::string> _conf_map; }; diff --git a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp index ad8772bb0f..4002ea84ac 100644 --- a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp +++ b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp @@ -22,8 +22,11 @@ #include "io/fs/benchmark/benchmark_factory.hpp" DEFINE_string(fs_type, "hdfs", "Supported File System: s3, hdfs, local"); -DEFINE_string(operation, "read", "Supported Operations: read, write, open, size, list, connect"); +DEFINE_string(operation, "create_write", + "Supported Operations: create_write, open_read, open, rename, delete, exists"); +DEFINE_string(threads, "10", "Number of threads"); DEFINE_string(iterations, "10", "Number of runs"); +DEFINE_string(file_size, "104857600", "File size"); DEFINE_string(conf, "", "config file"); std::string get_usage(const std::string& progname) { @@ -31,17 +34,25 @@ std::string get_usage(const std::string& progname) { ss << progname << " is the Doris BE benchmark tool for testing file system.\n"; ss << "Usage:\n"; - ss << progname << " --fs_type=[fs_type] --operation=[op_type] --iterations=10\n"; + ss << progname + << " --fs_type=[fs_type] --operation=[op_type] --threads=10 --iterations=10 " + "--file_size=104857600\n"; ss << "\nfs_type:\n"; ss << " hdfs\n"; ss << " s3\n"; ss << "\nop_type:\n"; ss << " read\n"; ss << " write\n"; + ss << "\nthreads:\n"; + ss << " num of threads\n"; ss << "\niterations:\n"; ss << " num of run\n"; + ss << "\nfile_size:\n"; + ss << " file size\n"; ss << "\nExample:\n"; - ss << progname << " --conf my.conf --fs_type=s3 --operation=read --iterations=100\n"; + ss << progname + << " --conf my.conf --fs_type=s3 --operation=read --threads=10 --iterations=100 " + "--file_size=104857600\n"; return ss.str(); } @@ -93,8 +104,9 @@ int main(int argc, char** argv) { } try { - doris::io::MultiBenchmark multi_bm(FLAGS_fs_type, FLAGS_operation, - std::stoi(FLAGS_iterations), conf_map); + doris::io::MultiBenchmark multi_bm(FLAGS_fs_type, FLAGS_operation, std::stoi(FLAGS_threads), + std::stoi(FLAGS_iterations), std::stol(FLAGS_file_size), + conf_map); doris::Status st = multi_bm.init_env(); if (!st) { std::cout << "init env failed: " << st << std::endl; diff --git a/be/src/io/fs/benchmark/hdfs_benchmark.hpp b/be/src/io/fs/benchmark/hdfs_benchmark.hpp new file mode 100644 index 0000000000..637f7a614a --- /dev/null +++ b/be/src/io/fs/benchmark/hdfs_benchmark.hpp @@ -0,0 +1,299 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "io/file_factory.h" +#include "io/fs/benchmark/base_benchmark.h" +#include "io/fs/file_reader_writer_fwd.h" +#include "io/fs/file_writer.h" +#include "io/fs/hdfs_file_reader.h" +#include "io/fs/hdfs_file_system.h" +#include "io/hdfs_builder.h" +#include "util/jni-util.h" +#include "util/slice.h" + +namespace doris::io { + +class HdfsOpenReadBenchmark : public BaseBenchmark { +public: + HdfsOpenReadBenchmark(int threads, int iterations, size_t file_size, + const std::map<std::string, std::string>& conf_map) + : BaseBenchmark("HdfsReadBenchmark", threads, iterations, file_size, 3, conf_map) {} + virtual ~HdfsOpenReadBenchmark() = default; + + Status init() override { return Status::OK(); } + + Status run(benchmark::State& state) override { + std::shared_ptr<io::FileSystem> fs; + io::FileReaderSPtr reader; + bm_log("begin to init {}", _name); + std::string base_dir = _conf_map["baseDir"]; + size_t buffer_size = + _conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L; + io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); + THdfsParams hdfs_params = parse_properties(_conf_map); + auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); + bm_log("file_path: {}", file_path); + RETURN_IF_ERROR( + FileFactory::create_hdfs_reader(hdfs_params, file_path, &fs, &reader, reader_opts)); + bm_log("finish to init {}", _name); + + bm_log("begin to run {}", _name); + Status status; + std::vector<char> buffer; + buffer.resize(buffer_size); + doris::Slice data = {buffer.data(), buffer.size()}; + size_t offset = 0; + size_t bytes_read = 0; + + auto start = std::chrono::high_resolution_clock::now(); + size_t read_size = _file_size; + long remaining_size = read_size; + + while (remaining_size > 0) { + bytes_read = 0; + size_t size = std::min(buffer_size, (size_t)remaining_size); + data.size = size; + status = reader->read_at(offset, data, &bytes_read); + if (status != Status::OK() || bytes_read < 0) { + bm_log("reader read_at error: {}", status.to_string()); + break; + } + if (bytes_read == 0) { // EOF + break; + } + offset += bytes_read; + remaining_size -= bytes_read; + } + bm_log("finish to run {}", _name); + + auto end = std::chrono::high_resolution_clock::now(); + + auto elapsed_seconds = + std::chrono::duration_cast<std::chrono::duration<double>>(end - start); + + state.SetIterationTime(elapsed_seconds.count()); + + if (reader != nullptr) { + reader->close(); + } + return status; + } +}; + +class HdfsOpenBenchmark : public BaseBenchmark { +public: + HdfsOpenBenchmark(int threads, int iterations, size_t file_size, + const std::map<std::string, std::string>& conf_map) + : BaseBenchmark("HdfsOpenBenchmark", threads, iterations, file_size, 3, conf_map) {} + virtual ~HdfsOpenBenchmark() = default; + + Status init() override { return Status::OK(); } + + Status run(benchmark::State& state) override { + bm_log("begin to run {}", _name); + auto start = std::chrono::high_resolution_clock::now(); + std::string base_dir = _conf_map["baseDir"]; + io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); + THdfsParams hdfs_params = parse_properties(_conf_map); + auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); + bm_log("file_path: {}", file_path); + std::shared_ptr<io::HdfsFileSystem> fs; + io::FileReaderSPtr reader; + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + RETURN_IF_ERROR(fs->open_file(file_path, reader_opts, &reader)); + auto end = std::chrono::high_resolution_clock::now(); + auto elapsed_seconds = + std::chrono::duration_cast<std::chrono::duration<double>>(end - start); + + state.SetIterationTime(elapsed_seconds.count()); + bm_log("finish to run {}", _name); + + if (reader != nullptr) { + reader->close(); + } + return Status::OK(); + } + +private: +}; + +class HdfsCreateWriteBenchmark : public BaseBenchmark { +public: + HdfsCreateWriteBenchmark(int threads, int iterations, size_t file_size, + const std::map<std::string, std::string>& conf_map) + : BaseBenchmark("HdfsCreateWriteBenchmark", threads, iterations, file_size, 3, + conf_map) {} + virtual ~HdfsCreateWriteBenchmark() = default; + + Status init() override { + std::string base_dir = _conf_map["baseDir"]; + io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); + THdfsParams hdfs_params = parse_properties(_conf_map); + std::shared_ptr<io::HdfsFileSystem> fs; + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + RETURN_IF_ERROR(fs->delete_directory(base_dir)); + return Status::OK(); + } + + Status run(benchmark::State& state) override { + bm_log("begin to run {}", _name); + auto start = std::chrono::high_resolution_clock::now(); + std::string base_dir = _conf_map["baseDir"]; + io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); + THdfsParams hdfs_params = parse_properties(_conf_map); + auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); + bm_log("file_path: {}", file_path); + std::shared_ptr<io::HdfsFileSystem> fs; + io::FileWriterPtr writer; + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + RETURN_IF_ERROR(fs->create_file(file_path, &writer)); + Status status; + size_t write_size = _file_size; + size_t buffer_size = + _conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L; + long remaining_size = write_size; + std::vector<char> buffer; + buffer.resize(buffer_size); + doris::Slice data = {buffer.data(), buffer.size()}; + while (remaining_size > 0) { + size_t size = std::min(buffer_size, (size_t)remaining_size); + data.size = size; + status = writer->append(data); + if (status != Status::OK()) { + bm_log("writer append error: {}", status.to_string()); + break; + } + remaining_size -= size; + } + auto end = std::chrono::high_resolution_clock::now(); + auto elapsed_seconds = + std::chrono::duration_cast<std::chrono::duration<double>>(end - start); + + state.SetIterationTime(elapsed_seconds.count()); + bm_log("finish to run {}", _name); + + if (writer != nullptr) { + writer->close(); + } + return status; + } +}; + +class HdfsRenameBenchmark : public BaseBenchmark { +public: + HdfsRenameBenchmark(int threads, int iterations, size_t file_size, + const std::map<std::string, std::string>& conf_map) + : BaseBenchmark("HdfsRenameBenchmark", threads, 1, file_size, 1, conf_map) {} + virtual ~HdfsRenameBenchmark() = default; + + Status init() override { return Status::OK(); } + + Status run(benchmark::State& state) override { + bm_log("begin to run {}", _name); + auto start = std::chrono::high_resolution_clock::now(); + std::string base_dir = _conf_map["baseDir"]; + io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); + THdfsParams hdfs_params = parse_properties(_conf_map); + auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); + auto new_file_path = fmt::format("{}/test_{}_new", base_dir, state.thread_index()); + bm_log("file_path: {}", file_path); + std::shared_ptr<io::HdfsFileSystem> fs; + io::FileWriterPtr writer; + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + RETURN_IF_ERROR(fs->rename(file_path, new_file_path)); + auto end = std::chrono::high_resolution_clock::now(); + auto elapsed_seconds = + std::chrono::duration_cast<std::chrono::duration<double>>(end - start); + + state.SetIterationTime(elapsed_seconds.count()); + bm_log("finish to run {}", _name); + + if (writer != nullptr) { + writer->close(); + } + return Status::OK(); + } + +private: +}; + +class HdfsDeleteBenchmark : public BaseBenchmark { +public: + HdfsDeleteBenchmark(int threads, int iterations, size_t file_size, + const std::map<std::string, std::string>& conf_map) + : BaseBenchmark("HdfsDeleteBenchmark", threads, 1, file_size, 1, conf_map) {} + virtual ~HdfsDeleteBenchmark() = default; + + Status init() override { return Status::OK(); } + + Status run(benchmark::State& state) override { + bm_log("begin to run {}", _name); + auto start = std::chrono::high_resolution_clock::now(); + std::string base_dir = _conf_map["baseDir"]; + io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); + THdfsParams hdfs_params = parse_properties(_conf_map); + auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); + bm_log("file_path: {}", file_path); + std::shared_ptr<io::HdfsFileSystem> fs; + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + RETURN_IF_ERROR(fs->delete_file(file_path)); + auto end = std::chrono::high_resolution_clock::now(); + auto elapsed_seconds = + std::chrono::duration_cast<std::chrono::duration<double>>(end - start); + + state.SetIterationTime(elapsed_seconds.count()); + bm_log("finish to run {}", _name); + return Status::OK(); + } + +private: +}; + +class HdfsExistsBenchmark : public BaseBenchmark { +public: + HdfsExistsBenchmark(int threads, int iterations, size_t file_size, + const std::map<std::string, std::string>& conf_map) + : BaseBenchmark("HdfsExistsBenchmark", threads, iterations, file_size, 3, conf_map) {} + virtual ~HdfsExistsBenchmark() = default; + + Status init() override { return Status::OK(); } + + Status run(benchmark::State& state) override { + bm_log("begin to run {}", _name); + auto start = std::chrono::high_resolution_clock::now(); + std::string base_dir = _conf_map["baseDir"]; + io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); + THdfsParams hdfs_params = parse_properties(_conf_map); + auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); + bm_log("file_path: {}", file_path); + std::shared_ptr<io::HdfsFileSystem> fs; + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + bool res = false; + RETURN_IF_ERROR(fs->exists(file_path, &res)); + auto end = std::chrono::high_resolution_clock::now(); + auto elapsed_seconds = + std::chrono::duration_cast<std::chrono::duration<double>>(end - start); + + state.SetIterationTime(elapsed_seconds.count()); + bm_log("finish to run {}", _name); + return Status::OK(); + } +}; + +} // namespace doris::io diff --git a/be/src/io/fs/benchmark/s3_benchmark.hpp b/be/src/io/fs/benchmark/s3_benchmark.hpp index 5b9a81aaec..7e958cefdb 100644 --- a/be/src/io/fs/benchmark/s3_benchmark.hpp +++ b/be/src/io/fs/benchmark/s3_benchmark.hpp @@ -27,8 +27,10 @@ namespace doris::io { class S3ReadBenchmark : public BaseBenchmark { public: - S3ReadBenchmark(int iterations, const std::map<std::string, std::string>& conf_map) - : BaseBenchmark("S3ReadBenchmark", iterations, conf_map), _result(buffer, 128) {} + S3ReadBenchmark(int threads, int iterations, size_t file_size, + const std::map<std::string, std::string>& conf_map) + : BaseBenchmark("S3ReadBenchmark", threads, iterations, file_size, 3, conf_map), + _result(buffer, 128) {} virtual ~S3ReadBenchmark() = default; Status init() override { @@ -41,7 +43,9 @@ public: return Status::OK(); } - Status run() override { return _reader->read_at(0, _result, &_bytes_read); } + Status run(benchmark::State& state) override { + return _reader->read_at(0, _result, &_bytes_read); + } private: doris::S3Conf _s3_conf; diff --git a/bin/run-fs-benchmark.sh b/bin/run-fs-benchmark.sh new file mode 100755 index 0000000000..bf433ef178 --- /dev/null +++ b/bin/run-fs-benchmark.sh @@ -0,0 +1,331 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -eo pipefail + +curdir="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" + +MACHINE_OS=$(uname -s) +if [[ "$(uname -s)" == 'Darwin' ]] && command -v brew &>/dev/null; then + PATH="$(brew --prefix)/opt/gnu-getopt/bin:${PATH}" + export PATH +fi + +OPTS="$(getopt \ + -n "$0" \ + -o '' \ + -l 'conf:,fs_type:,operation:,threads:,iterations:,file_size:' \ + -- "$@")" + +eval set -- "${OPTS}" + +while true; do + case "$1" in + --conf) + CONF="$2" + shift 2 + ;; + --fs_type) + FS_TYPE="$2" + shift 2 + ;; + --operation) + OPERATION="$2" + shift 2 + ;; + --threads) + THREADS="$2" + shift 2 + ;; + --iterations) + ITERATIONS="$2" + shift 2 + ;; + --file_size) + FILE_SIZE="$2" + shift 2 + ;; + --) + shift + break + ;; + *) + echo "Internal error" + exit 1 + ;; + esac +done + +echo "CONF: ${CONF}" +echo "FS_TYPE: ${FS_TYPE}" +echo "OPERATION: ${OPERATION}" +echo "THREADS: ${THREADS}" +echo "ITERATIONS: ${ITERATIONS}" +echo "FILE_SIZE: ${FILE_SIZE}" + +DORIS_HOME="$( + cd "${curdir}/.." + pwd +)" +export DORIS_HOME + +if [[ "$(uname -s)" != 'Darwin' ]]; then + MAX_MAP_COUNT="$(cat /proc/sys/vm/max_map_count)" + if [[ "${MAX_MAP_COUNT}" -lt 2000000 ]]; then + echo "Please set vm.max_map_count to be 2000000 under root using 'sysctl -w vm.max_map_count=2000000'." + exit 1 + fi +fi + +MAX_FILE_COUNT="$(ulimit -n)" +if [[ "${MAX_FILE_COUNT}" -lt 65536 ]]; then + echo "Please set the maximum number of open file descriptors to be 65536 using 'ulimit -n 65536'." + exit 1 +fi + +# add java libs +for f in "${DORIS_HOME}/lib/java_extensions"/*.jar; do + if [[ -z "${DORIS_CLASSPATH}" ]]; then + export DORIS_CLASSPATH="${f}" + else + export DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}" + fi +done + +if [[ -d "${DORIS_HOME}/lib/hadoop_hdfs/" ]]; then + # add hadoop libs + for f in "${DORIS_HOME}/lib/hadoop_hdfs/common"/*.jar; do + DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}" + done + for f in "${DORIS_HOME}/lib/hadoop_hdfs/common/lib"/*.jar; do + DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}" + done + for f in "${DORIS_HOME}/lib/hadoop_hdfs/hdfs"/*.jar; do + DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}" + done + for f in "${DORIS_HOME}/lib/hadoop_hdfs/hdfs/lib"/*.jar; do + DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}" + done +fi + +# the CLASSPATH and LIBHDFS_OPTS is used for hadoop libhdfs +# and conf/ dir so that hadoop libhdfs can read .xml config file in conf/ +export CLASSPATH="${DORIS_HOME}/conf/:${DORIS_CLASSPATH}" +# DORIS_CLASSPATH is for self-managed jni +export DORIS_CLASSPATH="-Djava.class.path=${DORIS_CLASSPATH}" + +jdk_version() { + local java_cmd="${1}" + local result + local IFS=$'\n' + + if [[ -z "${java_cmd}" ]]; then + result=no_java + return 1 + else + local version + # remove \r for Cygwin + version="$("${java_cmd}" -Xms32M -Xmx32M -version 2>&1 | tr '\r' '\n' | grep version | awk '{print $3}')" + version="${version//\"/}" + if [[ "${version}" =~ ^1\. ]]; then + result="$(echo "${version}" | awk -F '.' '{print $2}')" + else + result="$(echo "${version}" | awk -F '.' '{print $1}')" + fi + fi + echo "${result}" + return 0 +} + +# export env variables from be.conf +# +# LOG_DIR +# PID_DIR +export LOG_DIR="${DORIS_HOME}/log" +PID_DIR="$( + cd "${curdir}" + pwd +)" +export PID_DIR + +# set odbc conf path +export ODBCSYSINI="${DORIS_HOME}/conf" + +# support utf8 for oracle database +export NLS_LANG='AMERICAN_AMERICA.AL32UTF8' + +# filter known leak. +export LSAN_OPTIONS="suppressions=${DORIS_HOME}/conf/lsan_suppr.conf" +export ASAN_OPTIONS="suppressions=${DORIS_HOME}/conf/asan_suppr.conf" + +while read -r line; do + envline="$(echo "${line}" | + sed 's/[[:blank:]]*=[[:blank:]]*/=/g' | + sed 's/^[[:blank:]]*//g' | + grep -E "^[[:upper:]]([[:upper:]]|_|[[:digit:]])*=" || + true)" + envline="$(eval "echo ${envline}")" + if [[ "${envline}" == *"="* ]]; then + eval 'export "${envline}"' + fi +done <"${DORIS_HOME}/conf/be.conf" + +if [[ -e "${DORIS_HOME}/bin/palo_env.sh" ]]; then + # shellcheck disable=1091 + source "${DORIS_HOME}/bin/palo_env.sh" +fi + +if [[ -z "${JAVA_HOME}" ]]; then + echo "The JAVA_HOME environment variable is not defined correctly" + echo "This environment variable is needed to run this program" + echo "NB: JAVA_HOME should point to a JDK not a JRE" + echo "You can set it in be.conf" + exit 1 +fi + +if [[ ! -d "${LOG_DIR}" ]]; then + mkdir -p "${LOG_DIR}" +fi + +pidfile="${PID_DIR}/fs_benchmark_tool.pid" + +if [[ -f "${pidfile}" ]]; then + if kill -0 "$(cat "${pidfile}")" >/dev/null 2>&1; then + echo "Backend running as process $(cat "${pidfile}"). Stop it first." + exit 1 + else + rm "${pidfile}" + fi +fi + +chmod 755 "${DORIS_HOME}/lib/fs_benchmark_tool" +echo "start time: $(date)" >>"${LOG_DIR}/fs_benchmark_tool.out" + +if [[ ! -f '/bin/limit3' ]]; then + LIMIT='' +else + LIMIT="/bin/limit3 -c 0 -n 65536" +fi + +## set asan and ubsan env to generate core file +export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0 +export UBSAN_OPTIONS=print_stacktrace=1 + +## set TCMALLOC_HEAP_LIMIT_MB to limit memory used by tcmalloc +set_tcmalloc_heap_limit() { + local total_mem_mb + local mem_limit_str + + if [[ "$(uname -s)" != 'Darwin' ]]; then + total_mem_mb="$(free -m | grep Mem | awk '{print $2}')" + else + total_mem_mb="$(($(sysctl -a hw.memsize | awk '{print $NF}') / 1024))" + fi + mem_limit_str=$(grep ^mem_limit "${DORIS_HOME}"/conf/be.conf) + local digits_unit=${mem_limit_str##*=} + digits_unit="${digits_unit#"${digits_unit%%[![:space:]]*}"}" + digits_unit="${digits_unit%"${digits_unit##*[![:space:]]}"}" + local digits=${digits_unit%%[^[:digit:]]*} + local unit=${digits_unit##*[[:digit:] ]} + + mem_limit_mb=0 + case ${unit} in + t | T) mem_limit_mb=$((digits * 1024 * 1024)) ;; + g | G) mem_limit_mb=$((digits * 1024)) ;; + m | M) mem_limit_mb=$((digits)) ;; + k | K) mem_limit_mb=$((digits / 1024)) ;; + %) mem_limit_mb=$((total_mem_mb * digits / 100)) ;; + *) mem_limit_mb=$((digits / 1024 / 1024 / 1024)) ;; + esac + + if [[ "${mem_limit_mb}" -eq 0 ]]; then + mem_limit_mb=$((total_mem_mb * 90 / 100)) + fi + + if [[ "${mem_limit_mb}" -gt "${total_mem_mb}" ]]; then + echo "mem_limit is larger than whole memory of the server. ${mem_limit_mb} > ${total_mem_mb}." + return 1 + fi + export TCMALLOC_HEAP_LIMIT_MB=${mem_limit_mb} +} + +# set_tcmalloc_heap_limit || exit 1 + +## set hdfs3 conf +if [[ -f "${DORIS_HOME}/conf/hdfs-site.xml" ]]; then + export LIBHDFS3_CONF="${DORIS_HOME}/conf/hdfs-site.xml" +fi + +# check java version and choose correct JAVA_OPTS +java_version="$( + set -e + jdk_version "${JAVA_HOME}/bin/java" +)" + +CUR_DATE=$(date +%Y%m%d-%H%M%S) +LOG_PATH="-DlogPath=${DORIS_HOME}/log/jni.log" +COMMON_OPTS="-Dsun.java.command=DorisBE -XX:-CriticalJNINatives" +JDBC_OPTS="-DJDBC_MIN_POOL=1 -DJDBC_MAX_POOL=100 -DJDBC_MAX_IDEL_TIME=300000" + +if [[ "${java_version}" -gt 8 ]]; then + if [[ -z ${JAVA_OPTS_FOR_JDK_9} ]]; then + JAVA_OPTS_FOR_JDK_9="-Xmx1024m ${LOG_PATH} -Xlog:gc:${DORIS_HOME}/log/fs_benchmark_tool.gc.log.${CUR_DATE} ${COMMON_OPTS} ${JDBC_OPTS}" + fi + final_java_opt="${JAVA_OPTS_FOR_JDK_9}" +else + if [[ -z ${JAVA_OPTS} ]]; then + JAVA_OPTS="-Xmx1024m ${LOG_PATH} -Xloggc:${DORIS_HOME}/log/fs_benchmark_tool.gc.log.${CUR_DATE} ${COMMON_OPTS} ${JDBC_OPTS}" + fi + final_java_opt="${JAVA_OPTS}" +fi + +if [[ "${MACHINE_OS}" == "Darwin" ]]; then + max_fd_limit='-XX:-MaxFDLimit' + + if ! echo "${final_java_opt}" | grep "${max_fd_limit/-/\-}" >/dev/null; then + final_java_opt="${final_java_opt} ${max_fd_limit}" + fi + + if [[ -n "${JAVA_OPTS}" ]] && ! echo "${JAVA_OPTS}" | grep "${max_fd_limit/-/\-}" >/dev/null; then + JAVA_OPTS="${JAVA_OPTS} ${max_fd_limit}" + fi +fi + +# set LIBHDFS_OPTS for hadoop libhdfs +export LIBHDFS_OPTS="${final_java_opt}" + +#echo "CLASSPATH: ${CLASSPATH}" +#echo "LD_LIBRARY_PATH: ${LD_LIBRARY_PATH}" +#echo "LIBHDFS_OPTS: ${LIBHDFS_OPTS}" + +# see https://github.com/apache/doris/blob/master/docs/zh-CN/community/developer-guide/debug-tool.md#jemalloc-heap-profile +export JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:30000,dirty_decay_ms:30000,oversize_threshold:0,lg_tcache_max:16,prof_prefix:jeprof.out" + +${LIMIT:+${LIMIT}} "${DORIS_HOME}/lib/fs_benchmark_tool" --conf "${CONF}" --fs_type="${FS_TYPE}" --operation="${OPERATION}" --threads="${THREADS}" --iterations="${ITERATIONS}" --file_size="${FILE_SIZE}" 2>&1 | tee "${LOG_DIR}/fs_benchmark_tool.log" + +qps="0MB/s" +latency="0ms" + +eval "$(grep "median" "${LOG_DIR}/fs_benchmark_tool.log" | awk '{printf("qps=%sMB/s latency=%sms", "'"${FILE_SIZE}"'" / 1024 / 1024 / ($2 * "'"${THREADS}"'" / 1000), $2 * "'"${THREADS}"'")}')" + +echo "------------------------------" +echo " Benchmark Result " +echo "------------------------------" +echo "thread_num: ${THREADS}." +echo "qps: ${qps}." +echo "latency: ${latency}." --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org