This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5986b3cfc6a7c537e58ed08831a926b5619b79cd
Author: yiguolei <676222...@qq.com>
AuthorDate: Thu Sep 12 14:07:15 2024 +0800

    add reserve memory logic in scan operator (#40719)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/pipeline/exec/scan_operator.cpp     | 25 +++++++++++++++++++++++++
 be/src/pipeline/exec/scan_operator.h       |  2 ++
 be/src/pipeline/pipeline_task.cpp          |  2 ++
 be/src/runtime/query_context.h             | 14 ++++++++------
 be/src/vec/exec/scan/scanner_context.h     |  9 ++++-----
 be/src/vec/exec/scan/scanner_scheduler.cpp |  9 ++++++++-
 6 files changed, 49 insertions(+), 12 deletions(-)

diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 0c0cfb18c77..b69b10461d6 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1305,6 +1305,31 @@ Status 
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
     return Status::OK();
 }
 
+template <typename LocalStateType>
+size_t ScanOperatorX<LocalStateType>::get_reserve_mem_size(RuntimeState* 
state) {
+    auto& local_state = get_local_state(state);
+    if (local_state.low_memory_mode()) {
+        return 
local_state._scanner_ctx->low_memory_mode_scan_bytes_per_scanner() *
+               local_state._scanner_ctx->low_memory_mode_scanners();
+    } else {
+        if (local_state._scanner_peak_memory_usage->value() > 0) {
+            // It is only a safty check, to avoid some counter not right.
+            if (local_state._scanner_peak_memory_usage->value() >
+                local_state._scanner_ctx->block_memory_usage()) {
+                return local_state._scanner_peak_memory_usage->value() -
+                       local_state._scanner_ctx->block_memory_usage();
+            } else {
+                return config::doris_scanner_row_bytes;
+            }
+        } else {
+            // If the scan operator is first time to run, then we think it 
will occupy doris_scanner_row_bytes.
+            // It maybe a little smaller than actual usage.
+            return config::doris_scanner_row_bytes;
+            // return local_state._scanner_ctx->max_bytes_in_queue();
+        }
+    }
+}
+
 template class ScanOperatorX<OlapScanLocalState>;
 template class ScanLocalState<OlapScanLocalState>;
 template class ScanOperatorX<JDBCScanLocalState>;
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index fed1e4015d8..e228b76caa9 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -366,6 +366,8 @@ public:
 
     [[nodiscard]] virtual bool is_file_scan_operator() const { return false; }
 
+    [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state) override;
+
     const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
         return _runtime_filter_descs;
     }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index ed8c04a5e70..e4e82bb2207 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -362,6 +362,8 @@ Status PipelineTask::execute(bool* eos) {
         });
 
         DEFER_RELEASE_RESERVED();
+        // Every loop should check if memory is not enough.
+        _state->get_query_ctx()->update_low_memory_mode();
 
         // `_dry_run` means sink operator need no more data
         // `_sink->is_finished(_state)` means sink operator should be finished
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index fdbfc7c7217..0a19cd61f94 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -270,9 +270,9 @@ public:
     // 3. the process reached soft mem_limit, if not release these, if not 
release buffer, the query will be cancelled.
     // 4. If the query reserve memory failed.
     // Under low memory mode, the query should release some buffers such as 
scan operator block queue, union operator queue, exchange buffer size, 
streaming agg
-    bool low_memory_mode() {
+    void update_low_memory_mode() {
         if (_low_memory_mode) {
-            return true;
+            return;
         }
 
         // If less than 100MB left, then it is low memory mode
@@ -280,7 +280,7 @@ public:
             _low_memory_mode = true;
             LOG(INFO) << "Query " << print_id(_query_id)
                       << " goes to low memory mode due to exceed process soft 
memory limit";
-            return true;
+            return;
         }
 
         if (_workload_group) {
@@ -292,7 +292,7 @@ public:
                         << "Query " << print_id(_query_id)
                         << " goes to low memory mode due to workload group 
high water mark reached";
                 _low_memory_mode = true;
-                return true;
+                return;
             }
 
             if (is_low_wartermark &&
@@ -303,15 +303,17 @@ public:
                           << " goes to low memory mode due to workload group 
low water mark "
                              "reached and the query enable spill";
                 _low_memory_mode = true;
-                return true;
+                return;
             }
         }
 
-        return _low_memory_mode;
+        return;
     }
 
     void set_low_memory_mode() { _low_memory_mode = true; }
 
+    bool low_memory_mode() { return _low_memory_mode; }
+
 private:
     int _timeout_second;
     TUniqueId _query_id;
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index cb9bd8e71b0..200c36cdc98 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -124,6 +124,8 @@ public:
     void return_free_block(vectorized::BlockUPtr block);
     inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; }
 
+    int64_t block_memory_usage() { return _block_memory_usage; }
+
     // Caller should make sure the pipeline task is still running when calling 
this function
     void update_peak_running_scanner(int num);
     // Caller should make sure the pipeline task is still running when calling 
this function
@@ -172,12 +174,12 @@ public:
     int batch_size() const { return _batch_size; }
 
     // During low memory mode, there will be at most 4 scanners running and 
every scanner will
-    // cache at most 2MB data. So that every instance will keep 8MB buffer.
+    // cache at most 1MB data. So that every instance will keep 8MB buffer.
     bool low_memory_mode() const;
 
     // TODO(yiguolei) add this as session variable
     int32_t low_memory_mode_scan_bytes_per_scanner() const {
-        return 2 * 1024 * 1024; // 2MB
+        return 1 * 1024 * 1024; // 1MB
     }
 
     int32_t low_memory_mode_scanners() const { return 4; }
@@ -190,9 +192,6 @@ public:
 
     bool _should_reset_thread_name = true;
 
-    const static int LOW_MEMORY_MODE_SCAN_BYTES = 2 * 1024 * 1024; // 2MB
-    const static int LOW_MEMORY_MODE_MAX_SCANNERS = 4;
-
 protected:
     /// Four criteria to determine whether to increase the parallelism of the 
scanners
     /// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 2352f10ca0c..87eda999b41 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -271,8 +271,12 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
 
             size_t raw_bytes_read = 0;
             bool first_read = true;
+            // If the first block is full, then it is true. Or the first block 
+ second block > batch_size
+            bool has_first_full_block = false;
 
-            while (!eos && raw_bytes_read < raw_bytes_threshold) {
+            // During low memory mode, every scan task will return at most 2 
block to reduce memory usage.
+            while (!eos && raw_bytes_read < raw_bytes_threshold &&
+                   !(ctx->low_memory_mode() && has_first_full_block)) {
                 if (UNLIKELY(ctx->done())) {
                     eos = true;
                     break;
@@ -318,6 +322,9 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                     
ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() -
                                          block_size);
                 } else {
+                    if (!scan_task->cached_blocks.empty()) {
+                        has_first_full_block = true;
+                    }
                     ctx->inc_block_usage(free_block->allocated_bytes());
                     
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to