This is an automated email from the ASF dual-hosted git repository. yangzhg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4f5e1601df [bug](scanner) Improve limit query performance on olapScannode and avoid infinite loop (#11301) 4f5e1601df is described below commit 4f5e1601dfe5fb04bc941fe5d5b98d8da658a8db Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Mon Aug 1 13:50:12 2022 +0800 [bug](scanner) Improve limit query performance on olapScannode and avoid infinite loop (#11301) 1. Fix a bug that query large column table may cause infinite loop 2. Optimize the query logic with limit, for the case where the limit value is relatively small, reduce the parallelism of the scanner, reduce unnecessary resource consumption, and increase the number of similar queries that the system can carry at the same time, and increase the query speed by more than 60% --- be/src/exec/olap_scan_node.cpp | 19 ++++++++-- be/src/exec/olap_scan_node.h | 2 + be/src/exec/olap_scanner.cpp | 22 +++++++---- be/src/exec/olap_scanner.h | 4 ++ be/src/runtime/fragment_mgr.cpp | 9 +++++ be/src/runtime/query_fragments_ctx.h | 11 ++++++ be/src/runtime/row_batch.cpp | 2 +- be/src/vec/exec/volap_scan_node.cpp | 71 ++++++++++++++++++++++++------------ 8 files changed, 104 insertions(+), 36 deletions(-) diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index cabf14ae62..f8d53a9e43 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -95,7 +95,8 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) { _runtime_filter_ctxs[i].runtimefilter = runtime_filter; } - + _batch_size = _limit == -1 ? state->batch_size() + : std::min(static_cast<int64_t>(state->batch_size()), _limit); return Status::OK(); } @@ -273,7 +274,6 @@ Status OlapScanNode::open(RuntimeState* state) { Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - // check if Canceled. if (state->is_cancelled()) { std::unique_lock<std::mutex> l(_row_batches_lock); @@ -939,6 +939,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) { OlapScanner* scanner = new OlapScanner(state, this, _olap_scan_node.is_preaggregation, _need_agg_finalize, *scan_range, _scanner_mem_tracker.get()); + scanner->set_batch_size(_batch_size); // add scanner to pool before doing prepare. // so that scanner can be automatically deconstructed if prepare failed. _scanner_pool.add(scanner); @@ -1489,7 +1490,12 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { } } - ThreadPoolToken* thread_token = state->get_query_fragments_ctx()->get_token(); + ThreadPoolToken* thread_token = nullptr; + if (limit() != -1 && limit() < 1024) { + thread_token = state->get_query_fragments_ctx()->get_serial_token(); + } else { + thread_token = state->get_query_fragments_ctx()->get_token(); + } /********************************* * The basic strategy of priority scheduling: @@ -1739,7 +1745,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { << ", fragment id=" << print_id(_runtime_state->fragment_instance_id()); break; } - RowBatch* row_batch = new RowBatch(this->row_desc(), state->batch_size()); + RowBatch* row_batch = new RowBatch(this->row_desc(), _batch_size); row_batch->set_scanner_id(scanner->id()); status = scanner->get_batch(_runtime_state, row_batch, &eos); if (!status.ok()) { @@ -1757,6 +1763,10 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { raw_bytes_read += row_batch->tuple_data_pool()->total_reserved_bytes(); } raw_rows_read = scanner->raw_rows_read(); + if (limit() != -1 && raw_rows_read >= limit()) { + eos = true; + break; + } } { @@ -1775,6 +1785,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { std::lock_guard<SpinLock> guard(_status_mutex); global_status_ok = _status.ok(); } + if (UNLIKELY(!global_status_ok)) { eos = true; for (auto rb : row_batchs) { diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index 5d5b700356..ea2b0dc70d 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -206,6 +206,8 @@ protected: // object is. ObjectPool _scanner_pool; + size_t _batch_size = 0; + std::shared_ptr<std::thread> _transfer_thread; // Keeps track of total splits and the number finished. diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index c725aa1458..a2dac28d30 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -63,11 +63,7 @@ Status OlapScanner::prepare( SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); set_tablet_reader(); // set limit to reduce end of rowset and segment mem use - _tablet_reader->set_batch_size( - _parent->limit() == -1 - ? _parent->_runtime_state->batch_size() - : std::min(static_cast<int64_t>(_parent->_runtime_state->batch_size()), - _parent->limit())); + _tablet_reader->set_batch_size(_parent->_batch_size); // Get olap table TTabletId tablet_id = scan_range.tablet_id; @@ -314,9 +310,15 @@ Status OlapScanner::_init_return_columns(bool need_seq_col) { Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); // 2. Allocate Row's Tuple buf + Status st = Status::OK(); uint8_t* tuple_buf = - batch->tuple_data_pool()->allocate(state->batch_size() * _tuple_desc->byte_size()); - bzero(tuple_buf, state->batch_size() * _tuple_desc->byte_size()); + batch->tuple_data_pool()->allocate(_batch_size * _tuple_desc->byte_size(), &st); + RETURN_NOT_OK_STATUS_WITH_WARN(st, "Allocate mem for row batch failed"); + if (tuple_buf == nullptr) { + LOG(WARNING) << "Allocate mem for row batch failed."; + return Status::RuntimeError("Allocate mem for row batch failed."); + } + bzero(tuple_buf, _batch_size * _tuple_desc->byte_size()); Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buf); std::unique_ptr<MemPool> mem_pool(new MemPool(_mem_tracker)); @@ -329,6 +331,11 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { ObjectPool tmp_object_pool; // release the memory of the object which can't pass the conjuncts. ObjectPool unused_object_pool; + if (batch->tuple_data_pool()->total_reserved_bytes() >= raw_bytes_threshold) { + return Status::RuntimeError( + "Scanner row bytes buffer is too small, please try to increase be config " + "'doris_scanner_row_bytes'."); + } while (true) { // Batch is full or reach raw_rows_threshold or raw_bytes_threshold, break if (batch->is_full() || @@ -631,6 +638,7 @@ void OlapScanner::_update_realtime_counter() { COUNTER_UPDATE(_parent->_raw_rows_counter, stats.raw_rows_read); // if raw_rows_read is reset, scanNode will scan all table rows which may cause BE crash _raw_rows_read += stats.raw_rows_read; + _tablet_reader->mutable_stats()->raw_rows_read = 0; } diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h index e95e31d106..ea128dc9dd 100644 --- a/be/src/exec/olap_scanner.h +++ b/be/src/exec/olap_scanner.h @@ -90,6 +90,8 @@ public: TabletStorageType get_storage_type(); + void set_batch_size(size_t batch_size) { _batch_size = batch_size; } + protected: Status _init_tablet_reader_params( const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, @@ -140,6 +142,8 @@ protected: int64_t _raw_rows_read = 0; int64_t _compressed_bytes_read = 0; + size_t _batch_size = 0; + // number rows filtered by pushed condition int64_t _num_rows_pushed_cond_filtered = 0; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 7b16f853ce..b2f00c862d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -640,6 +640,15 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit); } } + if (params.__isset.fragment && params.fragment.__isset.plan && + params.fragment.plan.nodes.size() > 0) { + for (auto& node : params.fragment.plan.nodes) { + if (node.limit > 0 && node.limit < 1024) { + fragments_ctx->set_serial_thread_token(); + break; + } + } + } { // Find _fragments_ctx_map again, in case some other request has already diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h index 0c91f1b305..78d84cb4b2 100644 --- a/be/src/runtime/query_fragments_ctx.h +++ b/be/src/runtime/query_fragments_ctx.h @@ -59,9 +59,15 @@ public: ThreadPool::ExecutionMode::CONCURRENT, cpu_limit); } } + void set_serial_thread_token() { + _serial_thread_token = _exec_env->limited_scan_thread_pool()->new_token( + ThreadPool::ExecutionMode::SERIAL, 1); + } ThreadPoolToken* get_token() { return _thread_token.get(); } + ThreadPoolToken* get_serial_token() { return _serial_thread_token.get(); } + void set_ready_to_execute() { { std::lock_guard<std::mutex> l(_start_lock); @@ -108,6 +114,11 @@ private: // If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env. std::unique_ptr<ThreadPoolToken> _thread_token; + // A token used to submit olap scanner to the "_limited_scan_thread_pool" serially, it used for + // query like `select * limit 1`, this query used for limit the max scaner thread to 1 to avoid + // this query cost too much resource + std::unique_ptr<ThreadPoolToken> _serial_thread_token; + std::mutex _start_lock; std::condition_variable _start_cond; // Only valid when _need_wait_execution_trigger is set to true in FragmentExecState. diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 2b1f0a09c1..9c8ac6d3aa 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -499,7 +499,7 @@ size_t RowBatch::get_batch_size(const PRowBatch& batch) { void RowBatch::acquire_state(RowBatch* src) { // DCHECK(_row_desc.equals(src->_row_desc)); DCHECK_EQ(_num_tuples_per_row, src->_num_tuples_per_row); - DCHECK_EQ(_tuple_ptrs_size, src->_tuple_ptrs_size); + // DCHECK_EQ(_tuple_ptrs_size, src->_tuple_ptrs_size); DCHECK_EQ(_auxiliary_mem_usage, 0); // The destination row batch should be empty. diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 993475cfec..4b4ddf313f 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -1876,35 +1876,58 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per } // post volap scanners to thread-pool - PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool(); auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan(); - PriorityThreadPool* remote_thread_pool = state->exec_env()->remote_scan_thread_pool(); + ThreadPoolToken* thread_token = nullptr; + if (_limit > -1 && _limit < 1024) { + thread_token = state->get_query_fragments_ctx()->get_serial_token(); + } else { + thread_token = state->get_query_fragments_ctx()->get_token(); + } auto iter = olap_scanners.begin(); - while (iter != olap_scanners.end()) { - PriorityThreadPool::Task task; - task.work_function = [this, scanner = *iter, parent_span = cur_span] { - opentelemetry::trace::Scope scope {parent_span}; - this->scanner_thread(scanner); - }; - task.priority = _nice; - task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk()); - (*iter)->start_wait_worker_timer(); - - TabletStorageType type = (*iter)->get_storage_type(); - bool ret = false; - COUNTER_UPDATE(_scanner_sched_counter, 1); - if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { - ret = thread_pool->offer(task); - } else { - ret = remote_thread_pool->offer(task); + if (thread_token != nullptr) { + while (iter != olap_scanners.end()) { + auto s = thread_token->submit_func([this, scanner = *iter, parent_span = cur_span] { + opentelemetry::trace::Scope scope {parent_span}; + this->scanner_thread(scanner); + }); + if (s.ok()) { + (*iter)->start_wait_worker_timer(); + COUNTER_UPDATE(_scanner_sched_counter, 1); + olap_scanners.erase(iter++); + } else { + LOG(FATAL) << "Failed to assign scanner task to thread pool! " << s.get_error_msg(); + } + ++_total_assign_num; } + } else { + PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool(); + PriorityThreadPool* remote_thread_pool = state->exec_env()->remote_scan_thread_pool(); + while (iter != olap_scanners.end()) { + PriorityThreadPool::Task task; + task.work_function = [this, scanner = *iter, parent_span = cur_span] { + opentelemetry::trace::Scope scope {parent_span}; + this->scanner_thread(scanner); + }; + task.priority = _nice; + task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk()); + (*iter)->start_wait_worker_timer(); + + TabletStorageType type = (*iter)->get_storage_type(); + bool ret = false; + COUNTER_UPDATE(_scanner_sched_counter, 1); + if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { + ret = thread_pool->offer(task); + } else { + ret = remote_thread_pool->offer(task); + } - if (ret) { - olap_scanners.erase(iter++); - } else { - LOG(FATAL) << "Failed to assign scanner task to thread pool!"; + if (ret) { + olap_scanners.erase(iter++); + } else { + LOG(FATAL) << "Failed to assign scanner task to thread pool!"; + } + ++_total_assign_num; } - ++_total_assign_num; } return assigned_thread_num; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org