zhannngchen commented on code in PR #12866: URL: https://github.com/apache/doris/pull/12866#discussion_r977657612
########## be/src/olap/olap_server.cpp: ########## @@ -700,6 +708,19 @@ Status StorageEngine::submit_quick_compaction_task(TabletSharedPtr tablet) { return Status::OK(); } +Status StorageEngine::_handle_seg_compaction(BetaRowsetWriter* writer, + SegCompactionCandidatesSharedPtr segments) { + writer->do_segcompaction(segments); + return Status::OK(); +} + +Status StorageEngine::submit_seg_compaction_task(BetaRowsetWriter* writer, + SegCompactionCandidatesSharedPtr segments) { + _seg_compaction_thread_pool->submit_func( Review Comment: ditto ########## be/src/olap/olap_server.cpp: ########## @@ -700,6 +708,19 @@ Status StorageEngine::submit_quick_compaction_task(TabletSharedPtr tablet) { return Status::OK(); } +Status StorageEngine::_handle_seg_compaction(BetaRowsetWriter* writer, + SegCompactionCandidatesSharedPtr segments) { + writer->do_segcompaction(segments); Review Comment: should be `return writer->do_segcompaction(segments);` ? ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); Review Comment: should return here? ########## be/src/common/config.h: ########## @@ -875,6 +878,12 @@ CONF_Bool(enable_new_load_scan_node, "false"); // Temp config. True to use new file scanner. Will remove after fully test. CONF_Bool(enable_new_file_scanner, "false"); +CONF_Bool(enable_segcompaction, "false"); // currently only support vectorized storage +// Trigger segcompaction if the num of segments in a rowset exceeds this threshold. +CONF_Int32(segcompaction_threshold_segment_num, "10"); + +CONF_Int32(segcompaction_small_threshold, "1000000"); Review Comment: use 1048576 instead. ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) Review Comment: `if (writer != nullptr && writer->get_data_dir())` ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); +} + +// todo: will rename only do the job? maybe need deep modification +void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) { + int ret; + auto src_seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + if (src_seg_path.compare(dst_seg_path) != 0) { + CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == _segid_statistics_map.end(), false); + CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), + true); + statistics org = _segid_statistics_map[seg_id + 1]; + _segid_statistics_map.emplace(_num_segcompacted, org); + clear_statistics_for_deleting_segments(seg_id, seg_id); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); + } +} + +void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, uint64_t end) { + LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin + 1 + << " to:" << end + 1; + for (int i = begin; i <= end; ++i) { + _segid_statistics_map.erase(i + 1); + } +} + +Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr segments) { + uint64_t begin = (*(segments->begin()))->id(); + uint64_t end = (*(segments->end() - 1))->id(); + LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size() + << " segments. Begin:" << begin << " End:" << end; + uint64_t begin_time = GetCurrentTimeMicros(); + + auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(), + _context.tablet_schema->columns().size()); + std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics()); + vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, schema, stat.get()); + auto writer = create_segcompaction_writer(begin, end); + assert(writer != nullptr); + vectorized::Block block = _context.tablet_schema->create_block(); + while (true) { + auto status = reader->next_batch(&block); + if (status != Status::OK()) { + assert(status.is_end_of_file()); + break; + } + _add_block_for_segcompaction(&block, &writer); + block.clear_column_data(); + } + clear_statistics_for_deleting_segments(begin, end); + RETURN_NOT_OK(_flush_segment_writer(&writer)); + delete_original_segments(begin, end); Review Comment: RETURN_IF_ERROR ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); +} + +// todo: will rename only do the job? maybe need deep modification +void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) { + int ret; + auto src_seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + if (src_seg_path.compare(dst_seg_path) != 0) { Review Comment: compare `seg_id` and `_num_segcompacted` is faster? ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); +} + +// todo: will rename only do the job? maybe need deep modification Review Comment: What's this comment mean? ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); +} + +// todo: will rename only do the job? maybe need deep modification +void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) { + int ret; + auto src_seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + if (src_seg_path.compare(dst_seg_path) != 0) { + CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == _segid_statistics_map.end(), false); + CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), + true); + statistics org = _segid_statistics_map[seg_id + 1]; + _segid_statistics_map.emplace(_num_segcompacted, org); + clear_statistics_for_deleting_segments(seg_id, seg_id); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); + } +} + +void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, uint64_t end) { + LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin + 1 + << " to:" << end + 1; + for (int i = begin; i <= end; ++i) { + _segid_statistics_map.erase(i + 1); + } +} + +Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr segments) { + uint64_t begin = (*(segments->begin()))->id(); + uint64_t end = (*(segments->end() - 1))->id(); + LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size() + << " segments. Begin:" << begin << " End:" << end; + uint64_t begin_time = GetCurrentTimeMicros(); + + auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(), + _context.tablet_schema->columns().size()); + std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics()); + vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, schema, stat.get()); + auto writer = create_segcompaction_writer(begin, end); + assert(writer != nullptr); Review Comment: We should not crash the whole process just due to some error happens during segment compaction ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); +} + +// todo: will rename only do the job? maybe need deep modification +void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) { + int ret; + auto src_seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + if (src_seg_path.compare(dst_seg_path) != 0) { + CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == _segid_statistics_map.end(), false); + CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), + true); + statistics org = _segid_statistics_map[seg_id + 1]; + _segid_statistics_map.emplace(_num_segcompacted, org); + clear_statistics_for_deleting_segments(seg_id, seg_id); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); Review Comment: ditto, we should also let caller know there's some thing wrong. ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); +} + +// todo: will rename only do the job? maybe need deep modification +void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) { + int ret; + auto src_seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + if (src_seg_path.compare(dst_seg_path) != 0) { + CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == _segid_statistics_map.end(), false); + CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), + true); + statistics org = _segid_statistics_map[seg_id + 1]; + _segid_statistics_map.emplace(_num_segcompacted, org); + clear_statistics_for_deleting_segments(seg_id, seg_id); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); + } +} + +void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, uint64_t end) { + LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin + 1 + << " to:" << end + 1; + for (int i = begin; i <= end; ++i) { + _segid_statistics_map.erase(i + 1); + } +} + +Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr segments) { + uint64_t begin = (*(segments->begin()))->id(); + uint64_t end = (*(segments->end() - 1))->id(); + LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size() + << " segments. Begin:" << begin << " End:" << end; + uint64_t begin_time = GetCurrentTimeMicros(); + + auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(), + _context.tablet_schema->columns().size()); + std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics()); + vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, schema, stat.get()); + auto writer = create_segcompaction_writer(begin, end); + assert(writer != nullptr); + vectorized::Block block = _context.tablet_schema->create_block(); + while (true) { + auto status = reader->next_batch(&block); + if (status != Status::OK()) { + assert(status.is_end_of_file()); Review Comment: ditto, don't use assert easily. ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); +} + +// todo: will rename only do the job? maybe need deep modification +void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) { + int ret; + auto src_seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + if (src_seg_path.compare(dst_seg_path) != 0) { + CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == _segid_statistics_map.end(), false); + CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), + true); + statistics org = _segid_statistics_map[seg_id + 1]; + _segid_statistics_map.emplace(_num_segcompacted, org); + clear_statistics_for_deleting_segments(seg_id, seg_id); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); + } +} + +void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, uint64_t end) { + LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin + 1 + << " to:" << end + 1; + for (int i = begin; i <= end; ++i) { + _segid_statistics_map.erase(i + 1); + } +} + +Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr segments) { + uint64_t begin = (*(segments->begin()))->id(); + uint64_t end = (*(segments->end() - 1))->id(); + LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size() + << " segments. Begin:" << begin << " End:" << end; + uint64_t begin_time = GetCurrentTimeMicros(); + + auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(), + _context.tablet_schema->columns().size()); + std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics()); + vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, schema, stat.get()); + auto writer = create_segcompaction_writer(begin, end); + assert(writer != nullptr); + vectorized::Block block = _context.tablet_schema->create_block(); + while (true) { + auto status = reader->next_batch(&block); + if (status != Status::OK()) { + assert(status.is_end_of_file()); + break; + } + _add_block_for_segcompaction(&block, &writer); + block.clear_column_data(); + } + clear_statistics_for_deleting_segments(begin, end); + RETURN_NOT_OK(_flush_segment_writer(&writer)); + delete_original_segments(begin, end); + rename_compacted_segments(begin, end); + + if (VLOG_DEBUG_IS_ON) { + std::stringstream ss; + for (const auto& entry : std::filesystem::directory_iterator(_context.tablet_path)) { + ss << "[" << entry.path() << "]"; + } + VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point + << " _num_segment:" << _num_segment << " _num_segcompacted:" << _num_segcompacted + << " list directory:" << ss.str(); + } + + CHECK_EQ(_is_doing_segcompaction, true); + _is_doing_segcompaction = false; + _segcompacting_cond.notify_all(); + + uint64_t elapsed = GetCurrentTimeMicros() - begin_time; + LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. elapsed time:" << elapsed + << "us"; + + return Status::OK(); +} + +Status BetaRowsetWriter::load_noncompacted_segments( + std::vector<segment_v2::SegmentSharedPtr>* segments) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) { + auto seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto cache_path = + BetaRowset::local_cache_path(_context.tablet_path, _context.rowset_id, seg_id); + std::shared_ptr<segment_v2::Segment> segment; + auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, _context.tablet_schema, + &segment); + if (!s.ok()) { + LOG(WARNING) << "failed to open segment. " << seg_path << ":" << s.to_string(); + return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED); + } + segments->push_back(std::move(segment)); + } + return Status::OK(); +} + +void BetaRowsetWriter::find_longest_consecutive_small_segment( + SegCompactionCandidatesSharedPtr segments) { + std::vector<segment_v2::SegmentSharedPtr> all_segments; + load_noncompacted_segments(&all_segments); + + std::stringstream ss_all; + for (auto& segment : all_segments) { + ss_all << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]"; + } + LOG(INFO) << "all noncompacted segments num:" << all_segments.size() + << " list of segments:" << ss_all.str(); + + bool is_terminated_by_big = false; + bool let_big_terminate = false; + size_t small_threshold = config::segcompaction_small_threshold; + for (int64_t i = 0; i < all_segments.size(); ++i) { Review Comment: We'd better to add some comments for this strategy. ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); +} + +// todo: will rename only do the job? maybe need deep modification +void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) { + int ret; + auto src_seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + if (src_seg_path.compare(dst_seg_path) != 0) { + CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == _segid_statistics_map.end(), false); + CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), + true); + statistics org = _segid_statistics_map[seg_id + 1]; + _segid_statistics_map.emplace(_num_segcompacted, org); + clear_statistics_for_deleting_segments(seg_id, seg_id); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); + } +} + +void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, uint64_t end) { + LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin + 1 + << " to:" << end + 1; + for (int i = begin; i <= end; ++i) { + _segid_statistics_map.erase(i + 1); + } +} + +Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr segments) { + uint64_t begin = (*(segments->begin()))->id(); + uint64_t end = (*(segments->end() - 1))->id(); + LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size() + << " segments. Begin:" << begin << " End:" << end; + uint64_t begin_time = GetCurrentTimeMicros(); + + auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(), + _context.tablet_schema->columns().size()); + std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics()); + vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, schema, stat.get()); + auto writer = create_segcompaction_writer(begin, end); + assert(writer != nullptr); + vectorized::Block block = _context.tablet_schema->create_block(); + while (true) { + auto status = reader->next_batch(&block); + if (status != Status::OK()) { + assert(status.is_end_of_file()); + break; + } + _add_block_for_segcompaction(&block, &writer); + block.clear_column_data(); + } + clear_statistics_for_deleting_segments(begin, end); + RETURN_NOT_OK(_flush_segment_writer(&writer)); + delete_original_segments(begin, end); + rename_compacted_segments(begin, end); + + if (VLOG_DEBUG_IS_ON) { + std::stringstream ss; + for (const auto& entry : std::filesystem::directory_iterator(_context.tablet_path)) { + ss << "[" << entry.path() << "]"; + } + VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point + << " _num_segment:" << _num_segment << " _num_segcompacted:" << _num_segcompacted + << " list directory:" << ss.str(); + } + + CHECK_EQ(_is_doing_segcompaction, true); + _is_doing_segcompaction = false; + _segcompacting_cond.notify_all(); + + uint64_t elapsed = GetCurrentTimeMicros() - begin_time; + LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. elapsed time:" << elapsed + << "us"; + + return Status::OK(); +} + +Status BetaRowsetWriter::load_noncompacted_segments( + std::vector<segment_v2::SegmentSharedPtr>* segments) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) { + auto seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto cache_path = + BetaRowset::local_cache_path(_context.tablet_path, _context.rowset_id, seg_id); + std::shared_ptr<segment_v2::Segment> segment; + auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, _context.tablet_schema, + &segment); + if (!s.ok()) { + LOG(WARNING) << "failed to open segment. " << seg_path << ":" << s.to_string(); + return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED); + } + segments->push_back(std::move(segment)); + } + return Status::OK(); +} + +void BetaRowsetWriter::find_longest_consecutive_small_segment( + SegCompactionCandidatesSharedPtr segments) { + std::vector<segment_v2::SegmentSharedPtr> all_segments; + load_noncompacted_segments(&all_segments); + + std::stringstream ss_all; + for (auto& segment : all_segments) { + ss_all << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]"; + } + LOG(INFO) << "all noncompacted segments num:" << all_segments.size() + << " list of segments:" << ss_all.str(); + + bool is_terminated_by_big = false; + bool let_big_terminate = false; + size_t small_threshold = config::segcompaction_small_threshold; + for (int64_t i = 0; i < all_segments.size(); ++i) { + segment_v2::SegmentSharedPtr seg = all_segments[i]; + if (seg->num_rows() > small_threshold) { + if (let_big_terminate) { + is_terminated_by_big = true; + break; + } else { + rename_compacted_segment_plain(_segcompacted_point); + ++_segcompacted_point; + } + } else { + let_big_terminate = true; // break if find a big after small + segments->push_back(seg); + ++_segcompacted_point; + } + } + size_t s = segments->size(); + if (!is_terminated_by_big && s <= (config::segcompaction_threshold_segment_num / 2)) { + // start with big segments and end with small, better to do it in next + // round to compact more at once + _segcompacted_point -= s; + segments->clear(); + LOG(INFO) << "candidate segments num too small:" << s; Review Comment: should not be INFO, it's too frequent, user will be confused to see lots of this log... ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); +} + +// todo: will rename only do the job? maybe need deep modification +void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) { + int ret; + auto src_seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + if (src_seg_path.compare(dst_seg_path) != 0) { + CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == _segid_statistics_map.end(), false); + CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), + true); + statistics org = _segid_statistics_map[seg_id + 1]; + _segid_statistics_map.emplace(_num_segcompacted, org); + clear_statistics_for_deleting_segments(seg_id, seg_id); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); + } +} + +void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, uint64_t end) { + LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin + 1 + << " to:" << end + 1; + for (int i = begin; i <= end; ++i) { + _segid_statistics_map.erase(i + 1); + } +} + +Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr segments) { + uint64_t begin = (*(segments->begin()))->id(); + uint64_t end = (*(segments->end() - 1))->id(); + LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size() + << " segments. Begin:" << begin << " End:" << end; + uint64_t begin_time = GetCurrentTimeMicros(); + + auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(), + _context.tablet_schema->columns().size()); + std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics()); + vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, schema, stat.get()); + auto writer = create_segcompaction_writer(begin, end); + assert(writer != nullptr); + vectorized::Block block = _context.tablet_schema->create_block(); + while (true) { + auto status = reader->next_batch(&block); + if (status != Status::OK()) { + assert(status.is_end_of_file()); + break; + } + _add_block_for_segcompaction(&block, &writer); + block.clear_column_data(); + } + clear_statistics_for_deleting_segments(begin, end); + RETURN_NOT_OK(_flush_segment_writer(&writer)); + delete_original_segments(begin, end); + rename_compacted_segments(begin, end); + + if (VLOG_DEBUG_IS_ON) { + std::stringstream ss; + for (const auto& entry : std::filesystem::directory_iterator(_context.tablet_path)) { + ss << "[" << entry.path() << "]"; + } + VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point + << " _num_segment:" << _num_segment << " _num_segcompacted:" << _num_segcompacted + << " list directory:" << ss.str(); + } + + CHECK_EQ(_is_doing_segcompaction, true); + _is_doing_segcompaction = false; + _segcompacting_cond.notify_all(); + + uint64_t elapsed = GetCurrentTimeMicros() - begin_time; + LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. elapsed time:" << elapsed + << "us"; + + return Status::OK(); +} + +Status BetaRowsetWriter::load_noncompacted_segments( + std::vector<segment_v2::SegmentSharedPtr>* segments) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) { + auto seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto cache_path = + BetaRowset::local_cache_path(_context.tablet_path, _context.rowset_id, seg_id); + std::shared_ptr<segment_v2::Segment> segment; + auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, _context.tablet_schema, + &segment); + if (!s.ok()) { + LOG(WARNING) << "failed to open segment. " << seg_path << ":" << s.to_string(); + return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED); + } + segments->push_back(std::move(segment)); + } + return Status::OK(); +} + +void BetaRowsetWriter::find_longest_consecutive_small_segment( + SegCompactionCandidatesSharedPtr segments) { + std::vector<segment_v2::SegmentSharedPtr> all_segments; + load_noncompacted_segments(&all_segments); + + std::stringstream ss_all; + for (auto& segment : all_segments) { + ss_all << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]"; + } + LOG(INFO) << "all noncompacted segments num:" << all_segments.size() + << " list of segments:" << ss_all.str(); + + bool is_terminated_by_big = false; + bool let_big_terminate = false; + size_t small_threshold = config::segcompaction_small_threshold; + for (int64_t i = 0; i < all_segments.size(); ++i) { + segment_v2::SegmentSharedPtr seg = all_segments[i]; + if (seg->num_rows() > small_threshold) { + if (let_big_terminate) { + is_terminated_by_big = true; + break; + } else { + rename_compacted_segment_plain(_segcompacted_point); + ++_segcompacted_point; + } + } else { + let_big_terminate = true; // break if find a big after small + segments->push_back(seg); + ++_segcompacted_point; + } + } + size_t s = segments->size(); + if (!is_terminated_by_big && s <= (config::segcompaction_threshold_segment_num / 2)) { + // start with big segments and end with small, better to do it in next + // round to compact more at once + _segcompacted_point -= s; + segments->clear(); + LOG(INFO) << "candidate segments num too small:" << s; + return; + } + if (s == 1) { // poor bachelor, let it go + LOG(INFO) << "only one candidate segment"; + rename_compacted_segment_plain(_segcompacted_point - 1); Review Comment: handle error ########## be/src/olap/rowset/beta_rowset_writer.h: ########## @@ -73,42 +77,81 @@ class BetaRowsetWriter : public RowsetWriter { return Status::OK(); } + Status do_segcompaction(SegCompactionCandidatesSharedPtr segments); + private: template <typename RowType> Status _add_row(const RowType& row); Status _add_block(const vectorized::Block* block, std::unique_ptr<segment_v2::SegmentWriter>* writer); + Status _add_block_for_segcompaction(const vectorized::Block* block, + std::unique_ptr<segment_v2::SegmentWriter>* writer); + Status _do_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer, + bool is_segcompaction, int64_t begin, int64_t end); Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer); + Status _create_segment_writer_for_segcompaction( + std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t begin, uint64_t end); Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer); void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta); + void segcompaction_if_necessary(); + void segcompaction_ramaining_if_necessary(); + vectorized::VMergeIterator* get_segcompaction_reader(SegCompactionCandidatesSharedPtr segments, + std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat); + std::unique_ptr<segment_v2::SegmentWriter> create_segcompaction_writer(uint64_t begin, + uint64_t end); + Status delete_original_segments(uint32_t begin, uint32_t end); + void rename_compacted_segments(int64_t begin, int64_t end); + void rename_compacted_segment_plain(uint64_t seg_id); + Status load_noncompacted_segments(std::vector<segment_v2::SegmentSharedPtr>* segments); + void find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr segments); + SegCompactionCandidatesSharedPtr get_segcompaction_candidates(bool is_last); + void wait_flying_segcompaction(); + bool is_segcompacted() { return (_num_segcompacted > 0) ? true : false; } + void clear_statistics_for_deleting_segments(uint64_t begin, uint64_t end); private: RowsetWriterContext _context; std::shared_ptr<RowsetMeta> _rowset_meta; std::atomic<int32_t> _num_segment; + std::atomic<int32_t> _segcompacted_point; // segemnts before this point have + // already been segment compacted + std::atomic<int32_t> _num_segcompacted; // index for segment compaction /// When flushing the memtable in the load process, we do not use this writer but an independent writer. /// Because we want to flush memtables in parallel. /// In other processes, such as merger or schema change, we will use this unified writer for data writing. std::unique_ptr<segment_v2::SegmentWriter> _segment_writer; - mutable SpinLock _lock; // lock to protect _wblocks. - std::vector<io::FileWriterPtr> _file_writers; + mutable SpinLock _lock; // lock to protect _wblocks. + io::FileWriterPtr _file_writer; // writer for current active segment + io::FileWriterPtr _segcompaction_file_writer; // counters and statistics maintained during data write std::atomic<int64_t> _num_rows_written; std::atomic<int64_t> _total_data_size; std::atomic<int64_t> _total_index_size; // TODO rowset Zonemap + struct statistics { + int64_t row_num; + int64_t data_size; + int64_t index_size; + KeyBoundsPB key_bounds; + }; + std::map<uint32_t, statistics> _segid_statistics_map; Review Comment: map is not thread safe, we should hold a lock when writing it. ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { Review Comment: If this method failed, we should not allow the load process success? this method should return status as well? ########## be/src/olap/rowset/beta_rowset_writer.h: ########## @@ -73,42 +77,81 @@ class BetaRowsetWriter : public RowsetWriter { return Status::OK(); } + Status do_segcompaction(SegCompactionCandidatesSharedPtr segments); + private: template <typename RowType> Status _add_row(const RowType& row); Status _add_block(const vectorized::Block* block, std::unique_ptr<segment_v2::SegmentWriter>* writer); + Status _add_block_for_segcompaction(const vectorized::Block* block, + std::unique_ptr<segment_v2::SegmentWriter>* writer); + Status _do_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer, + bool is_segcompaction, int64_t begin, int64_t end); Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer); + Status _create_segment_writer_for_segcompaction( + std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t begin, uint64_t end); Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer); void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta); + void segcompaction_if_necessary(); Review Comment: private method should start with _ ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -309,12 +641,23 @@ Status BetaRowsetWriter::_create_segment_writer( DCHECK(file_writer != nullptr); segment_v2::SegmentWriterOptions writer_options; writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; - writer->reset(new segment_v2::SegmentWriter(file_writer.get(), _num_segment, - _context.tablet_schema, _context.data_dir, - _context.max_rows_per_segment, writer_options)); - { - std::lock_guard<SpinLock> l(_lock); - _file_writers.push_back(std::move(file_writer)); + + if (is_segcompaction) { + writer->reset(new segment_v2::SegmentWriter(file_writer.get(), _num_segcompacted + 1, Review Comment: The only difference of these 2 branch is the parameter sgement_id? ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); +} + +// todo: will rename only do the job? maybe need deep modification +void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) { + int ret; + auto src_seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + if (src_seg_path.compare(dst_seg_path) != 0) { + CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == _segid_statistics_map.end(), false); + CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), + true); + statistics org = _segid_statistics_map[seg_id + 1]; + _segid_statistics_map.emplace(_num_segcompacted, org); + clear_statistics_for_deleting_segments(seg_id, seg_id); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); + } +} + +void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, uint64_t end) { + LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin + 1 + << " to:" << end + 1; + for (int i = begin; i <= end; ++i) { + _segid_statistics_map.erase(i + 1); + } +} + +Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr segments) { + uint64_t begin = (*(segments->begin()))->id(); + uint64_t end = (*(segments->end() - 1))->id(); + LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size() + << " segments. Begin:" << begin << " End:" << end; + uint64_t begin_time = GetCurrentTimeMicros(); + + auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(), + _context.tablet_schema->columns().size()); + std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics()); + vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, schema, stat.get()); + auto writer = create_segcompaction_writer(begin, end); + assert(writer != nullptr); + vectorized::Block block = _context.tablet_schema->create_block(); + while (true) { + auto status = reader->next_batch(&block); + if (status != Status::OK()) { + assert(status.is_end_of_file()); + break; + } + _add_block_for_segcompaction(&block, &writer); + block.clear_column_data(); + } + clear_statistics_for_deleting_segments(begin, end); + RETURN_NOT_OK(_flush_segment_writer(&writer)); + delete_original_segments(begin, end); + rename_compacted_segments(begin, end); + + if (VLOG_DEBUG_IS_ON) { + std::stringstream ss; + for (const auto& entry : std::filesystem::directory_iterator(_context.tablet_path)) { + ss << "[" << entry.path() << "]"; + } + VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point + << " _num_segment:" << _num_segment << " _num_segcompacted:" << _num_segcompacted + << " list directory:" << ss.str(); + } + + CHECK_EQ(_is_doing_segcompaction, true); + _is_doing_segcompaction = false; + _segcompacting_cond.notify_all(); + + uint64_t elapsed = GetCurrentTimeMicros() - begin_time; + LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. elapsed time:" << elapsed + << "us"; + + return Status::OK(); +} + +Status BetaRowsetWriter::load_noncompacted_segments( + std::vector<segment_v2::SegmentSharedPtr>* segments) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); Review Comment: the returned error status seems useless? ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); +} + +// todo: will rename only do the job? maybe need deep modification +void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) { + int ret; + auto src_seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + if (src_seg_path.compare(dst_seg_path) != 0) { + CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == _segid_statistics_map.end(), false); + CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), + true); + statistics org = _segid_statistics_map[seg_id + 1]; + _segid_statistics_map.emplace(_num_segcompacted, org); + clear_statistics_for_deleting_segments(seg_id, seg_id); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); + } +} + +void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, uint64_t end) { + LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin + 1 + << " to:" << end + 1; + for (int i = begin; i <= end; ++i) { + _segid_statistics_map.erase(i + 1); + } +} + +Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr segments) { + uint64_t begin = (*(segments->begin()))->id(); + uint64_t end = (*(segments->end() - 1))->id(); + LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size() + << " segments. Begin:" << begin << " End:" << end; + uint64_t begin_time = GetCurrentTimeMicros(); + + auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(), + _context.tablet_schema->columns().size()); + std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics()); + vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, schema, stat.get()); + auto writer = create_segcompaction_writer(begin, end); + assert(writer != nullptr); + vectorized::Block block = _context.tablet_schema->create_block(); + while (true) { + auto status = reader->next_batch(&block); + if (status != Status::OK()) { + assert(status.is_end_of_file()); + break; + } + _add_block_for_segcompaction(&block, &writer); + block.clear_column_data(); + } + clear_statistics_for_deleting_segments(begin, end); + RETURN_NOT_OK(_flush_segment_writer(&writer)); + delete_original_segments(begin, end); + rename_compacted_segments(begin, end); + + if (VLOG_DEBUG_IS_ON) { + std::stringstream ss; + for (const auto& entry : std::filesystem::directory_iterator(_context.tablet_path)) { + ss << "[" << entry.path() << "]"; + } + VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point + << " _num_segment:" << _num_segment << " _num_segcompacted:" << _num_segcompacted + << " list directory:" << ss.str(); + } + + CHECK_EQ(_is_doing_segcompaction, true); + _is_doing_segcompaction = false; + _segcompacting_cond.notify_all(); + + uint64_t elapsed = GetCurrentTimeMicros() - begin_time; + LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. elapsed time:" << elapsed + << "us"; + + return Status::OK(); +} + +Status BetaRowsetWriter::load_noncompacted_segments( + std::vector<segment_v2::SegmentSharedPtr>* segments) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) { + auto seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto cache_path = + BetaRowset::local_cache_path(_context.tablet_path, _context.rowset_id, seg_id); + std::shared_ptr<segment_v2::Segment> segment; + auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, _context.tablet_schema, + &segment); + if (!s.ok()) { + LOG(WARNING) << "failed to open segment. " << seg_path << ":" << s.to_string(); + return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED); + } + segments->push_back(std::move(segment)); + } + return Status::OK(); +} + +void BetaRowsetWriter::find_longest_consecutive_small_segment( + SegCompactionCandidatesSharedPtr segments) { + std::vector<segment_v2::SegmentSharedPtr> all_segments; + load_noncompacted_segments(&all_segments); + + std::stringstream ss_all; + for (auto& segment : all_segments) { + ss_all << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]"; + } + LOG(INFO) << "all noncompacted segments num:" << all_segments.size() + << " list of segments:" << ss_all.str(); + + bool is_terminated_by_big = false; + bool let_big_terminate = false; + size_t small_threshold = config::segcompaction_small_threshold; + for (int64_t i = 0; i < all_segments.size(); ++i) { + segment_v2::SegmentSharedPtr seg = all_segments[i]; + if (seg->num_rows() > small_threshold) { + if (let_big_terminate) { + is_terminated_by_big = true; + break; + } else { + rename_compacted_segment_plain(_segcompacted_point); + ++_segcompacted_point; + } + } else { + let_big_terminate = true; // break if find a big after small + segments->push_back(seg); + ++_segcompacted_point; Review Comment: Is it better to update _segcompacted_point after the segment compaction task succeeded? If you update this variable and the task failed, you will have no chance to retry on the segments you just chosen. ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); +} + +// todo: will rename only do the job? maybe need deep modification +void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) { + int ret; + auto src_seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + if (src_seg_path.compare(dst_seg_path) != 0) { + CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == _segid_statistics_map.end(), false); + CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), + true); + statistics org = _segid_statistics_map[seg_id + 1]; + _segid_statistics_map.emplace(_num_segcompacted, org); + clear_statistics_for_deleting_segments(seg_id, seg_id); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); + } +} + +void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, uint64_t end) { + LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin + 1 + << " to:" << end + 1; + for (int i = begin; i <= end; ++i) { + _segid_statistics_map.erase(i + 1); + } +} + +Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr segments) { + uint64_t begin = (*(segments->begin()))->id(); + uint64_t end = (*(segments->end() - 1))->id(); + LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size() + << " segments. Begin:" << begin << " End:" << end; + uint64_t begin_time = GetCurrentTimeMicros(); + + auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(), + _context.tablet_schema->columns().size()); + std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics()); + vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, schema, stat.get()); + auto writer = create_segcompaction_writer(begin, end); + assert(writer != nullptr); + vectorized::Block block = _context.tablet_schema->create_block(); + while (true) { + auto status = reader->next_batch(&block); + if (status != Status::OK()) { + assert(status.is_end_of_file()); + break; + } + _add_block_for_segcompaction(&block, &writer); + block.clear_column_data(); + } + clear_statistics_for_deleting_segments(begin, end); + RETURN_NOT_OK(_flush_segment_writer(&writer)); + delete_original_segments(begin, end); + rename_compacted_segments(begin, end); + + if (VLOG_DEBUG_IS_ON) { + std::stringstream ss; + for (const auto& entry : std::filesystem::directory_iterator(_context.tablet_path)) { + ss << "[" << entry.path() << "]"; + } + VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point + << " _num_segment:" << _num_segment << " _num_segcompacted:" << _num_segcompacted + << " list directory:" << ss.str(); + } + + CHECK_EQ(_is_doing_segcompaction, true); + _is_doing_segcompaction = false; + _segcompacting_cond.notify_all(); + + uint64_t elapsed = GetCurrentTimeMicros() - begin_time; + LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. elapsed time:" << elapsed + << "us"; + + return Status::OK(); +} + +Status BetaRowsetWriter::load_noncompacted_segments( + std::vector<segment_v2::SegmentSharedPtr>* segments) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) { + auto seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto cache_path = + BetaRowset::local_cache_path(_context.tablet_path, _context.rowset_id, seg_id); + std::shared_ptr<segment_v2::Segment> segment; + auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, _context.tablet_schema, + &segment); + if (!s.ok()) { + LOG(WARNING) << "failed to open segment. " << seg_path << ":" << s.to_string(); + return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED); + } + segments->push_back(std::move(segment)); + } + return Status::OK(); +} + +void BetaRowsetWriter::find_longest_consecutive_small_segment( + SegCompactionCandidatesSharedPtr segments) { + std::vector<segment_v2::SegmentSharedPtr> all_segments; + load_noncompacted_segments(&all_segments); + + std::stringstream ss_all; + for (auto& segment : all_segments) { + ss_all << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]"; + } + LOG(INFO) << "all noncompacted segments num:" << all_segments.size() + << " list of segments:" << ss_all.str(); + + bool is_terminated_by_big = false; + bool let_big_terminate = false; + size_t small_threshold = config::segcompaction_small_threshold; + for (int64_t i = 0; i < all_segments.size(); ++i) { + segment_v2::SegmentSharedPtr seg = all_segments[i]; + if (seg->num_rows() > small_threshold) { + if (let_big_terminate) { + is_terminated_by_big = true; + break; + } else { + rename_compacted_segment_plain(_segcompacted_point); + ++_segcompacted_point; + } + } else { + let_big_terminate = true; // break if find a big after small + segments->push_back(seg); + ++_segcompacted_point; + } + } + size_t s = segments->size(); + if (!is_terminated_by_big && s <= (config::segcompaction_threshold_segment_num / 2)) { + // start with big segments and end with small, better to do it in next + // round to compact more at once + _segcompacted_point -= s; + segments->clear(); + LOG(INFO) << "candidate segments num too small:" << s; + return; + } + if (s == 1) { // poor bachelor, let it go + LOG(INFO) << "only one candidate segment"; + rename_compacted_segment_plain(_segcompacted_point - 1); + segments->clear(); + return; + } + std::stringstream ss; + for (auto& segment : (*segments.get())) { + ss << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]"; + } + LOG(INFO) << "candidate segments num:" << s << " list of candidates:" << ss.str(); +} + +SegCompactionCandidatesSharedPtr BetaRowsetWriter::get_segcompaction_candidates(bool is_last) { + SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>(); + if (is_last) { + load_noncompacted_segments(segments.get()); + if (segments->size() == 1) { + LOG(INFO) << "only one last candidate segment"; + rename_compacted_segment_plain(_segcompacted_point); + segments->clear(); + } + } else { + find_longest_consecutive_small_segment(segments); + } + return segments; +} + +void BetaRowsetWriter::segcompaction_if_necessary() { + if (!config::enable_segcompaction || !config::enable_storage_vectorization) { + return; + } + if (!_is_doing_segcompaction && Review Comment: Two threads can both see `_is_doing_segcompaction` as false, and enter the following block? ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); +} + +// todo: will rename only do the job? maybe need deep modification +void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) { + int ret; + auto src_seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + if (src_seg_path.compare(dst_seg_path) != 0) { + CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == _segid_statistics_map.end(), false); + CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), + true); + statistics org = _segid_statistics_map[seg_id + 1]; + _segid_statistics_map.emplace(_num_segcompacted, org); + clear_statistics_for_deleting_segments(seg_id, seg_id); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); + } +} + +void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, uint64_t end) { + LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin + 1 + << " to:" << end + 1; + for (int i = begin; i <= end; ++i) { + _segid_statistics_map.erase(i + 1); + } +} + +Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr segments) { + uint64_t begin = (*(segments->begin()))->id(); + uint64_t end = (*(segments->end() - 1))->id(); + LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size() + << " segments. Begin:" << begin << " End:" << end; + uint64_t begin_time = GetCurrentTimeMicros(); + + auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(), + _context.tablet_schema->columns().size()); + std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics()); + vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, schema, stat.get()); + auto writer = create_segcompaction_writer(begin, end); + assert(writer != nullptr); + vectorized::Block block = _context.tablet_schema->create_block(); + while (true) { + auto status = reader->next_batch(&block); + if (status != Status::OK()) { + assert(status.is_end_of_file()); + break; + } + _add_block_for_segcompaction(&block, &writer); + block.clear_column_data(); + } + clear_statistics_for_deleting_segments(begin, end); + RETURN_NOT_OK(_flush_segment_writer(&writer)); + delete_original_segments(begin, end); + rename_compacted_segments(begin, end); + + if (VLOG_DEBUG_IS_ON) { + std::stringstream ss; + for (const auto& entry : std::filesystem::directory_iterator(_context.tablet_path)) { + ss << "[" << entry.path() << "]"; + } + VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point + << " _num_segment:" << _num_segment << " _num_segcompacted:" << _num_segcompacted + << " list directory:" << ss.str(); + } + + CHECK_EQ(_is_doing_segcompaction, true); + _is_doing_segcompaction = false; + _segcompacting_cond.notify_all(); + + uint64_t elapsed = GetCurrentTimeMicros() - begin_time; + LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. elapsed time:" << elapsed + << "us"; + + return Status::OK(); +} + +Status BetaRowsetWriter::load_noncompacted_segments( + std::vector<segment_v2::SegmentSharedPtr>* segments) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) { + auto seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto cache_path = + BetaRowset::local_cache_path(_context.tablet_path, _context.rowset_id, seg_id); + std::shared_ptr<segment_v2::Segment> segment; + auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, _context.tablet_schema, + &segment); + if (!s.ok()) { + LOG(WARNING) << "failed to open segment. " << seg_path << ":" << s.to_string(); + return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED); + } + segments->push_back(std::move(segment)); + } + return Status::OK(); +} + +void BetaRowsetWriter::find_longest_consecutive_small_segment( + SegCompactionCandidatesSharedPtr segments) { + std::vector<segment_v2::SegmentSharedPtr> all_segments; + load_noncompacted_segments(&all_segments); + + std::stringstream ss_all; + for (auto& segment : all_segments) { + ss_all << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]"; + } + LOG(INFO) << "all noncompacted segments num:" << all_segments.size() + << " list of segments:" << ss_all.str(); + + bool is_terminated_by_big = false; + bool let_big_terminate = false; + size_t small_threshold = config::segcompaction_small_threshold; + for (int64_t i = 0; i < all_segments.size(); ++i) { + segment_v2::SegmentSharedPtr seg = all_segments[i]; + if (seg->num_rows() > small_threshold) { + if (let_big_terminate) { + is_terminated_by_big = true; + break; + } else { + rename_compacted_segment_plain(_segcompacted_point); + ++_segcompacted_point; + } + } else { + let_big_terminate = true; // break if find a big after small + segments->push_back(seg); + ++_segcompacted_point; + } + } + size_t s = segments->size(); + if (!is_terminated_by_big && s <= (config::segcompaction_threshold_segment_num / 2)) { + // start with big segments and end with small, better to do it in next + // round to compact more at once + _segcompacted_point -= s; + segments->clear(); + LOG(INFO) << "candidate segments num too small:" << s; + return; + } + if (s == 1) { // poor bachelor, let it go + LOG(INFO) << "only one candidate segment"; + rename_compacted_segment_plain(_segcompacted_point - 1); + segments->clear(); + return; + } + std::stringstream ss; + for (auto& segment : (*segments.get())) { + ss << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]"; + } + LOG(INFO) << "candidate segments num:" << s << " list of candidates:" << ss.str(); +} + +SegCompactionCandidatesSharedPtr BetaRowsetWriter::get_segcompaction_candidates(bool is_last) { + SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>(); + if (is_last) { Review Comment: 1. Should we check the `segcompaction_small_threshold` here as well? e.g. you compacted 10 segments, for some reason it works very slow, and after it finished, all load data have been written, generated 100 segments ,all of them are greater than 1MB, we should not compact them? 2. In the previous case, if there left 100 segments, should we do segment compaction on all of them at once? ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -219,17 +516,35 @@ Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block) { if (block->rows() == 0) { return Status::OK(); } + segcompaction_if_necessary(); std::unique_ptr<segment_v2::SegmentWriter> writer; RETURN_NOT_OK(_create_segment_writer(&writer)); RETURN_NOT_OK(_add_block(block, &writer)); RETURN_NOT_OK(_flush_segment_writer(&writer)); return Status::OK(); } +void BetaRowsetWriter::wait_flying_segcompaction() { + uint64_t begin_wait = GetCurrentTimeMicros(); + while (_is_doing_segcompaction) { // TODO: memory barrier? + // change sync wait to async? + std::unique_lock<std::mutex> l(_segcompacting_cond_lock); Review Comment: the lock is not used anywhere else, maybe thread token is more suitable? ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); +} + +// todo: will rename only do the job? maybe need deep modification +void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) { + int ret; + auto src_seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + if (src_seg_path.compare(dst_seg_path) != 0) { + CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == _segid_statistics_map.end(), false); + CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), + true); + statistics org = _segid_statistics_map[seg_id + 1]; + _segid_statistics_map.emplace(_num_segcompacted, org); + clear_statistics_for_deleting_segments(seg_id, seg_id); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); + } +} + +void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, uint64_t end) { + LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin + 1 + << " to:" << end + 1; + for (int i = begin; i <= end; ++i) { + _segid_statistics_map.erase(i + 1); + } +} + +Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr segments) { + uint64_t begin = (*(segments->begin()))->id(); + uint64_t end = (*(segments->end() - 1))->id(); + LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size() + << " segments. Begin:" << begin << " End:" << end; + uint64_t begin_time = GetCurrentTimeMicros(); + + auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(), + _context.tablet_schema->columns().size()); + std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics()); + vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, schema, stat.get()); + auto writer = create_segcompaction_writer(begin, end); + assert(writer != nullptr); + vectorized::Block block = _context.tablet_schema->create_block(); + while (true) { + auto status = reader->next_batch(&block); + if (status != Status::OK()) { + assert(status.is_end_of_file()); + break; + } + _add_block_for_segcompaction(&block, &writer); + block.clear_column_data(); + } + clear_statistics_for_deleting_segments(begin, end); + RETURN_NOT_OK(_flush_segment_writer(&writer)); + delete_original_segments(begin, end); + rename_compacted_segments(begin, end); + + if (VLOG_DEBUG_IS_ON) { + std::stringstream ss; + for (const auto& entry : std::filesystem::directory_iterator(_context.tablet_path)) { + ss << "[" << entry.path() << "]"; + } + VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point + << " _num_segment:" << _num_segment << " _num_segcompacted:" << _num_segcompacted + << " list directory:" << ss.str(); + } + + CHECK_EQ(_is_doing_segcompaction, true); + _is_doing_segcompaction = false; + _segcompacting_cond.notify_all(); + + uint64_t elapsed = GetCurrentTimeMicros() - begin_time; + LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. elapsed time:" << elapsed + << "us"; + + return Status::OK(); +} + +Status BetaRowsetWriter::load_noncompacted_segments( + std::vector<segment_v2::SegmentSharedPtr>* segments) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) { + auto seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto cache_path = + BetaRowset::local_cache_path(_context.tablet_path, _context.rowset_id, seg_id); + std::shared_ptr<segment_v2::Segment> segment; + auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, _context.tablet_schema, + &segment); + if (!s.ok()) { + LOG(WARNING) << "failed to open segment. " << seg_path << ":" << s.to_string(); + return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED); + } + segments->push_back(std::move(segment)); + } + return Status::OK(); +} + +void BetaRowsetWriter::find_longest_consecutive_small_segment( + SegCompactionCandidatesSharedPtr segments) { + std::vector<segment_v2::SegmentSharedPtr> all_segments; + load_noncompacted_segments(&all_segments); + + std::stringstream ss_all; + for (auto& segment : all_segments) { + ss_all << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]"; + } + LOG(INFO) << "all noncompacted segments num:" << all_segments.size() + << " list of segments:" << ss_all.str(); + + bool is_terminated_by_big = false; + bool let_big_terminate = false; + size_t small_threshold = config::segcompaction_small_threshold; + for (int64_t i = 0; i < all_segments.size(); ++i) { + segment_v2::SegmentSharedPtr seg = all_segments[i]; + if (seg->num_rows() > small_threshold) { + if (let_big_terminate) { + is_terminated_by_big = true; + break; + } else { + rename_compacted_segment_plain(_segcompacted_point); + ++_segcompacted_point; + } + } else { + let_big_terminate = true; // break if find a big after small + segments->push_back(seg); + ++_segcompacted_point; + } + } + size_t s = segments->size(); + if (!is_terminated_by_big && s <= (config::segcompaction_threshold_segment_num / 2)) { + // start with big segments and end with small, better to do it in next + // round to compact more at once + _segcompacted_point -= s; + segments->clear(); + LOG(INFO) << "candidate segments num too small:" << s; + return; + } + if (s == 1) { // poor bachelor, let it go + LOG(INFO) << "only one candidate segment"; + rename_compacted_segment_plain(_segcompacted_point - 1); + segments->clear(); + return; + } + std::stringstream ss; + for (auto& segment : (*segments.get())) { + ss << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]"; + } + LOG(INFO) << "candidate segments num:" << s << " list of candidates:" << ss.str(); +} + +SegCompactionCandidatesSharedPtr BetaRowsetWriter::get_segcompaction_candidates(bool is_last) { + SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>(); + if (is_last) { + load_noncompacted_segments(segments.get()); + if (segments->size() == 1) { + LOG(INFO) << "only one last candidate segment"; + rename_compacted_segment_plain(_segcompacted_point); + segments->clear(); + } + } else { + find_longest_consecutive_small_segment(segments); + } + return segments; +} + +void BetaRowsetWriter::segcompaction_if_necessary() { + if (!config::enable_segcompaction || !config::enable_storage_vectorization) { + return; + } + if (!_is_doing_segcompaction && + ((_num_segment - _segcompacted_point) >= config::segcompaction_threshold_segment_num)) { + _is_doing_segcompaction = true; + SegCompactionCandidatesSharedPtr segments = get_segcompaction_candidates(false); + if (segments->size() > 0) { + LOG(INFO) << "submit segcompaction task, segment num:" << _num_segment + << ", segcompacted_point:" << _segcompacted_point; + StorageEngine::instance()->submit_seg_compaction_task(this, segments); + } else { + _is_doing_segcompaction = false; + } + } +} + +void BetaRowsetWriter::segcompaction_ramaining_if_necessary() { + if (!config::enable_segcompaction || !config::enable_storage_vectorization) { + return; + } + if (!is_segcompacted() || _segcompacted_point == _num_segment) { + // no need if never segcompact before or all segcompacted + return; + } + CHECK_EQ(_is_doing_segcompaction, false); Review Comment: DCHECK_EQ ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +110,284 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, nullptr); + merge_itr->init(read_options); + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK()) { + writer = nullptr; + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + } + if (writer->get_data_dir()) + LOG(INFO) << "segcompaction segment writer created for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return writer; +} + +Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); +} + +// todo: will rename only do the job? maybe need deep modification +void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) { + int ret; + auto src_seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + if (src_seg_path.compare(dst_seg_path) != 0) { + CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == _segid_statistics_map.end(), false); + CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), + true); + statistics org = _segid_statistics_map[seg_id + 1]; + _segid_statistics_map.emplace(_num_segcompacted, org); + clear_statistics_for_deleting_segments(seg_id, seg_id); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + DCHECK_EQ(ret, 0); + } +} + +void BetaRowsetWriter::clear_statistics_for_deleting_segments(uint64_t begin, uint64_t end) { + LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin + 1 + << " to:" << end + 1; + for (int i = begin; i <= end; ++i) { + _segid_statistics_map.erase(i + 1); + } +} + +Status BetaRowsetWriter::do_segcompaction(SegCompactionCandidatesSharedPtr segments) { + uint64_t begin = (*(segments->begin()))->id(); + uint64_t end = (*(segments->end() - 1))->id(); + LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size() + << " segments. Begin:" << begin << " End:" << end; + uint64_t begin_time = GetCurrentTimeMicros(); + + auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(), + _context.tablet_schema->columns().size()); + std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics()); + vectorized::VMergeIterator* reader = get_segcompaction_reader(segments, schema, stat.get()); + auto writer = create_segcompaction_writer(begin, end); + assert(writer != nullptr); + vectorized::Block block = _context.tablet_schema->create_block(); + while (true) { + auto status = reader->next_batch(&block); + if (status != Status::OK()) { + assert(status.is_end_of_file()); + break; + } + _add_block_for_segcompaction(&block, &writer); + block.clear_column_data(); + } + clear_statistics_for_deleting_segments(begin, end); + RETURN_NOT_OK(_flush_segment_writer(&writer)); + delete_original_segments(begin, end); + rename_compacted_segments(begin, end); + + if (VLOG_DEBUG_IS_ON) { + std::stringstream ss; + for (const auto& entry : std::filesystem::directory_iterator(_context.tablet_path)) { + ss << "[" << entry.path() << "]"; + } + VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point + << " _num_segment:" << _num_segment << " _num_segcompacted:" << _num_segcompacted + << " list directory:" << ss.str(); + } + + CHECK_EQ(_is_doing_segcompaction, true); + _is_doing_segcompaction = false; + _segcompacting_cond.notify_all(); + + uint64_t elapsed = GetCurrentTimeMicros() - begin_time; + LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. elapsed time:" << elapsed + << "us"; + Review Comment: we should also check correctness for segment compaction, refer to `Compaction::check_correctness` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org