This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 8a44c180bf3d9aa842cd92a54c3f6c5f802949fa Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Sat Mar 2 01:02:48 2024 +0800 [opt](scan) read scan ranges in the order of partitions (#31630) --- be/src/pipeline/exec/file_scan_operator.cpp | 55 +++++++++++++++++++++++------ be/src/vec/exec/scan/new_file_scan_node.cpp | 55 +++++++++++++++++++++++------ 2 files changed, 88 insertions(+), 22 deletions(-) diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index ac193147dfb..9d48fce2552 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -73,20 +73,53 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state, _scan_ranges = scan_ranges; } else { // There is no need for the number of scanners to exceed the number of threads in thread pool. - _scan_ranges.clear(); - auto range_iter = scan_ranges.begin(); - for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) { - _scan_ranges.push_back(*range_iter); + _scan_ranges.resize(max_scanners); + std::vector<TScanRangeParams>& scan_ranges_ = + const_cast<std::vector<TScanRangeParams>&>(scan_ranges); + auto& first_ranges = scan_ranges_[0].scan_range.ext_scan_range.file_scan_range.ranges; + if (first_ranges[0].__isset.columns_from_path_keys && + !first_ranges[0].columns_from_path_keys.empty()) { + int num_keys = first_ranges[0].columns_from_path_keys.size(); + // In the insert statement, reading data in partition order can reduce the memory usage of BE + // and prevent the generation of smaller tables. + std::sort(scan_ranges_.begin(), scan_ranges_.end(), + [&num_keys](TScanRangeParams r1, TScanRangeParams r2) { + auto& path1 = r1.scan_range.ext_scan_range.file_scan_range.ranges[0] + .columns_from_path; + auto& path2 = r2.scan_range.ext_scan_range.file_scan_range.ranges[0] + .columns_from_path; + for (int i = 0; i < num_keys; ++i) { + if (path1[i] < path2[i]) { + return true; + } + } + return false; + }); } - for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) { - if (i == max_scanners) { - i = 0; + int num_ranges = scan_ranges.size() / max_scanners; + int num_add_one = scan_ranges.size() - num_ranges * max_scanners; + int scan_index = 0; + int range_index = 0; + for (int i = 0; i < num_add_one; ++i) { + _scan_ranges[scan_index] = scan_ranges_[range_index++]; + auto& ranges = + _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges; j++) { + auto& merged_ranges = scan_ranges_[range_index++] + .scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); + } + } + for (int i = num_add_one; i < max_scanners; ++i) { + _scan_ranges[scan_index] = scan_ranges_[range_index++]; + auto& ranges = + _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges - 1; j++) { + auto& merged_ranges = scan_ranges_[range_index++] + .scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); } - auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges; - auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges; - ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); } - _scan_ranges.shrink_to_fit(); LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size(); } if (scan_ranges.size() > 0 && diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index 2ce80f4463a..a0ae03a8647 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -71,20 +71,53 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state, _scan_ranges = scan_ranges; } else { // There is no need for the number of scanners to exceed the number of threads in thread pool. - _scan_ranges.clear(); - auto range_iter = scan_ranges.begin(); - for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) { - _scan_ranges.push_back(*range_iter); + _scan_ranges.resize(max_scanners); + std::vector<TScanRangeParams>& scan_ranges_ = + const_cast<std::vector<TScanRangeParams>&>(scan_ranges); + auto& first_ranges = scan_ranges_[0].scan_range.ext_scan_range.file_scan_range.ranges; + if (first_ranges[0].__isset.columns_from_path_keys && + !first_ranges[0].columns_from_path_keys.empty()) { + int num_keys = first_ranges[0].columns_from_path_keys.size(); + // In the insert statement, reading data in partition order can reduce the memory usage of BE + // and prevent the generation of smaller tables. + std::sort(scan_ranges_.begin(), scan_ranges_.end(), + [&num_keys](TScanRangeParams r1, TScanRangeParams r2) { + auto& path1 = r1.scan_range.ext_scan_range.file_scan_range.ranges[0] + .columns_from_path; + auto& path2 = r2.scan_range.ext_scan_range.file_scan_range.ranges[0] + .columns_from_path; + for (int i = 0; i < num_keys; ++i) { + if (path1[i] < path2[i]) { + return true; + } + } + return false; + }); } - for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) { - if (i == max_scanners) { - i = 0; + int num_ranges = scan_ranges.size() / max_scanners; + int num_add_one = scan_ranges.size() - num_ranges * max_scanners; + int scan_index = 0; + int range_index = 0; + for (int i = 0; i < num_add_one; ++i) { + _scan_ranges[scan_index] = scan_ranges_[range_index++]; + auto& ranges = + _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges; j++) { + auto& merged_ranges = scan_ranges_[range_index++] + .scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); + } + } + for (int i = num_add_one; i < max_scanners; ++i) { + _scan_ranges[scan_index] = scan_ranges_[range_index++]; + auto& ranges = + _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges - 1; j++) { + auto& merged_ranges = scan_ranges_[range_index++] + .scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); } - auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges; - auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges; - ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); } - _scan_ranges.shrink_to_fit(); LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size(); } if (scan_ranges.size() > 0 && --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org