This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit b41b17ad0a6bd542767698937ff712c4d4e542b0 Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Mon Mar 11 22:48:16 2024 +0800 [fix](spill) fix storage engine type cast error (#32071) --- be/src/vec/spill/spill_stream.cpp | 5 +++- be/src/vec/spill/spill_stream.h | 11 ++++---- be/src/vec/spill/spill_stream_manager.cpp | 46 +++++++++++++++++++++++++------ be/src/vec/spill/spill_stream_manager.h | 43 +++++++++++++++++++++++++++-- be/src/vec/spill/spill_writer.cpp | 1 - be/src/vec/spill/spill_writer.h | 6 ++-- 6 files changed, 90 insertions(+), 22 deletions(-) diff --git a/be/src/vec/spill/spill_stream.cpp b/be/src/vec/spill/spill_stream.cpp index d770e44a147..d08b63df40b 100644 --- a/be/src/vec/spill/spill_stream.cpp +++ b/be/src/vec/spill/spill_stream.cpp @@ -32,7 +32,7 @@ #include "vec/spill/spill_writer.h" namespace doris::vectorized { -SpillStream::SpillStream(RuntimeState* state, int64_t stream_id, doris::DataDir* data_dir, +SpillStream::SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* data_dir, std::string spill_dir, size_t batch_rows, size_t batch_bytes, RuntimeProfile* profile) : state_(state), @@ -72,6 +72,9 @@ void SpillStream::close() { (void)reader_->close(); } +const std::string& SpillStream::get_spill_root_dir() const { + return data_dir_->path(); +} Status SpillStream::prepare_spill() { DCHECK(!spill_promise_); RETURN_IF_ERROR(writer_->open()); diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h index 579449e503c..9f328240d75 100644 --- a/be/src/vec/spill/spill_stream.h +++ b/be/src/vec/spill/spill_stream.h @@ -20,29 +20,28 @@ #include <future> #include <memory> -#include "olap/data_dir.h" #include "vec/spill/spill_reader.h" #include "vec/spill/spill_writer.h" namespace doris { class RuntimeProfile; -class DataDir; class ThreadPool; namespace vectorized { class Block; +class SpillDataDir; class SpillStream { public: - SpillStream(RuntimeState* state, int64_t stream_id, doris::DataDir* data_dir, + SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* data_dir, std::string spill_dir, size_t batch_rows, size_t batch_bytes, RuntimeProfile* profile); int64_t id() const { return stream_id_; } - DataDir* get_data_dir() const { return data_dir_; } - const std::string& get_spill_root_dir() const { return data_dir_->path(); } + SpillDataDir* get_data_dir() const { return data_dir_; } + const std::string& get_spill_root_dir() const; const std::string& get_spill_dir() const { return spill_dir_; } @@ -85,7 +84,7 @@ private: ThreadPool* io_thread_pool_; int64_t stream_id_; std::atomic_bool closed_ = false; - doris::DataDir* data_dir_ = nullptr; + SpillDataDir* data_dir_ = nullptr; std::string spill_dir_; size_t batch_rows_; size_t batch_bytes_; diff --git a/be/src/vec/spill/spill_stream_manager.cpp b/be/src/vec/spill/spill_stream_manager.cpp index 4dfd847482d..ecbee7fb858 100644 --- a/be/src/vec/spill/spill_stream_manager.cpp +++ b/be/src/vec/spill/spill_stream_manager.cpp @@ -29,9 +29,8 @@ #include "io/fs/file_system.h" #include "io/fs/local_file_system.h" -#include "olap/data_dir.h" #include "olap/olap_define.h" -#include "olap/storage_engine.h" +#include "runtime/runtime_state.h" #include "util/runtime_profile.h" #include "util/time.h" #include "vec/spill/spill_stream.h" @@ -112,9 +111,9 @@ void SpillStreamManager::_spill_gc_thread_callback() { Status SpillStreamManager::_init_spill_store_map() { for (const auto& path : _spill_store_paths) { - auto store = std::make_unique<DataDir>(ExecEnv::GetInstance()->storage_engine().to_local(), - path.path, path.capacity_bytes, path.storage_medium); - auto st = store->init(false); + auto store = + std::make_unique<SpillDataDir>(path.path, path.capacity_bytes, path.storage_medium); + auto st = store->init(); if (!st.ok()) { LOG(WARNING) << "Store load failed, status=" << st.to_string() << ", path=" << store->path(); @@ -126,9 +125,9 @@ Status SpillStreamManager::_init_spill_store_map() { return Status::OK(); } -std::vector<DataDir*> SpillStreamManager::_get_stores_for_spill( +std::vector<SpillDataDir*> SpillStreamManager::_get_stores_for_spill( TStorageMedium::type storage_medium) { - std::vector<DataDir*> stores; + std::vector<SpillDataDir*> stores; for (auto&& [_, store] : _spill_store_map) { if (store->storage_medium() == storage_medium && !store->reach_capacity_limit(0)) { stores.push_back(store.get()); @@ -136,7 +135,7 @@ std::vector<DataDir*> SpillStreamManager::_get_stores_for_spill( } std::sort(stores.begin(), stores.end(), - [](DataDir* a, DataDir* b) { return a->get_usage(0) < b->get_usage(0); }); + [](SpillDataDir* a, SpillDataDir* b) { return a->get_usage(0) < b->get_usage(0); }); size_t seventy_percent_index = stores.size(); size_t eighty_five_percent_index = stores.size(); @@ -176,7 +175,7 @@ Status SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea int64_t id = id_++; std::string spill_dir; - doris::DataDir* data_dir = nullptr; + SpillDataDir* data_dir = nullptr; for (auto& dir : data_dirs) { data_dir = dir; std::string spill_root_dir = fmt::format("{}/{}", data_dir->path(), SPILL_DIR_PREFIX); @@ -259,4 +258,33 @@ void SpillStreamManager::gc(int64_t max_file_count) { } } } + +SpillDataDir::SpillDataDir(const std::string& path, int64_t capacity_bytes, + TStorageMedium::type storage_medium) + : _path(path), + _available_bytes(0), + _disk_capacity_bytes(0), + _storage_medium(storage_medium) {} + +Status SpillDataDir::init() { + bool exists = false; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(_path, &exists)); + if (!exists) { + RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed, path={}", _path), + "check file exist failed"); + } + + return Status::OK(); +} +bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) { + double used_pct = get_usage(incoming_data_size); + int64_t left_bytes = _available_bytes - incoming_data_size; + if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 && + left_bytes <= config::storage_flood_stage_left_capacity_bytes) { + LOG(WARNING) << "reach capacity limit. used pct: " << used_pct + << ", left bytes: " << left_bytes << ", path: " << _path; + return true; + } + return false; +} } // namespace doris::vectorized diff --git a/be/src/vec/spill/spill_stream_manager.h b/be/src/vec/spill/spill_stream_manager.h index 2b412072038..f73f840458e 100644 --- a/be/src/vec/spill/spill_stream_manager.h +++ b/be/src/vec/spill/spill_stream_manager.h @@ -31,6 +31,45 @@ class RuntimeProfile; namespace vectorized { +class SpillDataDir { +public: + SpillDataDir(const std::string& path, int64_t capacity_bytes = -1, + TStorageMedium::type storage_medium = TStorageMedium::HDD); + + Status init(); + + const std::string& path() const { return _path; } + + bool is_ssd_disk() const { return _storage_medium == TStorageMedium::SSD; } + + TStorageMedium::type storage_medium() const { return _storage_medium; } + + // check if the capacity reach the limit after adding the incoming data + // return true if limit reached, otherwise, return false. + // TODO(cmy): for now we can not precisely calculate the capacity Doris used, + // so in order to avoid running out of disk capacity, we currently use the actual + // disk available capacity and total capacity to do the calculation. + // So that the capacity Doris actually used may exceeds the user specified capacity. + bool reach_capacity_limit(int64_t incoming_data_size); + + Status update_capacity(); + + double get_usage(int64_t incoming_data_size) const { + return _disk_capacity_bytes == 0 + ? 0 + : (_disk_capacity_bytes - _available_bytes + incoming_data_size) / + (double)_disk_capacity_bytes; + } + +private: + std::string _path; + + // the actual available capacity of the disk of this data dir + size_t _available_bytes; + // the actual capacity of the disk of this data dir + size_t _disk_capacity_bytes; + TStorageMedium::type _storage_medium; +}; class SpillStreamManager { public: SpillStreamManager(const std::vector<StorePath>& paths); @@ -74,10 +113,10 @@ public: private: Status _init_spill_store_map(); void _spill_gc_thread_callback(); - std::vector<DataDir*> _get_stores_for_spill(TStorageMedium::type storage_medium); + std::vector<SpillDataDir*> _get_stores_for_spill(TStorageMedium::type storage_medium); std::vector<StorePath> _spill_store_paths; - std::unordered_map<std::string, std::unique_ptr<DataDir>> _spill_store_map; + std::unordered_map<std::string, std::unique_ptr<SpillDataDir>> _spill_store_map; CountDownLatch _stop_background_threads_latch; std::unique_ptr<ThreadPool> async_task_thread_pool_; diff --git a/be/src/vec/spill/spill_writer.cpp b/be/src/vec/spill/spill_writer.cpp index 6e048ecd0dd..d51dcf8f1ec 100644 --- a/be/src/vec/spill/spill_writer.cpp +++ b/be/src/vec/spill/spill_writer.cpp @@ -21,7 +21,6 @@ #include "io/file_factory.h" #include "io/fs/local_file_system.h" #include "io/fs/local_file_writer.h" -#include "olap/data_dir.h" #include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "vec/spill/spill_stream_manager.h" diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h index 1ecda1aff9c..14e8120f775 100644 --- a/be/src/vec/spill/spill_writer.h +++ b/be/src/vec/spill/spill_writer.h @@ -25,12 +25,12 @@ #include "util/runtime_profile.h" #include "vec/core/block.h" namespace doris { -class DataDir; namespace vectorized { +class SpillDataDir; class SpillWriter { public: - SpillWriter(int64_t id, size_t batch_size, doris::DataDir* data_dir, const std::string& dir) + SpillWriter(int64_t id, size_t batch_size, SpillDataDir* data_dir, const std::string& dir) : data_dir_(data_dir), stream_id_(id), batch_size_(batch_size) { file_path_ = dir + "/" + std::to_string(file_index_); } @@ -66,7 +66,7 @@ private: // not owned, point to the data dir of this rowset // for checking disk capacity when write data to disk. - doris::DataDir* data_dir_ = nullptr; + SpillDataDir* data_dir_ = nullptr; std::atomic_bool closed_ = false; int64_t stream_id_; size_t batch_size_; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org