This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 12c061294833427b9f8c8e6e25d7128b3c4f7ad7
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Fri Sep 6 17:49:24 2024 +0800

    [opt](scanner profile) More counter for scanner (#40144)
    
    New profile metrics to monitor the schedule process of scanner.
    ```
    VScanner:
        ...
        -  PeakMemoryUsage:  16.31  MB
        -  PeakRunningScanner:  1
        ...
    ```
    In general, the value of `PeakMemoryUsage` is increased when any of the
    scan tasks gets a block, decreased when ScanOperator consumes block from
    block_queue.
---
 be/src/pipeline/exec/scan_operator.cpp             |   8 ++
 be/src/pipeline/exec/scan_operator.h               |   2 +
 be/src/vec/exec/scan/scanner_context.cpp           |  49 +++++++---
 be/src/vec/exec/scan/scanner_context.h             |  13 +--
 be/src/vec/exec/scan/scanner_scheduler.cpp         |  24 ++++-
 be/src/vec/exec/scan/vscanner.cpp                  |   1 +
 .../suites/query_profile/scanner_profile.groovy    | 104 +++++++++++++++++++++
 7 files changed, 177 insertions(+), 24 deletions(-)

diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index d7af0053944..cd65d61f641 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -995,6 +995,9 @@ Status ScanLocalState<Derived>::_start_scanners(
     _scanner_ctx = vectorized::ScannerContext::create_shared(
             state(), this, p._output_tuple_desc, p.output_row_descriptor(), 
scanners, p.limit(),
             state()->scan_queue_mem_limit(), _scan_dependency,
+            // NOTE: This will logic makes _max_thread_num of ScannerContext 
to be C(num of cores) * 2
+            // For a query with C/2 instance and M scan node, scan task of 
this query will be C/2 * M * C*2
+            // and will be C*C*N at most.
             // 1. If data distribution is ignored , we use 1 instance to scan.
             // 2. Else if this operator is not file scan operator, we use 
config::doris_scanner_thread_pool_thread_num scanners to scan.
             // 3. Else, file scanner will consume much memory so we use 
config::doris_scanner_thread_pool_thread_num / query_parallel_instance_num 
scanners to scan.
@@ -1055,6 +1058,9 @@ Status ScanLocalState<Derived>::_init_profile() {
     _memory_usage_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_scanner_profile, 
"MemoryUsage", 1);
     _free_blocks_memory_usage =
             _scanner_profile->AddHighWaterMarkCounter("FreeBlocks", 
TUnit::BYTES, "MemoryUsage", 1);
+    _scanner_peak_memory_usage =
+            _scanner_profile->AddHighWaterMarkCounter("PeakMemoryUsage", 
TUnit::BYTES);
+
     _newly_create_free_blocks_num =
             ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", 
TUnit::UNIT);
     _scale_up_scanners_counter = ADD_COUNTER(_scanner_profile, 
"NumScaleUpScanners", TUnit::UNIT);
@@ -1073,6 +1079,8 @@ Status ScanLocalState<Derived>::_init_profile() {
 
     _max_scanner_thread_num = ADD_COUNTER(_runtime_profile, 
"MaxScannerThreadNum", TUnit::UNIT);
 
+    _peak_running_scanner =
+            _scanner_profile->AddHighWaterMarkCounter("PeakRunningScanner", 
TUnit::UNIT);
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 823440f52c0..f84153e596d 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -109,6 +109,8 @@ protected:
     RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
     // Max num of scanner thread
     RuntimeProfile::Counter* _max_scanner_thread_num = nullptr;
+    RuntimeProfile::HighWaterMarkCounter* _peak_running_scanner = nullptr;
+    RuntimeProfile::HighWaterMarkCounter* _scanner_peak_memory_usage = nullptr;
     // time of get block from scanner
     RuntimeProfile::Counter* _scan_timer = nullptr;
     RuntimeProfile::Counter* _scan_cpu_timer = nullptr;
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index bab11616c77..5cc20c214c1 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -73,11 +73,23 @@ ScannerContext::ScannerContext(
         limit = -1;
     }
     MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio();
+    // _max_thread_num controls how many scanners of this ScanOperator can be 
submitted to scheduler at a time.
+    // The overall target of our system is to make full utilization of the 
resources.
+    // At the same time, we dont want too many tasks are queued by scheduler, 
that makes the query
+    // waiting too long, and existing task can not be scheduled in time.
+    // First of all, we try to make sure _max_thread_num of a ScanNode of a 
query on a single backend is less than
+    // config::doris_scanner_thread_pool_thread_num.
+    // For example, on a 64-core machine, the default value of 
config::doris_scanner_thread_pool_thread_num will be 64*2 =128.
+    // and the num_parallel_instances of this scan operator will be 64/2=32.
+    // For a query who has two scan nodes, the _max_thread_num of each scan 
node instance will be 128 / 32 = 4.
+    // We have 32 instances of this scan operator, so for the ScanNode, we 
have 4 * 32 = 128 scanner tasks can be submitted at a time.
+    // Remember that we have to ScanNode in this query, so the total number of 
scanner tasks can be submitted at a time is 128 * 2 = 256.
     _max_thread_num =
             _state->num_scanner_threads() > 0
                     ? _state->num_scanner_threads()
                     : config::doris_scanner_thread_pool_thread_num / 
num_parallel_instances;
     _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
+    // In some situation, there are not too many big tablets involed, so we 
can reduce the thread number.
     _max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size());
     // 1. Calculate max concurrency
     // For select * from table limit 10; should just use one thread.
@@ -116,7 +128,6 @@ Status ScannerContext::init() {
     _scanner_sched_counter = _local_state->_scanner_sched_counter;
     _newly_create_free_blocks_num = 
_local_state->_newly_create_free_blocks_num;
     _scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer;
-    _free_blocks_memory_usage_mark = _local_state->_free_blocks_memory_usage;
     _scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
     _scale_up_scanners_counter = _local_state->_scale_up_scanners_counter;
 
@@ -157,9 +168,11 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool 
force) {
     vectorized::BlockUPtr block = nullptr;
     if (_free_blocks.try_dequeue(block)) {
         DCHECK(block->mem_reuse());
-        _free_blocks_memory_usage -= block->allocated_bytes();
-        _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
-    } else if (_free_blocks_memory_usage < _max_bytes_in_queue || force) {
+        _block_memory_usage -= block->allocated_bytes();
+        // A free block is reused, so the memory usage should be decreased
+        // The caller of get_free_block will increase the memory usage
+        update_peak_memory_usage(-block->allocated_bytes());
+    } else if (_block_memory_usage < _max_bytes_in_queue || force) {
         _newly_create_free_blocks_num->update(1);
         block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 
0,
                                                  true /*ignore invalid 
slots*/);
@@ -168,11 +181,13 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool 
force) {
 }
 
 void ScannerContext::return_free_block(vectorized::BlockUPtr block) {
-    if (block->mem_reuse() && _free_blocks_memory_usage < _max_bytes_in_queue) 
{
-        _free_blocks_memory_usage += block->allocated_bytes();
-        _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+    if (block->mem_reuse() && _block_memory_usage < _max_bytes_in_queue) {
+        size_t block_size_to_reuse = block->allocated_bytes();
+        _block_memory_usage += block_size_to_reuse;
         block->clear_column_data();
-        _free_blocks.enqueue(std::move(block));
+        if (_free_blocks.enqueue(std::move(block))) {
+            update_peak_memory_usage(block_size_to_reuse);
+        }
     }
 }
 
@@ -242,8 +257,8 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
             if (_estimated_block_size > block_size) {
                 _estimated_block_size = block_size;
             }
-            _free_blocks_memory_usage -= block_size;
-            _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+            _block_memory_usage -= block_size;
+            update_peak_memory_usage(-current_block->allocated_bytes());
             // consume current block
             block->swap(*current_block);
             return_free_block(std::move(current_block));
@@ -263,8 +278,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
                     for (int i = 0; i < free_blocks_for_each; ++i) {
                         vectorized::BlockUPtr removed_block;
                         if (_free_blocks.try_dequeue(removed_block)) {
-                            _free_blocks_memory_usage -= 
block->allocated_bytes();
-                            
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+                            _block_memory_usage -= block->allocated_bytes();
                         }
                     }
                 }
@@ -314,8 +328,7 @@ void ScannerContext::_try_to_scale_up() {
         int num_add = int(std::min(_num_running_scanners * SCALE_UP_RATIO,
                                    _max_thread_num * MAX_SCALE_UP_RATIO - 
_num_running_scanners));
         if (_estimated_block_size > 0) {
-            int most_add =
-                    (_max_bytes_in_queue - _free_blocks_memory_usage) / 
_estimated_block_size;
+            int most_add = (_max_bytes_in_queue - _block_memory_usage) / 
_estimated_block_size;
             num_add = std::min(num_add, most_add);
         }
         for (int i = 0; i < num_add; ++i) {
@@ -445,4 +458,12 @@ void ScannerContext::_set_scanner_done() {
     _dependency->set_always_ready();
 }
 
+void ScannerContext::update_peak_running_scanner(int num) {
+    _local_state->_peak_running_scanner->add(num);
+}
+
+void ScannerContext::update_peak_memory_usage(int64_t usage) {
+    _local_state->_scanner_peak_memory_usage->add(usage);
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index d97fc731fe5..f93d01eef88 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -122,10 +122,12 @@ public:
 
     vectorized::BlockUPtr get_free_block(bool force);
     void return_free_block(vectorized::BlockUPtr block);
-    inline void inc_free_block_usage(size_t usage) {
-        _free_blocks_memory_usage += usage;
-        _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
-    }
+    inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; }
+
+    // Caller should make sure the pipeline task is still running when calling 
this function
+    void update_peak_running_scanner(int num);
+    // Caller should make sure the pipeline task is still running when calling 
this function
+    void update_peak_memory_usage(int64_t usage);
 
     // Get next block from blocks queue. Called by ScanNode/ScanOperator
     // Set eos to true if there is no more data to read.
@@ -223,7 +225,6 @@ protected:
     RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
     RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
     RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
-    RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage_mark = 
nullptr;
     RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
     RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
     QueryThreadContext _query_thread_context;
@@ -231,7 +232,7 @@ protected:
 
     // for scaling up the running scanners
     size_t _estimated_block_size = 0;
-    std::atomic_long _free_blocks_memory_usage = 0;
+    std::atomic_long _block_memory_usage = 0;
     int64_t _last_scale_up_time = 0;
     int64_t _last_fetch_time = 0;
     int64_t _total_wait_block_time = 0;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 351f5d4e275..e30983932ee 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -34,6 +34,7 @@
 #include "common/status.h"
 #include "olap/tablet.h"
 #include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "util/async_io.h" // IWYU pragma: keep
@@ -210,6 +211,9 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
         return;
     }
 
+    ctx->update_peak_running_scanner(1);
+    Defer defer([&] { ctx->update_peak_running_scanner(-1); });
+
     std::shared_ptr<ScannerDelegate> scanner_delegate = 
scan_task->scanner.lock();
     if (scanner_delegate == nullptr) {
         return;
@@ -267,13 +271,18 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                 if (free_block == nullptr) {
                     break;
                 }
+                // We got a new created block or a reused block.
+                ctx->update_peak_memory_usage(free_block->allocated_bytes());
+                ctx->update_peak_memory_usage(-free_block->allocated_bytes());
                 status = scanner->get_block_after_projects(state, 
free_block.get(), &eos);
+                // Projection will truncate useless columns, makes block size 
change.
+                auto free_block_bytes = free_block->allocated_bytes();
+                ctx->update_peak_memory_usage(free_block_bytes);
                 first_read = false;
                 if (!status.ok()) {
                     LOG(WARNING) << "Scan thread read VScanner failed: " << 
status.to_string();
                     break;
                 }
-                auto free_block_bytes = free_block->allocated_bytes();
                 raw_bytes_read += free_block_bytes;
                 if (!scan_task->cached_blocks.empty() &&
                     scan_task->cached_blocks.back().first->rows() + 
free_block->rows() <=
@@ -281,18 +290,25 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                     size_t block_size = 
scan_task->cached_blocks.back().first->allocated_bytes();
                     vectorized::MutableBlock mutable_block(
                             scan_task->cached_blocks.back().first.get());
+                    
ctx->update_peak_memory_usage(-mutable_block.allocated_bytes());
                     status = mutable_block.merge(*free_block);
+                    
ctx->update_peak_memory_usage(mutable_block.allocated_bytes());
                     if (!status.ok()) {
                         LOG(WARNING) << "Block merge failed: " << 
status.to_string();
                         break;
                     }
+                    scan_task->cached_blocks.back().second = 
mutable_block.allocated_bytes();
                     scan_task->cached_blocks.back().first.get()->set_columns(
                             std::move(mutable_block.mutable_columns()));
+
+                    // Return block succeed or not, this free_block is not 
used by this scan task any more.
+                    ctx->update_peak_memory_usage(-free_block_bytes);
+                    // If block can be reused, its memory usage will be added 
back.
                     ctx->return_free_block(std::move(free_block));
-                    ctx->inc_free_block_usage(
-                            
scan_task->cached_blocks.back().first->allocated_bytes() - block_size);
+                    
ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() -
+                                         block_size);
                 } else {
-                    ctx->inc_free_block_usage(free_block->allocated_bytes());
+                    ctx->inc_block_usage(free_block->allocated_bytes());
                     
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
                 }
             } // end for while
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 43d791caffa..a78f8956025 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -143,6 +143,7 @@ Status VScanner::get_block(RuntimeState* state, Block* 
block, bool* eof) {
     }
 
     if (state->is_cancelled()) {
+        // TODO: Should return the specific ErrorStatus instead of just 
Cancelled.
         return Status::Cancelled("cancelled");
     }
     *eof = *eof || _should_stop;
diff --git a/regression-test/suites/query_profile/scanner_profile.groovy 
b/regression-test/suites/query_profile/scanner_profile.groovy
new file mode 100644
index 00000000000..38216d211e6
--- /dev/null
+++ b/regression-test/suites/query_profile/scanner_profile.groovy
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import groovy.json.JsonOutput
+import groovy.json.JsonSlurper
+import groovy.json.StringEscapeUtils
+
+
+def getProfileList = {
+    def dst = 'http://' + context.config.feHttpAddress
+    def conn = new URL(dst + "/rest/v1/query_profile").openConnection()
+    conn.setRequestMethod("GET")
+    def encoding = 
Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + 
+            (context.config.feHttpPassword == null ? "" : 
context.config.feHttpPassword)).getBytes("UTF-8"))
+    conn.setRequestProperty("Authorization", "Basic ${encoding}")
+    return conn.getInputStream().getText()
+}
+
+
+def getProfile = { id ->
+        def dst = 'http://' + context.config.feHttpAddress
+        def conn = new URL(dst + 
"/api/profile/text/?query_id=$id").openConnection()
+        conn.setRequestMethod("GET")
+        def encoding = 
Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + 
+                (context.config.feHttpPassword == null ? "" : 
context.config.feHttpPassword)).getBytes("UTF-8"))
+        conn.setRequestProperty("Authorization", "Basic ${encoding}")
+        return conn.getInputStream().getText()
+}
+
+suite('scanner_profile') {
+    sql """
+        DROP TABLE IF EXISTS scanner_profile;
+    """
+    sql """
+        CREATE TABLE if not exists `scanner_profile` (
+            `id` INT,
+            `name` varchar(32)
+        ) ENGINE=OLAP
+        DISTRIBUTED BY HASH(`id`) BUCKETS 10
+        PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1"
+        );
+    """
+
+    // Insert data to table
+    sql """
+        insert into scanner_profile values 
+        (1, "A"),(2, "B"),(3, "C"),(4, 
"D"),(5,"E"),(6,"F"),(7,"G"),(8,"H"),(9,"K");
+    """
+    sql """
+        insert into scanner_profile values 
+        (10, "A"),(20, "B"),(30, "C"),(40, 
"D"),(50,"E"),(60,"F"),(70,"G"),(80,"H"),(90,"K");
+    """
+    sql """
+        insert into scanner_profile values 
+        (101, "A"),(201, "B"),(301, "C"),(401, 
"D"),(501,"E"),(601,"F"),(701,"G"),(801,"H"),(901,"K");
+    """
+    sql """
+        insert into scanner_profile values 
+        (1010, "A"),(2010, "B"),(3010, "C"),(4010, 
"D"),(5010,"E"),(6010,"F"),(7010,"G"),(8010,"H"),(9010,"K");
+    """
+
+    def uuidString = UUID.randomUUID().toString()
+    sql "set enable_profile=true"
+    // With Limit, MaxScannerThreadNum = 1
+    sql """
+        select "with_limit_1_${uuidString}", * from scanner_profile limit 10;
+    """
+    
+    def wholeString = getProfileList()
+    List profileData = new JsonSlurper().parseText(wholeString).data.rows
+    String queryIdWithLimit1 = "";
+    
+
+    logger.info("{}", uuidString)
+
+    for (def profileItem in profileData) {
+        if (profileItem["Sql 
Statement"].toString().contains("with_limit_1_${uuidString}")) {
+            queryIdWithLimit1 = profileItem["Profile ID"].toString()
+            logger.info("profileItem: {}", profileItem)
+        }
+    }
+
+    logger.info("queryIdWithLimit1_${uuidString}: {}", queryIdWithLimit1)
+
+    assertTrue(queryIdWithLimit1 != "")
+    def String profileWithLimit1 = getProfile(queryIdWithLimit1).toString()
+    logger.info("query profile {}", profileWithLimit1)
+    assertTrue(profileWithLimit1.contains("- PeakRunningScanner: 1"))
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to