This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f5e1601df [bug](scanner) Improve limit query performance on 
olapScannode and avoid infinite loop (#11301)
4f5e1601df is described below

commit 4f5e1601dfe5fb04bc941fe5d5b98d8da658a8db
Author: Zhengguo Yang <yangz...@gmail.com>
AuthorDate: Mon Aug 1 13:50:12 2022 +0800

    [bug](scanner) Improve limit query performance on olapScannode and avoid 
infinite loop (#11301)
    
    1. Fix a bug that query large column table may cause infinite loop
    2. Optimize the query logic with limit, for the case where the limit value 
is relatively small, reduce the parallelism of the scanner, reduce unnecessary 
resource consumption, and increase the number of similar queries that the 
system can carry at the same time, and increase the query speed by more than 60%
---
 be/src/exec/olap_scan_node.cpp       | 19 ++++++++--
 be/src/exec/olap_scan_node.h         |  2 +
 be/src/exec/olap_scanner.cpp         | 22 +++++++----
 be/src/exec/olap_scanner.h           |  4 ++
 be/src/runtime/fragment_mgr.cpp      |  9 +++++
 be/src/runtime/query_fragments_ctx.h | 11 ++++++
 be/src/runtime/row_batch.cpp         |  2 +-
 be/src/vec/exec/volap_scan_node.cpp  | 71 ++++++++++++++++++++++++------------
 8 files changed, 104 insertions(+), 36 deletions(-)

diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index cabf14ae62..f8d53a9e43 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -95,7 +95,8 @@ Status OlapScanNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 
         _runtime_filter_ctxs[i].runtimefilter = runtime_filter;
     }
-
+    _batch_size = _limit == -1 ? state->batch_size()
+                               : 
std::min(static_cast<int64_t>(state->batch_size()), _limit);
     return Status::OK();
 }
 
@@ -273,7 +274,6 @@ Status OlapScanNode::open(RuntimeState* state) {
 Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* 
eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
-
     // check if Canceled.
     if (state->is_cancelled()) {
         std::unique_lock<std::mutex> l(_row_batches_lock);
@@ -939,6 +939,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) 
{
             OlapScanner* scanner =
                     new OlapScanner(state, this, 
_olap_scan_node.is_preaggregation,
                                     _need_agg_finalize, *scan_range, 
_scanner_mem_tracker.get());
+            scanner->set_batch_size(_batch_size);
             // add scanner to pool before doing prepare.
             // so that scanner can be automatically deconstructed if prepare 
failed.
             _scanner_pool.add(scanner);
@@ -1489,7 +1490,12 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
         }
     }
 
-    ThreadPoolToken* thread_token = 
state->get_query_fragments_ctx()->get_token();
+    ThreadPoolToken* thread_token = nullptr;
+    if (limit() != -1 && limit() < 1024) {
+        thread_token = state->get_query_fragments_ctx()->get_serial_token();
+    } else {
+        thread_token = state->get_query_fragments_ctx()->get_token();
+    }
 
     /*********************************
      * The basic strategy of priority scheduling:
@@ -1739,7 +1745,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
                        << ", fragment id=" << 
print_id(_runtime_state->fragment_instance_id());
             break;
         }
-        RowBatch* row_batch = new RowBatch(this->row_desc(), 
state->batch_size());
+        RowBatch* row_batch = new RowBatch(this->row_desc(), _batch_size);
         row_batch->set_scanner_id(scanner->id());
         status = scanner->get_batch(_runtime_state, row_batch, &eos);
         if (!status.ok()) {
@@ -1757,6 +1763,10 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
             raw_bytes_read += 
row_batch->tuple_data_pool()->total_reserved_bytes();
         }
         raw_rows_read = scanner->raw_rows_read();
+        if (limit() != -1 && raw_rows_read >= limit()) {
+            eos = true;
+            break;
+        }
     }
 
     {
@@ -1775,6 +1785,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
             std::lock_guard<SpinLock> guard(_status_mutex);
             global_status_ok = _status.ok();
         }
+
         if (UNLIKELY(!global_status_ok)) {
             eos = true;
             for (auto rb : row_batchs) {
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index 5d5b700356..ea2b0dc70d 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -206,6 +206,8 @@ protected:
     // object is.
     ObjectPool _scanner_pool;
 
+    size_t _batch_size = 0;
+
     std::shared_ptr<std::thread> _transfer_thread;
 
     // Keeps track of total splits and the number finished.
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index c725aa1458..a2dac28d30 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -63,11 +63,7 @@ Status OlapScanner::prepare(
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     set_tablet_reader();
     // set limit to reduce end of rowset and segment mem use
-    _tablet_reader->set_batch_size(
-            _parent->limit() == -1
-                    ? _parent->_runtime_state->batch_size()
-                    : 
std::min(static_cast<int64_t>(_parent->_runtime_state->batch_size()),
-                               _parent->limit()));
+    _tablet_reader->set_batch_size(_parent->_batch_size);
 
     // Get olap table
     TTabletId tablet_id = scan_range.tablet_id;
@@ -314,9 +310,15 @@ Status OlapScanner::_init_return_columns(bool 
need_seq_col) {
 Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) 
{
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     // 2. Allocate Row's Tuple buf
+    Status st = Status::OK();
     uint8_t* tuple_buf =
-            batch->tuple_data_pool()->allocate(state->batch_size() * 
_tuple_desc->byte_size());
-    bzero(tuple_buf, state->batch_size() * _tuple_desc->byte_size());
+            batch->tuple_data_pool()->allocate(_batch_size * 
_tuple_desc->byte_size(), &st);
+    RETURN_NOT_OK_STATUS_WITH_WARN(st, "Allocate mem for row batch failed");
+    if (tuple_buf == nullptr) {
+        LOG(WARNING) << "Allocate mem for row batch failed.";
+        return Status::RuntimeError("Allocate mem for row batch failed.");
+    }
+    bzero(tuple_buf, _batch_size * _tuple_desc->byte_size());
     Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buf);
 
     std::unique_ptr<MemPool> mem_pool(new MemPool(_mem_tracker));
@@ -329,6 +331,11 @@ Status OlapScanner::get_batch(RuntimeState* state, 
RowBatch* batch, bool* eof) {
         ObjectPool tmp_object_pool;
         // release the memory of the object which can't pass the conjuncts.
         ObjectPool unused_object_pool;
+        if (batch->tuple_data_pool()->total_reserved_bytes() >= 
raw_bytes_threshold) {
+            return Status::RuntimeError(
+                    "Scanner row bytes buffer is too small, please try to 
increase be config "
+                    "'doris_scanner_row_bytes'.");
+        }
         while (true) {
             // Batch is full or reach raw_rows_threshold or 
raw_bytes_threshold, break
             if (batch->is_full() ||
@@ -631,6 +638,7 @@ void OlapScanner::_update_realtime_counter() {
     COUNTER_UPDATE(_parent->_raw_rows_counter, stats.raw_rows_read);
     // if raw_rows_read is reset, scanNode will scan all table rows which may 
cause BE crash
     _raw_rows_read += stats.raw_rows_read;
+
     _tablet_reader->mutable_stats()->raw_rows_read = 0;
 }
 
diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h
index e95e31d106..ea128dc9dd 100644
--- a/be/src/exec/olap_scanner.h
+++ b/be/src/exec/olap_scanner.h
@@ -90,6 +90,8 @@ public:
 
     TabletStorageType get_storage_type();
 
+    void set_batch_size(size_t batch_size) { _batch_size = batch_size; }
+
 protected:
     Status _init_tablet_reader_params(
             const std::vector<OlapScanRange*>& key_ranges, const 
std::vector<TCondition>& filters,
@@ -140,6 +142,8 @@ protected:
     int64_t _raw_rows_read = 0;
     int64_t _compressed_bytes_read = 0;
 
+    size_t _batch_size = 0;
+
     // number rows filtered by pushed condition
     int64_t _num_rows_pushed_cond_filtered = 0;
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7b16f853ce..b2f00c862d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -640,6 +640,15 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
                 
fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit);
             }
         }
+        if (params.__isset.fragment && params.fragment.__isset.plan &&
+            params.fragment.plan.nodes.size() > 0) {
+            for (auto& node : params.fragment.plan.nodes) {
+                if (node.limit > 0 && node.limit < 1024) {
+                    fragments_ctx->set_serial_thread_token();
+                    break;
+                }
+            }
+        }
 
         {
             // Find _fragments_ctx_map again, in case some other request has 
already
diff --git a/be/src/runtime/query_fragments_ctx.h 
b/be/src/runtime/query_fragments_ctx.h
index 0c91f1b305..78d84cb4b2 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_fragments_ctx.h
@@ -59,9 +59,15 @@ public:
                     ThreadPool::ExecutionMode::CONCURRENT, cpu_limit);
         }
     }
+    void set_serial_thread_token() {
+        _serial_thread_token = 
_exec_env->limited_scan_thread_pool()->new_token(
+                ThreadPool::ExecutionMode::SERIAL, 1);
+    }
 
     ThreadPoolToken* get_token() { return _thread_token.get(); }
 
+    ThreadPoolToken* get_serial_token() { return _serial_thread_token.get(); }
+
     void set_ready_to_execute() {
         {
             std::lock_guard<std::mutex> l(_start_lock);
@@ -108,6 +114,11 @@ private:
     // If this token is not set, the scanner will be executed in 
"_scan_thread_pool" in exec env.
     std::unique_ptr<ThreadPoolToken> _thread_token;
 
+    // A token used to submit olap scanner to the "_limited_scan_thread_pool" 
serially, it used for
+    // query like `select * limit 1`, this query used for limit the max scaner 
thread to 1 to avoid
+    // this query cost too much resource
+    std::unique_ptr<ThreadPoolToken> _serial_thread_token;
+
     std::mutex _start_lock;
     std::condition_variable _start_cond;
     // Only valid when _need_wait_execution_trigger is set to true in 
FragmentExecState.
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 2b1f0a09c1..9c8ac6d3aa 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -499,7 +499,7 @@ size_t RowBatch::get_batch_size(const PRowBatch& batch) {
 void RowBatch::acquire_state(RowBatch* src) {
     // DCHECK(_row_desc.equals(src->_row_desc));
     DCHECK_EQ(_num_tuples_per_row, src->_num_tuples_per_row);
-    DCHECK_EQ(_tuple_ptrs_size, src->_tuple_ptrs_size);
+    // DCHECK_EQ(_tuple_ptrs_size, src->_tuple_ptrs_size);
     DCHECK_EQ(_auxiliary_mem_usage, 0);
 
     // The destination row batch should be empty.
diff --git a/be/src/vec/exec/volap_scan_node.cpp 
b/be/src/vec/exec/volap_scan_node.cpp
index 993475cfec..4b4ddf313f 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -1876,35 +1876,58 @@ int 
VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
     }
 
     // post volap scanners to thread-pool
-    PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool();
     auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan();
-    PriorityThreadPool* remote_thread_pool = 
state->exec_env()->remote_scan_thread_pool();
+    ThreadPoolToken* thread_token = nullptr;
+    if (_limit > -1 && _limit < 1024) {
+        thread_token = state->get_query_fragments_ctx()->get_serial_token();
+    } else {
+        thread_token = state->get_query_fragments_ctx()->get_token();
+    }
     auto iter = olap_scanners.begin();
-    while (iter != olap_scanners.end()) {
-        PriorityThreadPool::Task task;
-        task.work_function = [this, scanner = *iter, parent_span = cur_span] {
-            opentelemetry::trace::Scope scope {parent_span};
-            this->scanner_thread(scanner);
-        };
-        task.priority = _nice;
-        task.queue_id = 
state->exec_env()->store_path_to_index((*iter)->scan_disk());
-        (*iter)->start_wait_worker_timer();
-
-        TabletStorageType type = (*iter)->get_storage_type();
-        bool ret = false;
-        COUNTER_UPDATE(_scanner_sched_counter, 1);
-        if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
-            ret = thread_pool->offer(task);
-        } else {
-            ret = remote_thread_pool->offer(task);
+    if (thread_token != nullptr) {
+        while (iter != olap_scanners.end()) {
+            auto s = thread_token->submit_func([this, scanner = *iter, 
parent_span = cur_span] {
+                opentelemetry::trace::Scope scope {parent_span};
+                this->scanner_thread(scanner);
+            });
+            if (s.ok()) {
+                (*iter)->start_wait_worker_timer();
+                COUNTER_UPDATE(_scanner_sched_counter, 1);
+                olap_scanners.erase(iter++);
+            } else {
+                LOG(FATAL) << "Failed to assign scanner task to thread pool! " 
<< s.get_error_msg();
+            }
+            ++_total_assign_num;
         }
+    } else {
+        PriorityThreadPool* thread_pool = 
state->exec_env()->scan_thread_pool();
+        PriorityThreadPool* remote_thread_pool = 
state->exec_env()->remote_scan_thread_pool();
+        while (iter != olap_scanners.end()) {
+            PriorityThreadPool::Task task;
+            task.work_function = [this, scanner = *iter, parent_span = 
cur_span] {
+                opentelemetry::trace::Scope scope {parent_span};
+                this->scanner_thread(scanner);
+            };
+            task.priority = _nice;
+            task.queue_id = 
state->exec_env()->store_path_to_index((*iter)->scan_disk());
+            (*iter)->start_wait_worker_timer();
+
+            TabletStorageType type = (*iter)->get_storage_type();
+            bool ret = false;
+            COUNTER_UPDATE(_scanner_sched_counter, 1);
+            if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
+                ret = thread_pool->offer(task);
+            } else {
+                ret = remote_thread_pool->offer(task);
+            }
 
-        if (ret) {
-            olap_scanners.erase(iter++);
-        } else {
-            LOG(FATAL) << "Failed to assign scanner task to thread pool!";
+            if (ret) {
+                olap_scanners.erase(iter++);
+            } else {
+                LOG(FATAL) << "Failed to assign scanner task to thread pool!";
+            }
+            ++_total_assign_num;
         }
-        ++_total_assign_num;
     }
 
     return assigned_thread_num;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to