yiguolei commented on code in PR #30746: URL: https://github.com/apache/doris/pull/30746#discussion_r1477222416
########## be/src/vec/exec/scan/scanner_context.cpp: ########## @@ -31,64 +29,56 @@ #include "common/status.h" #include "pipeline/exec/scan_operator.h" #include "runtime/descriptors.h" -#include "runtime/exec_env.h" -#include "runtime/query_context.h" #include "runtime/runtime_state.h" -#include "util/pretty_printer.h" #include "util/uid_util.h" #include "vec/core/block.h" -#include "vec/exec/scan/scanner_scheduler.h" #include "vec/exec/scan/vscan_node.h" -#include "vec/exec/scan/vscanner.h" namespace doris::vectorized { using namespace std::chrono_literals; -static bvar::Status<int64_t> g_bytes_in_scanner_queue("doris_bytes_in_scanner_queue", 0); -static bvar::Status<int64_t> g_num_running_scanners("doris_num_running_scanners", 0); - ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list<std::shared_ptr<ScannerDelegate>>& scanners, int64_t limit_, int64_t max_bytes_in_blocks_queue, const int num_parallel_instances, - pipeline::ScanLocalStateBase* local_state, - std::shared_ptr<pipeline::ScanDependency> dependency) + pipeline::ScanLocalStateBase* local_state) : HasTaskExecutionCtx(state), _state(state), - _parent(nullptr), _local_state(local_state), _output_tuple_desc(output_row_descriptor ? output_row_descriptor->tuple_descriptors().front() : output_tuple_desc), _output_row_descriptor(output_row_descriptor), - _process_status(Status::OK()), _batch_size(state->batch_size()), limit(limit_), _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) * num_parallel_instances), _scanner_scheduler(state->exec_env()->scanner_scheduler()), - _scanners(scanners.begin(), scanners.end()), _all_scanners(scanners.begin(), scanners.end()), - _num_parallel_instances(num_parallel_instances), - _dependency(dependency) { + _num_parallel_instances(num_parallel_instances) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); _query_id = _state->get_query_ctx()->query_id(); ctx_id = UniqueId::gen_uid().to_string(); - if (_scanners.empty()) { + // Provide more memory for wide tables, increase proportionally by multiples of 300 + _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; + if (scanners.empty()) { _is_finished = true; _set_scanner_done(); } + _scanners.enqueue_bulk(scanners.begin(), scanners.size()); if (limit < 0) { limit = -1; + } else if (limit < _batch_size) { + _batch_size = limit; } - _max_thread_num = config::doris_scanner_thread_pool_thread_num / 4; + _max_thread_num = Review Comment: keep the same as original logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org