This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch branch-4.0-preview in repository https://gitbox.apache.org/repos/asf/doris.git
commit 8b3eefac0752f783e417c6efe2d137d24330945f Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Mon Apr 22 10:10:48 2024 +0800 [improvement](spill) improve spill directory and fix bugs (#33900) * [improvement](spill) improve spill directory and fix bugs * fix --- be/src/common/config.cpp | 2 +- .../exec/partitioned_hash_join_probe_operator.cpp | 10 ++++--- .../exec/partitioned_hash_join_sink_operator.cpp | 12 +++++++- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 4 --- be/src/pipeline/exec/spill_sort_sink_operator.h | 1 - be/src/runtime/query_context.cpp | 4 +++ .../workload_group/workload_group_manager.cpp | 2 +- be/src/service/doris_main.cpp | 3 ++ be/src/vec/spill/spill_stream.cpp | 5 ++++ be/src/vec/spill/spill_stream.h | 2 ++ be/src/vec/spill/spill_stream_manager.cpp | 33 ++++++++++++++++++---- be/src/vec/spill/spill_stream_manager.h | 2 ++ 12 files changed, 63 insertions(+), 17 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index e791a902704..aa50787d5b5 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1160,7 +1160,7 @@ DEFINE_mDouble(high_disk_avail_level_diff_usages, "0.15"); // create tablet in partition random robin idx lru size, default 10000 DEFINE_Int32(partition_disk_index_lru_size, "10000"); // limit the storage space that query spill files can use -DEFINE_String(spill_storage_root_path, "${DORIS_HOME}/storage"); +DEFINE_String(spill_storage_root_path, ""); DEFINE_String(spill_storage_limit, "20%"); // 20% DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s DEFINE_mInt32(spill_gc_file_count, "2000"); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 78dcaf1e6c5..2f766511984 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -251,7 +251,6 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { LOG(INFO) << "execution_context released, maybe query was cancelled."; - _dependency->set_ready(); return; } _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); @@ -325,7 +324,11 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti } auto& mutable_block = _shared_state->partitioned_build_blocks[partition_index]; - DCHECK(mutable_block != nullptr); + if (!mutable_block) { + ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); + spilled_stream.reset(); + return Status::OK(); + } auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); @@ -340,11 +343,11 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti LOG(INFO) << "execution_context released, maybe query was cancelled."; return; } + SCOPED_ATTACH_TASK(state); _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_TIMER(_recovery_build_timer); Defer defer([this] { --_spilling_task_count; }); (void)state; // avoid ut compile error - SCOPED_ATTACH_TASK(state); DCHECK_EQ(_spill_status_ok.load(), true); bool eos = false; @@ -654,7 +657,6 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, if (local_state._need_to_setup_internal_operators) { *eos = false; bool has_data = false; - CHECK_EQ(local_state._dependency->is_blocked_by(), nullptr); RETURN_IF_ERROR(local_state.recovery_build_blocks_from_disk( state, local_state._partition_cursor, has_data)); if (has_data) { diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index c9d61757461..b8454c19bf3 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -137,7 +137,17 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta auto execution_context = state->get_task_execution_context(); _dependency->block(); - auto spill_func = [execution_context, build_block, state, this]() { + auto query_id = state->query_id(); + auto mem_tracker = state->get_query_ctx()->query_mem_tracker; + auto spill_func = [execution_context, build_block, state, query_id, mem_tracker, + this]() mutable { + SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + Defer defer {[&]() { + // need to reset build_block here, or else build_block will be destructed + // after SCOPED_ATTACH_TASK_WITH_ID and will trigger memory_orphan_check failure + build_block.reset(); + }}; + auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { LOG(INFO) << "execution_context released, maybe query was cancelled."; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index cc80fc205d7..f5e74826bb6 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -138,10 +138,6 @@ Status SpillSortSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX<LocalStateType>::open(state)); return _sort_sink_operator->open(state); } -Status SpillSortSinkOperatorX::close(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX<LocalStateType>::close(state)); - return _sort_sink_operator->close(state); -} Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state) { if (!_enable_spill) { return Status::OK(); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index d66215411aa..af0d8686acc 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -80,7 +80,6 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status close(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { return _sort_sink_operator->required_data_distribution(); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 10f54255741..436d5547ee9 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -26,6 +26,8 @@ #include "runtime/thread_context.h" #include "runtime/workload_group/workload_group_manager.h" #include "util/mem_info.h" +#include "util/uid_util.h" +#include "vec/spill/spill_stream_manager.h" namespace doris { @@ -146,6 +148,8 @@ QueryContext::~QueryContext() { _runtime_predicates.clear(); file_scan_range_params_map.clear(); obj_pool.clear(); + + _exec_env->spill_stream_mgr()->async_cleanup_query(_query_id); } void QueryContext::set_ready_to_execute(bool is_cancelled) { diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 2e75218e9cf..e336c9f80a8 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -183,7 +183,7 @@ void WorkloadGroupMgr::refresh_wg_memory_info() { // in process_mem_used. // we count these cache memories equally on workload groups. double ratio = (double)proc_vm_rss / (double)all_queries_mem_used; - if (ratio >= 1.25) { + if (ratio <= 1.25) { auto sys_mem_available = doris::MemInfo::sys_mem_available(); std::string debug_msg = fmt::format( "\nProcess Memory Summary: process_vm_rss: {}, process mem: {}, sys mem available: " diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index a218c7be6f1..731e09c6be9 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -411,6 +411,9 @@ int main(int argc, char** argv) { } std::vector<doris::StorePath> spill_paths; + if (doris::config::spill_storage_root_path.empty()) { + doris::config::spill_storage_root_path = doris::config::storage_root_path; + } olap_res = doris::parse_conf_store_paths(doris::config::spill_storage_root_path, &spill_paths); if (!olap_res) { LOG(ERROR) << "parse config spill storage path failed, path=" diff --git a/be/src/vec/spill/spill_stream.cpp b/be/src/vec/spill/spill_stream.cpp index f5b6fea096d..ed7be9a0b28 100644 --- a/be/src/vec/spill/spill_stream.cpp +++ b/be/src/vec/spill/spill_stream.cpp @@ -25,6 +25,7 @@ #include "io/fs/local_file_system.h" #include "runtime/exec_env.h" +#include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "vec/core/block.h" #include "vec/spill/spill_reader.h" @@ -88,6 +89,10 @@ void SpillStream::close() { } } +const TUniqueId& SpillStream::query_id() const { + return state_->query_id(); +} + const std::string& SpillStream::get_spill_root_dir() const { return data_dir_->path(); } diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h index 6b5166f2652..68abfa9aaf7 100644 --- a/be/src/vec/spill/spill_stream.h +++ b/be/src/vec/spill/spill_stream.h @@ -81,6 +81,8 @@ public: read_wait_io_timer_ = wait_io_timer; } + const TUniqueId& query_id() const; + private: friend class SpillStreamManager; diff --git a/be/src/vec/spill/spill_stream_manager.cpp b/be/src/vec/spill/spill_stream_manager.cpp index 0259bef33f3..05a2531c466 100644 --- a/be/src/vec/spill/spill_stream_manager.cpp +++ b/be/src/vec/spill/spill_stream_manager.cpp @@ -36,6 +36,7 @@ #include "util/pretty_printer.h" #include "util/runtime_profile.h" #include "util/time.h" +#include "util/uid_util.h" #include "vec/spill/spill_stream.h" namespace doris::vectorized { @@ -128,7 +129,7 @@ Status SpillStreamManager::_init_spill_store_map() { std::vector<SpillDataDir*> SpillStreamManager::_get_stores_for_spill( TStorageMedium::type storage_medium) { std::vector<SpillDataDir*> stores; - for (auto&& [_, store] : _spill_store_map) { + for (auto& [_, store] : _spill_store_map) { if (store->storage_medium() == storage_medium && !store->reach_capacity_limit(0)) { stores.push_back(store.get()); } @@ -188,7 +189,7 @@ Status SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea for (auto& dir : data_dirs) { data_dir = dir; std::string spill_root_dir = fmt::format("{}/{}", data_dir->path(), SPILL_DIR_PREFIX); - spill_dir = fmt::format("{}/{}-{}-{}-{}-{}", spill_root_dir, query_id, operator_name, + spill_dir = fmt::format("{}/{}/{}-{}-{}-{}", spill_root_dir, query_id, operator_name, node_id, state->task_id(), id); auto st = io::global_local_filesystem()->create_directory(spill_dir); if (!st.ok()) { @@ -207,9 +208,15 @@ Status SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea } void SpillStreamManager::delete_spill_stream(SpillStreamSPtr stream) { - auto gc_dir = fmt::format("{}/{}/{}", stream->get_data_dir()->path(), SPILL_GC_DIR_PREFIX, - std::filesystem::path(stream->get_spill_dir()).filename().string()); - (void)io::global_local_filesystem()->rename(stream->get_spill_dir(), gc_dir); + auto query_dir = fmt::format("{}/{}/{}", stream->get_data_dir()->path(), SPILL_GC_DIR_PREFIX, + print_id(stream->query_id())); + auto st = io::global_local_filesystem()->create_directory(query_dir); + if (st.ok()) { + auto gc_dir = + fmt::format("{}/{}", query_dir, + std::filesystem::path(stream->get_spill_dir()).filename().string()); + (void)io::global_local_filesystem()->rename(stream->get_spill_dir(), gc_dir); + } } void SpillStreamManager::gc(int64_t max_file_count) { @@ -266,6 +273,22 @@ void SpillStreamManager::gc(int64_t max_file_count) { } } +void SpillStreamManager::async_cleanup_query(TUniqueId query_id) { + (void)get_async_task_thread_pool()->submit_func([this, query_id] { + for (auto& [_, store] : _spill_store_map) { + std::string query_spill_dir = + fmt::format("{}/{}/{}", store->path(), SPILL_DIR_PREFIX, print_id(query_id)); + bool exists = false; + auto status = io::global_local_filesystem()->exists(query_spill_dir, &exists); + if (status.ok() && exists) { + auto gc_dir = fmt::format("{}/{}/{}-gc", store->path(), SPILL_GC_DIR_PREFIX, + print_id(query_id)); + (void)io::global_local_filesystem()->rename(query_spill_dir, gc_dir); + } + } + }); +} + SpillDataDir::SpillDataDir(std::string path, int64_t capacity_bytes, TStorageMedium::type storage_medium) : _path(std::move(path)), diff --git a/be/src/vec/spill/spill_stream_manager.h b/be/src/vec/spill/spill_stream_manager.h index 2d7350f775f..36062ce0b46 100644 --- a/be/src/vec/spill/spill_stream_manager.h +++ b/be/src/vec/spill/spill_stream_manager.h @@ -108,6 +108,8 @@ public: // 标记SpillStream需要被删除,在GC线程中异步删除落盘文件 void delete_spill_stream(SpillStreamSPtr spill_stream); + void async_cleanup_query(TUniqueId query_id); + void gc(int64_t max_file_count); ThreadPool* get_spill_io_thread_pool(const std::string& path) const { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org