This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new fbcf3380971 [refactor](scanner) refactoring and optimizing scanner scheduling (#30746) fbcf3380971 is described below commit fbcf33809719140253a40c913c480f5d0f103830 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Wed Feb 7 18:08:24 2024 +0800 [refactor](scanner) refactoring and optimizing scanner scheduling (#30746) --- be/src/pipeline/exec/file_scan_operator.cpp | 8 +- be/src/pipeline/exec/scan_operator.cpp | 27 +- be/src/pipeline/exec/scan_operator.h | 3 +- be/src/runtime/runtime_state.h | 8 +- be/src/vec/exec/scan/new_file_scan_node.cpp | 9 +- be/src/vec/exec/scan/pip_scanner_context.h | 242 +--------- be/src/vec/exec/scan/scanner_context.cpp | 515 +++++++++------------ be/src/vec/exec/scan/scanner_context.h | 213 ++++----- be/src/vec/exec/scan/scanner_scheduler.cpp | 297 ++++-------- be/src/vec/exec/scan/scanner_scheduler.h | 44 +- be/src/vec/exec/scan/vscan_node.cpp | 25 +- be/src/vec/exec/scan/vscan_node.h | 3 +- .../java/org/apache/doris/qe/SessionVariable.java | 42 +- gensrc/thrift/PaloInternalService.thrift | 2 + 14 files changed, 499 insertions(+), 939 deletions(-) diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 51fa60f067d..ac193147dfb 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -38,8 +38,10 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s } auto& p = _parent->cast<FileScanOperatorX>(); - size_t shard_num = - std::min<size_t>(config::doris_scanner_thread_pool_thread_num, _scan_ranges.size()); + size_t shard_num = std::min<size_t>( + config::doris_scanner_thread_pool_thread_num / state()->query_parallel_instance_num(), + _scan_ranges.size()); + shard_num = std::max(shard_num, (size_t)1); _kv_cache.reset(new vectorized::ShardedKVCache(shard_num)); for (auto& scan_range : _scan_ranges) { std::unique_ptr<vectorized::VFileScanner> scanner = vectorized::VFileScanner::create_unique( @@ -62,7 +64,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state, const std::vector<TScanRangeParams>& scan_ranges) { int max_scanners = config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num(); - max_scanners = max_scanners == 0 ? 1 : max_scanners; + max_scanners = std::max(std::max(max_scanners, state->parallel_scan_max_scanners_count()), 1); // For select * from table limit 10; should just use one thread. if (should_run_serial()) { max_scanners = 1; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index d9fced39b05..f19bed90a9e 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -160,7 +160,6 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) { if (_scanner_ctx) { DCHECK(!_eos && _num_scanners->value() > 0); RETURN_IF_ERROR(_scanner_ctx->init()); - RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx)); } _opened = true; return status; @@ -1288,16 +1287,14 @@ Status ScanLocalState<Derived>::_init_profile() { profile()->add_child(_scanner_profile.get(), true, nullptr); _memory_usage_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_scanner_profile, "MemoryUsage", 1); - _queued_blocks_memory_usage = _scanner_profile->AddHighWaterMarkCounter( - "QueuedBlocks", TUnit::BYTES, "MemoryUsage", 1); _free_blocks_memory_usage = _scanner_profile->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES, "MemoryUsage", 1); _newly_create_free_blocks_num = ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT); + _scale_up_scanners_counter = ADD_COUNTER(_scanner_profile, "NumScaleUpScanners", TUnit::UNIT); // time of transfer thread to wait for block from scan thread _scanner_wait_batch_timer = ADD_TIMER(_scanner_profile, "ScannerBatchWaitTime"); _scanner_sched_counter = ADD_COUNTER(_scanner_profile, "ScannerSchedCount", TUnit::UNIT); - _scanner_ctx_sched_counter = ADD_COUNTER(_scanner_profile, "ScannerCtxSchedCount", TUnit::UNIT); _scanner_ctx_sched_time = ADD_TIMER(_scanner_profile, "ScannerCtxSchedTime"); _scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime"); @@ -1456,14 +1453,10 @@ Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized: }}; if (state->is_cancelled()) { - // ISSUE: https://github.com/apache/doris/issues/16360 - // _scanner_ctx may be null here, see: `VScanNode::alloc_resource` (_eos == null) if (local_state._scanner_ctx) { - local_state._scanner_ctx->set_status_on_error(Status::Cancelled("query cancelled")); - return local_state._scanner_ctx->status(); - } else { - return Status::Cancelled("query cancelled"); + local_state._scanner_ctx->stop_scanners(state); } + return Status::Cancelled("Query cancelled in ScanOperator"); } if (local_state._eos) { @@ -1471,21 +1464,11 @@ Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized: return Status::OK(); } - vectorized::BlockUPtr scan_block = nullptr; bool eos = false; - RETURN_IF_ERROR(local_state._scanner_ctx->get_block_from_queue(state, &scan_block, &eos, 0)); - if (eos) { - source_state = SourceState::FINISHED; - DCHECK(scan_block == nullptr); - return Status::OK(); - } - - // get scanner's block memory - block->swap(*scan_block); - local_state._scanner_ctx->return_free_block(std::move(scan_block)); + RETURN_IF_ERROR(local_state._scanner_ctx->get_block_from_queue(state, block, &eos, 0)); local_state.reached_limit(block, source_state); - if (eos) { + if (eos || source_state == SourceState::FINISHED) { source_state = SourceState::FINISHED; // reach limit, stop the scanners. local_state._scanner_ctx->stop_scanners(state); diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 8fac0b946ea..add2249276f 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -142,7 +142,6 @@ protected: std::shared_ptr<RuntimeProfile> _scanner_profile; RuntimeProfile::Counter* _scanner_sched_counter = nullptr; - RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr; RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr; RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr; RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr; @@ -160,8 +159,8 @@ protected: // time of filter output block from scanner RuntimeProfile::Counter* _filter_timer = nullptr; RuntimeProfile::Counter* _memory_usage_counter = nullptr; - RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage = nullptr; RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr; + RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr; // rows read from the scanner (including those discarded by (pre)filters) RuntimeProfile::Counter* _rows_read_counter = nullptr; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 91443ef9492..38053d3cb68 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -128,7 +128,13 @@ public: : _query_options.query_timeout; } int max_io_buffers() const { return _query_options.max_io_buffers; } - int num_scanner_threads() const { return _query_options.num_scanner_threads; } + int num_scanner_threads() const { + return _query_options.__isset.num_scanner_threads ? _query_options.num_scanner_threads : 0; + } + double scanner_scale_up_ratio() const { + return _query_options.__isset.scanner_scale_up_ratio ? _query_options.scanner_scale_up_ratio + : 0; + } TQueryType::type query_type() const { return _query_options.query_type; } int64_t timestamp_ms() const { return _timestamp_ms; } int32_t nano_seconds() const { return _nano_seconds; } diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index da33538b8c3..2ce80f4463a 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -62,7 +62,7 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state, const std::vector<TScanRangeParams>& scan_ranges) { int max_scanners = config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num(); - max_scanners = max_scanners == 0 ? 1 : max_scanners; + max_scanners = std::max(std::max(max_scanners, state->parallel_scan_max_scanners_count()), 1); // For select * from table limit 10; should just use one thread. if (should_run_serial()) { max_scanners = 1; @@ -116,9 +116,10 @@ Status NewFileScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { return Status::OK(); } - // TODO: determine kv cache shard num - size_t shard_num = - std::min<size_t>(config::doris_scanner_thread_pool_thread_num, _scan_ranges.size()); + size_t shard_num = std::min<size_t>( + config::doris_scanner_thread_pool_thread_num / _state->query_parallel_instance_num(), + _scan_ranges.size()); + shard_num = std::max(shard_num, (size_t)1); _kv_cache.reset(new ShardedKVCache(shard_num)); for (auto& scan_range : _scan_ranges) { std::unique_ptr<VFileScanner> scanner = diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 62f6f9edb21..b69f7c031d4 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -36,129 +36,6 @@ public: : vectorized::ScannerContext(state, parent, output_tuple_desc, output_row_descriptor, scanners, limit_, max_bytes_in_blocks_queue, num_parallel_instances) {} - - Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos, - int id) override { - { - std::unique_lock l(_transfer_lock); - if (state->is_cancelled()) { - set_status_on_error(Status::Cancelled("cancelled"), false); - } - - if (!status().ok()) { - return _process_status; - } - } - - std::vector<vectorized::BlockUPtr> merge_blocks; - { - std::unique_lock<std::mutex> l(*_queue_mutexs[id]); - // The pipeline maybe wake up by scanner.done. If there are still any data - // in the queue, should read the data first and then check if the scanner.done - // if done, then eos is returned to indicate that the scan operator finished. - if (_blocks_queues[id].empty()) { - *eos = done(); - return Status::OK(); - } - if (_process_status.is<ErrorCode::CANCELLED>()) { - *eos = true; - return Status::OK(); - } - *block = std::move(_blocks_queues[id].front()); - _blocks_queues[id].pop_front(); - - auto rows = (*block)->rows(); - while (!_blocks_queues[id].empty()) { - const auto add_rows = (*_blocks_queues[id].front()).rows(); - if (rows + add_rows < state->batch_size()) { - rows += add_rows; - merge_blocks.emplace_back(std::move(_blocks_queues[id].front())); - _blocks_queues[id].pop_front(); - } else { - break; - } - } - - if (_blocks_queues[id].empty()) { - this->reschedule_scanner_ctx(); - } - } - - _current_used_bytes -= (*block)->allocated_bytes(); - if (!merge_blocks.empty()) { - vectorized::MutableBlock m(block->get()); - for (auto& merge_block : merge_blocks) { - _current_used_bytes -= merge_block->allocated_bytes(); - static_cast<void>(m.merge(*merge_block)); - return_free_block(std::move(merge_block)); - } - (*block)->set_columns(std::move(m.mutable_columns())); - } - - // after return free blocks, should try to reschedule the scanner - if (should_be_scheduled()) { - this->reschedule_scanner_ctx(); - } - - return Status::OK(); - } - - void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) override { - const int queue_size = _blocks_queues.size(); - const int block_size = blocks.size(); - if (block_size == 0) { - return; - } - int64_t local_bytes = 0; - - for (const auto& block : blocks) { - auto st = validate_block_schema(block.get()); - if (!st.ok()) { - set_status_on_error(st, false); - } - local_bytes += block->allocated_bytes(); - } - - for (int i = 0; i < queue_size && i < block_size; ++i) { - int queue = _next_queue_to_feed; - { - std::lock_guard<std::mutex> l(*_queue_mutexs[queue]); - for (int j = i; j < block_size; j += queue_size) { - _blocks_queues[queue].emplace_back(std::move(blocks[j])); - } - } - _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0; - } - _current_used_bytes += local_bytes; - } - - bool empty_in_queue(int id) override { - std::unique_lock<std::mutex> l(*_queue_mutexs[id]); - return _blocks_queues[id].empty(); - } - - Status init() override { - for (int i = 0; i < _num_parallel_instances; ++i) { - _queue_mutexs.emplace_back(std::make_unique<std::mutex>()); - _blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>()); - } - return ScannerContext::init(); - } - - std::string debug_string() override { - auto res = ScannerContext::debug_string(); - for (int i = 0; i < _blocks_queues.size(); ++i) { - res += " queue " + std::to_string(i) + ":size " + - std::to_string(_blocks_queues[i].size()); - } - return res; - } - -protected: - int _next_queue_to_feed = 0; - std::vector<std::unique_ptr<std::mutex>> _queue_mutexs; - std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues; - std::atomic_int64_t _current_used_bytes = 0; }; class PipXScannerContext final : public vectorized::ScannerContext { @@ -172,117 +49,38 @@ public: int64_t limit_, int64_t max_bytes_in_blocks_queue, std::shared_ptr<pipeline::ScanDependency> dependency) : vectorized::ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, - limit_, max_bytes_in_blocks_queue, 1, local_state, - dependency) {} - Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos, - int id) override { - if (_blocks_queue_buffered.empty()) { - std::unique_lock l(_transfer_lock); - if (state->is_cancelled()) { - set_status_on_error(Status::Cancelled("cancelled"), false); - } - - if (!status().ok()) { - return _process_status; - } - - if (_blocks_queue.empty()) { - *eos = done(); - return Status::OK(); - } - if (_process_status.is<ErrorCode::CANCELLED>()) { - *eos = true; - return Status::OK(); - } - - _blocks_queue_buffered = std::move(_blocks_queue); - } - - // `get_block_from_queue` should not be called concurrently from multiple threads, - // so here no need to lock. - *block = std::move(_blocks_queue_buffered.front()); - _blocks_queue_buffered.pop_front(); + limit_, max_bytes_in_blocks_queue, 1, local_state) { + _dependency = dependency; + } - std::vector<vectorized::BlockUPtr> merge_blocks; - auto rows = (*block)->rows(); - while (!_blocks_queue_buffered.empty()) { - const auto add_rows = (*_blocks_queue_buffered.front()).rows(); - if (rows + add_rows < state->batch_size()) { - rows += add_rows; - merge_blocks.emplace_back(std::move(_blocks_queue_buffered.front())); - _blocks_queue_buffered.pop_front(); - } else { - break; - } + void append_block_to_queue(std::shared_ptr<vectorized::ScanTask> scan_task) override { + vectorized::ScannerContext::append_block_to_queue(scan_task); + if (_dependency) { + _dependency->set_ready(); } + } - if (_blocks_queue_buffered.empty()) { - std::unique_lock l(_transfer_lock); - if (_blocks_queue.empty()) { - this->reschedule_scanner_ctx(); + Status get_block_from_queue(RuntimeState* state, vectorized::Block* block, bool* eos, int id, + bool wait = true) override { + Status st = vectorized::ScannerContext::get_block_from_queue(state, block, eos, id, wait); + std::lock_guard<std::mutex> l(_transfer_lock); + if (_blocks_queue.empty()) { + if (_dependency) { _dependency->block(); - } else { - _blocks_queue_buffered = std::move(_blocks_queue); - } - } - - _cur_bytes_in_queue -= (*block)->allocated_bytes(); - if (!merge_blocks.empty()) { - vectorized::MutableBlock m(block->get()); - for (auto& merge_block : merge_blocks) { - _cur_bytes_in_queue -= merge_block->allocated_bytes(); - static_cast<void>(m.merge(*merge_block)); - if (merge_block->mem_reuse()) { - _free_blocks_buffered.emplace_back(std::move(merge_block)); - } } - (*block)->set_columns(std::move(m.mutable_columns())); - } - return_free_blocks(); - - // after return free blocks, should try to reschedule the scanner - if (should_be_scheduled()) { - this->reschedule_scanner_ctx(); } - - return Status::OK(); + return st; } - void reschedule_scanner_ctx() override { - if (done()) { - return; - } - auto state = _scanner_scheduler->submit(shared_from_this()); - //todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times? - if (state.ok()) { - _num_scheduling_ctx++; - } else { - set_status_on_error(state, false); +protected: + void _set_scanner_done() override { + if (_dependency) { + _dependency->set_scanner_done(); } } private: - void return_free_blocks() { - if (_free_blocks_buffered.empty()) { - return; - } - - size_t total_bytes = 0; - for (auto& block : _free_blocks_buffered) { - const auto bytes = block->allocated_bytes(); - block->clear_column_data(); - _estimated_block_bytes = std::max(bytes, (size_t)16); - total_bytes += bytes; - } - _free_blocks_memory_usage->add(total_bytes); - const auto count = _free_blocks_buffered.size(); - _free_blocks.enqueue_bulk(std::make_move_iterator(_free_blocks_buffered.begin()), count); - _free_blocks_buffered.clear(); - _serving_blocks_num -= count; - } - - std::vector<vectorized::BlockUPtr> _free_blocks_buffered; - std::list<vectorized::BlockUPtr> _blocks_queue_buffered; + std::shared_ptr<pipeline::ScanDependency> _dependency = nullptr; }; } // namespace doris::pipeline diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index be143b9f729..45e934ee790 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -17,12 +17,10 @@ #include "scanner_context.h" -#include <bthread/bthread.h> #include <fmt/format.h> #include <gen_cpp/Metrics_types.h> #include <glog/logging.h> -#include <algorithm> #include <mutex> #include <ostream> #include <utility> @@ -31,64 +29,57 @@ #include "common/status.h" #include "pipeline/exec/scan_operator.h" #include "runtime/descriptors.h" -#include "runtime/exec_env.h" -#include "runtime/query_context.h" #include "runtime/runtime_state.h" -#include "util/pretty_printer.h" #include "util/uid_util.h" #include "vec/core/block.h" -#include "vec/exec/scan/scanner_scheduler.h" #include "vec/exec/scan/vscan_node.h" -#include "vec/exec/scan/vscanner.h" namespace doris::vectorized { using namespace std::chrono_literals; -static bvar::Status<int64_t> g_bytes_in_scanner_queue("doris_bytes_in_scanner_queue", 0); -static bvar::Status<int64_t> g_num_running_scanners("doris_num_running_scanners", 0); - ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list<std::shared_ptr<ScannerDelegate>>& scanners, int64_t limit_, int64_t max_bytes_in_blocks_queue, const int num_parallel_instances, - pipeline::ScanLocalStateBase* local_state, - std::shared_ptr<pipeline::ScanDependency> dependency) + pipeline::ScanLocalStateBase* local_state) : HasTaskExecutionCtx(state), _state(state), - _parent(nullptr), _local_state(local_state), _output_tuple_desc(output_row_descriptor ? output_row_descriptor->tuple_descriptors().front() : output_tuple_desc), _output_row_descriptor(output_row_descriptor), - _process_status(Status::OK()), _batch_size(state->batch_size()), limit(limit_), _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) * num_parallel_instances), _scanner_scheduler(state->exec_env()->scanner_scheduler()), - _scanners(scanners.begin(), scanners.end()), _all_scanners(scanners.begin(), scanners.end()), - _num_parallel_instances(num_parallel_instances), - _dependency(dependency) { + _num_parallel_instances(num_parallel_instances) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); _query_id = _state->get_query_ctx()->query_id(); ctx_id = UniqueId::gen_uid().to_string(); - if (_scanners.empty()) { + // Provide more memory for wide tables, increase proportionally by multiples of 300 + _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; + if (scanners.empty()) { _is_finished = true; _set_scanner_done(); } + _scanners.enqueue_bulk(scanners.begin(), scanners.size()); if (limit < 0) { limit = -1; } - _max_thread_num = config::doris_scanner_thread_pool_thread_num / 4; + MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio(); + _max_thread_num = _state->num_scanner_threads() > 0 + ? _state->num_scanner_threads() + : config::doris_scanner_thread_pool_thread_num / + state->query_parallel_instance_num(); _max_thread_num *= num_parallel_instances; _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num; - DCHECK(_max_thread_num > 0); - _max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size()); + _max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size()); // 1. Calculate max concurrency // For select * from table limit 10; should just use one thread. if ((_parent && _parent->should_run_serial()) || @@ -104,45 +95,9 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VS int64_t limit_, int64_t max_bytes_in_blocks_queue, const int num_parallel_instances, pipeline::ScanLocalStateBase* local_state) - : HasTaskExecutionCtx(state), - _state(state), - _parent(parent), - _local_state(local_state), - _output_tuple_desc(output_row_descriptor - ? output_row_descriptor->tuple_descriptors().front() - : output_tuple_desc), - _output_row_descriptor(output_row_descriptor), - _process_status(Status::OK()), - _batch_size(state->batch_size()), - limit(limit_), - _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) * - num_parallel_instances), - _scanner_scheduler(state->exec_env()->scanner_scheduler()), - _scanners(scanners.begin(), scanners.end()), - _all_scanners(scanners.begin(), scanners.end()), - _num_parallel_instances(num_parallel_instances) { - DCHECK(_output_row_descriptor == nullptr || - _output_row_descriptor->tuple_descriptors().size() == 1); - _query_id = _state->get_query_ctx()->query_id(); - ctx_id = UniqueId::gen_uid().to_string(); - if (_scanners.empty()) { - _is_finished = true; - _set_scanner_done(); - } - if (limit < 0) { - limit = -1; - } - _max_thread_num = config::doris_scanner_thread_pool_thread_num / 4; - _max_thread_num *= num_parallel_instances; - _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num; - DCHECK(_max_thread_num > 0); - _max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size()); - // 1. Calculate max concurrency - // For select * from table limit 10; should just use one thread. - if ((_parent && _parent->should_run_serial()) || - (_local_state && _local_state->should_run_serial())) { - _max_thread_num = 1; - } + : ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, limit_, + max_bytes_in_blocks_queue, num_parallel_instances, local_state) { + _parent = parent; } // After init function call, should not access _parent @@ -150,43 +105,21 @@ Status ScannerContext::init() { if (_parent) { _scanner_profile = _parent->_scanner_profile; _scanner_sched_counter = _parent->_scanner_sched_counter; - _scanner_ctx_sched_counter = _parent->_scanner_ctx_sched_counter; - _scanner_ctx_sched_time = _parent->_scanner_ctx_sched_time; - _free_blocks_memory_usage = _parent->_free_blocks_memory_usage; _newly_create_free_blocks_num = _parent->_newly_create_free_blocks_num; - _queued_blocks_memory_usage = _parent->_queued_blocks_memory_usage; _scanner_wait_batch_timer = _parent->_scanner_wait_batch_timer; + _free_blocks_memory_usage_mark = _parent->_free_blocks_memory_usage; + _scanner_ctx_sched_time = _parent->_scanner_ctx_sched_time; + _scale_up_scanners_counter = _parent->_scale_up_scanners_counter; } else { _scanner_profile = _local_state->_scanner_profile; _scanner_sched_counter = _local_state->_scanner_sched_counter; - _scanner_ctx_sched_counter = _local_state->_scanner_ctx_sched_counter; - _scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time; - _free_blocks_memory_usage = _local_state->_free_blocks_memory_usage; _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num; - _queued_blocks_memory_usage = _local_state->_queued_blocks_memory_usage; _scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer; + _free_blocks_memory_usage_mark = _local_state->_free_blocks_memory_usage; + _scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time; + _scale_up_scanners_counter = _local_state->_scale_up_scanners_counter; } - // 2. Calculate the number of free blocks that all scanners can use. - // The calculation logic is as follows: - // 1. Assuming that at most M rows can be scanned in one scan(config::doris_scanner_row_num), - // then figure out how many blocks are required for one scan(_block_per_scanner). - // 2. The maximum number of concurrency * the blocks required for one scan, - // that is, the number of blocks that all scanners can use. - auto doris_scanner_row_num = - limit == -1 ? config::doris_scanner_row_num - : std::min(static_cast<int64_t>(config::doris_scanner_row_num), limit); - int real_block_size = - limit == -1 ? _batch_size : std::min(static_cast<int64_t>(_batch_size), limit); - _block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / real_block_size; - _free_blocks_capacity = _max_thread_num * _block_per_scanner; - auto block = get_free_block(); - _estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16); - int min_blocks = (config::min_bytes_in_scanner_queue + _estimated_block_bytes - 1) / - _estimated_block_bytes; - _free_blocks_capacity = std::max(_free_blocks_capacity, min_blocks); - return_free_block(std::move(block)); - #ifndef BE_TEST // 3. get thread token if (_state->get_query_ctx()) { @@ -198,8 +131,6 @@ Status ScannerContext::init() { } #endif - _num_unfinished_scanners = _scanners.size(); - if (_parent) { COUNTER_SET(_parent->_max_scanner_thread_num, (int64_t)_max_thread_num); _parent->_runtime_profile->add_info_string("UseSpecificThreadToken", @@ -210,6 +141,17 @@ Status ScannerContext::init() { thread_token == nullptr ? "False" : "True"); } + // submit `_max_thread_num` running scanners to `ScannerScheduler` + // When a running scanners is finished, it will submit one of the remaining scanners. + for (int i = 0; i < _max_thread_num; ++i) { + std::weak_ptr<ScannerDelegate> next_scanner; + if (_scanners.try_dequeue(next_scanner)) { + vectorized::BlockUPtr block = get_free_block(); + submit_scan_task(std::make_shared<ScanTask>(next_scanner, std::move(block))); + _num_running_scanners++; + } + } + return Status::OK(); } @@ -220,138 +162,221 @@ std::string ScannerContext::parent_name() { vectorized::BlockUPtr ScannerContext::get_free_block() { vectorized::BlockUPtr block; if (_free_blocks.try_dequeue(block)) { + std::lock_guard<std::mutex> fl(_free_blocks_lock); DCHECK(block->mem_reuse()); - _free_blocks_memory_usage->add(-block->allocated_bytes()); - _serving_blocks_num++; + _free_blocks_memory_usage -= block->allocated_bytes(); + _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); return block; } - block = vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size, - true /*ignore invalid slots*/); - - COUNTER_UPDATE(_newly_create_free_blocks_num, 1); - - _serving_blocks_num++; - return block; + _newly_create_free_blocks_num->update(1); + return vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size, + true /*ignore invalid slots*/); } -void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block> block) { - _serving_blocks_num--; - if (block->mem_reuse()) { - // Only put blocks with schema to free blocks, because colocate blocks - // need schema. - _estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16); +void ScannerContext::return_free_block(vectorized::BlockUPtr block) { + std::lock_guard<std::mutex> fl(_free_blocks_lock); + if (block->mem_reuse() && _free_blocks_memory_usage < _max_bytes_in_queue) { block->clear_column_data(); - _free_blocks_memory_usage->add(block->allocated_bytes()); + _free_blocks_memory_usage += block->allocated_bytes(); + _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); _free_blocks.enqueue(std::move(block)); } } -void ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) { - std::lock_guard l(_transfer_lock); - auto old_bytes_in_queue = _cur_bytes_in_queue; - for (auto& b : blocks) { - auto st = validate_block_schema(b.get()); +bool ScannerContext::empty_in_queue(int id) { + std::lock_guard<std::mutex> l(_transfer_lock); + return _blocks_queue.empty(); +} + +void ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) { + _scanner_sched_counter->update(1); + _num_scheduled_scanners++; + _scanner_scheduler->submit(shared_from_this(), scan_task); +} + +void ScannerContext::append_block_to_queue(std::shared_ptr<ScanTask> scan_task) { + if (scan_task->status_ok() && scan_task->current_block->rows() > 0) { + Status st = validate_block_schema(scan_task->current_block.get()); if (!st.ok()) { - set_status_on_error(st, false); + scan_task->set_status(st); } - _cur_bytes_in_queue += b->allocated_bytes(); - _blocks_queue.push_back(std::move(b)); } - blocks.clear(); - if (_dependency) { - _dependency->set_ready(); + std::lock_guard<std::mutex> l(_transfer_lock); + if (!scan_task->status_ok()) { + _process_status = scan_task->get_status(); + } + if (_last_scale_up_time == 0) { + _last_scale_up_time = UnixMillis(); + } + if (_blocks_queue.empty() && _last_fetch_time != 0) { + // there's no block in queue before current block, so the consumer is waiting + _total_wait_block_time += UnixMillis() - _last_fetch_time; } + _num_scheduled_scanners--; + _blocks_queue.emplace_back(scan_task); _blocks_queue_added_cv.notify_one(); - _queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue); - g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue); } -bool ScannerContext::empty_in_queue(int id) { +Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Block* block, + bool* eos, int id, bool wait) { + if (state->is_cancelled()) { + _set_scanner_done(); + return Status::Cancelled("Query cancelled in ScannerContext"); + } std::unique_lock l(_transfer_lock); - return _blocks_queue.empty(); -} + // Wait for block from queue + if (wait) { + // scanner batch wait time + SCOPED_TIMER(_scanner_wait_batch_timer); + while (!done() && _blocks_queue.empty() && _process_status.ok()) { + _blocks_queue_added_cv.wait_for(l, 1s); + } + } + if (!_process_status.ok()) { + _set_scanner_done(); + return _process_status; + } + std::shared_ptr<ScanTask> scan_task = nullptr; + if (!_blocks_queue.empty() && !done()) { + _last_fetch_time = UnixMillis(); + scan_task = _blocks_queue.front(); + _blocks_queue.pop_front(); + } -Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, - bool* eos, int id) { - std::vector<vectorized::BlockUPtr> merge_blocks; - { - std::unique_lock l(_transfer_lock); - // Normally, the scanner scheduler will schedule ctx. - // But when the amount of data in the blocks queue exceeds the upper limit, - // the scheduler will stop scheduling. - // (if the scheduler continues to schedule, it will cause a lot of busy running). - // At this point, consumers are required to trigger new scheduling to ensure that - // data can be continuously fetched. - bool to_be_schedule = should_be_scheduled(); - - bool is_scheduled = false; - if (!done() && to_be_schedule && _num_running_scanners == 0) { - is_scheduled = true; - auto submit_status = _scanner_scheduler->submit(shared_from_this()); - if (!submit_status.ok()) { - set_status_on_error(submit_status, false); + if (scan_task) { + if (!scan_task->status_ok()) { + _set_scanner_done(); + return scan_task->get_status(); + } + // We can only know the block size after reading at least one block + // Just take the size of first block as `_estimated_block_size` + if (scan_task->first_block) { + std::lock_guard<std::mutex> fl(_free_blocks_lock); + size_t block_size = scan_task->current_block->allocated_bytes(); + _free_blocks_memory_usage += block_size; + _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); + scan_task->first_block = false; + if (block_size > _estimated_block_size) { + _estimated_block_size = block_size; } } - - // Wait for block from queue - { - SCOPED_TIMER(_scanner_wait_batch_timer); - // scanner batch wait time - while (!(!_blocks_queue.empty() || done() || !status().ok() || state->is_cancelled())) { - if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) { - LOG(INFO) << debug_string(); + // consume current block + block->swap(*scan_task->current_block); + if (!scan_task->current_block->mem_reuse()) { + // it depends on the memory strategy of ScanNode/ScanOperator + // we should double check `mem_reuse()` of `current_block` to make sure it can be reused + _newly_create_free_blocks_num->update(1); + scan_task->current_block = vectorized::Block::create_unique(_output_tuple_desc->slots(), + _batch_size, true); + } + if (scan_task->is_eos()) { // current scanner is finished, and no more data to read + _num_finished_scanners++; + std::weak_ptr<ScannerDelegate> next_scanner; + // submit one of the remaining scanners + if (_scanners.try_dequeue(next_scanner)) { + // reuse current running scanner, just reset some states. + scan_task->reuse_scanner(next_scanner); + submit_scan_task(scan_task); + } else { + // no more scanner to be scheduled + // `_free_blocks` serve all running scanners, maybe it's too large for the remaining scanners + int free_blocks_for_each = _free_blocks.size_approx() / _num_running_scanners; + _num_running_scanners--; + std::lock_guard<std::mutex> fl(_free_blocks_lock); + for (int i = 0; i < free_blocks_for_each; ++i) { + vectorized::BlockUPtr removed_block; + if (_free_blocks.try_dequeue(removed_block)) { + _free_blocks_memory_usage -= block->allocated_bytes(); + _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); + } } - _blocks_queue_added_cv.wait_for(l, 1s); } + } else { + // resubmit current running scanner to read the next block + submit_scan_task(scan_task); } + // scale up + _try_to_scale_up(); + } - if (state->is_cancelled()) { - set_status_on_error(Status::Cancelled("cancelled"), false); - } + if (_num_finished_scanners == _all_scanners.size() && _blocks_queue.empty()) { + _set_scanner_done(); + _is_finished = true; + } + *eos = done(); + return Status::OK(); +} - if (!status().ok()) { - return status(); +void ScannerContext::_try_to_scale_up() { + // Four criteria to determine whether to increase the parallelism of the scanners + // 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up + // 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get blocks + // 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough memory to scale up + // 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num` + if (MAX_SCALE_UP_RATIO > 0 && _scanners.size_approx() > 0 && + (_num_running_scanners < _max_thread_num * MAX_SCALE_UP_RATIO) && + (_last_fetch_time - _last_scale_up_time > SCALE_UP_DURATION) && // duration > 5000ms + (_total_wait_block_time > (_last_fetch_time - _last_scale_up_time) * + WAIT_BLOCK_DURATION_RATIO)) { // too large lock time + double wait_ratio = + (double)_total_wait_block_time / (_last_fetch_time - _last_scale_up_time); + if (_last_wait_duration_ratio > 0 && wait_ratio > _last_wait_duration_ratio * 0.8) { + // when _last_wait_duration_ratio > 0, it has scaled up before. + // we need to determine if the scale-up is effective: + // the wait duration ratio after last scaling up should less than 80% of `_last_wait_duration_ratio` + return; } - if (!_blocks_queue.empty()) { - *block = std::move(_blocks_queue.front()); - _blocks_queue.pop_front(); - auto block_bytes = (*block)->allocated_bytes(); - _cur_bytes_in_queue -= block_bytes; - _queued_blocks_memory_usage->add(-block_bytes); - - auto rows = (*block)->rows(); - while (!_blocks_queue.empty()) { - auto& add_block = _blocks_queue.front(); - const auto add_rows = (*add_block).rows(); - if (rows + add_rows < state->batch_size()) { - rows += add_rows; - block_bytes = (*add_block).allocated_bytes(); - _cur_bytes_in_queue -= block_bytes; - _queued_blocks_memory_usage->add(-block_bytes); - merge_blocks.emplace_back(std::move(add_block)); - _blocks_queue.pop_front(); + std::lock_guard<std::mutex> fl(_free_blocks_lock); + bool is_scale_up = false; + // calculate the number of scanners that can be scheduled + int num_add = std::min(_num_running_scanners * SCALE_UP_RATIO, + _max_thread_num * MAX_SCALE_UP_RATIO - _num_running_scanners); + num_add = std::max(num_add, 1); + for (int i = 0; i < num_add; ++i) { + vectorized::BlockUPtr allocate_block = nullptr; + // reuse block in `_free_blocks` firstly + if (!_free_blocks.try_dequeue(allocate_block)) { + if (_free_blocks_memory_usage < _max_bytes_in_queue) { + _newly_create_free_blocks_num->update(1); + allocate_block = vectorized::Block::create_unique(_output_tuple_desc->slots(), + _batch_size, true); + } + } else { + // comes from `_free_blocks`, decrease first, then will be added back. + _free_blocks_memory_usage -= allocate_block->allocated_bytes(); + _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); + } + if (allocate_block) { + // get enough memory to launch one more scanner. + std::weak_ptr<ScannerDelegate> scale_up_scanner; + if (_scanners.try_dequeue(scale_up_scanner)) { + std::shared_ptr<ScanTask> scale_up_task = + std::make_shared<ScanTask>(scale_up_scanner, std::move(allocate_block)); + _free_blocks_memory_usage += _estimated_block_size; + _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); + // `first_block` is used to update `_free_blocks_memory_usage`, + // we have got the `_estimated_block_size`, no need for further updates + scale_up_task->first_block = false; + submit_scan_task(scale_up_task); + _num_running_scanners++; + _scale_up_scanners_counter->update(1); + is_scale_up = true; } else { break; } + } else { + break; } - } else { - *eos = done(); } - } - g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue); - if (!merge_blocks.empty()) { - vectorized::MutableBlock m(block->get()); - for (auto& merge_block : merge_blocks) { - static_cast<void>(m.merge(*merge_block)); - return_free_block(std::move(merge_block)); + if (is_scale_up) { + _last_wait_duration_ratio = wait_ratio; + _last_scale_up_time = UnixMillis(); + _total_wait_block_time = 0; } - (*block)->set_columns(std::move(m.mutable_columns())); } - - return Status::OK(); } Status ScannerContext::validate_block_schema(Block* block) { @@ -380,29 +405,17 @@ Status ScannerContext::validate_block_schema(Block* block) { return Status::OK(); } -void ScannerContext::inc_num_running_scanners(int32_t inc) { - std::lock_guard l(_transfer_lock); - _num_running_scanners += inc; - g_num_running_scanners.set_value(_num_running_scanners); -} - -void ScannerContext::set_status_on_error(const Status& status, bool need_lock) { - std::unique_lock l(_transfer_lock, std::defer_lock); - if (need_lock) { - l.lock(); - } - if (this->status().ok()) { - _process_status = status; - _blocks_queue_added_cv.notify_one(); - _should_stop = true; - _set_scanner_done(); - LOG(INFO) << "ctx is set status on error " << debug_string() - << ", call stack is: " << Status::InternalError<true>("catch error status"); - } +void ScannerContext::set_status_on_error(const Status& status) { + std::lock_guard<std::mutex> l(_transfer_lock); + _process_status = status; + _blocks_queue_added_cv.notify_one(); } void ScannerContext::stop_scanners(RuntimeState* state) { - std::unique_lock l(_transfer_lock); + std::lock_guard<std::mutex> l(_transfer_lock); + if (_should_stop) { + return; + } _should_stop = true; _set_scanner_done(); for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) { @@ -453,95 +466,15 @@ void ScannerContext::stop_scanners(RuntimeState* state) { _blocks_queue_added_cv.notify_one(); } -void ScannerContext::_set_scanner_done() { - if (_dependency) { - _dependency->set_scanner_done(); - } -} - std::string ScannerContext::debug_string() { return fmt::format( - "id: {}, total scanners: {}, scanners: {}, blocks in queue: {}," - " status: {}, _should_stop: {}, _is_finished: {}, free blocks: {}," + "id: {}, total scanners: {}, blocks in queue: {}," + " _should_stop: {}, _is_finished: {}, free blocks: {}," " limit: {}, _num_running_scanners: {}, _max_thread_num: {}," - " _block_per_scanner: {}, _cur_bytes_in_queue: {}, MAX_BYTE_OF_QUEUE: {}, " - "num_ctx_scheduled: {}, serving_blocks_num: {}, allowed_blocks_num: {}, query_id: {}", - ctx_id, _all_scanners.size(), _scanners.size(), _blocks_queue.size(), - _process_status.to_string(), _should_stop, _is_finished, _free_blocks.size_approx(), - limit, _num_running_scanners, _max_thread_num, _block_per_scanner, _cur_bytes_in_queue, - _max_bytes_in_queue, num_ctx_scheduled(), _serving_blocks_num, allowed_blocks_num(), - print_id(_query_id)); -} - -void ScannerContext::reschedule_scanner_ctx() { - std::lock_guard l(_transfer_lock); - if (done()) { - return; - } - auto submit_status = _scanner_scheduler->submit(shared_from_this()); - //todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times? - if (!submit_status.ok()) { - set_status_on_error(submit_status, false); - } -} - -void ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate> scanner) { - std::lock_guard l(_transfer_lock); - // Use a transfer lock to avoid the scanner be scheduled concurrently. For example, that after - // calling "_scanners.push_front(scanner)", there may be other ctx in scheduler - // to schedule that scanner right away, and in that schedule run, the scanner may be marked as closed - // before we call the following if() block. - { - --_num_running_scanners; - g_num_running_scanners.set_value(_num_running_scanners); - if (scanner->_scanner->need_to_close()) { - --_num_unfinished_scanners; - if (_num_unfinished_scanners == 0) { - _is_finished = true; - _set_scanner_done(); - _blocks_queue_added_cv.notify_one(); - return; - } - } else { - _scanners.push_front(scanner); - } - } - - if (should_be_scheduled()) { - auto submit_status = _scanner_scheduler->submit(shared_from_this()); - if (!submit_status.ok()) { - set_status_on_error(submit_status, false); - } - } -} - -// This method is called in scanner scheduler, and task context is hold -void ScannerContext::get_next_batch_of_scanners( - std::list<std::weak_ptr<ScannerDelegate>>* current_run) { - std::lock_guard l(_transfer_lock); - // Update the sched counter for profile - Defer defer {[&]() { _scanner_sched_counter->update(current_run->size()); }}; - // 1. Calculate how many scanners should be scheduled at this run. - // If there are enough space in blocks queue, - // the scanner number depends on the _free_blocks numbers - int thread_slot_num = get_available_thread_slot_num(); - - // 2. get #thread_slot_num scanners from ctx->scanners - // and put them into "this_run". - for (int i = 0; i < thread_slot_num && !_scanners.empty();) { - std::weak_ptr<ScannerDelegate> scanner_ref = _scanners.front(); - std::shared_ptr<ScannerDelegate> scanner = scanner_ref.lock(); - _scanners.pop_front(); - if (scanner == nullptr) { - continue; - } - if (scanner->_scanner->need_to_close()) { - static_cast<void>(scanner->_scanner->close(_state)); - } else { - current_run->push_back(scanner_ref); - i++; - } - } + " _max_bytes_in_queue: {}, query_id: {}", + ctx_id, _all_scanners.size(), _blocks_queue.size(), _should_stop, _is_finished, + _free_blocks.size_approx(), limit, _num_scheduled_scanners, _max_thread_num, + _max_bytes_in_queue, print_id(_query_id)); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 4d936d72a13..58e6f4dae7f 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -58,6 +58,47 @@ class VScanNode; class ScannerScheduler; class SimplifiedScanScheduler; +class ScanTask { +public: + ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner, vectorized::BlockUPtr free_block) + : scanner(delegate_scanner), current_block(std::move(free_block)) {} + +private: + // whether current scanner is finished + bool eos = false; + Status status = Status::OK(); + +public: + std::weak_ptr<ScannerDelegate> scanner; + // cache the block of current loop + vectorized::BlockUPtr current_block; + // only take the size of the first block as estimated size + bool first_block = true; + uint64_t last_submit_time; // nanoseconds + + void set_status(Status _status) { + if (_status.is<ErrorCode::END_OF_FILE>()) { + // set `eos` if `END_OF_FILE`, don't take `END_OF_FILE` as error + eos = true; + } + status = _status; + } + Status get_status() const { return status; } + bool status_ok() { return status.ok() || status.is<ErrorCode::END_OF_FILE>(); } + bool is_eos() const { return eos; } + void set_eos(bool _eos) { eos = _eos; } + + // reuse current running scanner + // reset `eos` and `status` + // `first_block` is used to update `_free_blocks_memory_usage`, and take the first block size + // as the `_estimated_block_size`. It has updated `_free_blocks_memory_usage`, so don't reset. + void reuse_scanner(std::weak_ptr<ScannerDelegate> next_scanner) { + scanner = next_scanner; + eos = false; + status = Status::OK(); + } +}; + // ScannerContext is responsible for recording the execution status // of a group of Scanners corresponding to a ScanNode. // Including how many scanners are being scheduled, and maintaining @@ -81,88 +122,49 @@ public: virtual Status init(); vectorized::BlockUPtr get_free_block(); - void return_free_block(std::unique_ptr<vectorized::Block> block); + void return_free_block(vectorized::BlockUPtr block); - // Append blocks from scanners to the blocks queue. - virtual void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks); - // Get next block from blocks queue. Called by ScanNode + // Get next block from blocks queue. Called by ScanNode/ScanOperator // Set eos to true if there is no more data to read. - // And if eos is true, the block returned must be nullptr. - virtual Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, - bool* eos, int id); + virtual Status get_block_from_queue(RuntimeState* state, vectorized::Block* block, bool* eos, + int id, bool wait = true); [[nodiscard]] Status validate_block_schema(Block* block); - // When a scanner complete a scan, this method will be called - // to return the scanner to the list for next scheduling. - void push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate> scanner); + // submit the running scanner to thread pool in `ScannerScheduler` + // set the next scanned block to `ScanTask::current_block` + // set the error state to `ScanTask::status` + // set the `eos` to `ScanTask::eos` if there is no more data in current scanner + void submit_scan_task(std::shared_ptr<ScanTask> scan_task); - void set_status_on_error(const Status& status, bool need_lock = true); + // append the running scanner and its cached block to `_blocks_queue` + virtual void append_block_to_queue(std::shared_ptr<ScanTask> scan_task); - Status status() { - if (_process_status.is<ErrorCode::END_OF_FILE>()) { - return Status::OK(); - } - return _process_status; - } + void set_status_on_error(const Status& status); // Return true if this ScannerContext need no more process bool done() const { return _is_finished || _should_stop; } bool is_finished() { return _is_finished.load(); } bool should_stop() { return _should_stop.load(); } - void inc_num_running_scanners(int32_t scanner_inc); - - int get_num_running_scanners() const { return _num_running_scanners; } - - int get_num_unfinished_scanners() const { return _num_unfinished_scanners; } - - void get_next_batch_of_scanners(std::list<std::weak_ptr<ScannerDelegate>>* current_run); - virtual std::string debug_string(); RuntimeState* state() { return _state; } - - void incr_num_ctx_scheduling(int64_t num) { _scanner_ctx_sched_counter->update(num); } - - int64_t num_ctx_scheduled() { return _scanner_ctx_sched_counter->value(); } void incr_ctx_scheduling_time(int64_t num) { _scanner_ctx_sched_time->update(num); } std::string parent_name(); virtual bool empty_in_queue(int id); - // todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when executing shared scan - inline bool should_be_scheduled() const { - return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) && - (_serving_blocks_num < allowed_blocks_num()); - } - - int get_available_thread_slot_num() { - int thread_slot_num = 0; - thread_slot_num = (allowed_blocks_num() + _block_per_scanner - 1) / _block_per_scanner; - thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners); - if (thread_slot_num <= 0) { - thread_slot_num = 1; - } - return thread_slot_num; - } - - int32_t allowed_blocks_num() const { - int32_t blocks_num = std::min(_free_blocks_capacity, - int32_t((_max_bytes_in_queue + _estimated_block_bytes - 1) / - _estimated_block_bytes)); - return blocks_num; - } - SimplifiedScanScheduler* get_simple_scan_scheduler() { return _simple_scan_scheduler; } - virtual void reschedule_scanner_ctx(); void stop_scanners(RuntimeState* state); int32_t get_max_thread_num() const { return _max_thread_num; } void set_max_thread_num(int32_t num) { _max_thread_num = num; } + int batch_size() const { return _batch_size; } + // the unique id of this context std::string ctx_id; TUniqueId _query_id; @@ -176,10 +178,16 @@ protected: const RowDescriptor* output_row_descriptor, const std::list<std::shared_ptr<ScannerDelegate>>& scanners_, int64_t limit_, int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances, - pipeline::ScanLocalStateBase* local_state, - std::shared_ptr<pipeline::ScanDependency> dependency); + pipeline::ScanLocalStateBase* local_state); + + /// Four criteria to determine whether to increase the parallelism of the scanners + /// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up + /// 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get blocks + /// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough memory to scale up + /// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num` + virtual void _set_scanner_done() {}; - void _set_scanner_done(); + void _try_to_scale_up(); RuntimeState* _state = nullptr; VScanNode* _parent = nullptr; @@ -189,97 +197,52 @@ protected: const TupleDescriptor* _output_tuple_desc = nullptr; const RowDescriptor* _output_row_descriptor = nullptr; - // _transfer_lock is used to protect the critical section - // where the ScanNode and ScannerScheduler interact. - // Including access to variables such as blocks_queue, _process_status, _is_finished, etc. std::mutex _transfer_lock; - // The blocks got from scanners will be added to the "blocks_queue". - // And the upper scan node will be as a consumer to fetch blocks from this queue. - // Should be protected by "_transfer_lock" - std::list<vectorized::BlockUPtr> _blocks_queue; - // Wait in get_block_from_queue(), by ScanNode. std::condition_variable _blocks_queue_added_cv; - // Wait in clear_and_join(), by ScanNode. - std::condition_variable _ctx_finish_cv; - - // The following 3 variables control the process of the scanner scheduling. - // Use _transfer_lock to protect them. - // 1. _process_status - // indicates the global status of this scanner context. - // Set to non-ok if encounter errors. - // And if it is non-ok, the scanner process should stop. - // Set be set by either ScanNode or ScannerScheduler. - // 2. _should_stop - // Always be set by ScanNode. - // True means no more data need to be read(reach limit or closed) - // 3. _is_finished - // Always be set by ScannerScheduler. - // True means all scanners are finished to scan. - Status _process_status; + std::list<std::shared_ptr<ScanTask>> _blocks_queue; + + Status _process_status = Status::OK(); std::atomic_bool _should_stop = false; std::atomic_bool _is_finished = false; // Lazy-allocated blocks for all scanners to share, for memory reuse. moodycamel::ConcurrentQueue<vectorized::BlockUPtr> _free_blocks; - std::atomic<int32_t> _serving_blocks_num = 0; - // The current number of free blocks available to the scanners. - // Used to limit the memory usage of the scanner. - // NOTE: this is NOT the size of `_free_blocks`. - int32_t _free_blocks_capacity = 0; - int64_t _estimated_block_bytes = 0; int _batch_size; // The limit from SQL's limit clause int64_t limit; - // Current number of running scanners. - std::atomic_int32_t _num_running_scanners = 0; - // Current number of ctx being scheduled. - // After each Scanner finishes a task, it will put the corresponding ctx - // back into the scheduling queue. - // Therefore, there will be multiple pointer of same ctx in the scheduling queue. - // Here we record the number of ctx in the scheduling queue to clean up at the end. - std::atomic_int32_t _num_scheduling_ctx = 0; - // Num of unfinished scanners. Should be set in init() - std::atomic_int32_t _num_unfinished_scanners = 0; - // Max number of scan thread for this scanner context. int32_t _max_thread_num = 0; - // How many blocks a scanner can use in one task. - int32_t _block_per_scanner = 0; - - // The current bytes of blocks in blocks queue - int64_t _cur_bytes_in_queue = 0; - // The max limit bytes of blocks in blocks queue - const int64_t _max_bytes_in_queue; - + int64_t _max_bytes_in_queue; doris::vectorized::ScannerScheduler* _scanner_scheduler; SimplifiedScanScheduler* _simple_scan_scheduler = nullptr; - // List "scanners" saves all "unfinished" scanners. - // The scanner scheduler will pop scanners from this list, run scanner, - // and then if the scanner is not finished, will be pushed back to this list. - // Not need to protect by lock, because only one scheduler thread will access to it. - std::mutex _scanners_lock; - // Scanner's ownership belong to vscannode or scanoperator, scanner context does not own it. - // ScannerContext has to check if scanner is deconstructed before use it. - std::list<std::weak_ptr<ScannerDelegate>> _scanners; + moodycamel::ConcurrentQueue<std::weak_ptr<ScannerDelegate>> _scanners; + int32_t _num_scheduled_scanners = 0; + int32_t _num_finished_scanners = 0; + int32_t _num_running_scanners = 0; // weak pointer for _scanners, used in stop function std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners; - std::vector<int64_t> _finished_scanner_runtime; - std::vector<int64_t> _finished_scanner_rows_read; - std::vector<int64_t> _finished_scanner_wait_worker_time; - const int _num_parallel_instances; - std::shared_ptr<RuntimeProfile> _scanner_profile; RuntimeProfile::Counter* _scanner_sched_counter = nullptr; - RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr; - RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr; - RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr; - RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage = nullptr; RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr; - - std::shared_ptr<pipeline::ScanDependency> _dependency = nullptr; + RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage_mark = nullptr; + RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr; + RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr; + + // for scaling up the running scanners + std::mutex _free_blocks_lock; + size_t _estimated_block_size = 0; + int64_t _free_blocks_memory_usage = 0; + int64_t _last_scale_up_time = 0; + int64_t _last_fetch_time = 0; + int64_t _total_wait_block_time = 0; + double _last_wait_duration_ratio = 0; + const int64_t SCALE_UP_DURATION = 5000; // 5000ms + const float WAIT_BLOCK_DURATION_RATIO = 0.5; + const float SCALE_UP_RATIO = 0.5; + float MAX_SCALE_UP_RATIO; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index d8678bc0dc3..40fff7ed70c 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -69,10 +69,6 @@ ScannerScheduler::~ScannerScheduler() { return; } - for (int i = 0; i < QUEUE_NUM; i++) { - delete _pending_queues[i]; - } - delete[] _pending_queues; _deregister_metrics(); } @@ -81,18 +77,12 @@ void ScannerScheduler::stop() { return; } - for (int i = 0; i < QUEUE_NUM; i++) { - _pending_queues[i]->shutdown(); - } - _is_closed = true; - _scheduler_pool->shutdown(); _local_scan_thread_pool->shutdown(); _remote_scan_thread_pool->shutdown(); _limited_scan_thread_pool->shutdown(); - _scheduler_pool->wait(); _local_scan_thread_pool->join(); _remote_scan_thread_pool->join(); _limited_scan_thread_pool->wait(); @@ -101,24 +91,12 @@ void ScannerScheduler::stop() { } Status ScannerScheduler::init(ExecEnv* env) { - // 1. scheduling thread pool and scheduling queues - static_cast<void>(ThreadPoolBuilder("SchedulingThreadPool") - .set_min_threads(QUEUE_NUM) - .set_max_threads(QUEUE_NUM) - .build(&_scheduler_pool)); - - _pending_queues = new BlockingQueue<std::shared_ptr<ScannerContext>>*[QUEUE_NUM]; - for (int i = 0; i < QUEUE_NUM; i++) { - _pending_queues[i] = new BlockingQueue<std::shared_ptr<ScannerContext>>(INT32_MAX); - static_cast<void>(_scheduler_pool->submit_func([this, i] { this->_schedule_thread(i); })); - } - - // 2. local scan thread pool + // 1. local scan thread pool _local_scan_thread_pool = std::make_unique<PriorityThreadPool>( config::doris_scanner_thread_pool_thread_num, config::doris_scanner_thread_pool_queue_size, "local_scan"); - // 3. remote scan thread pool + // 2. remote scan thread pool _remote_thread_pool_max_size = config::doris_max_remote_scanner_thread_pool_thread_num != -1 ? config::doris_max_remote_scanner_thread_pool_thread_num : std::max(512, CpuInfo::num_cores() * 10); @@ -128,7 +106,7 @@ Status ScannerScheduler::init(ExecEnv* env) { _remote_thread_pool_max_size, config::doris_remote_scanner_thread_pool_queue_size, "RemoteScanThreadPool"); - // 4. limited scan thread pool + // 3. limited scan thread pool static_cast<void>(ThreadPoolBuilder("LimitedScanThreadPool") .set_min_threads(config::doris_scanner_thread_pool_thread_num) .set_max_threads(config::doris_scanner_thread_pool_thread_num) @@ -139,155 +117,97 @@ Status ScannerScheduler::init(ExecEnv* env) { return Status::OK(); } -Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx) { +void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx, + std::shared_ptr<ScanTask> scan_task) { + scan_task->last_submit_time = GetCurrentTimeNanos(); if (ctx->done()) { - return Status::EndOfFile("ScannerContext is done"); - } - ctx->queue_idx = (_queue_idx++ % QUEUE_NUM); - if (!_pending_queues[ctx->queue_idx]->blocking_put(ctx)) { - return Status::InternalError("failed to submit scanner context to scheduler"); - } - return Status::OK(); -} - -std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token( - ThreadPool::ExecutionMode mode, int max_concurrency) { - return _limited_scan_thread_pool->new_token(mode, max_concurrency); -} - -void ScannerScheduler::_schedule_thread(int queue_id) { - BlockingQueue<std::shared_ptr<ScannerContext>>* queue = _pending_queues[queue_id]; - while (!_is_closed) { - std::shared_ptr<ScannerContext> ctx; - bool ok = queue->blocking_get(&ctx); - if (!ok) { - // maybe closed - continue; - } - - _schedule_scanners(ctx); - // If ctx is done, no need to schedule it again. - // But should notice that there may still scanners running in scanner pool. + return; } -} - -void ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) { auto task_lock = ctx->task_exec_ctx(); if (task_lock == nullptr) { LOG(INFO) << "could not lock task execution context, query " << ctx->debug_string() << " maybe finished"; return; } - MonotonicStopWatch watch; - watch.reset(); - watch.start(); - ctx->incr_num_ctx_scheduling(1); - - if (ctx->done()) { - return; - } - - std::list<std::weak_ptr<ScannerDelegate>> this_run; - ctx->get_next_batch_of_scanners(&this_run); - if (this_run.empty()) { - // There will be 2 cases when this_run is empty: - // 1. The blocks queue reaches limit. - // The consumer will continue scheduling the ctx. - // 2. All scanners are running. - // There running scanner will schedule the ctx after they are finished. - // So here we just return to stop scheduling ctx. - return; - } - - ctx->inc_num_running_scanners(this_run.size()); // Submit scanners to thread pool // TODO(cmy): How to handle this "nice"? int nice = 1; - auto iter = this_run.begin(); if (ctx->thread_token != nullptr) { - // TODO llj tg how to treat this? - while (iter != this_run.end()) { - std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock(); - if (scanner_delegate == nullptr) { - // Has to ++, or there is a dead loop - iter++; - continue; - } - scanner_delegate->_scanner->start_wait_worker_timer(); - auto s = ctx->thread_token->submit_func([this, scanner_ref = *iter, ctx]() { - this->_scanner_scan(this, ctx, scanner_ref); - }); - if (s.ok()) { - iter++; - } else { - ctx->set_status_on_error(s); - break; - } + std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock(); + if (scanner_delegate == nullptr) { + return; + } + + scanner_delegate->_scanner->start_wait_worker_timer(); + auto s = ctx->thread_token->submit_func( + [this, scanner_ref = scan_task, ctx]() { this->_scanner_scan(ctx, scanner_ref); }); + if (!s.ok()) { + scan_task->set_status(s); + ctx->append_block_to_queue(scan_task); + return; } } else { - while (iter != this_run.end()) { - std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock(); - if (scanner_delegate == nullptr) { - // Has to ++, or there is a dead loop - iter++; - continue; - } - scanner_delegate->_scanner->start_wait_worker_timer(); - TabletStorageType type = scanner_delegate->_scanner->get_storage_type(); - bool ret = false; - if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { - if (auto* scan_sche = ctx->get_simple_scan_scheduler()) { - auto work_func = [this, scanner_ref = *iter, ctx]() { - this->_scanner_scan(this, ctx, scanner_ref); - }; - SimplifiedScanTask simple_scan_task = {work_func, ctx}; - ret = scan_sche->get_scan_queue()->try_put(simple_scan_task); - } else { - PriorityThreadPool::Task task; - task.work_function = [this, scanner_ref = *iter, ctx]() { - this->_scanner_scan(this, ctx, scanner_ref); - }; - task.priority = nice; - ret = _local_scan_thread_pool->offer(task); - } + std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock(); + if (scanner_delegate == nullptr) { + return; + } + + scanner_delegate->_scanner->start_wait_worker_timer(); + TabletStorageType type = scanner_delegate->_scanner->get_storage_type(); + bool ret = false; + if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { + if (auto* scan_sche = ctx->get_simple_scan_scheduler()) { + auto work_func = [this, scanner_ref = scan_task, ctx]() { + this->_scanner_scan(ctx, scanner_ref); + }; + SimplifiedScanTask simple_scan_task = {work_func, ctx}; + ret = scan_sche->get_scan_queue()->try_put(simple_scan_task); } else { PriorityThreadPool::Task task; - task.work_function = [this, scanner_ref = *iter, ctx]() { - this->_scanner_scan(this, ctx, scanner_ref); + task.work_function = [this, scanner_ref = scan_task, ctx]() { + this->_scanner_scan(ctx, scanner_ref); }; task.priority = nice; - ret = _remote_scan_thread_pool->offer(task); - } - if (ret) { - iter++; - } else { - ctx->set_status_on_error( - Status::InternalError("failed to submit scanner to scanner pool")); - break; + ret = _local_scan_thread_pool->offer(task); } + } else { + PriorityThreadPool::Task task; + task.work_function = [this, scanner_ref = scan_task, ctx]() { + this->_scanner_scan(ctx, scanner_ref); + }; + task.priority = nice; + ret = _remote_scan_thread_pool->offer(task); + } + if (!ret) { + scan_task->set_status( + Status::InternalError("Failed to submit scanner to scanner pool")); + ctx->append_block_to_queue(scan_task); + return; } } - ctx->incr_ctx_scheduling_time(watch.elapsed_time()); } -void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, - std::shared_ptr<ScannerContext> ctx, - std::weak_ptr<ScannerDelegate> scanner_ref) { +std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token( + ThreadPool::ExecutionMode mode, int max_concurrency) { + return _limited_scan_thread_pool->new_token(mode, max_concurrency); +} + +void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, + std::shared_ptr<ScanTask> scan_task) { + // record the time from scanner submission to actual execution in nanoseconds + ctx->incr_ctx_scheduling_time(GetCurrentTimeNanos() - scan_task->last_submit_time); auto task_lock = ctx->task_exec_ctx(); if (task_lock == nullptr) { - // LOG(WARNING) << "could not lock task execution context, query " << print_id(_query_id) - // << " maybe finished"; return; } - //LOG_EVERY_N(INFO, 100) << "start running scanner from ctx " << ctx->debug_string(); - // will release scanner if it is the last one, task lock is hold here, to ensure - // that scanner could call scannode's method during deconstructor - std::shared_ptr<ScannerDelegate> scanner_delegate = scanner_ref.lock(); - auto& scanner = scanner_delegate->_scanner; + + std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock(); if (scanner_delegate == nullptr) { return; } + + VScannerSPtr& scanner = scanner_delegate->_scanner; SCOPED_ATTACH_TASK(scanner->runtime_state()); // for cpu hard limit, thread name should not be reset if (ctx->_should_reset_thread_name) { @@ -310,14 +230,13 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, if (!scanner->is_init()) { status = scanner->init(); if (!status.ok()) { - ctx->set_status_on_error(status); eos = true; } } + if (!eos && !scanner->is_open()) { status = scanner->open(state); if (!status.ok()) { - ctx->set_status_on_error(status); eos = true; } scanner->set_opened(); @@ -325,46 +244,21 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, static_cast<void>(scanner->try_append_late_arrival_runtime_filter()); - // Because we use thread pool to scan data from storage. One scanner can't - // use this thread too long, this can starve other query's scanner. So, we - // need yield this thread when we do enough work. However, OlapStorage read - // data in pre-aggregate mode, then we can't use storage returned data to - // judge if we need to yield. So we record all raw data read in this round - // scan, if this exceeds row number or bytes threshold, we yield this thread. - std::vector<vectorized::BlockUPtr> blocks; - int64_t raw_bytes_read = 0; - int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; - int num_rows_in_block = 0; - - // Only set to true when ctx->done() return true. - // Use this flag because we need distinguish eos from `should_stop`. - // If eos is true, we still need to return blocks, - // but is should_stop is true, no need to return blocks - bool should_stop = false; - // Has to wait at least one full block, or it will cause a lot of schedule task in priority - // queue, it will affect query latency and query concurrency for example ssb 3.3. - auto should_do_scan = [&, batch_size = state->batch_size(), - time = state->wait_full_block_schedule_times()]() { - if (raw_bytes_read < raw_bytes_threshold) { - return true; - } else if (num_rows_in_block < batch_size) { - return raw_bytes_read < raw_bytes_threshold * time; - } - return false; - }; - - while (!eos && should_do_scan()) { - // TODO llj task group should should_yield? + bool first_read = true; + while (!eos) { if (UNLIKELY(ctx->done())) { - // No need to set status on error here. - // Because done() maybe caused by "should_stop" - should_stop = true; + eos = true; break; } + BlockUPtr free_block = nullptr; + if (first_read) { + status = scanner->get_block_after_projects(state, scan_task->current_block.get(), &eos); + first_read = false; + } else { + free_block = ctx->get_free_block(); + status = scanner->get_block_after_projects(state, free_block.get(), &eos); + } - BlockUPtr block = ctx->get_free_block(); - - status = scanner->get_block_after_projects(state, block.get(), &eos); // The VFileScanner for external table may try to open not exist files, // Because FE file cache for external table may out of date. // So, NOT_FOUND for VFileScanner is not a fail case. @@ -374,49 +268,36 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, !status.is<ErrorCode::NOT_FOUND>()))) { LOG(WARNING) << "Scan thread read VScanner failed: " << status.to_string(); break; - } - VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " << eos; - if (status.is<ErrorCode::NOT_FOUND>()) { + } else if (status.is<ErrorCode::NOT_FOUND>()) { // The only case in this "if" branch is external table file delete and fe cache has not been updated yet. // Set status to OK. status = Status::OK(); eos = true; + break; } - raw_bytes_read += block->allocated_bytes(); - num_rows_in_block += block->rows(); - if (UNLIKELY(block->rows() == 0)) { - ctx->return_free_block(std::move(block)); - } else { - if (!blocks.empty() && blocks.back()->rows() + block->rows() <= state->batch_size()) { - vectorized::MutableBlock mutable_block(blocks.back().get()); - static_cast<void>(mutable_block.merge(*block)); - blocks.back().get()->set_columns(std::move(mutable_block.mutable_columns())); - ctx->return_free_block(std::move(block)); - } else { - blocks.push_back(std::move(block)); - } + if (!first_read && free_block) { + vectorized::MutableBlock mutable_block(scan_task->current_block.get()); + static_cast<void>(mutable_block.merge(*free_block)); + scan_task->current_block->set_columns(std::move(mutable_block.mutable_columns())); + ctx->return_free_block(std::move(free_block)); + } + if (scan_task->current_block->rows() >= ctx->batch_size()) { + break; } } // end for while - // if we failed, check status. if (UNLIKELY(!status.ok())) { - // _transfer_done = true; - ctx->set_status_on_error(status); + scan_task->set_status(status); eos = true; - blocks.clear(); - } else if (should_stop) { - // No need to return blocks because of should_stop, just delete them - blocks.clear(); - } else if (!blocks.empty()) { - ctx->append_blocks_to_queue(blocks); } scanner->update_scan_cpu_timer(); - if (eos || should_stop) { + if (eos) { scanner->mark_to_need_to_close(); } - ctx->push_back_scanner_and_reschedule(scanner_delegate); + scan_task->set_eos(eos); + ctx->append_block_to_queue(scan_task); } void ScannerScheduler::_register_metrics() { diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 9fedd27dbd8..7a602038956 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -37,25 +37,18 @@ class BlockingQueue; namespace doris::vectorized { class ScannerDelegate; +class ScanTask; class ScannerContext; // Responsible for the scheduling and execution of all Scanners of a BE node. -// ScannerScheduler has two types of thread pools: -// 1. Scheduling thread pool -// Responsible for Scanner scheduling. -// A set of Scanners for a query will be encapsulated into a ScannerContext -// and submitted to the ScannerScheduler's scheduling queue. -// There are multiple scheduling queues in ScannerScheduler, and each scheduling queue -// is handled by a scheduling thread. -// The scheduling thread is scheduled in granularity of ScannerContext, -// that is, a group of Scanners in a ScannerContext are scheduled at a time. -// -//2. Execution thread pool -// The scheduling thread will submit the Scanners selected from the ScannerContext +// Execution thread pool +// When a ScannerContext is launched, it will submit the running scanners to this scheduler. +// The scheduling thread will submit the running scanner and its ScannerContext // to the execution thread pool to do the actual scan task. -// Each Scanner will act as a producer, read a group of blocks and put them into +// Each Scanner will act as a producer, read the next block and put it into // the corresponding block queue. // The corresponding ScanNode will act as a consumer to consume blocks from the block queue. +// After the block is consumed, the unfinished scanner will resubmit to this scheduler. class ScannerScheduler { public: ScannerScheduler(); @@ -63,7 +56,7 @@ public: [[nodiscard]] Status init(ExecEnv* env); - [[nodiscard]] Status submit(std::shared_ptr<ScannerContext> ctx); + void submit(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> scan_task); void stop(); @@ -73,32 +66,13 @@ public: int remote_thread_pool_max_size() const { return _remote_thread_pool_max_size; } private: - // scheduling thread function - void _schedule_thread(int queue_id); - // schedule scanners in a certain ScannerContext - void _schedule_scanners(std::shared_ptr<ScannerContext> ctx); - // execution thread function - void _scanner_scan(ScannerScheduler* scheduler, std::shared_ptr<ScannerContext> ctx, - std::weak_ptr<ScannerDelegate> scanner); + static void _scanner_scan(std::shared_ptr<ScannerContext> ctx, + std::shared_ptr<ScanTask> scan_task); void _register_metrics(); static void _deregister_metrics(); - // Scheduling queue number. - // TODO: make it configurable. - static const int QUEUE_NUM = 4; - // The ScannerContext will be submitted to the pending queue roundrobin. - // _queue_idx pointer to the current queue. - // Use std::atomic_uint to prevent numerical overflow from memory out of bound. - // The scheduler thread will take ctx from pending queue, schedule it, - // and put it to the _scheduling_map. - // If any scanner finish, it will take ctx from and put it to pending queue again. - std::atomic_uint _queue_idx = {0}; - BlockingQueue<std::shared_ptr<ScannerContext>>** _pending_queues = nullptr; - - // scheduling thread pool - std::unique_ptr<ThreadPool> _scheduler_pool; // execution thread pool // _local_scan_thread_pool is for local scan task(typically, olap scanner) // _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.) diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 568b206db5e..f2f4e242bc6 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -197,7 +197,6 @@ Status VScanNode::alloc_resource(RuntimeState* state) { if (_scanner_ctx) { DCHECK(!_eos && _num_scanners->value() > 0); RETURN_IF_ERROR(_scanner_ctx->init()); - RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx)); } if (_shared_scan_opt) { LOG(INFO) << "instance shared scan enabled" @@ -219,7 +218,6 @@ Status VScanNode::alloc_resource(RuntimeState* state) { : Status::OK()); if (_scanner_ctx) { RETURN_IF_ERROR(_scanner_ctx->init()); - RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx)); } } @@ -246,14 +244,10 @@ Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* }}; if (state->is_cancelled()) { - // ISSUE: https://github.com/apache/doris/issues/16360 - // _scanner_ctx may be null here, see: `VScanNode::alloc_resource` (_eos == null) if (_scanner_ctx) { - _scanner_ctx->set_status_on_error(Status::Cancelled("query cancelled")); - return _scanner_ctx->status(); - } else { - return Status::Cancelled("query cancelled"); + _scanner_ctx->stop_scanners(state); } + return Status::Cancelled("Query cancelled in ScanNode"); } if (_eos) { @@ -261,16 +255,7 @@ Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* return Status::OK(); } - vectorized::BlockUPtr scan_block = nullptr; - RETURN_IF_ERROR(_scanner_ctx->get_block_from_queue(state, &scan_block, eos, _context_queue_id)); - if (*eos) { - DCHECK(scan_block == nullptr); - return Status::OK(); - } - - // get scanner's block memory - block->swap(*scan_block); - _scanner_ctx->return_free_block(std::move(scan_block)); + RETURN_IF_ERROR(_scanner_ctx->get_block_from_queue(state, block, eos, _context_queue_id)); reached_limit(block, eos); if (*eos) { @@ -294,16 +279,14 @@ Status VScanNode::_init_profile() { runtime_profile()->add_child(_scanner_profile.get(), true, nullptr); _memory_usage_counter = ADD_LABEL_COUNTER(_scanner_profile, "MemoryUsage"); - _queued_blocks_memory_usage = - _scanner_profile->AddHighWaterMarkCounter("QueuedBlocks", TUnit::BYTES, "MemoryUsage"); _free_blocks_memory_usage = _scanner_profile->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES, "MemoryUsage"); _newly_create_free_blocks_num = ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT); + _scale_up_scanners_counter = ADD_COUNTER(_scanner_profile, "NumScaleUpScanners", TUnit::UNIT); // time of transfer thread to wait for block from scan thread _scanner_wait_batch_timer = ADD_TIMER(_scanner_profile, "ScannerBatchWaitTime"); _scanner_sched_counter = ADD_COUNTER(_scanner_profile, "ScannerSchedCount", TUnit::UNIT); - _scanner_ctx_sched_counter = ADD_COUNTER(_scanner_profile, "ScannerCtxSchedCount", TUnit::UNIT); _scanner_ctx_sched_time = ADD_TIMER(_scanner_profile, "ScannerCtxSchedTime"); _scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime"); diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 0c39d15b57f..b83e9211214 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -377,7 +377,6 @@ protected: RuntimeProfile::Counter* _filter_timer = nullptr; RuntimeProfile::Counter* _scanner_sched_counter = nullptr; - RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr; RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr; RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr; RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr; @@ -387,8 +386,8 @@ protected: RuntimeProfile::Counter* _max_scanner_thread_num = nullptr; RuntimeProfile::Counter* _memory_usage_counter = nullptr; - RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage = nullptr; RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr; + RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr; std::unordered_map<std::string, int> _colname_to_slot_id; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 8b5e24a52aa..8effd6adae2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -74,6 +74,8 @@ public class SessionVariable implements Serializable, Writable { public static final String EXEC_MEM_LIMIT = "exec_mem_limit"; public static final String SCAN_QUEUE_MEM_LIMIT = "scan_queue_mem_limit"; + public static final String NUM_SCANNER_THREADS = "num_scanner_threads"; + public static final String SCANNER_SCALE_UP_RATIO = "scanner_scale_up_ratio"; public static final String QUERY_TIMEOUT = "query_timeout"; public static final String ANALYZE_TIMEOUT = "analyze_timeout"; @@ -553,6 +555,20 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT) public long maxScanQueueMemByte = 2147483648L / 20; + @VariableMgr.VarAttr(name = NUM_SCANNER_THREADS, needForward = true, description = { + "ScanNode扫描数据的最大并发,默认为0,采用BE的doris_scanner_thread_pool_thread_num", + "The max threads to read data of ScanNode, " + + "default 0, use doris_scanner_thread_pool_thread_num in be.conf" + }) + public int numScannerThreads = 0; + + @VariableMgr.VarAttr(name = SCANNER_SCALE_UP_RATIO, needForward = true, description = { + "ScanNode自适应的增加扫描并发,最大允许增长的并发倍率,默认为0,关闭该功能", + "The max multiple of increasing the concurrency of scanners adaptively, " + + "default 0, turn off scaling up" + }) + public double scannerScaleUpRatio = 0; + @VariableMgr.VarAttr(name = ENABLE_SPILLING) public boolean enableSpilling = false; @@ -1790,6 +1806,14 @@ public class SessionVariable implements Serializable, Writable { return maxScanQueueMemByte; } + public int getNumScannerThreads() { + return numScannerThreads; + } + + public double getScannerScaleUpRatio() { + return scannerScaleUpRatio; + } + public int getQueryTimeoutS() { return queryTimeoutS; } @@ -1962,7 +1986,15 @@ public class SessionVariable implements Serializable, Writable { } public void setMaxScanQueueMemByte(long scanQueueMemByte) { - this.maxScanQueueMemByte = Math.min(scanQueueMemByte, maxExecMemByte / 2); + this.maxScanQueueMemByte = scanQueueMemByte; + } + + public void setNumScannerThreads(int numScannerThreads) { + this.numScannerThreads = numScannerThreads; + } + + public void setScannerScaleUpRatio(double scannerScaleUpRatio) { + this.scannerScaleUpRatio = scannerScaleUpRatio; } public boolean isSqlQuoteShowCreate() { @@ -2771,7 +2803,9 @@ public class SessionVariable implements Serializable, Writable { public TQueryOptions toThrift() { TQueryOptions tResult = new TQueryOptions(); tResult.setMemLimit(maxExecMemByte); - tResult.setScanQueueMemLimit(Math.min(maxScanQueueMemByte, maxExecMemByte / 20)); + tResult.setScanQueueMemLimit(maxScanQueueMemByte); + tResult.setNumScannerThreads(numScannerThreads); + tResult.setScannerScaleUpRatio(scannerScaleUpRatio); // TODO chenhao, reservation will be calculated by cost tResult.setMinReservation(0); @@ -3067,7 +3101,9 @@ public class SessionVariable implements Serializable, Writable { public TQueryOptions getQueryOptionVariables() { TQueryOptions queryOptions = new TQueryOptions(); queryOptions.setMemLimit(maxExecMemByte); - queryOptions.setScanQueueMemLimit(Math.min(maxScanQueueMemByte, maxExecMemByte / 20)); + queryOptions.setScanQueueMemLimit(maxScanQueueMemByte); + queryOptions.setNumScannerThreads(numScannerThreads); + queryOptions.setScannerScaleUpRatio(scannerScaleUpRatio); queryOptions.setQueryTimeout(queryTimeoutS); queryOptions.setInsertTimeout(insertTimeoutS); queryOptions.setAnalyzeTimeout(analyzeTimeoutS); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index b0e3a50628b..c321aa2660e 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -271,6 +271,8 @@ struct TQueryOptions { 97: optional i64 parallel_scan_min_rows_per_scanner = 0; 98: optional bool skip_bad_tablet = false; + // Increase concurrency of scanners adaptively, the maxinum times to scale up + 99: optional double scanner_scale_up_ratio = 0; // For cloud, to control if the content would be written into file cache 1000: optional bool disable_file_cache = false --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org