This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 8b3eefac0752f783e417c6efe2d137d24330945f
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Mon Apr 22 10:10:48 2024 +0800

    [improvement](spill) improve spill directory and fix bugs (#33900)
    
    * [improvement](spill) improve spill directory and fix bugs
    
    * fix
---
 be/src/common/config.cpp                           |  2 +-
 .../exec/partitioned_hash_join_probe_operator.cpp  | 10 ++++---
 .../exec/partitioned_hash_join_sink_operator.cpp   | 12 +++++++-
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  4 ---
 be/src/pipeline/exec/spill_sort_sink_operator.h    |  1 -
 be/src/runtime/query_context.cpp                   |  4 +++
 .../workload_group/workload_group_manager.cpp      |  2 +-
 be/src/service/doris_main.cpp                      |  3 ++
 be/src/vec/spill/spill_stream.cpp                  |  5 ++++
 be/src/vec/spill/spill_stream.h                    |  2 ++
 be/src/vec/spill/spill_stream_manager.cpp          | 33 ++++++++++++++++++----
 be/src/vec/spill/spill_stream_manager.h            |  2 ++
 12 files changed, 63 insertions(+), 17 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index e791a902704..aa50787d5b5 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1160,7 +1160,7 @@ DEFINE_mDouble(high_disk_avail_level_diff_usages, "0.15");
 // create tablet in partition random robin idx lru size, default 10000
 DEFINE_Int32(partition_disk_index_lru_size, "10000");
 // limit the storage space that query spill files can use
-DEFINE_String(spill_storage_root_path, "${DORIS_HOME}/storage");
+DEFINE_String(spill_storage_root_path, "");
 DEFINE_String(spill_storage_limit, "20%");   // 20%
 DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
 DEFINE_mInt32(spill_gc_file_count, "2000");
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 78dcaf1e6c5..2f766511984 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -251,7 +251,6 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
                     auto execution_context_lock = execution_context.lock();
                     if (!execution_context_lock) {
                         LOG(INFO) << "execution_context released, maybe query 
was cancelled.";
-                        _dependency->set_ready();
                         return;
                     }
                     
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
@@ -325,7 +324,11 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
     }
 
     auto& mutable_block = 
_shared_state->partitioned_build_blocks[partition_index];
-    DCHECK(mutable_block != nullptr);
+    if (!mutable_block) {
+        
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
+        spilled_stream.reset();
+        return Status::OK();
+    }
 
     auto execution_context = state->get_task_execution_context();
     _shared_state_holder = _shared_state->shared_from_this();
@@ -340,11 +343,11 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
             LOG(INFO) << "execution_context released, maybe query was 
cancelled.";
             return;
         }
+        SCOPED_ATTACH_TASK(state);
         _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
         SCOPED_TIMER(_recovery_build_timer);
         Defer defer([this] { --_spilling_task_count; });
         (void)state; // avoid ut compile error
-        SCOPED_ATTACH_TASK(state);
         DCHECK_EQ(_spill_status_ok.load(), true);
 
         bool eos = false;
@@ -654,7 +657,6 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
     if (local_state._need_to_setup_internal_operators) {
         *eos = false;
         bool has_data = false;
-        CHECK_EQ(local_state._dependency->is_blocked_by(), nullptr);
         RETURN_IF_ERROR(local_state.recovery_build_blocks_from_disk(
                 state, local_state._partition_cursor, has_data));
         if (has_data) {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index c9d61757461..b8454c19bf3 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -137,7 +137,17 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
 
     auto execution_context = state->get_task_execution_context();
     _dependency->block();
-    auto spill_func = [execution_context, build_block, state, this]() {
+    auto query_id = state->query_id();
+    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
+    auto spill_func = [execution_context, build_block, state, query_id, 
mem_tracker,
+                       this]() mutable {
+        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+        Defer defer {[&]() {
+            // need to reset build_block here, or else build_block will be 
destructed
+            // after SCOPED_ATTACH_TASK_WITH_ID and will trigger 
memory_orphan_check failure
+            build_block.reset();
+        }};
+
         auto execution_context_lock = execution_context.lock();
         if (!execution_context_lock) {
             LOG(INFO) << "execution_context released, maybe query was 
cancelled.";
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index cc80fc205d7..f5e74826bb6 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -138,10 +138,6 @@ Status SpillSortSinkOperatorX::open(RuntimeState* state) {
     RETURN_IF_ERROR(DataSinkOperatorX<LocalStateType>::open(state));
     return _sort_sink_operator->open(state);
 }
-Status SpillSortSinkOperatorX::close(RuntimeState* state) {
-    RETURN_IF_ERROR(DataSinkOperatorX<LocalStateType>::close(state));
-    return _sort_sink_operator->close(state);
-}
 Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state) {
     if (!_enable_spill) {
         return Status::OK();
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index d66215411aa..af0d8686acc 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -80,7 +80,6 @@ public:
 
     Status prepare(RuntimeState* state) override;
     Status open(RuntimeState* state) override;
-    Status close(RuntimeState* state) override;
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
     DataDistribution required_data_distribution() const override {
         return _sort_sink_operator->required_data_distribution();
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 10f54255741..436d5547ee9 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -26,6 +26,8 @@
 #include "runtime/thread_context.h"
 #include "runtime/workload_group/workload_group_manager.h"
 #include "util/mem_info.h"
+#include "util/uid_util.h"
+#include "vec/spill/spill_stream_manager.h"
 
 namespace doris {
 
@@ -146,6 +148,8 @@ QueryContext::~QueryContext() {
     _runtime_predicates.clear();
     file_scan_range_params_map.clear();
     obj_pool.clear();
+
+    _exec_env->spill_stream_mgr()->async_cleanup_query(_query_id);
 }
 
 void QueryContext::set_ready_to_execute(bool is_cancelled) {
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 2e75218e9cf..e336c9f80a8 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -183,7 +183,7 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
     // in process_mem_used.
     // we count these cache memories equally on workload groups.
     double ratio = (double)proc_vm_rss / (double)all_queries_mem_used;
-    if (ratio >= 1.25) {
+    if (ratio <= 1.25) {
         auto sys_mem_available = doris::MemInfo::sys_mem_available();
         std::string debug_msg = fmt::format(
                 "\nProcess Memory Summary: process_vm_rss: {}, process mem: 
{}, sys mem available: "
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index a218c7be6f1..731e09c6be9 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -411,6 +411,9 @@ int main(int argc, char** argv) {
     }
 
     std::vector<doris::StorePath> spill_paths;
+    if (doris::config::spill_storage_root_path.empty()) {
+        doris::config::spill_storage_root_path = 
doris::config::storage_root_path;
+    }
     olap_res = 
doris::parse_conf_store_paths(doris::config::spill_storage_root_path, 
&spill_paths);
     if (!olap_res) {
         LOG(ERROR) << "parse config spill storage path failed, path="
diff --git a/be/src/vec/spill/spill_stream.cpp 
b/be/src/vec/spill/spill_stream.cpp
index f5b6fea096d..ed7be9a0b28 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -25,6 +25,7 @@
 
 #include "io/fs/local_file_system.h"
 #include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "vec/core/block.h"
 #include "vec/spill/spill_reader.h"
@@ -88,6 +89,10 @@ void SpillStream::close() {
     }
 }
 
+const TUniqueId& SpillStream::query_id() const {
+    return state_->query_id();
+}
+
 const std::string& SpillStream::get_spill_root_dir() const {
     return data_dir_->path();
 }
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index 6b5166f2652..68abfa9aaf7 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -81,6 +81,8 @@ public:
         read_wait_io_timer_ = wait_io_timer;
     }
 
+    const TUniqueId& query_id() const;
+
 private:
     friend class SpillStreamManager;
 
diff --git a/be/src/vec/spill/spill_stream_manager.cpp 
b/be/src/vec/spill/spill_stream_manager.cpp
index 0259bef33f3..05a2531c466 100644
--- a/be/src/vec/spill/spill_stream_manager.cpp
+++ b/be/src/vec/spill/spill_stream_manager.cpp
@@ -36,6 +36,7 @@
 #include "util/pretty_printer.h"
 #include "util/runtime_profile.h"
 #include "util/time.h"
+#include "util/uid_util.h"
 #include "vec/spill/spill_stream.h"
 
 namespace doris::vectorized {
@@ -128,7 +129,7 @@ Status SpillStreamManager::_init_spill_store_map() {
 std::vector<SpillDataDir*> SpillStreamManager::_get_stores_for_spill(
         TStorageMedium::type storage_medium) {
     std::vector<SpillDataDir*> stores;
-    for (auto&& [_, store] : _spill_store_map) {
+    for (auto& [_, store] : _spill_store_map) {
         if (store->storage_medium() == storage_medium && 
!store->reach_capacity_limit(0)) {
             stores.push_back(store.get());
         }
@@ -188,7 +189,7 @@ Status 
SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea
     for (auto& dir : data_dirs) {
         data_dir = dir;
         std::string spill_root_dir = fmt::format("{}/{}", data_dir->path(), 
SPILL_DIR_PREFIX);
-        spill_dir = fmt::format("{}/{}-{}-{}-{}-{}", spill_root_dir, query_id, 
operator_name,
+        spill_dir = fmt::format("{}/{}/{}-{}-{}-{}", spill_root_dir, query_id, 
operator_name,
                                 node_id, state->task_id(), id);
         auto st = io::global_local_filesystem()->create_directory(spill_dir);
         if (!st.ok()) {
@@ -207,9 +208,15 @@ Status 
SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea
 }
 
 void SpillStreamManager::delete_spill_stream(SpillStreamSPtr stream) {
-    auto gc_dir = fmt::format("{}/{}/{}", stream->get_data_dir()->path(), 
SPILL_GC_DIR_PREFIX,
-                              
std::filesystem::path(stream->get_spill_dir()).filename().string());
-    (void)io::global_local_filesystem()->rename(stream->get_spill_dir(), 
gc_dir);
+    auto query_dir = fmt::format("{}/{}/{}", stream->get_data_dir()->path(), 
SPILL_GC_DIR_PREFIX,
+                                 print_id(stream->query_id()));
+    auto st = io::global_local_filesystem()->create_directory(query_dir);
+    if (st.ok()) {
+        auto gc_dir =
+                fmt::format("{}/{}", query_dir,
+                            
std::filesystem::path(stream->get_spill_dir()).filename().string());
+        (void)io::global_local_filesystem()->rename(stream->get_spill_dir(), 
gc_dir);
+    }
 }
 
 void SpillStreamManager::gc(int64_t max_file_count) {
@@ -266,6 +273,22 @@ void SpillStreamManager::gc(int64_t max_file_count) {
     }
 }
 
+void SpillStreamManager::async_cleanup_query(TUniqueId query_id) {
+    (void)get_async_task_thread_pool()->submit_func([this, query_id] {
+        for (auto& [_, store] : _spill_store_map) {
+            std::string query_spill_dir =
+                    fmt::format("{}/{}/{}", store->path(), SPILL_DIR_PREFIX, 
print_id(query_id));
+            bool exists = false;
+            auto status = 
io::global_local_filesystem()->exists(query_spill_dir, &exists);
+            if (status.ok() && exists) {
+                auto gc_dir = fmt::format("{}/{}/{}-gc", store->path(), 
SPILL_GC_DIR_PREFIX,
+                                          print_id(query_id));
+                (void)io::global_local_filesystem()->rename(query_spill_dir, 
gc_dir);
+            }
+        }
+    });
+}
+
 SpillDataDir::SpillDataDir(std::string path, int64_t capacity_bytes,
                            TStorageMedium::type storage_medium)
         : _path(std::move(path)),
diff --git a/be/src/vec/spill/spill_stream_manager.h 
b/be/src/vec/spill/spill_stream_manager.h
index 2d7350f775f..36062ce0b46 100644
--- a/be/src/vec/spill/spill_stream_manager.h
+++ b/be/src/vec/spill/spill_stream_manager.h
@@ -108,6 +108,8 @@ public:
     // 标记SpillStream需要被删除,在GC线程中异步删除落盘文件
     void delete_spill_stream(SpillStreamSPtr spill_stream);
 
+    void async_cleanup_query(TUniqueId query_id);
+
     void gc(int64_t max_file_count);
 
     ThreadPool* get_spill_io_thread_pool(const std::string& path) const {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to