dataroaring commented on code in PR #12866: URL: https://github.com/apache/doris/pull/12866#discussion_r1007540694
########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -102,6 +115,409 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } +vectorized::VMergeIterator* BetaRowsetWriter::_get_segcompaction_reader( + SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema, + OlapReaderStatistics* stat, uint64_t* merged_row_stat) { + StorageReadOptions read_options; + read_options.stats = stat; + read_options.use_page_cache = false; + read_options.tablet_schema = _context.tablet_schema; + std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + for (auto& seg_ptr : *segments) { + std::unique_ptr<RowwiseIterator> iter; + auto s = seg_ptr->new_iterator(*schema, read_options, &iter); + if (!s.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); + return nullptr; + } + seg_iterators.push_back(std::move(iter)); + } + std::vector<RowwiseIterator*> iterators; + for (auto& owned_it : seg_iterators) { + // transfer ownership + iterators.push_back(owned_it.release()); + } + bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); + bool is_reverse = false; + auto merge_itr = + vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, merged_row_stat); + DCHECK(merge_itr); + auto s = merge_itr->init(read_options); + if (!s.ok()) { + LOG(WARNING) << "failed to init iterator: " << s.to_string(); + return nullptr; + } + + return (vectorized::VMergeIterator*)merge_itr; +} + +std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::_create_segcompaction_writer( + uint64_t begin, uint64_t end) { + Status status; + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + status = _create_segment_writer_for_segcompaction(&writer, begin, end); + if (status != Status::OK() || writer == nullptr) { + LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end + << " path:" << writer->get_data_dir()->path(); + return nullptr; + } else { + return writer; + } +} + +Status BetaRowsetWriter::_delete_original_segments(uint32_t begin, uint32_t end) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (uint32_t i = begin; i <= end; ++i) { + auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + RETURN_NOT_OK_LOG(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); + } + return Status::OK(); +} + +Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) { + int ret; + auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, + _context.rowset_id, begin, end); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted++); + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + if (ret) { + return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED); + } + return Status::OK(); +} + +Status BetaRowsetWriter::_rename_compacted_segment_plain(uint64_t seg_id) { + if (seg_id == _num_segcompacted) { + return Status::OK(); + } + + int ret; + auto src_seg_path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); + auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, + _num_segcompacted); + LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " + << dst_seg_path; + { + std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); + DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false); + DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), + true); + Statistics org = _segid_statistics_map[seg_id]; + _segid_statistics_map.emplace(_num_segcompacted, org); + _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id); + } + ++_num_segcompacted; + ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); + if (ret) { + LOG(WARNING) << "failed to rename " << src_seg_path << " to " << dst_seg_path; + return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED); + } + return Status::OK(); +} + +void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint64_t begin, + uint64_t end) { + LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end; + for (int i = begin; i <= end; ++i) { + _segid_statistics_map.erase(i); + } +} + +Status BetaRowsetWriter::_check_correctness(std::unique_ptr<OlapReaderStatistics> stat, + uint64_t merged_row_stat, uint64_t row_count, + uint64_t begin, uint64_t end) { + uint64_t stat_read_row = stat->raw_rows_read; + uint64_t sum_target_row = 0; + + { + std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); + for (int i = begin; i <= end; ++i) { + sum_target_row += _segid_statistics_map[i].row_num; + } + } + + if (sum_target_row != stat_read_row) { + LOG(WARNING) << "read row_num does not match. expect read row:" << sum_target_row + << " actual read row:" << stat_read_row; + return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR); + } + + uint64_t total_row = row_count + merged_row_stat; + if (stat_read_row != total_row) { + LOG(WARNING) << "total row_num does not match. expect total row:" << total_row + << " actual total row:" << stat_read_row; + return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR); + } + return Status::OK(); +} + +Status BetaRowsetWriter::_do_segcompaction(SegCompactionCandidatesSharedPtr segments) { + SCOPED_ATTACH_TASK(StorageEngine::instance()->segcompaction_mem_tracker(), + ThreadContext::TaskType::COMPACTION); + // throttle segcompaction task if memory depleted. + if (MemTrackerLimiter::sys_mem_exceed_limit_check(GB_EXCHANGE_BYTE)) { + LOG(WARNING) << "skip segcompaction due to memory shortage"; + return Status::MemoryLimitExceeded("skip segcompaction due to memory shortage"); + } + uint64_t begin = (*(segments->begin()))->id(); + uint64_t end = (*(segments->end() - 1))->id(); + LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size() + << " segments. Begin:" << begin << " End:" << end; + uint64_t begin_time = GetCurrentTimeMicros(); + + auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(), + _context.tablet_schema->columns().size()); + std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics()); + uint64_t merged_row_stat = 0; + vectorized::VMergeIterator* reader = + _get_segcompaction_reader(segments, schema, stat.get(), &merged_row_stat); + if (UNLIKELY(reader == nullptr)) { + LOG(WARNING) << "failed to get segcompaction reader"; + return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_INIT_READER); + } + std::unique_ptr<vectorized::VMergeIterator> reader_ptr; + reader_ptr.reset(reader); + auto writer = _create_segcompaction_writer(begin, end); + if (UNLIKELY(writer == nullptr)) { + LOG(WARNING) << "failed to get segcompaction writer"; + return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_INIT_WRITER); + } + uint64_t row_count = 0; + vectorized::Block block = _context.tablet_schema->create_block(); + while (true) { + auto status = reader_ptr->next_batch(&block); + row_count += block.rows(); + if (status != Status::OK()) { + if (LIKELY(status.is_end_of_file())) { + RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, &writer), + "write block failed"); + break; + } else { + LOG(WARNING) << "read block failed: " << status.to_string(); + return status; + } + } + RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, &writer), "write block failed"); + block.clear_column_data(); + } + RETURN_NOT_OK_LOG(_check_correctness(std::move(stat), merged_row_stat, row_count, begin, end), + "check correctness failed"); + { + std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); + _clear_statistics_for_deleting_segments_unsafe(begin, end); + } + RETURN_NOT_OK(_flush_segment_writer(&writer)); + RETURN_NOT_OK(_delete_original_segments(begin, end)); + RETURN_NOT_OK(_rename_compacted_segments(begin, end)); + + if (VLOG_DEBUG_IS_ON) { + std::stringstream ss; + for (const auto& entry : std::filesystem::directory_iterator(_context.tablet_path)) { + ss << "[" << entry.path() << "]"; + } + VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point + << " _num_segment:" << _num_segment << " _num_segcompacted:" << _num_segcompacted + << " list directory:" << ss.str(); + } + + _segcompacted_point += (end - begin + 1); + + uint64_t elapsed = GetCurrentTimeMicros() - begin_time; + LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. elapsed time:" << elapsed + << "us. _segcompacted_point update:" << _segcompacted_point; + + return Status::OK(); +} + +void BetaRowsetWriter::segcompaction(SegCompactionCandidatesSharedPtr segments) { + Status status = _do_segcompaction(segments); + if (!status.ok()) { + int16_t errcode = status.precise_code(); + switch (errcode) { + case OLAP_ERR_SEGCOMPACTION_INIT_READER: + case OLAP_ERR_SEGCOMPACTION_INIT_WRITER: + LOG(WARNING) << "segcompaction failed, try next time:" << status; + return; + default: + LOG(WARNING) << "segcompaction fatal, terminating the write job:" << status; + // status will be checked by the next trigger of segcompaction or the final wait + _segcompaction_status.store(OLAP_ERR_OTHER_ERROR); + } + } + DCHECK_EQ(_is_doing_segcompaction, true); + _is_doing_segcompaction = false; Review Comment: We do not hold mutex here, so the program may run into a case in which there are no thread to signal the waiting threads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org