This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 103c473b96 [Bug](pipeline) fix pipeline shared scan + topn optimization (#21940) 103c473b96 is described below commit 103c473b967477b0748a9e7afa9f781f65d9a5eb Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Tue Jul 25 12:48:27 2023 +0800 [Bug](pipeline) fix pipeline shared scan + topn optimization (#21940) --- be/src/olap/merger.cpp | 26 +++-- be/src/olap/reader.cpp | 4 +- be/src/olap/reader.h | 17 ++-- be/src/olap/rowset/beta_rowset_reader.cpp | 10 +- be/src/olap/rowset/beta_rowset_reader.h | 6 +- be/src/olap/rowset/rowset_reader.h | 19 +++- be/src/olap/schema_change.cpp | 17 ++-- be/src/olap/tablet.cpp | 10 +- be/src/olap/tablet.h | 4 +- be/src/vec/exec/scan/new_olap_scan_node.cpp | 148 +++++++++++++++------------- be/src/vec/exec/scan/new_olap_scanner.cpp | 53 +++++----- be/src/vec/exec/scan/new_olap_scanner.h | 6 +- be/src/vec/olap/block_reader.cpp | 57 +++++++---- be/src/vec/olap/block_reader.h | 2 +- be/src/vec/olap/vcollect_iterator.cpp | 25 ++--- be/src/vec/olap/vcollect_iterator.h | 9 +- be/src/vec/olap/vertical_block_reader.cpp | 18 ++-- be/test/olap/tablet_test.cpp | 8 +- 18 files changed, 240 insertions(+), 199 deletions(-) diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index 44fd3d510e..ee1bffa9e2 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -60,7 +60,10 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, TabletReader::ReaderParams reader_params; reader_params.tablet = tablet; reader_params.reader_type = reader_type; - reader_params.rs_readers = src_rowset_readers; + reader_params.rs_splits.reserve(src_rowset_readers.size()); + for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) { + reader_params.rs_splits.emplace_back(RowSetSplits(rs_reader)); + } reader_params.version = dst_rowset_writer->version(); TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>(); @@ -92,10 +95,10 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id()); // init segment rowid map for rowid conversion std::vector<uint32_t> segment_num_rows; - for (auto& rs_reader : reader_params.rs_readers) { - RETURN_IF_ERROR(rs_reader->get_segment_num_rows(&segment_num_rows)); - stats_output->rowid_conversion->init_segment_map(rs_reader->rowset()->rowset_id(), - segment_num_rows); + for (auto& rs_split : reader_params.rs_splits) { + RETURN_IF_ERROR(rs_split.rs_reader->get_segment_num_rows(&segment_num_rows)); + stats_output->rowid_conversion->init_segment_map( + rs_split.rs_reader->rowset()->rowset_id(), segment_num_rows); } } @@ -194,7 +197,10 @@ Status Merger::vertical_compact_one_group( reader_params.is_key_column_group = is_key; reader_params.tablet = tablet; reader_params.reader_type = reader_type; - reader_params.rs_readers = src_rowset_readers; + reader_params.rs_splits.reserve(src_rowset_readers.size()); + for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) { + reader_params.rs_splits.emplace_back(RowSetSplits(rs_reader)); + } reader_params.version = dst_rowset_writer->version(); TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>(); @@ -225,10 +231,10 @@ Status Merger::vertical_compact_one_group( stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id()); // init segment rowid map for rowid conversion std::vector<uint32_t> segment_num_rows; - for (auto& rs_reader : reader_params.rs_readers) { - RETURN_IF_ERROR(rs_reader->get_segment_num_rows(&segment_num_rows)); - stats_output->rowid_conversion->init_segment_map(rs_reader->rowset()->rowset_id(), - segment_num_rows); + for (auto& rs_split : reader_params.rs_splits) { + RETURN_IF_ERROR(rs_split.rs_reader->get_segment_num_rows(&segment_num_rows)); + stats_output->rowid_conversion->init_segment_map( + rs_split.rs_reader->rowset()->rowset_id(), segment_num_rows); } } diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 468c6289d5..1a5516f273 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -153,7 +153,7 @@ bool TabletReader::_optimize_for_single_rowset( } Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) { - if (read_params.rs_readers.empty()) { + if (read_params.rs_splits.empty()) { return Status::InternalError("fail to acquire data sources. tablet={}", _tablet->full_name()); } @@ -636,7 +636,7 @@ Status TabletReader::init_reader_params_and_create_block( for (auto& rowset : input_rowsets) { RowsetReaderSharedPtr rs_reader; RETURN_IF_ERROR(rowset->create_reader(&rs_reader)); - reader_params->rs_readers.push_back(std::move(rs_reader)); + reader_params->rs_splits.push_back(RowSetSplits(std::move(rs_reader))); } std::vector<RowsetMetaSharedPtr> rowset_metas(input_rowsets.size()); diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 231ff23d6e..249c199781 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -93,6 +93,16 @@ public: // Params for Reader, // mainly include tablet, data version and fetch range. struct ReaderParams { + bool has_single_version() const { + return (rs_splits.size() == 1 && + rs_splits[0].rs_reader->rowset()->start_version() == 0 && + !rs_splits[0].rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) || + (rs_splits.size() == 2 && + rs_splits[0].rs_reader->rowset()->rowset_meta()->num_rows() == 0 && + rs_splits[1].rs_reader->rowset()->start_version() == 2 && + !rs_splits[1].rs_reader->rowset()->rowset_meta()->is_segments_overlapping()); + } + TabletSharedPtr tablet; TabletSchemaSPtr tablet_schema; ReaderType reader_type = ReaderType::READER_QUERY; @@ -118,12 +128,7 @@ public: // For unique key table with merge-on-write DeleteBitmap* delete_bitmap {nullptr}; - - std::vector<RowsetReaderSharedPtr> rs_readers; - // if rs_readers_segment_offsets is not empty, means we only scan - // [pair.first, pair.second) segment in rs_reader, only effective in dup key - // and pipeline - std::vector<std::pair<int, int>> rs_readers_segment_offsets; + std::vector<RowSetSplits> rs_splits; // return_columns is init from query schema std::vector<uint32_t> return_columns; diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 7ba0260801..bdb0e1fca6 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -76,8 +76,7 @@ bool BetaRowsetReader::update_profile(RuntimeProfile* profile) { Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context, std::vector<RowwiseIteratorUPtr>* out_iters, - const std::pair<int, int>& segment_offset, - bool use_cache) { + const RowSetSplits& rs_splits, bool use_cache) { RETURN_IF_ERROR(_rowset->load()); _context = read_context; // The segment iterator is created with its own statistics, @@ -218,7 +217,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context // create iterator for each segment auto& segments = _segment_cache_handle.get_segments(); - auto [seg_start, seg_end] = segment_offset; + auto [seg_start, seg_end] = rs_splits.segment_offsets; if (seg_start == seg_end) { seg_start = 0; seg_end = segments.size(); @@ -241,12 +240,11 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context return Status::OK(); } -Status BetaRowsetReader::init(RowsetReaderContext* read_context, - const std::pair<int, int>& segment_offset) { +Status BetaRowsetReader::init(RowsetReaderContext* read_context, const RowSetSplits& rs_splits) { _context = read_context; _context->rowset_id = _rowset->rowset_id(); std::vector<RowwiseIteratorUPtr> iterators; - RETURN_IF_ERROR(get_segment_iterators(_context, &iterators, segment_offset)); + RETURN_IF_ERROR(get_segment_iterators(_context, &iterators, rs_splits)); // merge or union segment iterator if (read_context->need_ordered_result && _rowset->rowset_meta()->is_segments_overlapping()) { diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h index 83bc94ea0b..74ef9c96a6 100644 --- a/be/src/olap/rowset/beta_rowset_reader.h +++ b/be/src/olap/rowset/beta_rowset_reader.h @@ -46,13 +46,11 @@ public: ~BetaRowsetReader() override { _rowset->release(); } - Status init(RowsetReaderContext* read_context, - const std::pair<int, int>& segment_offset) override; + Status init(RowsetReaderContext* read_context, const RowSetSplits& rs_splits) override; Status get_segment_iterators(RowsetReaderContext* read_context, std::vector<RowwiseIteratorUPtr>* out_iters, - const std::pair<int, int>& segment_offset, - bool use_cache = false) override; + const RowSetSplits& rs_splits, bool use_cache = false) override; void reset_read_options() override; Status next_block(vectorized::Block* block) override; Status next_block_view(vectorized::BlockView* block_view) override; diff --git a/be/src/olap/rowset/rowset_reader.h b/be/src/olap/rowset/rowset_reader.h index dadfd0e05c..cbe005b9bd 100644 --- a/be/src/olap/rowset/rowset_reader.h +++ b/be/src/olap/rowset/rowset_reader.h @@ -37,17 +37,28 @@ class Block; class RowsetReader; using RowsetReaderSharedPtr = std::shared_ptr<RowsetReader>; +struct RowSetSplits { + RowsetReaderSharedPtr rs_reader; + + // if segment_offsets is not empty, means we only scan + // [pair.first, pair.second) segment in rs_reader, only effective in dup key + // and pipeline + std::pair<int, int> segment_offsets; + + RowSetSplits(RowsetReaderSharedPtr rs_reader_) + : rs_reader(rs_reader_), segment_offsets({0, 0}) {} + RowSetSplits() = default; +}; + class RowsetReader { public: virtual ~RowsetReader() = default; - // reader init - virtual Status init(RowsetReaderContext* read_context, - const std::pair<int, int>& segment_offset = {0, 0}) = 0; + virtual Status init(RowsetReaderContext* read_context, const RowSetSplits& rs_splits = {}) = 0; virtual Status get_segment_iterators(RowsetReaderContext* read_context, std::vector<RowwiseIteratorUPtr>* out_iters, - const std::pair<int, int>& segment_offset = {0, 0}, + const RowSetSplits& rs_splits = {}, bool use_cache = false) = 0; virtual void reset_read_options() = 0; diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index aff67f68bb..f2f69f241d 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -722,7 +722,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& // reader_context is stack variables, it's lifetime should keep the same // with rs_readers RowsetReaderContext reader_context; - std::vector<RowsetReaderSharedPtr> rs_readers; + std::vector<RowSetSplits> rs_splits; // delete handlers for new tablet DeleteHandler delete_handler; std::vector<ColumnId> return_columns; @@ -814,11 +814,11 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& } // acquire data sources correspond to history versions - base_tablet->capture_rs_readers(versions_to_be_changed, &rs_readers); - if (rs_readers.empty()) { + base_tablet->capture_rs_readers(versions_to_be_changed, &rs_splits); + if (rs_splits.empty()) { res = Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>( "fail to acquire all data sources. version_num={}, data_source_num={}", - versions_to_be_changed.size(), rs_readers.size()); + versions_to_be_changed.size(), rs_splits.size()); break; } auto& all_del_preds = base_tablet->delete_predicates(); @@ -846,8 +846,8 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& reader_context.batch_size = ALTER_TABLE_BATCH_SIZE; reader_context.delete_bitmap = &base_tablet->tablet_meta()->delete_bitmap(); reader_context.version = Version(0, end_version); - for (auto& rs_reader : rs_readers) { - res = rs_reader->init(&reader_context); + for (auto& rs_split : rs_splits) { + res = rs_split.rs_reader->init(&reader_context); if (!res) { LOG(WARNING) << "failed to init rowset reader: " << base_tablet->full_name(); break; @@ -865,7 +865,10 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl); sc_params.base_tablet = base_tablet; sc_params.new_tablet = new_tablet; - sc_params.ref_rowset_readers = rs_readers; + sc_params.ref_rowset_readers.reserve(rs_splits.size()); + for (RowSetSplits& split : rs_splits) { + sc_params.ref_rowset_readers.emplace_back(split.rs_reader); + } sc_params.delete_handler = &delete_handler; sc_params.base_tablet_schema = base_tablet_schema; sc_params.be_exec_version = request.be_exec_version; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 3f2c1829a5..9348f78119 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -924,16 +924,16 @@ Status Tablet::_capture_consistent_rowsets_unlocked(const std::vector<Version>& } Status Tablet::capture_rs_readers(const Version& spec_version, - std::vector<RowsetReaderSharedPtr>* rs_readers) const { + std::vector<RowSetSplits>* rs_splits) const { std::vector<Version> version_path; RETURN_IF_ERROR(capture_consistent_versions(spec_version, &version_path)); - RETURN_IF_ERROR(capture_rs_readers(version_path, rs_readers)); + RETURN_IF_ERROR(capture_rs_readers(version_path, rs_splits)); return Status::OK(); } Status Tablet::capture_rs_readers(const std::vector<Version>& version_path, - std::vector<RowsetReaderSharedPtr>* rs_readers) const { - DCHECK(rs_readers != nullptr && rs_readers->empty()); + std::vector<RowSetSplits>* rs_splits) const { + DCHECK(rs_splits != nullptr && rs_splits->empty()); for (auto version : version_path) { auto it = _rs_version_map.find(version); if (it == _rs_version_map.end()) { @@ -954,7 +954,7 @@ Status Tablet::capture_rs_readers(const std::vector<Version>& version_path, return Status::Error<CAPTURE_ROWSET_READER_ERROR>( "failed to create reader for rowset:{}", it->second->rowset_id().to_string()); } - rs_readers->push_back(std::move(rs_reader)); + rs_splits->push_back(RowSetSplits(std::move(rs_reader))); } return Status::OK(); } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 1688dcf2a6..3efd2c89ce 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -184,10 +184,10 @@ public: Status capture_consistent_rowsets(const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets) const; Status capture_rs_readers(const Version& spec_version, - std::vector<RowsetReaderSharedPtr>* rs_readers) const; + std::vector<RowSetSplits>* rs_splits) const; Status capture_rs_readers(const std::vector<Version>& version_path, - std::vector<RowsetReaderSharedPtr>* rs_readers) const; + std::vector<RowSetSplits>* rs_splits) const; const std::vector<RowsetMetaSharedPtr> delete_predicates() { return _tablet_meta->delete_predicates(); diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index cfd9e329f0..f12533431f 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -454,9 +454,9 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size()); bool is_duplicate_key = false; - int segment_count = 0; - std::vector<std::vector<RowsetReaderSharedPtr>> rowset_readers_vector(_scan_ranges.size()); - std::vector<std::vector<int>> tablet_rs_seg_count(_scan_ranges.size()); + size_t segment_count = 0; + std::vector<std::vector<RowSetSplits>> rowset_splits_vector(_scan_ranges.size()); + std::vector<std::vector<size_t>> tablet_rs_seg_count(_scan_ranges.size()); // Split tablet segment by scanner, only use in pipeline in duplicate key // 1. if tablet count lower than scanner thread num, count segment num of all tablet ready for scan @@ -484,7 +484,7 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { // to prevent this case: when there are lots of olap scanners to run for example 10000 // the rowsets maybe compacted when the last olap scanner starts Status acquire_reader_st = - tablet->capture_rs_readers({0, version}, &rowset_readers_vector[i]); + tablet->capture_rs_readers({0, version}, &rowset_splits_vector[i]); if (!acquire_reader_st.ok()) { LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st; std::stringstream ss; @@ -494,31 +494,30 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { return Status::InternalError(ss.str()); } - for (const auto& rowset_reader : rowset_readers_vector[i]) { - auto num_segments = rowset_reader->rowset()->num_segments(); + for (const auto& rowset_splits : rowset_splits_vector[i]) { + auto num_segments = rowset_splits.rs_reader->rowset()->num_segments(); tablet_rs_seg_count[i].emplace_back(num_segments); segment_count += num_segments; } } } - auto build_new_scanner = [&](const TPaloScanRange& scan_range, - const std::vector<OlapScanRange*>& key_ranges, - const std::vector<RowsetReaderSharedPtr>& rs_readers, - const std::vector<std::pair<int, int>>& rs_reader_seg_offsets) { - std::shared_ptr<NewOlapScanner> scanner = NewOlapScanner::create_shared( - _state, this, _limit_per_scanner, _olap_scan_node.is_preaggregation, scan_range, - key_ranges, rs_readers, rs_reader_seg_offsets, _scanner_profile.get()); - - RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts)); - scanner->set_compound_filters(_compound_filters); - scanners->push_back(scanner); - return Status::OK(); - }; if (is_duplicate_key) { - // 2. Split by segment count, each scanner need scan avg segment count - auto avg_segment_count = - std::max(segment_count / config::doris_scanner_thread_pool_thread_num, 1); + auto build_new_scanner = [&](const TPaloScanRange& scan_range, + const std::vector<OlapScanRange*>& key_ranges, + const std::vector<RowSetSplits>& rs_splits) { + std::shared_ptr<NewOlapScanner> scanner = NewOlapScanner::create_shared( + _state, this, _limit_per_scanner, _olap_scan_node.is_preaggregation, scan_range, + key_ranges, rs_splits, _scanner_profile.get()); + + RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts)); + scanner->set_compound_filters(_compound_filters); + scanners->push_back(scanner); + return Status::OK(); + }; + // 2. Split segment evenly to each scanner (e.g. each scanner need to scan `avg_segment_count_per_scanner` segments) + const auto avg_segment_count_by_scanner = + std::max(segment_count / config::doris_scanner_thread_pool_thread_num, (size_t)1); for (int i = 0; i < _scan_ranges.size(); ++i) { auto& scan_range = _scan_ranges[i]; std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &_cond_ranges; @@ -528,69 +527,84 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { scanner_ranges[j] = (*ranges)[j].get(); } + // Segment count in current rowset vector const auto& rs_seg_count = tablet_rs_seg_count[i]; - int rs_seg_count_index = 0; - int rs_seg_start_scan = 0; - int scanner_seg_occupy = 0; - std::vector<RowsetReaderSharedPtr> rs_readers; - std::vector<std::pair<int, int>> rs_reader_seg_offsets; - while (rs_seg_count_index < rs_seg_count.size()) { + size_t rowset_idx = 0; + size_t segment_idx_to_scan = 0; + size_t num_segments_assigned = 0; + + std::vector<RowSetSplits> rs_splits; + + while (rowset_idx < rs_seg_count.size()) { // do not generator range of segment (0, 0) - if (rs_seg_count[rs_seg_count_index] == 0) { - rs_seg_start_scan = 0; - rs_seg_count_index++; + if (rs_seg_count[rowset_idx] == 0) { + segment_idx_to_scan = 0; + rowset_idx++; continue; } - auto max_add_seg_nums = rs_seg_count[rs_seg_count_index] - rs_seg_start_scan; - rs_readers.emplace_back(rowset_readers_vector[i][rs_seg_count_index]->clone()); - - if (scanner_seg_occupy + max_add_seg_nums > avg_segment_count) { - auto need_add_seg_nums = avg_segment_count - scanner_seg_occupy; - rs_reader_seg_offsets.emplace_back( - rs_seg_start_scan, - rs_seg_start_scan + need_add_seg_nums); // only scan need_add_seg_nums - RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, rs_readers, - rs_reader_seg_offsets)); - - rs_seg_start_scan += need_add_seg_nums; - scanner_seg_occupy = 0; - rs_readers.clear(); - rs_reader_seg_offsets.clear(); - } else if (scanner_seg_occupy + max_add_seg_nums == avg_segment_count) { - rs_reader_seg_offsets.emplace_back(rs_seg_start_scan, - rs_seg_count[rs_seg_count_index]); - RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, rs_readers, - rs_reader_seg_offsets)); - - rs_seg_start_scan = 0; - scanner_seg_occupy = 0; - rs_readers.clear(); - rs_reader_seg_offsets.clear(); - rs_seg_count_index++; + const auto max_add_seg_nums = rs_seg_count[rowset_idx] - segment_idx_to_scan; + rs_splits.emplace_back(RowSetSplits()); + rs_splits.back().rs_reader = rowset_splits_vector[i][rowset_idx].rs_reader->clone(); + + // if segments assigned to current scanner are already more than the average count, + // this scanner will just scan segments equal to the average count + if (num_segments_assigned + max_add_seg_nums > avg_segment_count_by_scanner) { + auto need_add_seg_nums = avg_segment_count_by_scanner - num_segments_assigned; + rs_splits.back().segment_offsets = { + segment_idx_to_scan, + segment_idx_to_scan + need_add_seg_nums}; // only scan need_add_seg_nums + + RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, rs_splits)); + + segment_idx_to_scan += need_add_seg_nums; + num_segments_assigned = 0; + rs_splits.clear(); + } else if (num_segments_assigned + max_add_seg_nums == + avg_segment_count_by_scanner) { + rs_splits.back().segment_offsets = {segment_idx_to_scan, + rs_seg_count[rowset_idx]}; + RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, rs_splits)); + + segment_idx_to_scan = 0; + num_segments_assigned = 0; + rs_splits.clear(); + rowset_idx++; } else { - rs_reader_seg_offsets.emplace_back(rs_seg_start_scan, - rs_seg_count[rs_seg_count_index]); + rs_splits.back().segment_offsets = {segment_idx_to_scan, + rs_seg_count[rowset_idx]}; - rs_seg_start_scan = 0; - scanner_seg_occupy += max_add_seg_nums; - rs_seg_count_index++; + segment_idx_to_scan = 0; + num_segments_assigned += max_add_seg_nums; + rowset_idx++; } } #ifndef NDEBUG - for (const auto& offset : rs_reader_seg_offsets) { - DCHECK_NE(offset.first, offset.second); + for (const auto& rs_reader_with_segments : rs_splits) { + DCHECK_NE(rs_reader_with_segments.segment_offsets.first, + rs_reader_with_segments.segment_offsets.second); } #endif // dispose some segment tail - if (!rs_readers.empty()) { - build_new_scanner(*scan_range, scanner_ranges, rs_readers, rs_reader_seg_offsets); + if (!rs_splits.empty()) { + build_new_scanner(*scan_range, scanner_ranges, rs_splits); } } } else { + auto build_new_scanner = [&](const TPaloScanRange& scan_range, + const std::vector<OlapScanRange*>& key_ranges) { + std::shared_ptr<NewOlapScanner> scanner = NewOlapScanner::create_shared( + _state, this, _limit_per_scanner, _olap_scan_node.is_preaggregation, scan_range, + key_ranges, _scanner_profile.get()); + + RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts)); + scanner->set_compound_filters(_compound_filters); + scanners->push_back(scanner); + return Status::OK(); + }; for (auto& scan_range : _scan_ranges) { auto tablet_id = scan_range->tablet_id; auto [tablet, status] = @@ -619,7 +633,7 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { ++j, ++i) { scanner_ranges.push_back((*ranges)[i].get()); } - RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, {}, {})); + RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges)); } } } diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index b945d42f6c..decf12ee91 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -63,17 +63,26 @@ namespace doris::vectorized { NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, bool aggregation, const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges, - const std::vector<RowsetReaderSharedPtr>& rs_readers, - const std::vector<std::pair<int, int>>& rs_reader_seg_offsets, RuntimeProfile* profile) : VScanner(state, static_cast<VScanNode*>(parent), limit, profile), _aggregation(aggregation), _version(-1), _scan_range(scan_range), _key_ranges(key_ranges) { - DCHECK(rs_readers.size() == rs_reader_seg_offsets.size()); - _tablet_reader_params.rs_readers = rs_readers; - _tablet_reader_params.rs_readers_segment_offsets = rs_reader_seg_offsets; + _tablet_schema = std::make_shared<TabletSchema>(); + _is_init = false; +} + +NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, + bool aggregation, const TPaloScanRange& scan_range, + const std::vector<OlapScanRange*>& key_ranges, + const std::vector<RowSetSplits>& rs_splits, RuntimeProfile* profile) + : VScanner(state, static_cast<VScanNode*>(parent), limit, profile), + _aggregation(aggregation), + _version(-1), + _scan_range(scan_range), + _key_ranges(key_ranges) { + _tablet_reader_params.rs_splits = rs_splits; _tablet_schema = std::make_shared<TabletSchema>(); _is_init = false; } @@ -167,7 +176,7 @@ Status NewOlapScanner::init() { { std::shared_lock rdlock(_tablet->get_header_lock()); - if (_tablet_reader_params.rs_readers.empty()) { + if (_tablet_reader_params.rs_splits.empty()) { const RowsetSharedPtr rowset = _tablet->rowset_with_max_version(); if (rowset == nullptr) { std::stringstream ss; @@ -181,7 +190,7 @@ Status NewOlapScanner::init() { // the rowsets maybe compacted when the last olap scanner starts Version rd_version(0, _version); Status acquire_reader_st = - _tablet->capture_rs_readers(rd_version, &_tablet_reader_params.rs_readers); + _tablet->capture_rs_readers(rd_version, &_tablet_reader_params.rs_splits); if (!acquire_reader_st.ok()) { LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st; std::stringstream ss; @@ -237,20 +246,7 @@ Status NewOlapScanner::_init_tablet_reader_params( const FilterPredicates& filter_predicates, const std::vector<FunctionFilter>& function_filters) { // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty - bool single_version = - (_tablet_reader_params.rs_readers.size() == 1 && - _tablet_reader_params.rs_readers[0]->rowset()->start_version() == 0 && - !_tablet_reader_params.rs_readers[0] - ->rowset() - ->rowset_meta() - ->is_segments_overlapping()) || - (_tablet_reader_params.rs_readers.size() == 2 && - _tablet_reader_params.rs_readers[0]->rowset()->rowset_meta()->num_rows() == 0 && - _tablet_reader_params.rs_readers[1]->rowset()->start_version() == 2 && - !_tablet_reader_params.rs_readers[1] - ->rowset() - ->rowset_meta() - ->is_segments_overlapping()); + const bool single_version = _tablet_reader_params.has_single_version(); auto real_parent = reinterpret_cast<NewOlapScanNode*>(_parent); if (_state->skip_storage_engine_merge()) { _tablet_reader_params.direct_mode = true; @@ -408,12 +404,13 @@ Status NewOlapScanner::_init_tablet_reader_params( // by rowset->update_delayed_expired_timestamp().This could expand the lifespan of Rowset if (_tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) { constexpr static int delayed_s = 60; - for (auto rs_reader : _tablet_reader_params.rs_readers) { + for (auto rs_reader : _tablet_reader_params.rs_splits) { uint64_t delayed_expired_timestamp = UnixSeconds() + _tablet_reader_params.runtime_state->execution_timeout() + delayed_s; - rs_reader->rowset()->update_delayed_expired_timestamp(delayed_expired_timestamp); - StorageEngine::instance()->add_quering_rowset(rs_reader->rowset()); + rs_reader.rs_reader->rowset()->update_delayed_expired_timestamp( + delayed_expired_timestamp); + StorageEngine::instance()->add_quering_rowset(rs_reader.rs_reader->rowset()); } } @@ -452,10 +449,10 @@ Status NewOlapScanner::_init_return_columns() { doris::TabletStorageType NewOlapScanner::get_storage_type() { int local_reader = 0; - for (const auto& reader : _tablet_reader_params.rs_readers) { - local_reader += reader->rowset()->is_local(); + for (const auto& reader : _tablet_reader_params.rs_splits) { + local_reader += reader.rs_reader->rowset()->is_local(); } - int total_reader = _tablet_reader_params.rs_readers.size(); + int total_reader = _tablet_reader_params.rs_splits.size(); if (local_reader == total_reader) { return doris::TabletStorageType::STORAGE_TYPE_LOCAL; @@ -491,7 +488,7 @@ Status NewOlapScanner::close(RuntimeState* state) { // readers will be release when runtime state deconstructed but // deconstructor in reader references runtime state // so that it will core - _tablet_reader_params.rs_readers.clear(); + _tablet_reader_params.rs_splits.clear(); _tablet_reader.reset(); RETURN_IF_ERROR(VScanner::close(state)); diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index f44e55ee37..a411056097 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -55,10 +55,12 @@ class NewOlapScanner : public VScanner { public: NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, bool aggregation, const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges, - const std::vector<RowsetReaderSharedPtr>& rs_readers, - const std::vector<std::pair<int, int>>& rs_reader_seg_offsets, RuntimeProfile* profile); + NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, bool aggregation, + const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges, + const std::vector<RowSetSplits>& rs_splits, RuntimeProfile* profile); + Status init() override; Status open(RuntimeState* state) override; diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index bf513d78b9..b37459974c 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -60,28 +60,51 @@ BlockReader::~BlockReader() { } } -bool BlockReader::_rowsets_overlapping(const std::vector<RowsetReaderSharedPtr>& rs_readers) { +bool BlockReader::_rowsets_overlapping(const ReaderParams& read_params) { std::string cur_max_key; - for (const auto& rs_reader : rs_readers) { + const std::vector<RowSetSplits>& rs_splits = read_params.rs_splits; + for (const auto& rs_split : rs_splits) { // version 0-1 of every tablet is empty, just skip this rowset - if (rs_reader->rowset()->version().second == 1) { + if (rs_split.rs_reader->rowset()->version().second == 1) { continue; } - if (rs_reader->rowset()->num_rows() == 0) { + if (rs_split.rs_reader->rowset()->num_rows() == 0) { continue; } - if (rs_reader->rowset()->is_segments_overlapping()) { + if (rs_split.rs_reader->rowset()->is_segments_overlapping()) { return true; } std::string min_key; - bool has_min_key = rs_reader->rowset()->min_key(&min_key); + bool has_min_key = rs_split.rs_reader->rowset()->min_key(&min_key); if (!has_min_key) { return true; } if (min_key <= cur_max_key) { return true; } - CHECK(rs_reader->rowset()->max_key(&cur_max_key)); + CHECK(rs_split.rs_reader->rowset()->max_key(&cur_max_key)); + } + + for (const auto& rs_reader : rs_splits) { + // version 0-1 of every tablet is empty, just skip this rowset + if (rs_reader.rs_reader->rowset()->version().second == 1) { + continue; + } + if (rs_reader.rs_reader->rowset()->num_rows() == 0) { + continue; + } + if (rs_reader.rs_reader->rowset()->is_segments_overlapping()) { + return true; + } + std::string min_key; + bool has_min_key = rs_reader.rs_reader->rowset()->min_key(&min_key); + if (!has_min_key) { + return true; + } + if (min_key <= cur_max_key) { + return true; + } + CHECK(rs_reader.rs_reader->rowset()->max_key(&cur_max_key)); } return false; } @@ -96,34 +119,28 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params) { return res; } // check if rowsets are noneoverlapping - _is_rowsets_overlapping = _rowsets_overlapping(read_params.rs_readers); + _is_rowsets_overlapping = _rowsets_overlapping(read_params); _vcollect_iter.init(this, _is_rowsets_overlapping, read_params.read_orderby_key, - read_params.read_orderby_key_reverse, - read_params.rs_readers_segment_offsets); + read_params.read_orderby_key_reverse); _reader_context.push_down_agg_type_opt = read_params.push_down_agg_type_opt; std::vector<RowsetReaderSharedPtr> valid_rs_readers; - DCHECK(read_params.rs_readers_segment_offsets.empty() || - read_params.rs_readers_segment_offsets.size() == read_params.rs_readers.size()); - bool is_empty = read_params.rs_readers_segment_offsets.empty(); - for (int i = 0; i < read_params.rs_readers.size(); ++i) { - auto& rs_reader = read_params.rs_readers[i]; + for (int i = 0; i < read_params.rs_splits.size(); ++i) { + auto& rs_split = read_params.rs_splits[i]; // _vcollect_iter.topn_next() will init rs_reader by itself if (!_vcollect_iter.use_topn_next()) { - RETURN_IF_ERROR(rs_reader->init( - &_reader_context, - is_empty ? std::pair {0, 0} : read_params.rs_readers_segment_offsets[i])); + RETURN_IF_ERROR(rs_split.rs_reader->init(&_reader_context, rs_split)); } - Status res = _vcollect_iter.add_child(rs_reader); + Status res = _vcollect_iter.add_child(rs_split); if (!res.ok() && !res.is<END_OF_FILE>()) { LOG(WARNING) << "failed to add child to iterator, err=" << res; return res; } if (res.ok()) { - valid_rs_readers.push_back(rs_reader); + valid_rs_readers.push_back(rs_split.rs_reader); } } diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h index bf9e20def4..0fe188419e 100644 --- a/be/src/vec/olap/block_reader.h +++ b/be/src/vec/olap/block_reader.h @@ -90,7 +90,7 @@ private: bool _get_next_row_same(); - bool _rowsets_overlapping(const std::vector<RowsetReaderSharedPtr>& rs_readers); + bool _rowsets_overlapping(const ReaderParams& read_params); VCollectIterator _vcollect_iter; IteratorRowRef _next_row {{}, -1, false}; diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index bca58c34f8..78fad44fb7 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -65,8 +65,7 @@ VCollectIterator::~VCollectIterator() { } void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, bool force_merge, - bool is_reverse, - std::vector<std::pair<int, int>> rs_readers_segment_offsets) { + bool is_reverse) { _reader = reader; // when aggregate is enabled or key_type is DUP_KEYS, we don't merge @@ -92,21 +91,18 @@ void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, boo (_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS && _reader->_tablet->enable_unique_key_merge_on_write()))) { _topn_limit = _reader->_reader_context.read_orderby_key_limit; - // When we use scanner pooling + query with topn_with_limit, we need it because we initialize our rs_reader - // in out method but not upstream user. At time we init readers, we will need to use it. - _rs_readers_segment_offsets = rs_readers_segment_offsets; } else { _topn_limit = 0; } } -Status VCollectIterator::add_child(RowsetReaderSharedPtr rs_reader) { +Status VCollectIterator::add_child(const RowSetSplits& rs_splits) { if (use_topn_next()) { - _rs_readers.push_back(rs_reader); + _rs_splits.push_back(rs_splits); return Status::OK(); } - std::unique_ptr<LevelIterator> child(new Level0Iterator(rs_reader, _reader)); + std::unique_ptr<LevelIterator> child(new Level0Iterator(rs_splits.rs_reader, _reader)); _children.push_back(child.release()); return Status::OK(); } @@ -288,23 +284,20 @@ Status VCollectIterator::_topn_next(Block* block) { row_pos_comparator); if (_is_reverse) { - std::reverse(_rs_readers.begin(), _rs_readers.end()); + std::reverse(_rs_splits.begin(), _rs_splits.end()); } - bool segment_empty = _rs_readers_segment_offsets.empty(); - for (size_t i = 0; i < _rs_readers.size(); i++) { - const auto& rs_reader = _rs_readers[i]; + for (size_t i = 0; i < _rs_splits.size(); i++) { + const auto& rs_split = _rs_splits[i]; // init will prune segment by _reader_context.conditions and _reader_context.runtime_conditions - RETURN_IF_ERROR( - rs_reader->init(&_reader->_reader_context, - segment_empty ? std::pair {0, 0} : _rs_readers_segment_offsets[i])); + RETURN_IF_ERROR(rs_split.rs_reader->init(&_reader->_reader_context, rs_split)); // read _topn_limit rows from this rs size_t read_rows = 0; bool eof = false; while (read_rows < _topn_limit && !eof) { block->clear_column_data(); - auto status = rs_reader->next_block(block); + auto status = rs_split.rs_reader->next_block(block); if (!status.ok()) { if (status.is<END_OF_FILE>()) { eof = true; diff --git a/be/src/vec/olap/vcollect_iterator.h b/be/src/vec/olap/vcollect_iterator.h index 449e1e05d2..7faa47ac2b 100644 --- a/be/src/vec/olap/vcollect_iterator.h +++ b/be/src/vec/olap/vcollect_iterator.h @@ -53,10 +53,9 @@ public: // Hold reader point to get reader params ~VCollectIterator(); - void init(TabletReader* reader, bool ori_data_overlapping, bool force_merge, bool is_reverse, - std::vector<std::pair<int, int>> rs_readers_segment_offsets); + void init(TabletReader* reader, bool ori_data_overlapping, bool force_merge, bool is_reverse); - Status add_child(RowsetReaderSharedPtr rs_reader); + Status add_child(const RowSetSplits& rs_splits); Status build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers); // Get top row of the heap, nullptr if reach end. @@ -330,9 +329,7 @@ private: // for topn next size_t _topn_limit = 0; bool _topn_eof = false; - // when we use scanner pooling + query with topn_with_limit, we use it. - std::vector<RowsetReaderSharedPtr> _rs_readers; - std::vector<std::pair<int, int>> _rs_readers_segment_offsets; + std::vector<RowSetSplits> _rs_splits; // Hold reader point to access read params, such as fetch conditions. TabletReader* _reader = nullptr; diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp index 09757226c3..3023f28408 100644 --- a/be/src/vec/olap/vertical_block_reader.cpp +++ b/be/src/vec/olap/vertical_block_reader.cpp @@ -63,23 +63,23 @@ Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_para return res; } _reader_context.is_vertical_compaction = true; - for (auto& rs_reader : read_params.rs_readers) { + for (auto& rs_split : read_params.rs_splits) { // segment iterator will be inited here // In vertical compaction, every group will load segment so we should cache // segment to avoid tot many s3 head request - RETURN_IF_ERROR( - rs_reader->get_segment_iterators(&_reader_context, segment_iters, {0, 0}, true)); + RETURN_IF_ERROR(rs_split.rs_reader->get_segment_iterators(&_reader_context, segment_iters, + {}, true)); // if segments overlapping, all segment iterator should be inited in // heap merge iterator. If segments are none overlapping, only first segment of this // rowset will be inited and push to heap, other segment will be inited later when current // segment reached it's end. // Use this iterator_init_flag so we can load few segments in HeapMergeIterator to save memory - if (rs_reader->rowset()->is_segments_overlapping()) { - for (int i = 0; i < rs_reader->rowset()->num_segments(); ++i) { + if (rs_split.rs_reader->rowset()->is_segments_overlapping()) { + for (int i = 0; i < rs_split.rs_reader->rowset()->num_segments(); ++i) { iterator_init_flag->push_back(true); } } else { - for (int i = 0; i < rs_reader->rowset()->num_segments(); ++i) { + for (int i = 0; i < rs_split.rs_reader->rowset()->num_segments(); ++i) { if (i == 0) { iterator_init_flag->push_back(true); continue; @@ -87,10 +87,10 @@ Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_para iterator_init_flag->push_back(false); } } - for (int i = 0; i < rs_reader->rowset()->num_segments(); ++i) { - rowset_ids->push_back(rs_reader->rowset()->rowset_id()); + for (int i = 0; i < rs_split.rs_reader->rowset()->num_segments(); ++i) { + rowset_ids->push_back(rs_split.rs_reader->rowset()->rowset_id()); } - rs_reader->reset_read_options(); + rs_split.rs_reader->reset_read_options(); } return Status::OK(); } diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp index 3ae55f9743..838db9b0d6 100644 --- a/be/test/olap/tablet_test.cpp +++ b/be/test/olap/tablet_test.cpp @@ -278,13 +278,13 @@ TEST_F(TestTablet, pad_rowset) { _tablet->init(); Version version(5, 5); - std::vector<RowsetReaderSharedPtr> readers; - ASSERT_FALSE(_tablet->capture_rs_readers(version, &readers).ok()); - readers.clear(); + std::vector<RowSetSplits> splits; + ASSERT_FALSE(_tablet->capture_rs_readers(version, &splits).ok()); + splits.clear(); PadRowsetAction action(nullptr, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN); action._pad_rowset(_tablet, version); - ASSERT_TRUE(_tablet->capture_rs_readers(version, &readers).ok()); + ASSERT_TRUE(_tablet->capture_rs_readers(version, &splits).ok()); } TEST_F(TestTablet, cooldown_policy) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org