This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit bedad15f0394970731e38a727f40369320c4ed42 Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> AuthorDate: Sat Jan 27 00:06:19 2024 +0800 [enhancement](scanner) add a lower bound for bytes in scanner queue (#29624) --- be/src/common/config.cpp | 1 + be/src/common/config.h | 1 + be/src/vec/exec/scan/scanner_context.cpp | 10 ++++++++++ be/src/vec/exec/scan/scanner_scheduler.cpp | 8 ++------ 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 58bb9156b5b..42ff7e8fce7 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -260,6 +260,7 @@ DEFINE_mInt32(doris_scanner_queue_size, "1024"); DEFINE_mInt32(doris_scanner_row_num, "16384"); // single read execute fragment row bytes DEFINE_mInt32(doris_scanner_row_bytes, "10485760"); +DEFINE_mInt32(min_bytes_in_scanner_queue, "67108864"); // number of max scan keys DEFINE_mInt32(doris_max_scan_key_num, "48"); // the max number of push down values of a single column. diff --git a/be/src/common/config.h b/be/src/common/config.h index 8db84b459b8..1b75a1cbbd2 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -302,6 +302,7 @@ DECLARE_mInt32(doris_scanner_queue_size); DECLARE_mInt32(doris_scanner_row_num); // single read execute fragment row bytes DECLARE_mInt32(doris_scanner_row_bytes); +DECLARE_mInt32(min_bytes_in_scanner_queue); // number of max scan keys DECLARE_mInt32(doris_max_scan_key_num); // the max number of push down values of a single column. diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 0b19f389920..7691f159509 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -45,6 +45,9 @@ namespace doris::vectorized { using namespace std::chrono_literals; +static bvar::Status<int64_t> g_bytes_in_scanner_queue("doris_bytes_in_scanner_queue", 0); +static bvar::Status<int64_t> g_num_running_scanners("doris_num_running_scanners", 0); + ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list<std::shared_ptr<ScannerDelegate>>& scanners, @@ -179,6 +182,9 @@ Status ScannerContext::init() { _free_blocks_capacity = _max_thread_num * _block_per_scanner; auto block = get_free_block(); _estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16); + int min_blocks = (config::min_bytes_in_scanner_queue + _estimated_block_bytes - 1) / + _estimated_block_bytes; + _free_blocks_capacity = std::max(_free_blocks_capacity, min_blocks); return_free_block(std::move(block)); #ifndef BE_TEST @@ -258,6 +264,7 @@ void ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& } _blocks_queue_added_cv.notify_one(); _queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue); + g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue); } bool ScannerContext::empty_in_queue(int id) { @@ -334,6 +341,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo } } + g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue); if (!merge_blocks.empty()) { vectorized::MutableBlock m(block->get()); for (auto& merge_block : merge_blocks) { @@ -375,6 +383,7 @@ Status ScannerContext::validate_block_schema(Block* block) { void ScannerContext::inc_num_running_scanners(int32_t inc) { std::lock_guard l(_transfer_lock); _num_running_scanners += inc; + g_num_running_scanners.set_value(_num_running_scanners); } void ScannerContext::set_status_on_error(const Status& status, bool need_lock) { @@ -484,6 +493,7 @@ void ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr<ScannerDel // before we call the following if() block. { --_num_running_scanners; + g_num_running_scanners.set_value(_num_running_scanners); if (scanner->_scanner->need_to_close()) { --_num_unfinished_scanners; if (_num_unfinished_scanners == 0) { diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index e0fb5afa74a..feab4e9ba4c 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -328,8 +328,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, // judge if we need to yield. So we record all raw data read in this round // scan, if this exceeds row number or bytes threshold, we yield this thread. std::vector<vectorized::BlockUPtr> blocks; - int64_t raw_rows_read = scanner->get_rows_read(); - int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num; int64_t raw_bytes_read = 0; int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; int num_rows_in_block = 0; @@ -343,11 +341,10 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, // queue, it will affect query latency and query concurrency for example ssb 3.3. auto should_do_scan = [&, batch_size = state->batch_size(), time = state->wait_full_block_schedule_times()]() { - if (raw_bytes_read < raw_bytes_threshold && raw_rows_read < raw_rows_threshold) { + if (raw_bytes_read < raw_bytes_threshold) { return true; } else if (num_rows_in_block < batch_size) { - return raw_bytes_read < raw_bytes_threshold * time && - raw_rows_read < raw_rows_threshold * time; + return raw_bytes_read < raw_bytes_threshold * time; } return false; }; @@ -396,7 +393,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, blocks.push_back(std::move(block)); } } - raw_rows_read = scanner->get_rows_read(); } // end for while // if we failed, check status. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org