This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b41b17ad0a6bd542767698937ff712c4d4e542b0
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Mon Mar 11 22:48:16 2024 +0800

    [fix](spill) fix storage engine type cast error (#32071)
---
 be/src/vec/spill/spill_stream.cpp         |  5 +++-
 be/src/vec/spill/spill_stream.h           | 11 ++++----
 be/src/vec/spill/spill_stream_manager.cpp | 46 +++++++++++++++++++++++++------
 be/src/vec/spill/spill_stream_manager.h   | 43 +++++++++++++++++++++++++++--
 be/src/vec/spill/spill_writer.cpp         |  1 -
 be/src/vec/spill/spill_writer.h           |  6 ++--
 6 files changed, 90 insertions(+), 22 deletions(-)

diff --git a/be/src/vec/spill/spill_stream.cpp 
b/be/src/vec/spill/spill_stream.cpp
index d770e44a147..d08b63df40b 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -32,7 +32,7 @@
 #include "vec/spill/spill_writer.h"
 
 namespace doris::vectorized {
-SpillStream::SpillStream(RuntimeState* state, int64_t stream_id, 
doris::DataDir* data_dir,
+SpillStream::SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* 
data_dir,
                          std::string spill_dir, size_t batch_rows, size_t 
batch_bytes,
                          RuntimeProfile* profile)
         : state_(state),
@@ -72,6 +72,9 @@ void SpillStream::close() {
     (void)reader_->close();
 }
 
+const std::string& SpillStream::get_spill_root_dir() const {
+    return data_dir_->path();
+}
 Status SpillStream::prepare_spill() {
     DCHECK(!spill_promise_);
     RETURN_IF_ERROR(writer_->open());
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index 579449e503c..9f328240d75 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -20,29 +20,28 @@
 #include <future>
 #include <memory>
 
-#include "olap/data_dir.h"
 #include "vec/spill/spill_reader.h"
 #include "vec/spill/spill_writer.h"
 
 namespace doris {
 class RuntimeProfile;
-class DataDir;
 class ThreadPool;
 
 namespace vectorized {
 
 class Block;
+class SpillDataDir;
 
 class SpillStream {
 public:
-    SpillStream(RuntimeState* state, int64_t stream_id, doris::DataDir* 
data_dir,
+    SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* data_dir,
                 std::string spill_dir, size_t batch_rows, size_t batch_bytes,
                 RuntimeProfile* profile);
 
     int64_t id() const { return stream_id_; }
 
-    DataDir* get_data_dir() const { return data_dir_; }
-    const std::string& get_spill_root_dir() const { return data_dir_->path(); }
+    SpillDataDir* get_data_dir() const { return data_dir_; }
+    const std::string& get_spill_root_dir() const;
 
     const std::string& get_spill_dir() const { return spill_dir_; }
 
@@ -85,7 +84,7 @@ private:
     ThreadPool* io_thread_pool_;
     int64_t stream_id_;
     std::atomic_bool closed_ = false;
-    doris::DataDir* data_dir_ = nullptr;
+    SpillDataDir* data_dir_ = nullptr;
     std::string spill_dir_;
     size_t batch_rows_;
     size_t batch_bytes_;
diff --git a/be/src/vec/spill/spill_stream_manager.cpp 
b/be/src/vec/spill/spill_stream_manager.cpp
index 4dfd847482d..ecbee7fb858 100644
--- a/be/src/vec/spill/spill_stream_manager.cpp
+++ b/be/src/vec/spill/spill_stream_manager.cpp
@@ -29,9 +29,8 @@
 
 #include "io/fs/file_system.h"
 #include "io/fs/local_file_system.h"
-#include "olap/data_dir.h"
 #include "olap/olap_define.h"
-#include "olap/storage_engine.h"
+#include "runtime/runtime_state.h"
 #include "util/runtime_profile.h"
 #include "util/time.h"
 #include "vec/spill/spill_stream.h"
@@ -112,9 +111,9 @@ void SpillStreamManager::_spill_gc_thread_callback() {
 
 Status SpillStreamManager::_init_spill_store_map() {
     for (const auto& path : _spill_store_paths) {
-        auto store = 
std::make_unique<DataDir>(ExecEnv::GetInstance()->storage_engine().to_local(),
-                                               path.path, path.capacity_bytes, 
path.storage_medium);
-        auto st = store->init(false);
+        auto store =
+                std::make_unique<SpillDataDir>(path.path, path.capacity_bytes, 
path.storage_medium);
+        auto st = store->init();
         if (!st.ok()) {
             LOG(WARNING) << "Store load failed, status=" << st.to_string()
                          << ", path=" << store->path();
@@ -126,9 +125,9 @@ Status SpillStreamManager::_init_spill_store_map() {
     return Status::OK();
 }
 
-std::vector<DataDir*> SpillStreamManager::_get_stores_for_spill(
+std::vector<SpillDataDir*> SpillStreamManager::_get_stores_for_spill(
         TStorageMedium::type storage_medium) {
-    std::vector<DataDir*> stores;
+    std::vector<SpillDataDir*> stores;
     for (auto&& [_, store] : _spill_store_map) {
         if (store->storage_medium() == storage_medium && 
!store->reach_capacity_limit(0)) {
             stores.push_back(store.get());
@@ -136,7 +135,7 @@ std::vector<DataDir*> 
SpillStreamManager::_get_stores_for_spill(
     }
 
     std::sort(stores.begin(), stores.end(),
-              [](DataDir* a, DataDir* b) { return a->get_usage(0) < 
b->get_usage(0); });
+              [](SpillDataDir* a, SpillDataDir* b) { return a->get_usage(0) < 
b->get_usage(0); });
 
     size_t seventy_percent_index = stores.size();
     size_t eighty_five_percent_index = stores.size();
@@ -176,7 +175,7 @@ Status 
SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea
 
     int64_t id = id_++;
     std::string spill_dir;
-    doris::DataDir* data_dir = nullptr;
+    SpillDataDir* data_dir = nullptr;
     for (auto& dir : data_dirs) {
         data_dir = dir;
         std::string spill_root_dir = fmt::format("{}/{}", data_dir->path(), 
SPILL_DIR_PREFIX);
@@ -259,4 +258,33 @@ void SpillStreamManager::gc(int64_t max_file_count) {
         }
     }
 }
+
+SpillDataDir::SpillDataDir(const std::string& path, int64_t capacity_bytes,
+                           TStorageMedium::type storage_medium)
+        : _path(path),
+          _available_bytes(0),
+          _disk_capacity_bytes(0),
+          _storage_medium(storage_medium) {}
+
+Status SpillDataDir::init() {
+    bool exists = false;
+    RETURN_IF_ERROR(io::global_local_filesystem()->exists(_path, &exists));
+    if (!exists) {
+        RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed, 
path={}", _path),
+                                       "check file exist failed");
+    }
+
+    return Status::OK();
+}
+bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) {
+    double used_pct = get_usage(incoming_data_size);
+    int64_t left_bytes = _available_bytes - incoming_data_size;
+    if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
+        left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
+        LOG(WARNING) << "reach capacity limit. used pct: " << used_pct
+                     << ", left bytes: " << left_bytes << ", path: " << _path;
+        return true;
+    }
+    return false;
+}
 } // namespace doris::vectorized
diff --git a/be/src/vec/spill/spill_stream_manager.h 
b/be/src/vec/spill/spill_stream_manager.h
index 2b412072038..f73f840458e 100644
--- a/be/src/vec/spill/spill_stream_manager.h
+++ b/be/src/vec/spill/spill_stream_manager.h
@@ -31,6 +31,45 @@ class RuntimeProfile;
 
 namespace vectorized {
 
+class SpillDataDir {
+public:
+    SpillDataDir(const std::string& path, int64_t capacity_bytes = -1,
+                 TStorageMedium::type storage_medium = TStorageMedium::HDD);
+
+    Status init();
+
+    const std::string& path() const { return _path; }
+
+    bool is_ssd_disk() const { return _storage_medium == TStorageMedium::SSD; }
+
+    TStorageMedium::type storage_medium() const { return _storage_medium; }
+
+    // check if the capacity reach the limit after adding the incoming data
+    // return true if limit reached, otherwise, return false.
+    // TODO(cmy): for now we can not precisely calculate the capacity Doris 
used,
+    // so in order to avoid running out of disk capacity, we currently use the 
actual
+    // disk available capacity and total capacity to do the calculation.
+    // So that the capacity Doris actually used may exceeds the user specified 
capacity.
+    bool reach_capacity_limit(int64_t incoming_data_size);
+
+    Status update_capacity();
+
+    double get_usage(int64_t incoming_data_size) const {
+        return _disk_capacity_bytes == 0
+                       ? 0
+                       : (_disk_capacity_bytes - _available_bytes + 
incoming_data_size) /
+                                 (double)_disk_capacity_bytes;
+    }
+
+private:
+    std::string _path;
+
+    // the actual available capacity of the disk of this data dir
+    size_t _available_bytes;
+    // the actual capacity of the disk of this data dir
+    size_t _disk_capacity_bytes;
+    TStorageMedium::type _storage_medium;
+};
 class SpillStreamManager {
 public:
     SpillStreamManager(const std::vector<StorePath>& paths);
@@ -74,10 +113,10 @@ public:
 private:
     Status _init_spill_store_map();
     void _spill_gc_thread_callback();
-    std::vector<DataDir*> _get_stores_for_spill(TStorageMedium::type 
storage_medium);
+    std::vector<SpillDataDir*> _get_stores_for_spill(TStorageMedium::type 
storage_medium);
 
     std::vector<StorePath> _spill_store_paths;
-    std::unordered_map<std::string, std::unique_ptr<DataDir>> _spill_store_map;
+    std::unordered_map<std::string, std::unique_ptr<SpillDataDir>> 
_spill_store_map;
 
     CountDownLatch _stop_background_threads_latch;
     std::unique_ptr<ThreadPool> async_task_thread_pool_;
diff --git a/be/src/vec/spill/spill_writer.cpp 
b/be/src/vec/spill/spill_writer.cpp
index 6e048ecd0dd..d51dcf8f1ec 100644
--- a/be/src/vec/spill/spill_writer.cpp
+++ b/be/src/vec/spill/spill_writer.cpp
@@ -21,7 +21,6 @@
 #include "io/file_factory.h"
 #include "io/fs/local_file_system.h"
 #include "io/fs/local_file_writer.h"
-#include "olap/data_dir.h"
 #include "runtime/exec_env.h"
 #include "runtime/thread_context.h"
 #include "vec/spill/spill_stream_manager.h"
diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h
index 1ecda1aff9c..14e8120f775 100644
--- a/be/src/vec/spill/spill_writer.h
+++ b/be/src/vec/spill/spill_writer.h
@@ -25,12 +25,12 @@
 #include "util/runtime_profile.h"
 #include "vec/core/block.h"
 namespace doris {
-class DataDir;
 
 namespace vectorized {
+class SpillDataDir;
 class SpillWriter {
 public:
-    SpillWriter(int64_t id, size_t batch_size, doris::DataDir* data_dir, const 
std::string& dir)
+    SpillWriter(int64_t id, size_t batch_size, SpillDataDir* data_dir, const 
std::string& dir)
             : data_dir_(data_dir), stream_id_(id), batch_size_(batch_size) {
         file_path_ = dir + "/" + std::to_string(file_index_);
     }
@@ -66,7 +66,7 @@ private:
 
     // not owned, point to the data dir of this rowset
     // for checking disk capacity when write data to disk.
-    doris::DataDir* data_dir_ = nullptr;
+    SpillDataDir* data_dir_ = nullptr;
     std::atomic_bool closed_ = false;
     int64_t stream_id_;
     size_t batch_size_;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to