This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 085897a6857 improve logs 085897a6857 is described below commit 085897a6857ac71fc6da7848409471c8119163a4 Author: jacktengg <tengjianp...@selectdb.com> AuthorDate: Tue Dec 17 11:34:02 2024 +0800 improve logs --- be/src/common/config.cpp | 2 - be/src/common/config.h | 2 - be/src/pipeline/exec/operator.h | 2 - .../pipeline/exec/partition_sort_sink_operator.cpp | 4 +- .../exec/partitioned_aggregation_sink_operator.cpp | 37 +++++--- .../exec/partitioned_aggregation_sink_operator.h | 6 +- .../exec/partitioned_hash_join_probe_operator.cpp | 14 +-- .../exec/partitioned_hash_join_sink_operator.cpp | 100 ++++++++++++--------- .../exec/partitioned_hash_join_sink_operator.h | 2 - be/src/pipeline/exec/spill_sort_sink_operator.cpp | 6 +- be/src/pipeline/exec/spill_sort_sink_operator.h | 1 - be/src/pipeline/pipeline_task.cpp | 1 - be/src/runtime/memory/global_memory_arbitrator.h | 2 +- be/src/runtime/memory/memory_profile.cpp | 9 +- be/src/runtime/query_context.h | 3 - be/src/runtime/runtime_state.h | 2 +- be/src/vec/common/allocator.cpp | 7 +- .../java/org/apache/doris/qe/SessionVariable.java | 2 +- gensrc/thrift/PaloInternalService.thrift | 2 +- 19 files changed, 102 insertions(+), 102 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f8b7b390ee9..f0184db7067 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1400,8 +1400,6 @@ DEFINE_mBool(enable_delete_bitmap_merge_on_compaction, "false"); DEFINE_Bool(enable_table_size_correctness_check, "false"); DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false"); -DEFINE_mInt32(revocable_memory_bytes_high_watermark, "5"); - // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 1b9b8a3d531..d935f5278f2 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1483,8 +1483,6 @@ DECLARE_mBool(enable_delete_bitmap_merge_on_compaction); // Enable validation to check the correctness of table size. DECLARE_Bool(enable_table_size_correctness_check); -DECLARE_mInt32(revocable_memory_bytes_high_watermark); - #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 723e14f4bc7..af13ded196e 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -589,8 +589,6 @@ public: return state->minimum_operator_memory_required_bytes(); } - [[nodiscard]] virtual bool is_spilled(RuntimeState* state) const { return false; } - [[nodiscard]] bool is_spillable() const { return _spillable; } template <class TARGET> diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 759c7ea2bcc..b90af92f6b1 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -70,9 +70,7 @@ PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, int ope _topn_phase(tnode.partition_sort_node.ptopn_phase), _has_global_limit(tnode.partition_sort_node.has_global_limit), _top_n_algorithm(tnode.partition_sort_node.top_n_algorithm), - _partition_inner_limit(tnode.partition_sort_node.partition_inner_limit) { - _spillable = true; -} + _partition_inner_limit(tnode.partition_sort_node.partition_inner_limit) {} Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 6f008a3b1f2..58b272b3ac8 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -27,6 +27,7 @@ #include "pipeline/exec/spill_utils.h" #include "pipeline/pipeline_task.h" #include "runtime/fragment_mgr.h" +#include "util/pretty_printer.h" #include "util/runtime_profile.h" #include "vec/spill/spill_stream.h" #include "vec/spill/spill_stream_manager.h" @@ -180,7 +181,19 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized: return Status::Error<INTERNAL_ERROR>("fault_inject partitioned_agg_sink sink failed"); }); RETURN_IF_ERROR(_agg_sink_operator->sink(runtime_state, in_block, false)); + + size_t revocable_size = 0; + int64_t query_mem_limit = 0; if (eos) { + revocable_size = revocable_mem_size(state); + query_mem_limit = state->get_query_ctx()->get_mem_limit(); + LOG(INFO) << fmt::format( + "Query: {}, task {}, agg sink {} eos, need spill: {}, query mem limit: {}, " + "revocable memory: {}", + print_id(state->query_id()), state->task_id(), node_id(), + local_state._shared_state->is_spilled, PrettyPrinter::print_bytes(query_mem_limit), + PrettyPrinter::print_bytes(revocable_size)); + if (local_state._shared_state->is_spilled) { if (revocable_mem_size(state) > 0) { RETURN_IF_ERROR(revoke_memory(state, nullptr)); @@ -256,10 +269,12 @@ size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bo Status PartitionedAggSinkLocalState::revoke_memory( RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { const auto size_to_revoke = _parent->revocable_mem_size(state); - VLOG_DEBUG << "Query " << print_id(state->query_id()) << " agg node " - << Base::_parent->node_id() - << " revoke_memory, size: " << _parent->revocable_mem_size(state) - << ", eos: " << _eos; + LOG(INFO) << fmt::format( + "Query: {}, task {}, agg sink {} revoke_memory, eos: {}, need spill: {}, revocable " + "memory: {}", + print_id(state->query_id()), state->task_id(), _parent->node_id(), _eos, + _shared_state->is_spilled, + PrettyPrinter::print_bytes(_parent->revocable_mem_size(state))); if (!_shared_state->is_spilled) { _shared_state->is_spilled = true; profile()->add_info_string("Spilled", "true"); @@ -309,9 +324,12 @@ Status PartitionedAggSinkLocalState::revoke_memory( } _shared_state->close(); } else { - VLOG_DEBUG << "Query " << print_id(query_id) << " agg node " - << Base::_parent->node_id() << " revoke_memory finish, size: " - << _parent->revocable_mem_size(state) << ", eos: " << _eos; + LOG(INFO) << fmt::format( + "Query: {}, task {}, agg sink {} revoke_memory finish, eos: {}, " + "revocable memory: {}", + print_id(state->query_id()), state->task_id(), _parent->node_id(), + _eos, + PrettyPrinter::print_bytes(_parent->revocable_mem_size(state))); } if (_eos) { @@ -341,9 +359,4 @@ Status PartitionedAggSinkLocalState::revoke_memory( std::move(spill_runnable)); } -bool PartitionedAggSinkOperatorX::is_spilled(RuntimeState* state) const { - auto& local_state = get_local_state(state); - return local_state._shared_state->is_spilled; -} - } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 922798707d0..499db4919e7 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -22,6 +22,7 @@ #include "aggregation_sink_operator.h" #include "pipeline/dependency.h" #include "pipeline/exec/operator.h" +#include "util/pretty_printer.h" #include "vec/exprs/vectorized_agg_fn.h" #include "vec/exprs/vexpr.h" #include "vec/spill/spill_stream.h" @@ -83,8 +84,7 @@ public: total_rows / size_to_revoke_)); VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() - << ", spill_batch_rows: " << spill_batch_rows << ", total rows: " << total_rows - << ", size_to_revoke: " << size_to_revoke; + << ", spill_batch_rows: " << spill_batch_rows << ", total rows: " << total_rows; size_t row_count = 0; std::vector<TmpSpillInfo<typename HashTableType::key_type>> spill_infos( @@ -333,8 +333,6 @@ public: size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; - bool is_spilled(RuntimeState* state) const override; - private: friend class PartitionedAggSinkLocalState; std::unique_ptr<AggSinkOperatorX> _agg_sink_operator; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index bbbcb9b9d5e..f6cea157cd5 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -339,9 +339,8 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim } } - auto block_bytes = _recovered_build_block->allocated_bytes(); - COUNTER_UPDATE(_memory_used_counter, block_bytes); - if (block_bytes >= vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) { + if (_recovered_build_block->allocated_bytes() >= + vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) { break; } } @@ -608,10 +607,7 @@ Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized:: } } - auto old_probe_blocks_bytes = local_state._probe_blocks_bytes->value(); COUNTER_SET(local_state._probe_blocks_bytes, bytes_of_blocks); - COUNTER_UPDATE(local_state._memory_used_counter, - local_state._probe_blocks_bytes->value() - old_probe_blocks_bytes); return Status::OK(); } @@ -933,13 +929,8 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori COUNTER_SET(local_state._memory_usage_reserved, int64_t(local_state.estimate_memory_usage())); }); - LOG(INFO) << "Query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() - << ", task: " << state->task_id() - << " get_block, child eos: " << local_state._child_eos - << ", need spill: " << need_to_spill; if (need_more_input_data(state)) { - LOG(INFO) << "need more input data"; { SCOPED_TIMER(local_state._get_child_next_timer); RETURN_IF_ERROR(_child->get_block_after_projects(state, local_state._child_block.get(), @@ -969,7 +960,6 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori } if (!need_more_input_data(state)) { - LOG(INFO) << "not need more input data"; SCOPED_TIMER(local_state.exec_time_counter()); if (need_to_spill) { RETURN_IF_ERROR(pull(state, block, eos)); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index dad82b6cc8a..6cf9e658a60 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -270,7 +270,8 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( }); status = _finish_spilling(); VLOG_DEBUG << fmt::format( - "Query: {}, task {}, sink {} _revoke_unpartitioned_block set_ready_to_read", + "Query: {}, task {}, hash join sink {} _revoke_unpartitioned_block " + "set_ready_to_read", print_id(state->query_id()), state->task_id(), _parent->node_id()); _dependency->set_ready_to_read(); } @@ -303,7 +304,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { SCOPED_TIMER(_spill_total_timer); VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task " << state->task_id() - << " sink " << _parent->node_id() << " revoke_memory" + << " hash join sink " << _parent->node_id() << " revoke_memory" << ", eos: " << _child_eos; CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr); @@ -321,9 +322,9 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( auto spill_fin_cb = [this, state, query_id, spill_context]() { Status status; if (_child_eos) { - VLOG_DEBUG << "Query:" << print_id(this->state()->query_id()) << ", task " - << state->task_id() << " sink " << _parent->node_id() - << " set_ready_to_read"; + LOG(INFO) << "Query:" << print_id(this->state()->query_id()) << ", task " + << state->task_id() << " hash join sink " << _parent->node_id() + << " finish spilling, set_ready_to_read"; std::for_each(_shared_state->partitioned_build_blocks.begin(), _shared_state->partitioned_build_blocks.end(), [&](auto& block) { if (block) { @@ -542,43 +543,28 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B local_state._child_eos = eos; - Defer defer_dgb {[&]() { - if (local_state.revocable_mem_size(state) > 128 * 1024 * 1024) { - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task " << state->task_id() - << " sink " << node_id() << " _child_eos: " << local_state._child_eos - << ", revocable memory: " - << PrettyPrinter::print_bytes(local_state.revocable_mem_size(state)); - } - }}; const auto rows = in_block->rows(); const auto need_to_spill = local_state._shared_state->need_to_spill; + size_t revocable_size = 0; + int64_t query_mem_limit = 0; + if (eos) { + revocable_size = revocable_mem_size(state); + query_mem_limit = state->get_query_ctx()->get_mem_limit(); + LOG(INFO) << fmt::format( + "Query: {}, task {}, hash join sink {} eos, need spill: {}, query mem limit: {}, " + "revocable " + "memory: {}", + print_id(state->query_id()), state->task_id(), node_id(), need_to_spill, + PrettyPrinter::print_bytes(query_mem_limit), + PrettyPrinter::print_bytes(revocable_size)); + } + if (rows == 0) { if (eos) { - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task " << state->task_id() - << " sink " << node_id() << " eos, need spill: " << need_to_spill; - if (need_to_spill) { return revoke_memory(state, nullptr); } else { - const auto revocable_size = revocable_mem_size(state); - // TODO: consider parallel? - // After building hash table it will not be able to spill later - // even if memory is low, and will cause cancel of queries. - // So make a check here, if build blocks mem usage is too high, - // then trigger revoke memory. - auto query_mem_limit = state->get_query_ctx()->mem_limit(); - if (revocable_size >= (double)query_mem_limit / 100.0 * - state->revocable_memory_high_watermark_percent()) { - VLOG_DEBUG << fmt::format( - "Query: {}, task {}, sink {}, query mem limit: {}, revoke_memory " - "because revocable memory is high: {}", - print_id(state->query_id()), state->task_id(), node_id(), - PrettyPrinter::print_bytes(query_mem_limit), - PrettyPrinter::print_bytes(revocable_size)); - return revoke_memory(state, nullptr); - } - if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { RETURN_IF_ERROR(_setup_internal_operator(state)); } @@ -587,12 +573,31 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B "fault_inject partitioned_hash_join_sink " "sink_eos failed"); }); + + // TODO: consider parallel? + // After building hash table it will not be able to spill later + // even if memory is low, and will cause cancel of queries. + // So make a check here, if build blocks mem usage is too high, + // then trigger revoke memory. + auto revocable_memory_high_watermark_percent = + state->revocable_memory_high_watermark_percent(); + if (revocable_memory_high_watermark_percent > 0 && + revocable_size >= (double)query_mem_limit / 100.0 * + revocable_memory_high_watermark_percent) { + LOG(INFO) << fmt::format( + "Query: {}, task {}, hash join sink {} eos, revoke_memory " + "because revocable memory is high", + print_id(state->query_id()), state->task_id(), node_id()); + return revoke_memory(state, nullptr); + } + Defer defer {[&]() { local_state.update_memory_usage(); }}; RETURN_IF_ERROR(_inner_sink_operator->sink( local_state._shared_state->inner_runtime_state.get(), in_block, eos)); - VLOG_DEBUG << fmt::format( - "Query: {}, task {}, sink {} eos, set_ready_to_read, nonspill memory " + LOG(INFO) << fmt::format( + "Query: {}, task {}, hash join sink {} eos, set_ready_to_read, nonspill " + "memory " "usage: {}", print_id(state->query_id()), state->task_id(), node_id(), _inner_sink_operator->get_memory_usage_debug_str( @@ -628,12 +633,26 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B "fault_inject partitioned_hash_join_sink " "sink failed"); }); + + if (eos) { + auto revocable_memory_high_watermark_percent = + state->revocable_memory_high_watermark_percent(); + if (revocable_memory_high_watermark_percent > 0 && + revocable_size >= + (double)query_mem_limit / 100.0 * revocable_memory_high_watermark_percent) { + LOG(INFO) << fmt::format( + "Query: {}, task {}, hash join sink {} eos, revoke_memory " + "because revocable memory is high", + print_id(state->query_id()), state->task_id(), node_id()); + return revoke_memory(state, nullptr); + } + } RETURN_IF_ERROR(_inner_sink_operator->sink( local_state._shared_state->inner_runtime_state.get(), in_block, eos)); local_state.update_memory_usage(); if (eos) { - VLOG_DEBUG << fmt::format( - "Query: {}, task {}, sink {} eos, set_ready_to_read, nonspill memory " + LOG(INFO) << fmt::format( + "Query: {}, task {}, hash join sink {} eos, set_ready_to_read, nonspill memory " "usage: {}", print_id(state->query_id()), state->task_id(), node_id(), _inner_sink_operator->get_memory_usage_debug_str( @@ -663,9 +682,4 @@ size_t PartitionedHashJoinSinkOperatorX::get_reserve_mem_size(RuntimeState* stat return local_state.get_reserve_mem_size(state, eos); } -bool PartitionedHashJoinSinkOperatorX::is_spilled(RuntimeState* state) const { - auto& local_state = get_local_state(state); - return local_state._shared_state->need_to_spill; -} - } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 9e253ce3fca..b5e28f8b244 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -134,8 +134,6 @@ public: return _inner_probe_operator->require_data_distribution(); } - bool is_spilled(RuntimeState* state) const override; - private: friend class PartitionedHashJoinSinkLocalState; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 6a472a09cfd..2fa0c0ce8e1 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -114,6 +114,7 @@ SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* pool, int operator_id const TPlanNode& tnode, const DescriptorTbl& descs, bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id) { + _spillable = true; _sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool, operator_id, tnode, descs, require_bucket_distribution); } @@ -299,9 +300,4 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, return status; } -bool SpillSortSinkOperatorX::is_spilled(RuntimeState* state) const { - auto& local_state = get_local_state(state); - return local_state._shared_state->is_spilled; -} - } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 226fe61d386..3d6ccdcc4ce 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -91,7 +91,6 @@ public: Status revoke_memory(RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) override; - bool is_spilled(RuntimeState* state) const override; using DataSinkOperatorX<LocalStateType>::node_id; using DataSinkOperatorX<LocalStateType>::operator_id; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index d4ed0790942..9d284b31861 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -43,7 +43,6 @@ #include "util/container_util.hpp" #include "util/defer_op.h" #include "util/mem_info.h" -#include "util/pretty_printer.h" #include "util/runtime_profile.h" #include "util/uid_util.h" #include "vec/core/block.h" diff --git a/be/src/runtime/memory/global_memory_arbitrator.h b/be/src/runtime/memory/global_memory_arbitrator.h index abf3a51c9f3..05963132cb1 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.h +++ b/be/src/runtime/memory/global_memory_arbitrator.h @@ -76,7 +76,7 @@ public: static inline int64_t sys_mem_available() { return MemInfo::_s_sys_mem_available.load(std::memory_order_relaxed) - refresh_interval_memory_growth.load(std::memory_order_relaxed) - - process_reserved_memory(); + process_reserved_memory() + MemInfo::allocator_cache_mem(); } static inline std::string sys_mem_available_str() { diff --git a/be/src/runtime/memory/memory_profile.cpp b/be/src/runtime/memory/memory_profile.cpp index 8dbdcbdd3af..c7421236c42 100644 --- a/be/src/runtime/memory/memory_profile.cpp +++ b/be/src/runtime/memory/memory_profile.cpp @@ -343,10 +343,11 @@ int64_t MemoryProfile::other_current_usage() { void MemoryProfile::print_log_process_usage() { if (_enable_print_log_process_usage) { _enable_print_log_process_usage = false; - LOG(WARNING) << "Process Memory Summary: " + GlobalMemoryArbitrator::process_mem_log_str(); - LOG(WARNING) << "\n" << print_memory_overview_profile(); - LOG(WARNING) << "\n" << print_global_memory_profile(); - LOG(WARNING) << "\n" << print_top_memory_tasks_profile(); + LOG(WARNING) << "Process Memory Summary: " + GlobalMemoryArbitrator::process_mem_log_str() + << "\n" + << print_memory_overview_profile() << "\n" + << print_global_memory_profile() << "\n" + << print_top_memory_tasks_profile(); } } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 55b51088c50..bee6ad549e9 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -213,8 +213,6 @@ public: ThreadPool* get_memtable_flush_pool(); - int64_t mem_limit() const { return _bytes_limit; } - void set_merge_controller_handler( std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) { _merge_controller_handler = handler; @@ -348,7 +346,6 @@ private: TUniqueId _query_id; ExecEnv* _exec_env = nullptr; MonotonicStopWatch _query_watcher; - int64_t _bytes_limit = 0; bool _is_nereids = false; std::atomic<int> _running_big_mem_op_num = 0; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 7318c93f15a..16f500b2fcc 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -601,7 +601,7 @@ public: if (_query_options.__isset.revocable_memory_high_watermark_percent) { return _query_options.revocable_memory_high_watermark_percent; } - return 10; + return -1; } size_t minimum_operator_memory_required_bytes() const { diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index c8f0a7397d7..c59ad83c98d 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -35,6 +35,7 @@ #include "runtime/process_profile.h" #include "runtime/thread_context.h" #include "util/mem_info.h" +#include "util/pretty_printer.h" #include "util/stack_util.h" #include "util/uid_util.h" @@ -86,8 +87,10 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_mem "Allocator sys memory check failed: Cannot alloc:{}, consuming " "tracker:<{}>, peak used {}, current used {}, exec node:<{}>, {}.", size, doris::thread_context()->thread_mem_tracker()->label(), - doris::thread_context()->thread_mem_tracker()->peak_consumption(), - doris::thread_context()->thread_mem_tracker()->consumption(), + doris::PrettyPrinter::print_bytes( + doris::thread_context()->thread_mem_tracker()->peak_consumption()), + doris::PrettyPrinter::print_bytes( + doris::thread_context()->thread_mem_tracker()->consumption()), doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker_label(), doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 16b214b1536..f0380efd7b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -2245,7 +2245,7 @@ public class SessionVariable implements Serializable, Writable { public long dataQueueMaxBlocks = 1; @VariableMgr.VarAttr(name = REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT, fuzzy = true) - public int revocableMemoryHighWatermarkPercent = 10; + public int revocableMemoryHighWatermarkPercent = -1; // If the memory consumption of sort node exceed this limit, will trigger spill to disk; // Set to 0 to disable; min: 128M diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 7ebe16583d8..b44196d3df2 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -367,7 +367,7 @@ struct TQueryOptions { 144: optional i32 query_slot_count = 0; 145: optional bool enable_spill = false 146: optional bool enable_reserve_memory = true - 147: optional i32 revocable_memory_high_watermark_percent = 10 + 147: optional i32 revocable_memory_high_watermark_percent = -1 // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org