zhannngchen commented on code in PR #12866: URL: https://github.com/apache/doris/pull/12866#discussion_r977716456
########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); +} + +// todo: will rename only do the job? maybe need deep modification +void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) { + int ret; + auto src_seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + if (src_seg_path.compare(dst_seg_path) != 0) { + CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == _segid_statistics_map.end(), false); Review Comment: DCHECK_EQ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org