AshinGau commented on code in PR #30746: URL: https://github.com/apache/doris/pull/30746#discussion_r1477311455
########## be/src/vec/exec/scan/scanner_context.cpp: ########## @@ -31,64 +29,56 @@ #include "common/status.h" #include "pipeline/exec/scan_operator.h" #include "runtime/descriptors.h" -#include "runtime/exec_env.h" -#include "runtime/query_context.h" #include "runtime/runtime_state.h" -#include "util/pretty_printer.h" #include "util/uid_util.h" #include "vec/core/block.h" -#include "vec/exec/scan/scanner_scheduler.h" #include "vec/exec/scan/vscan_node.h" -#include "vec/exec/scan/vscanner.h" namespace doris::vectorized { using namespace std::chrono_literals; -static bvar::Status<int64_t> g_bytes_in_scanner_queue("doris_bytes_in_scanner_queue", 0); -static bvar::Status<int64_t> g_num_running_scanners("doris_num_running_scanners", 0); - ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list<std::shared_ptr<ScannerDelegate>>& scanners, int64_t limit_, int64_t max_bytes_in_blocks_queue, const int num_parallel_instances, - pipeline::ScanLocalStateBase* local_state, - std::shared_ptr<pipeline::ScanDependency> dependency) + pipeline::ScanLocalStateBase* local_state) : HasTaskExecutionCtx(state), _state(state), - _parent(nullptr), _local_state(local_state), _output_tuple_desc(output_row_descriptor ? output_row_descriptor->tuple_descriptors().front() : output_tuple_desc), _output_row_descriptor(output_row_descriptor), - _process_status(Status::OK()), _batch_size(state->batch_size()), limit(limit_), _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) * num_parallel_instances), _scanner_scheduler(state->exec_env()->scanner_scheduler()), - _scanners(scanners.begin(), scanners.end()), _all_scanners(scanners.begin(), scanners.end()), - _num_parallel_instances(num_parallel_instances), - _dependency(dependency) { + _num_parallel_instances(num_parallel_instances) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); _query_id = _state->get_query_ctx()->query_id(); ctx_id = UniqueId::gen_uid().to_string(); - if (_scanners.empty()) { + // Provide more memory for wide tables, increase proportionally by multiples of 300 + _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; + if (scanners.empty()) { _is_finished = true; _set_scanner_done(); } + _scanners.enqueue_bulk(scanners.begin(), scanners.size()); if (limit < 0) { limit = -1; + } else if (limit < _batch_size) { + _batch_size = limit; } - _max_thread_num = config::doris_scanner_thread_pool_thread_num / 4; + _max_thread_num = Review Comment: Dividing by 4 is completely meaningless. The `config::doris_scanner_thread_pool_thread_num / 4` here are very large, but many parameters previously controlled concurrency, and the actual concurrency has not been achieved _max_thread_num, now it is directly satisfied _max_thread_num concurrency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org