This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit ec4889d55117dbe7f25a89be8d0681e455d6325a Author: yiguolei <676222...@qq.com> AuthorDate: Fri Sep 13 15:02:19 2024 +0800 set min reserve bytes to 1MB (#40795) Issue Number: close #xxx <!--Describe your changes.--> --------- Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/pipeline/exec/operator.h | 16 +++++++++++----- be/src/pipeline/pipeline_task.cpp | 2 ++ be/src/runtime/runtime_state.h | 9 +++++++++ .../main/java/org/apache/doris/qe/SessionVariable.java | 7 +++++++ gensrc/thrift/PaloInternalService.thrift | 5 +++++ 5 files changed, 34 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 2c84d4ac65f..6e0b332c0a2 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -108,6 +108,11 @@ public: virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; } + // If this method is not overwrite by child, its default value is 1MB + [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) { + return state->minimum_operator_memory_required_bytes(); + } + virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); } [[nodiscard]] virtual bool require_data_distribution() const { return false; } OperatorPtr child() { return _child; } @@ -550,8 +555,6 @@ public: [[nodiscard]] std::string get_name() const override { return _name; } - [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) { return 0; } - [[nodiscard]] virtual bool try_reserve_memory(RuntimeState* state, vectorized::Block* block, bool eos) { return true; @@ -801,8 +804,6 @@ public: void set_parallel_tasks(int parallel_tasks) { _parallel_tasks = parallel_tasks; } int parallel_tasks() const { return _parallel_tasks; } - [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) { return 0; } - virtual void reset_reserve_mem_size(RuntimeState* state) {} protected: @@ -866,8 +867,13 @@ public: size_t get_reserve_mem_size(RuntimeState* state) override { auto& local_state = get_local_state(state); auto estimated_size = local_state.estimate_memory_usage(); + if (estimated_size < state->minimum_operator_memory_required_bytes()) { + estimated_size = state->minimum_operator_memory_required_bytes(); + } if (!is_source() && _child_x) { - estimated_size += _child_x->get_reserve_mem_size(state); + auto child_reserve_size = _child_x->get_reserve_mem_size(state); + estimated_size += + std::max(state->minimum_operator_memory_required_bytes(), child_reserve_size); } return estimated_size; } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index e4e82bb2207..1a6f8548687 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -376,6 +376,8 @@ Status PipelineTask::execute(bool* eos) { SCOPED_TIMER(_get_block_timer); _get_block_counter->update(1); size_t sink_reserve_size = _sink->get_reserve_mem_size(_state); + sink_reserve_size = + std::max(sink_reserve_size, _state->minimum_operator_memory_required_bytes()); reserve_size = _root->get_reserve_mem_size(_state) + sink_reserve_size; _root->reset_reserve_mem_size(_state); DCHECK_EQ(_root->get_reserve_mem_size(_state), 0); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 442268850bf..299d0204838 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -583,6 +583,15 @@ public: return 1; } + size_t minimum_operator_memory_required_bytes() const { + if (_query_options.__isset.minimum_operator_memory_required_kb) { + return _query_options.minimum_operator_memory_required_kb * 1024; + } else { + // refer other database + return 100 * 1024; + } + } + void set_max_operator_id(int max_operator_id) { _max_operator_id = max_operator_id; } int max_operator_id() const { return _max_operator_id; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index a97a098bdc1..14deb3b755b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -664,6 +664,7 @@ public class SessionVariable implements Serializable, Writable { "enable_adaptive_pipeline_task_serial_read_on_limit"; public static final String ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT = "adaptive_pipeline_task_serial_read_on_limit"; + public static final String MINIMUM_OPERATOR_MEMORY_REQUIRED_KB = "minimum_operator_memory_required_kb"; /** * If set false, user couldn't submit analyze SQL and FE won't allocate any related resources. @@ -1958,6 +1959,11 @@ public class SessionVariable implements Serializable, Writable { description = {"对外表启用 count(*) 下推优化", "enable count(*) pushdown optimization for external table"}) private boolean enableCountPushDownForExternalTable = true; + @VariableMgr.VarAttr(name = MINIMUM_OPERATOR_MEMORY_REQUIRED_KB, needForward = true, + description = {"一个算子运行需要的最小的内存大小", + "The minimum memory required to be used by an operator, if not meet, the operator will not run"}) + public int minimumOperatorMemoryRequiredKB = 1000; + public static final String IGNORE_RUNTIME_FILTER_IDS = "ignore_runtime_filter_ids"; public Set<Integer> getIgnoredRuntimeFilterIds() { @@ -3784,6 +3790,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setEnableAdaptivePipelineTaskSerialReadOnLimit(enableAdaptivePipelineTaskSerialReadOnLimit); tResult.setAdaptivePipelineTaskSerialReadOnLimit(adaptivePipelineTaskSerialReadOnLimit); tResult.setInListValueCountThreshold(inListValueCountThreshold); + tResult.setMinimumOperatorMemoryRequiredKb(minimumOperatorMemoryRequiredKB); return tResult; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 871101c5c35..4dfe26be250 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -335,6 +335,7 @@ struct TQueryOptions { 127: optional i32 in_list_value_count_threshold = 10; + // We need this two fields to make sure thrift id on master is compatible with other branch. 128: optional bool enable_verbose_profile = false; 129: optional i32 rpc_verbose_profile_max_instance_count = 0; @@ -343,6 +344,10 @@ struct TQueryOptions { 131: optional i32 adaptive_pipeline_task_serial_read_on_limit = 10000; 132: optional i32 parallel_prepare_threshold = 0; + + // The minimum memory that an operator required to run. + 133: optional i32 minimum_operator_memory_required_kb = 1024; + // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org