This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new f3d4f8d98b3 add low memory mode in scan operator (#40662)
f3d4f8d98b3 is described below

commit f3d4f8d98b37725ace0d62f9678c92f696ba56f4
Author: yiguolei <676222...@qq.com>
AuthorDate: Wed Sep 11 15:04:02 2024 +0800

    add low memory mode in scan operator (#40662)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/pipeline/exec/operator.h                    |  2 +
 be/src/pipeline/pipeline_task.cpp                  |  1 +
 be/src/pipeline/task_scheduler.cpp                 |  3 +-
 be/src/runtime/query_context.h                     | 49 +++++++++++++++
 .../workload_group/workload_group_manager.cpp      | 10 +++
 be/src/vec/exec/scan/scanner_context.cpp           | 72 ++++++++++++++--------
 be/src/vec/exec/scan/scanner_context.h             | 16 ++++-
 be/src/vec/exec/scan/scanner_scheduler.cpp         |  9 ++-
 8 files changed, 134 insertions(+), 28 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 265cf31f648..c6b49317375 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -198,6 +198,8 @@ public:
 
     void reset_estimate_memory_usage() { _estimate_memory_usage = 0; }
 
+    bool low_memory_mode() { return 
_state->get_query_ctx()->low_memory_mode(); }
+
 protected:
     friend class OperatorXBase;
 
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index fe44f8a4b3f..8051bb5d4e6 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -388,6 +388,7 @@ Status PipelineTask::execute(bool* eos) {
                                << ", sink name: " << _sink->get_name()
                                << ", node id: " << _sink->node_id() << " 
failed: " << st.to_string()
                                << ", debug info: " << 
GlobalMemoryArbitrator::process_mem_log_str();
+                    _state->get_query_ctx()->set_low_memory_mode();
                     bool is_high_wartermark = false;
                     bool is_low_wartermark = false;
                     workload_group->check_mem_used(&is_low_wartermark, 
&is_high_wartermark);
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 715feceed98..30f3302d429 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -58,9 +58,8 @@ TaskScheduler::~TaskScheduler() {
 
 Status TaskScheduler::start() {
     int cores = _task_queue->cores();
-    // Init the thread pool with cores+1 thread
+    // Init the thread pool with cores thread
     // some for pipeline task running
-    // 1 for spill disk query handler
     RETURN_IF_ERROR(ThreadPoolBuilder(_name)
                             .set_min_threads(cores)
                             .set_max_threads(cores)
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index eb2beb2ba05..fdbfc7c7217 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -264,6 +264,54 @@ public:
         }
     }
 
+    // Query will run in low memory mode when
+    // 1. the query is enable spill and wg's low water mark reached, if not 
release buffer, it will trigger spill disk, it is very expensive.
+    // 2. the query is not enable spill, but wg's high water mark reached, if 
not release buffer, the query will be cancelled.
+    // 3. the process reached soft mem_limit, if not release these, if not 
release buffer, the query will be cancelled.
+    // 4. If the query reserve memory failed.
+    // Under low memory mode, the query should release some buffers such as 
scan operator block queue, union operator queue, exchange buffer size, 
streaming agg
+    bool low_memory_mode() {
+        if (_low_memory_mode) {
+            return true;
+        }
+
+        // If less than 100MB left, then it is low memory mode
+        if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(100 * 1024 
* 1024)) {
+            _low_memory_mode = true;
+            LOG(INFO) << "Query " << print_id(_query_id)
+                      << " goes to low memory mode due to exceed process soft 
memory limit";
+            return true;
+        }
+
+        if (_workload_group) {
+            bool is_low_wartermark = false;
+            bool is_high_wartermark = false;
+            _workload_group->check_mem_used(&is_low_wartermark, 
&is_high_wartermark);
+            if (is_high_wartermark) {
+                LOG(INFO)
+                        << "Query " << print_id(_query_id)
+                        << " goes to low memory mode due to workload group 
high water mark reached";
+                _low_memory_mode = true;
+                return true;
+            }
+
+            if (is_low_wartermark &&
+                ((_query_options.__isset.enable_join_spill && 
_query_options.enable_join_spill) ||
+                 (_query_options.__isset.enable_sort_spill && 
_query_options.enable_sort_spill) ||
+                 (_query_options.__isset.enable_agg_spill && 
_query_options.enable_agg_spill))) {
+                LOG(INFO) << "Query " << print_id(_query_id)
+                          << " goes to low memory mode due to workload group 
low water mark "
+                             "reached and the query enable spill";
+                _low_memory_mode = true;
+                return true;
+            }
+        }
+
+        return _low_memory_mode;
+    }
+
+    void set_low_memory_mode() { _low_memory_mode = true; }
+
 private:
     int _timeout_second;
     TUniqueId _query_id;
@@ -313,6 +361,7 @@ private:
     std::mutex _pipeline_map_write_lock;
 
     std::atomic<int64_t> _spill_threshold {0};
+    std::atomic<bool> _low_memory_mode = false;
 
     std::mutex _profile_mutex;
     timespec _query_arrival_timestamp;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 39ab564db63..431eaa248ac 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -335,6 +335,8 @@ void WorkloadGroupMgr::handle_paused_queries() {
         wg->check_mem_used(&is_low_wartermark, &is_high_wartermark);
 
         if (!is_low_wartermark && !is_high_wartermark) {
+            // TODO: should check if there is a large reserve size in the 
query's operators
+            // If it exist, then should find the query and spill it.
             LOG(INFO) << "**** there are " << queries_list.size() << " to 
resume";
             for (const auto& query : queries_list) {
                 LOG(INFO) << "**** resume paused query: " << query.query_id();
@@ -359,6 +361,14 @@ void WorkloadGroupMgr::handle_paused_queries() {
         size_t max_memory_usage = 0;
         auto it_to_remove = queries_list.end();
 
+        // TODO: should check buffer type memory first, if could release many 
these memory, then not need do spill disk
+        // Buffer Memory are:
+        // 1. caches: page cache, segment cache...
+        // 2. memtables: load memtable
+        // 3. scan queue, exchange sink buffer, union queue
+        // 4. streaming aggs.
+        // If we could not recycle memory from these buffers(< 10%), then do 
spill disk.
+
         for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
             const auto query_ctx = query_it->query_ctx_.lock();
             // The query is finished during in paused list.
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index cbb3d0f5723..ed50c195d74 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -266,7 +266,10 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
             update_peak_memory_usage(-current_block->allocated_bytes());
             // consume current block
             block->swap(*current_block);
-            return_free_block(std::move(current_block));
+            // If under low memory mode, should not return the freeblock, it 
will occupy too memory.
+            if (!_local_state->low_memory_mode()) {
+                return_free_block(std::move(current_block));
+            }
         } else {
             // This scan task do not have any cached blocks.
             _blocks_queue.pop_front();
@@ -275,37 +278,54 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
                 _num_finished_scanners++;
                 std::weak_ptr<ScannerDelegate> next_scanner;
                 // submit one of the remaining scanners
-                if (_scanners.try_dequeue(next_scanner)) {
-                    auto submit_status = 
submit_scan_task(std::make_shared<ScanTask>(next_scanner));
-                    if (!submit_status.ok()) {
-                        _process_status = submit_status;
-                        _set_scanner_done();
-                        return _process_status;
-                    }
-                } else {
-                    // no more scanner to be scheduled
-                    // `_free_blocks` serve all running scanners, maybe it's 
too large for the remaining scanners
-                    int free_blocks_for_each = _free_blocks.size_approx() / 
_num_running_scanners;
+                // If under low memory mode, then there should be at most 4 
scanner running
+                if (_num_running_scanners > low_memory_mode_scanners() &&
+                    _local_state->low_memory_mode()) {
                     _num_running_scanners--;
-                    for (int i = 0; i < free_blocks_for_each; ++i) {
-                        vectorized::BlockUPtr removed_block;
-                        if (_free_blocks.try_dequeue(removed_block)) {
-                            _block_memory_usage -= block->allocated_bytes();
+                } else {
+                    if (_scanners.try_dequeue(next_scanner)) {
+                        auto submit_status =
+                                
submit_scan_task(std::make_shared<ScanTask>(next_scanner));
+                        if (!submit_status.ok()) {
+                            _process_status = submit_status;
+                            _set_scanner_done();
+                            return _process_status;
+                        }
+                    } else {
+                        // no more scanner to be scheduled
+                        // `_free_blocks` serve all running scanners, maybe 
it's too large for the remaining scanners
+                        int free_blocks_for_each =
+                                _free_blocks.size_approx() / 
_num_running_scanners;
+                        _num_running_scanners--;
+                        for (int i = 0; i < free_blocks_for_each; ++i) {
+                            vectorized::BlockUPtr removed_block;
+                            if (_free_blocks.try_dequeue(removed_block)) {
+                                _block_memory_usage -= 
block->allocated_bytes();
+                            }
                         }
                     }
                 }
             } else {
-                // resubmit current running scanner to read the next block
-                Status submit_status = submit_scan_task(scan_task);
-                if (!submit_status.ok()) {
-                    _process_status = submit_status;
-                    _set_scanner_done();
-                    return _process_status;
+                if (_local_state->low_memory_mode() &&
+                    _num_running_scanners > low_memory_mode_scanners()) {
+                    _num_running_scanners--;
+                    // push the scanner to the stack since it is not eos
+                    _scanners.enqueue(scan_task->scanner);
+                } else {
+                    // resubmit current running scanner to read the next block
+                    Status submit_status = submit_scan_task(scan_task);
+                    if (!submit_status.ok()) {
+                        _process_status = submit_status;
+                        _set_scanner_done();
+                        return _process_status;
+                    }
                 }
             }
         }
-        // scale up
-        RETURN_IF_ERROR(_try_to_scale_up());
+        if (_local_state->low_memory_mode()) {
+            // scale up
+            RETURN_IF_ERROR(_try_to_scale_up());
+        }
     }
 
     if (_num_finished_scanners == _all_scanners.size() && 
_blocks_queue.empty()) {
@@ -488,4 +508,8 @@ void ScannerContext::update_peak_memory_usage(int64_t 
usage) {
     _local_state->_scanner_peak_memory_usage->add(usage);
 }
 
+bool ScannerContext::low_memory_mode() const {
+    return _local_state->low_memory_mode();
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 03c4e5a4f1b..cb9bd8e71b0 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -171,6 +171,17 @@ public:
 
     int batch_size() const { return _batch_size; }
 
+    // During low memory mode, there will be at most 4 scanners running and 
every scanner will
+    // cache at most 2MB data. So that every instance will keep 8MB buffer.
+    bool low_memory_mode() const;
+
+    // TODO(yiguolei) add this as session variable
+    int32_t low_memory_mode_scan_bytes_per_scanner() const {
+        return 2 * 1024 * 1024; // 2MB
+    }
+
+    int32_t low_memory_mode_scanners() const { return 4; }
+
     // the unique id of this context
     std::string ctx_id;
     TUniqueId _query_id;
@@ -179,6 +190,9 @@ public:
 
     bool _should_reset_thread_name = true;
 
+    const static int LOW_MEMORY_MODE_SCAN_BYTES = 2 * 1024 * 1024; // 2MB
+    const static int LOW_MEMORY_MODE_MAX_SCANNERS = 4;
+
 protected:
     /// Four criteria to determine whether to increase the parallelism of the 
scanners
     /// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
@@ -217,7 +231,7 @@ protected:
     moodycamel::ConcurrentQueue<std::weak_ptr<ScannerDelegate>> _scanners;
     int32_t _num_scheduled_scanners = 0;
     int32_t _num_finished_scanners = 0;
-    int32_t _num_running_scanners = 0;
+    std::atomic_int _num_running_scanners = 0;
     // weak pointer for _scanners, used in stop function
     std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners;
     const int _num_parallel_instances;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 444ff4dbb0c..2352f10ca0c 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -264,7 +264,14 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
             }
 
             size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
-            size_t raw_bytes_read = 0; bool first_read = true;
+            if (ctx->low_memory_mode() &&
+                raw_bytes_threshold > 
ctx->low_memory_mode_scan_bytes_per_scanner()) {
+                raw_bytes_threshold = 
ctx->low_memory_mode_scan_bytes_per_scanner();
+            }
+
+            size_t raw_bytes_read = 0;
+            bool first_read = true;
+
             while (!eos && raw_bytes_read < raw_bytes_threshold) {
                 if (UNLIKELY(ctx->done())) {
                     eos = true;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to