wangbo commented on code in PR #17738: URL: https://github.com/apache/doris/pull/17738#discussion_r1137100521
########## be/src/vec/exec/scan/new_olap_scan_node.cpp: ########## @@ -404,57 +406,176 @@ Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) { } int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size()); - std::unordered_set<std::string> disk_set; - for (auto& scan_range : _scan_ranges) { - auto tablet_id = scan_range->tablet_id; - std::string err; - TabletSharedPtr tablet = - StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err); - if (tablet == nullptr) { - auto err_str = fmt::format("failed to get tablet: {}, reason: {}", tablet_id, err); - LOG(WARNING) << err_str; - return Status::InternalError(err_str); - } + bool is_duplicate_key = false; + int segment_count = 0; + std::vector<std::vector<RowsetReaderSharedPtr>> rowset_readers_vector(_scan_ranges.size()); + std::vector<std::vector<int>> tablet_rs_seg_count(_scan_ranges.size()); + + // Split tablet segment by scanner, only use in pipeline in duplicate key + // 1. if tablet count lower than scanner thread num, count segment num of all tablet ready for scan + // TODO: some tablet may do not have segment, may need split segment all case + if (_shared_scan_opt && _scan_ranges.size() < config::doris_scanner_thread_pool_thread_num) { + for (int i = 0; i < _scan_ranges.size(); ++i) { + auto& scan_range = _scan_ranges[i]; + auto tablet_id = scan_range->tablet_id; + std::string err; + TabletSharedPtr tablet = + StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err); + if (tablet == nullptr) { + auto err_str = fmt::format("failed to get tablet: {}, reason: {}", tablet_id, err); + LOG(WARNING) << err_str; + return Status::InternalError(err_str); + } - std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &cond_ranges; - int size_based_scanners_per_tablet = 1; + is_duplicate_key = tablet->keys_type() == DUP_KEYS; + if (!is_duplicate_key) { + break; + } - if (config::doris_scan_range_max_mb > 0) { - size_based_scanners_per_tablet = std::max( - 1, (int)(tablet->tablet_footprint() / (config::doris_scan_range_max_mb << 20))); + int64_t version = 0; + std::from_chars(scan_range->version.c_str(), + scan_range->version.c_str() + scan_range->version.size(), version); + + std::shared_lock rdlock(tablet->get_header_lock()); + // acquire tablet rowset readers at the beginning of the scan node + // to prevent this case: when there are lots of olap scanners to run for example 10000 + // the rowsets maybe compacted when the last olap scanner starts + Status acquire_reader_st = + tablet->capture_rs_readers({0, version}, &rowset_readers_vector[i]); + if (!acquire_reader_st.ok()) { + LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st; + std::stringstream ss; + ss << "failed to initialize storage reader. tablet=" << tablet->full_name() + << ", res=" << acquire_reader_st + << ", backend=" << BackendOptions::get_localhost(); + return Status::InternalError(ss.str()); + } + + for (const auto& rowset_reader : rowset_readers_vector[i]) { + auto num_segments = rowset_reader->rowset()->num_segments(); + tablet_rs_seg_count[i].emplace_back(num_segments); + segment_count += num_segments; + } } + } - int ranges_per_scanner = - std::max(1, (int)ranges->size() / - std::min(scanners_per_tablet, size_based_scanners_per_tablet)); - int num_ranges = ranges->size(); - for (int i = 0; i < num_ranges;) { - std::vector<doris::OlapScanRange*> scanner_ranges; - scanner_ranges.push_back((*ranges)[i].get()); - ++i; - for (int j = 1; i < num_ranges && j < ranges_per_scanner && - (*ranges)[i]->end_include == (*ranges)[i - 1]->end_include; - ++j, ++i) { - scanner_ranges.push_back((*ranges)[i].get()); + std::unordered_set<std::string> disk_set; + auto build_new_scanner = [&](const TPaloScanRange& scan_range, + const std::vector<OlapScanRange*>& key_ranges, + const std::vector<RowsetReaderSharedPtr>& rs_readers, + const std::vector<std::pair<int, int>>& rs_reader_seg_offsets) { + NewOlapScanner* scanner = new NewOlapScanner(_state, this, _limit_per_scanner, + _olap_scan_node.is_preaggregation, + _need_agg_finalize, _scanner_profile.get()); + + scanner->set_compound_filters(_compound_filters); + // add scanner to pool before doing prepare. + // so that scanner can be automatically deconstructed if prepare failed. + _scanner_pool.add(scanner); + RETURN_IF_ERROR(scanner->prepare(scan_range, key_ranges, _vconjunct_ctx_ptr.get(), + _olap_filters, _filter_predicates, _push_down_functions, + _common_vexpr_ctxs_pushdown.get(), rs_readers, + rs_reader_seg_offsets)); + scanners->push_back((VScanner*)scanner); + disk_set.insert(scanner->scan_disk()); + return Status::OK(); + }; + if (is_duplicate_key) { + // 2. Split by segment count, each scanner need scan avg segment count + auto avg_segment_count = + std::max(segment_count / config::doris_scanner_thread_pool_thread_num, 1); + for (int i = 0; i < _scan_ranges.size(); ++i) { + auto& scan_range = _scan_ranges[i]; + std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &cond_ranges; + int num_ranges = ranges->size(); + std::vector<doris::OlapScanRange*> scanner_ranges(num_ranges); + for (int j = 0; j < num_ranges; ++j) { + scanner_ranges[j] = (*ranges)[j].get(); + } + + const auto& rs_seg_count = tablet_rs_seg_count[i]; + int rs_seg_count_index = 0; + int rs_seg_start_scan = 0; + int scanner_seg_occupy = 0; + std::vector<RowsetReaderSharedPtr> rs_readers; + std::vector<std::pair<int, int>> rs_reader_seg_offsets; + + while (rs_seg_count_index < rs_seg_count.size()) { + auto max_add_seg_nums = rs_seg_count[rs_seg_count_index] - rs_seg_start_scan; + rs_readers.emplace_back(rowset_readers_vector[i][rs_seg_count_index]->clone()); + + if (scanner_seg_occupy + max_add_seg_nums > avg_segment_count) { + auto need_add_seg_nums = avg_segment_count - scanner_seg_occupy; + rs_reader_seg_offsets.emplace_back( + rs_seg_start_scan, + rs_seg_start_scan + need_add_seg_nums); // only scan need_add_seg_nums + RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, rs_readers, + rs_reader_seg_offsets)); + + rs_seg_start_scan += need_add_seg_nums; + scanner_seg_occupy = 0; + rs_readers.clear(); + rs_reader_seg_offsets.clear(); + } else if (scanner_seg_occupy + max_add_seg_nums == avg_segment_count) { + rs_reader_seg_offsets.emplace_back(rs_seg_start_scan, + rs_seg_count[rs_seg_count_index]); + RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, rs_readers, + rs_reader_seg_offsets)); + + rs_seg_start_scan = 0; + scanner_seg_occupy = 0; + rs_readers.clear(); + rs_reader_seg_offsets.clear(); + rs_seg_count_index++; + } else { + rs_reader_seg_offsets.emplace_back(rs_seg_start_scan, + rs_seg_count[rs_seg_count_index]); + + rs_seg_start_scan = 0; + scanner_seg_occupy += max_add_seg_nums; + rs_seg_count_index++; + } } - NewOlapScanner* scanner = new NewOlapScanner( - _state, this, _limit_per_scanner, _olap_scan_node.is_preaggregation, - _need_agg_finalize, _scanner_profile.get()); - - scanner->set_compound_filters(_compound_filters); - // add scanner to pool before doing prepare. - // so that scanner can be automatically deconstructed if prepare failed. - _scanner_pool.add(scanner); - RETURN_IF_ERROR(scanner->prepare( - *scan_range, scanner_ranges, _vconjunct_ctx_ptr.get(), _olap_filters, - _filter_predicates, _push_down_functions, _common_vexpr_ctxs_pushdown.get())); - scanners->push_back((VScanner*)scanner); - disk_set.insert(scanner->scan_disk()); + // dispose some segment tail + if (!rs_readers.empty()) { + build_new_scanner(*scan_range, scanner_ranges, rs_readers, rs_reader_seg_offsets); + } } + } else { + for (auto& scan_range : _scan_ranges) { + auto tablet_id = scan_range->tablet_id; + std::string err; + TabletSharedPtr tablet = + StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err); Review Comment: Missing tablet nullptr check here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org