This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ec4889d55117dbe7f25a89be8d0681e455d6325a
Author: yiguolei <676222...@qq.com>
AuthorDate: Fri Sep 13 15:02:19 2024 +0800

    set min reserve bytes to 1MB (#40795)
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    ---------
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/pipeline/exec/operator.h                          | 16 +++++++++++-----
 be/src/pipeline/pipeline_task.cpp                        |  2 ++
 be/src/runtime/runtime_state.h                           |  9 +++++++++
 .../main/java/org/apache/doris/qe/SessionVariable.java   |  7 +++++++
 gensrc/thrift/PaloInternalService.thrift                 |  5 +++++
 5 files changed, 34 insertions(+), 5 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 2c84d4ac65f..6e0b332c0a2 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -108,6 +108,11 @@ public:
 
     virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; }
 
+    // If this method is not overwrite by child, its default value is 1MB
+    [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) {
+        return state->minimum_operator_memory_required_bytes();
+    }
+
     virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
     [[nodiscard]] virtual bool require_data_distribution() const { return 
false; }
     OperatorPtr child() { return _child; }
@@ -550,8 +555,6 @@ public:
 
     [[nodiscard]] std::string get_name() const override { return _name; }
 
-    [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) { 
return 0; }
-
     [[nodiscard]] virtual bool try_reserve_memory(RuntimeState* state, 
vectorized::Block* block,
                                                   bool eos) {
         return true;
@@ -801,8 +804,6 @@ public:
     void set_parallel_tasks(int parallel_tasks) { _parallel_tasks = 
parallel_tasks; }
     int parallel_tasks() const { return _parallel_tasks; }
 
-    [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) { 
return 0; }
-
     virtual void reset_reserve_mem_size(RuntimeState* state) {}
 
 protected:
@@ -866,8 +867,13 @@ public:
     size_t get_reserve_mem_size(RuntimeState* state) override {
         auto& local_state = get_local_state(state);
         auto estimated_size = local_state.estimate_memory_usage();
+        if (estimated_size < state->minimum_operator_memory_required_bytes()) {
+            estimated_size = state->minimum_operator_memory_required_bytes();
+        }
         if (!is_source() && _child_x) {
-            estimated_size += _child_x->get_reserve_mem_size(state);
+            auto child_reserve_size = _child_x->get_reserve_mem_size(state);
+            estimated_size +=
+                    std::max(state->minimum_operator_memory_required_bytes(), 
child_reserve_size);
         }
         return estimated_size;
     }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index e4e82bb2207..1a6f8548687 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -376,6 +376,8 @@ Status PipelineTask::execute(bool* eos) {
             SCOPED_TIMER(_get_block_timer);
             _get_block_counter->update(1);
             size_t sink_reserve_size = _sink->get_reserve_mem_size(_state);
+            sink_reserve_size =
+                    std::max(sink_reserve_size, 
_state->minimum_operator_memory_required_bytes());
             reserve_size = _root->get_reserve_mem_size(_state) + 
sink_reserve_size;
             _root->reset_reserve_mem_size(_state);
             DCHECK_EQ(_root->get_reserve_mem_size(_state), 0);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 442268850bf..299d0204838 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -583,6 +583,15 @@ public:
         return 1;
     }
 
+    size_t minimum_operator_memory_required_bytes() const {
+        if (_query_options.__isset.minimum_operator_memory_required_kb) {
+            return _query_options.minimum_operator_memory_required_kb * 1024;
+        } else {
+            // refer other database
+            return 100 * 1024;
+        }
+    }
+
     void set_max_operator_id(int max_operator_id) { _max_operator_id = 
max_operator_id; }
 
     int max_operator_id() const { return _max_operator_id; }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index a97a098bdc1..14deb3b755b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -664,6 +664,7 @@ public class SessionVariable implements Serializable, 
Writable {
                                     
"enable_adaptive_pipeline_task_serial_read_on_limit";
     public static final String ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT =
                                     
"adaptive_pipeline_task_serial_read_on_limit";
+    public static final String MINIMUM_OPERATOR_MEMORY_REQUIRED_KB = 
"minimum_operator_memory_required_kb";
 
     /**
      * If set false, user couldn't submit analyze SQL and FE won't allocate 
any related resources.
@@ -1958,6 +1959,11 @@ public class SessionVariable implements Serializable, 
Writable {
             description = {"对外表启用 count(*) 下推优化", "enable count(*) pushdown 
optimization for external table"})
     private boolean enableCountPushDownForExternalTable = true;
 
+    @VariableMgr.VarAttr(name = MINIMUM_OPERATOR_MEMORY_REQUIRED_KB, 
needForward = true,
+            description = {"一个算子运行需要的最小的内存大小",
+                    "The minimum memory required to be used by an operator, if 
not meet, the operator will not run"})
+    public int minimumOperatorMemoryRequiredKB = 1000;
+
     public static final String IGNORE_RUNTIME_FILTER_IDS = 
"ignore_runtime_filter_ids";
 
     public Set<Integer> getIgnoredRuntimeFilterIds() {
@@ -3784,6 +3790,7 @@ public class SessionVariable implements Serializable, 
Writable {
         
tResult.setEnableAdaptivePipelineTaskSerialReadOnLimit(enableAdaptivePipelineTaskSerialReadOnLimit);
         
tResult.setAdaptivePipelineTaskSerialReadOnLimit(adaptivePipelineTaskSerialReadOnLimit);
         tResult.setInListValueCountThreshold(inListValueCountThreshold);
+        
tResult.setMinimumOperatorMemoryRequiredKb(minimumOperatorMemoryRequiredKB);
         return tResult;
     }
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 871101c5c35..4dfe26be250 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -335,6 +335,7 @@ struct TQueryOptions {
 
   127: optional i32 in_list_value_count_threshold = 10;
 
+
   // We need this two fields to make sure thrift id on master is compatible 
with other branch.
   128: optional bool enable_verbose_profile = false;
   129: optional i32 rpc_verbose_profile_max_instance_count = 0;
@@ -343,6 +344,10 @@ struct TQueryOptions {
   131: optional i32 adaptive_pipeline_task_serial_read_on_limit = 10000;
 
   132: optional i32 parallel_prepare_threshold = 0;
+
+  // The minimum memory that an operator required to run.
+  133: optional i32 minimum_operator_memory_required_kb = 1024;
+
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
   // In read path, read from file cache or remote storage when execute query.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to