yiguolei commented on code in PR #30746: URL: https://github.com/apache/doris/pull/30746#discussion_r1477222377
########## be/src/vec/exec/scan/scanner_context.cpp: ########## @@ -31,64 +29,56 @@ #include "common/status.h" #include "pipeline/exec/scan_operator.h" #include "runtime/descriptors.h" -#include "runtime/exec_env.h" -#include "runtime/query_context.h" #include "runtime/runtime_state.h" -#include "util/pretty_printer.h" #include "util/uid_util.h" #include "vec/core/block.h" -#include "vec/exec/scan/scanner_scheduler.h" #include "vec/exec/scan/vscan_node.h" -#include "vec/exec/scan/vscanner.h" namespace doris::vectorized { using namespace std::chrono_literals; -static bvar::Status<int64_t> g_bytes_in_scanner_queue("doris_bytes_in_scanner_queue", 0); -static bvar::Status<int64_t> g_num_running_scanners("doris_num_running_scanners", 0); - ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list<std::shared_ptr<ScannerDelegate>>& scanners, int64_t limit_, int64_t max_bytes_in_blocks_queue, const int num_parallel_instances, - pipeline::ScanLocalStateBase* local_state, - std::shared_ptr<pipeline::ScanDependency> dependency) + pipeline::ScanLocalStateBase* local_state) : HasTaskExecutionCtx(state), _state(state), - _parent(nullptr), _local_state(local_state), _output_tuple_desc(output_row_descriptor ? output_row_descriptor->tuple_descriptors().front() : output_tuple_desc), _output_row_descriptor(output_row_descriptor), - _process_status(Status::OK()), _batch_size(state->batch_size()), limit(limit_), _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) * num_parallel_instances), _scanner_scheduler(state->exec_env()->scanner_scheduler()), - _scanners(scanners.begin(), scanners.end()), _all_scanners(scanners.begin(), scanners.end()), - _num_parallel_instances(num_parallel_instances), - _dependency(dependency) { + _num_parallel_instances(num_parallel_instances) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); _query_id = _state->get_query_ctx()->query_id(); ctx_id = UniqueId::gen_uid().to_string(); - if (_scanners.empty()) { + // Provide more memory for wide tables, increase proportionally by multiples of 300 + _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; + if (scanners.empty()) { _is_finished = true; _set_scanner_done(); } + _scanners.enqueue_bulk(scanners.begin(), scanners.size()); if (limit < 0) { limit = -1; + } else if (limit < _batch_size) { + _batch_size = limit; Review Comment: remove this logic, keep the same as original since we do not know the actual reason. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org