zhannngchen commented on code in PR #12866: URL: https://github.com/apache/doris/pull/12866#discussion_r977743775
########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); +} + +// todo: will rename only do the job? maybe need deep modification +void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) { + int ret; + auto src_seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + if (src_seg_path.compare(dst_seg_path) != 0) { + CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == _segid_statistics_map.end(), false); + CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), + true); + statistics org = _segid_statistics_map[seg_id + 1]; + _segid_statistics_map.emplace(_num_segcompacted, org); + clear_statistics_for_deleting_segments(seg_id, seg_id); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); + } +} + +void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, uint64_t end) { + LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin + 1 + << " to:" << end + 1; + for (int i = begin; i <= end; ++i) { + _segid_statistics_map.erase(i + 1); + } +} + +Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr segments) { + uint64_t begin = (*(segments->begin()))->id(); + uint64_t end = (*(segments->end() - 1))->id(); + LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size() + << " segments. Begin:" << begin << " End:" << end; + uint64_t begin_time = GetCurrentTimeMicros(); + + auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(), + _context.tablet_schema->columns().size()); + std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics()); + vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, schema, stat.get()); + auto writer = create_segcompaction_writer(begin, end); + assert(writer != nullptr); + vectorized::Block block = _context.tablet_schema->create_block(); + while (true) { + auto status = reader->next_batch(&block); + if (status != Status::OK()) { + assert(status.is_end_of_file()); + break; + } + _add_block_for_segcompaction(&block, &writer); + block.clear_column_data(); + } + clear_statistics_for_deleting_segments(begin, end); + RETURN_NOT_OK(_flush_segment_writer(&writer)); + delete_original_segments(begin, end); + rename_compacted_segments(begin, end); + + if (VLOG_DEBUG_IS_ON) { + std::stringstream ss; + for (const auto& entry : std::filesystem::directory_iterator(_context.tablet_path)) { + ss << "[" << entry.path() << "]"; + } + VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point + << " _num_segment:" << _num_segment << " _num_segcompacted:" << _num_segcompacted + << " list directory:" << ss.str(); + } + + CHECK_EQ(_is_doing_segcompaction, true); + _is_doing_segcompaction = false; + _segcompacting_cond.notify_all(); + + uint64_t elapsed = GetCurrentTimeMicros() - begin_time; + LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. elapsed time:" << elapsed + << "us"; + + return Status::OK(); +} + +Status BetaRowsetWriter::load_noncompacted_segments( + std::vector<segment_v2::SegmentSharedPtr>* segments) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) { + auto seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto cache_path = + BetaRowset::local_cache_path(_context.tablet_path, _context.rowset_id, seg_id); + std::shared_ptr<segment_v2::Segment> segment; + auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, _context.tablet_schema, + &segment); + if (!s.ok()) { + LOG(WARNING) << "failed to open segment. " << seg_path << ":" << s.to_string(); + return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED); + } + segments->push_back(std::move(segment)); + } + return Status::OK(); +} + +void BetaRowsetWriter::find_longest_consecutive_small_segment( + SegCompactionCandidatesSharedPtr segments) { + std::vector<segment_v2::SegmentSharedPtr> all_segments; + load_noncompacted_segments(&all_segments); + + std::stringstream ss_all; + for (auto& segment : all_segments) { + ss_all << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]"; + } + LOG(INFO) << "all noncompacted segments num:" << all_segments.size() + << " list of segments:" << ss_all.str(); + + bool is_terminated_by_big = false; + bool let_big_terminate = false; + size_t small_threshold = config::segcompaction_small_threshold; + for (int64_t i = 0; i < all_segments.size(); ++i) { + segment_v2::SegmentSharedPtr seg = all_segments[i]; + if (seg->num_rows() > small_threshold) { + if (let_big_terminate) { + is_terminated_by_big = true; + break; + } else { + rename_compacted_segment_plain(_segcompacted_point); + ++_segcompacted_point; + } + } else { + let_big_terminate = true; // break if find a big after small + segments->push_back(seg); + ++_segcompacted_point; + } + } + size_t s = segments->size(); + if (!is_terminated_by_big && s <= (config::segcompaction_threshold_segment_num / 2)) { + // start with big segments and end with small, better to do it in next + // round to compact more at once + _segcompacted_point -= s; + segments->clear(); + LOG(INFO) << "candidate segments num too small:" << s; + return; + } + if (s == 1) { // poor bachelor, let it go + LOG(INFO) << "only one candidate segment"; + rename_compacted_segment_plain(_segcompacted_point - 1); + segments->clear(); + return; + } + std::stringstream ss; + for (auto& segment : (*segments.get())) { + ss << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]"; + } + LOG(INFO) << "candidate segments num:" << s << " list of candidates:" << ss.str(); +} + +SegCompactionCandidatesSharedPtr BetaRowsetWriter::get_segcompaction_candidates(bool is_last) { + SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>(); + if (is_last) { Review Comment: skip last several small segments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org