github-actions[bot] commented on code in PR #30839: URL: https://github.com/apache/doris/pull/30839#discussion_r1477725517
########## be/src/olap/compaction.cpp: ########## @@ -293,449 +337,327 @@ bool Compaction::handle_ordered_data_compaction() { // most rowset of current compaction is nonoverlapping // just handle nonoverlappint rowsets auto st = do_compact_ordered_rowsets(); + return st.ok(); +} + +Status CompactionMixin::execute_compact() { + uint32_t checksum_before; + uint32_t checksum_after; + bool enable_compaction_checksum = config::enable_compaction_checksum; + if (enable_compaction_checksum) { + EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), + _input_rowsets.back()->end_version(), &checksum_before); + RETURN_IF_ERROR(checksum_task.execute()); + } + + auto* data_dir = tablet()->data_dir(); + int64_t permits = get_compaction_permits(); + data_dir->disks_compaction_score_increment(permits); + data_dir->disks_compaction_num_increment(1); + + Status st = execute_compact_impl(permits); + + data_dir->disks_compaction_score_increment(-permits); + data_dir->disks_compaction_num_increment(-1); + if (!st.ok()) { - return false; + return st; } - return true; + + if (enable_compaction_checksum) { + EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), + _input_rowsets.back()->end_version(), &checksum_after); + RETURN_IF_ERROR(checksum_task.execute()); + if (checksum_before != checksum_after) { + return Status::InternalError( + "compaction tablet checksum not consistent, before={}, after={}, tablet_id={}", + checksum_before, checksum_after, _tablet->tablet_id()); + } + } + + _load_segment_to_cache(); + return Status::OK(); } -Status Compaction::do_compaction_impl(int64_t permits) { +Status CompactionMixin::execute_compact_impl(int64_t permits) { OlapStopWatch watch; if (handle_ordered_data_compaction()) { RETURN_IF_ERROR(modify_rowsets()); - - int64_t now = UnixMillis(); - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { - _tablet->set_last_cumu_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { - _tablet->set_last_base_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) { - _tablet->set_last_full_compaction_success_time(now); - } - auto cumu_policy = _tablet->cumulative_compaction_policy(); LOG(INFO) << "succeed to do ordered data " << compaction_name() << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version - << ", disk=" << _tablet->data_dir()->path() + << ", disk=" << tablet()->data_dir()->path() << ", segments=" << _input_num_segments << ", input_row_num=" << _input_row_num << ", output_row_num=" << _output_rowset->num_rows() << ", input_rowset_size=" << _input_rowsets_size << ", output_rowset_size=" << _output_rowset->data_disk_size() - << ". elapsed time=" << watch.get_elapse_second() - << "s. cumulative_compaction_policy=" - << (cumu_policy == nullptr ? "quick" : cumu_policy->name()); + << ". elapsed time=" << watch.get_elapse_second() << "s."; + _state = CompactionState::SUCCESS; return Status::OK(); } build_basic_info(); LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version << ", permits: " << permits; - bool vertical_compaction = should_vertical_compaction(); - RowsetWriterContext ctx; - RETURN_IF_ERROR(construct_input_rowset_readers()); - RETURN_IF_ERROR(construct_output_rowset_writer(ctx, vertical_compaction)); - // 2. write merged rows to output rowset - // The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool - Merger::Statistics stats; - // if ctx.skip_inverted_index.size() > 0, it means we need to do inverted index compaction. - // the row ID conversion matrix needs to be used for inverted index compaction. - if (ctx.skip_inverted_index.size() > 0 || (_tablet->keys_type() == KeysType::UNIQUE_KEYS && - _tablet->enable_unique_key_merge_on_write())) { - stats.rowid_conversion = &_rowid_conversion; + RETURN_IF_ERROR(merge_input_rowsets()); + + RETURN_IF_ERROR(do_inverted_index_compaction()); + + RETURN_IF_ERROR(modify_rowsets()); + + auto* cumu_policy = tablet()->cumulative_compaction_policy(); + DCHECK(cumu_policy); + LOG(INFO) << "succeed to do " << compaction_name() << " is_vertical=" << _is_vertical + << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version + << ", current_max_version=" << tablet()->max_version().second + << ", disk=" << tablet()->data_dir()->path() << ", segments=" << _input_num_segments + << ", input_rowset_size=" << _input_rowsets_size + << ", output_rowset_size=" << _output_rowset->data_disk_size() + << ", input_row_num=" << _input_row_num + << ", output_row_num=" << _output_rowset->num_rows() + << ", filtered_row_num=" << _stats.filtered_rows + << ", merged_row_num=" << _stats.merged_rows + << ". elapsed time=" << watch.get_elapse_second() + << "s. cumulative_compaction_policy=" << cumu_policy->name() + << ", compact_row_per_second=" << int(_input_row_num / watch.get_elapse_second()); + + _state = CompactionState::SUCCESS; + + return Status::OK(); +} + +Status CompactionMixin::do_inverted_index_compaction() { Review Comment: warning: function 'do_inverted_index_compaction' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status CompactionMixin::do_inverted_index_compaction() { ^ ``` <details> <summary>Additional context</summary> **be/src/olap/compaction.cpp:429:** 117 lines including whitespace and comments (threshold 80) ```cpp Status CompactionMixin::do_inverted_index_compaction() { ^ ``` </details> ########## be/src/olap/compaction.cpp: ########## @@ -293,449 +337,327 @@ // most rowset of current compaction is nonoverlapping // just handle nonoverlappint rowsets auto st = do_compact_ordered_rowsets(); + return st.ok(); +} + +Status CompactionMixin::execute_compact() { + uint32_t checksum_before; + uint32_t checksum_after; + bool enable_compaction_checksum = config::enable_compaction_checksum; + if (enable_compaction_checksum) { + EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), + _input_rowsets.back()->end_version(), &checksum_before); + RETURN_IF_ERROR(checksum_task.execute()); + } + + auto* data_dir = tablet()->data_dir(); + int64_t permits = get_compaction_permits(); + data_dir->disks_compaction_score_increment(permits); + data_dir->disks_compaction_num_increment(1); + + Status st = execute_compact_impl(permits); + + data_dir->disks_compaction_score_increment(-permits); + data_dir->disks_compaction_num_increment(-1); + if (!st.ok()) { - return false; + return st; } - return true; + + if (enable_compaction_checksum) { + EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), + _input_rowsets.back()->end_version(), &checksum_after); + RETURN_IF_ERROR(checksum_task.execute()); + if (checksum_before != checksum_after) { + return Status::InternalError( + "compaction tablet checksum not consistent, before={}, after={}, tablet_id={}", + checksum_before, checksum_after, _tablet->tablet_id()); + } + } + + _load_segment_to_cache(); + return Status::OK(); } -Status Compaction::do_compaction_impl(int64_t permits) { +Status CompactionMixin::execute_compact_impl(int64_t permits) { OlapStopWatch watch; if (handle_ordered_data_compaction()) { RETURN_IF_ERROR(modify_rowsets()); - - int64_t now = UnixMillis(); - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { - _tablet->set_last_cumu_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { - _tablet->set_last_base_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) { - _tablet->set_last_full_compaction_success_time(now); - } - auto cumu_policy = _tablet->cumulative_compaction_policy(); LOG(INFO) << "succeed to do ordered data " << compaction_name() << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version - << ", disk=" << _tablet->data_dir()->path() + << ", disk=" << tablet()->data_dir()->path() << ", segments=" << _input_num_segments << ", input_row_num=" << _input_row_num << ", output_row_num=" << _output_rowset->num_rows() << ", input_rowset_size=" << _input_rowsets_size << ", output_rowset_size=" << _output_rowset->data_disk_size() - << ". elapsed time=" << watch.get_elapse_second() - << "s. cumulative_compaction_policy=" - << (cumu_policy == nullptr ? "quick" : cumu_policy->name()); + << ". elapsed time=" << watch.get_elapse_second() << "s."; + _state = CompactionState::SUCCESS; return Status::OK(); } build_basic_info(); LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version << ", permits: " << permits; - bool vertical_compaction = should_vertical_compaction(); - RowsetWriterContext ctx; - RETURN_IF_ERROR(construct_input_rowset_readers()); - RETURN_IF_ERROR(construct_output_rowset_writer(ctx, vertical_compaction)); - // 2. write merged rows to output rowset - // The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool - Merger::Statistics stats; - // if ctx.skip_inverted_index.size() > 0, it means we need to do inverted index compaction. - // the row ID conversion matrix needs to be used for inverted index compaction. - if (ctx.skip_inverted_index.size() > 0 || (_tablet->keys_type() == KeysType::UNIQUE_KEYS && - _tablet->enable_unique_key_merge_on_write())) { - stats.rowid_conversion = &_rowid_conversion; + RETURN_IF_ERROR(merge_input_rowsets()); + + RETURN_IF_ERROR(do_inverted_index_compaction()); + + RETURN_IF_ERROR(modify_rowsets()); + + auto* cumu_policy = tablet()->cumulative_compaction_policy(); + DCHECK(cumu_policy); + LOG(INFO) << "succeed to do " << compaction_name() << " is_vertical=" << _is_vertical + << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version + << ", current_max_version=" << tablet()->max_version().second + << ", disk=" << tablet()->data_dir()->path() << ", segments=" << _input_num_segments + << ", input_rowset_size=" << _input_rowsets_size + << ", output_rowset_size=" << _output_rowset->data_disk_size() + << ", input_row_num=" << _input_row_num + << ", output_row_num=" << _output_rowset->num_rows() + << ", filtered_row_num=" << _stats.filtered_rows + << ", merged_row_num=" << _stats.merged_rows + << ". elapsed time=" << watch.get_elapse_second() + << "s. cumulative_compaction_policy=" << cumu_policy->name() + << ", compact_row_per_second=" << int(_input_row_num / watch.get_elapse_second()); + + _state = CompactionState::SUCCESS; + + return Status::OK(); +} + +Status CompactionMixin::do_inverted_index_compaction() { + const auto& ctx = _output_rs_writer->context(); + if (_input_row_num <= 0 || !_stats.rowid_conversion || + !config::inverted_index_compaction_enable || ctx.skip_inverted_index.empty()) { + return Status::OK(); } - Status res; - { - SCOPED_TIMER(_merge_rowsets_latency_timer); - if (vertical_compaction) { - res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, - _input_rs_readers, _output_rs_writer.get(), - get_avg_segment_rows(), &stats); - } else { - res = Merger::vmerge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, - _input_rs_readers, _output_rs_writer.get(), &stats); + OlapStopWatch inverted_watch; + + Version version = tablet()->max_version(); + DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id()); + std::set<RowLocation> missed_rows; + std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map; + // Convert the delete bitmap of the input rowsets to output rowset. + tablet()->calc_compaction_output_rowset_delete_bitmap( + _input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows, &location_map, + _tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap); + + if (!_allow_delete_in_cumu_compaction) { + if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && + _stats.merged_rows != missed_rows.size()) { + std::string err_msg = fmt::format( + "cumulative compaction: the merged rows({}) is not equal to missed " + "rows({}) in rowid conversion, tablet_id: {}, table_id:{}", + _stats.merged_rows, missed_rows.size(), _tablet->tablet_id(), + _tablet->table_id()); + DCHECK(false) << err_msg; + LOG(WARNING) << err_msg; } } - if (!res.ok()) { - LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res - << ", tablet=" << _tablet->tablet_id() - << ", output_version=" << _output_version; - return res; + RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map)); + + // translation vec + // <<dest_idx_num, dest_docId>> + // the first level vector: index indicates src segment. + // the second level vector: index indicates row id of source segment, + // value indicates row id of destination segment. + // <UINT32_MAX, UINT32_MAX> indicates current row not exist. + const auto& trans_vec = _stats.rowid_conversion->get_rowid_conversion_map(); + + // source rowset,segment -> index_id + const auto& src_seg_to_id_map = _stats.rowid_conversion->get_src_segment_to_id_map(); + + // dest rowset id + RowsetId dest_rowset_id = _stats.rowid_conversion->get_dst_rowset_id(); + // dest segment id -> num rows + std::vector<uint32_t> dest_segment_num_rows; + RETURN_IF_ERROR(_output_rs_writer->get_segment_num_rows(&dest_segment_num_rows)); + + auto src_segment_num = src_seg_to_id_map.size(); + auto dest_segment_num = dest_segment_num_rows.size(); + + if (dest_segment_num <= 0) { + LOG(INFO) << "skip doing index compaction due to no output segments" + << ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num + << ", output row number=" << _output_rowset->num_rows() + << ". elapsed time=" << inverted_watch.get_elapse_second() << "s."; + return Status::OK(); } - COUNTER_UPDATE(_merged_rows_counter, stats.merged_rows); - COUNTER_UPDATE(_filtered_rows_counter, stats.filtered_rows); - RETURN_NOT_OK_STATUS_WITH_WARN(_output_rs_writer->build(_output_rowset), - fmt::format("rowset writer build failed. output_version: {}", - _output_version.to_string())); - // Now we support delete in cumu compaction, to make all data in rowsets whose version - // is below output_version to be delete in the future base compaction, we should carry - // all delete predicate in the output rowset. - // Output start version > 2 means we must set the delete predicate in the output rowset - if (allow_delete_in_cumu_compaction() && _output_rowset->version().first > 2) { - DeletePredicatePB delete_predicate; - std::accumulate( - _input_rs_readers.begin(), _input_rs_readers.end(), &delete_predicate, - [](DeletePredicatePB* delete_predicate, const RowsetReaderSharedPtr& reader) { - if (reader->rowset()->rowset_meta()->has_delete_predicate()) { - delete_predicate->MergeFrom( - reader->rowset()->rowset_meta()->delete_predicate()); - } - return delete_predicate; - }); - // now version in delete_predicate is deprecated - if (!delete_predicate.in_predicates().empty() || - !delete_predicate.sub_predicates_v2().empty() || - !delete_predicate.sub_predicates().empty()) { - _output_rowset->rowset_meta()->set_delete_predicate(std::move(delete_predicate)); - } + // src index files + // format: rowsetId_segmentId + std::vector<std::string> src_index_files(src_segment_num); + for (const auto& m : src_seg_to_id_map) { + std::pair<RowsetId, uint32_t> p = m.first; + src_index_files[m.second] = p.first.to_string() + "_" + std::to_string(p.second); } - COUNTER_UPDATE(_output_rowset_data_size_counter, _output_rowset->data_disk_size()); - COUNTER_UPDATE(_output_row_num_counter, _output_rowset->num_rows()); - COUNTER_UPDATE(_output_segments_num_counter, _output_rowset->num_segments()); + // dest index files + // format: rowsetId_segmentId + std::vector<std::string> dest_index_files(dest_segment_num); + for (int i = 0; i < dest_segment_num; ++i) { + auto prefix = dest_rowset_id.to_string() + "_" + std::to_string(i); + dest_index_files[i] = prefix; + } - // 3. check correctness - RETURN_IF_ERROR(check_correctness(stats)); + // create index_writer to compaction indexes + const auto& fs = _output_rowset->rowset_meta()->fs(); + const auto& tablet_path = _tablet->tablet_path(); - if (_input_row_num > 0 && stats.rowid_conversion && config::inverted_index_compaction_enable && - !ctx.skip_inverted_index.empty()) { - OlapStopWatch inverted_watch; + // we choose the first destination segment name as the temporary index writer path + // Used to distinguish between different index compaction + auto index_writer_path = tablet_path + "/" + dest_index_files[0]; + LOG(INFO) << "start index compaction" + << ". tablet=" << _tablet->tablet_id() << ", source index size=" << src_segment_num + << ", destination index size=" << dest_segment_num << "."; - // check rowid_conversion correctness - Version version = _tablet->max_version(); - DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id()); - std::set<RowLocation> missed_rows; - std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map; - // Convert the delete bitmap of the input rowsets to output rowset. - std::size_t missed_rows_size = 0; - _tablet->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows, - &location_map, _tablet->tablet_meta()->delete_bitmap(), - &output_rowset_delete_bitmap); - if (!allow_delete_in_cumu_compaction()) { - missed_rows_size = missed_rows.size(); - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && - stats.merged_rows != missed_rows_size) { - std::string err_msg = fmt::format( - "cumulative compaction: the merged rows({}) is not equal to missed " - "rows({}) in rowid conversion, tablet_id: {}, table_id:{}", - stats.merged_rows, missed_rows_size, _tablet->tablet_id(), - _tablet->table_id()); - DCHECK(false) << err_msg; - LOG(WARNING) << err_msg; + auto error_handler = [this](int64_t index_id, int64_t column_uniq_id) { + LOG(WARNING) << "failed to do index compaction" + << ". tablet=" << _tablet->tablet_id() << ". column uniq id=" << column_uniq_id + << ". index_id=" << index_id; + for (auto& rowset : _input_rowsets) { + rowset->set_skip_index_compaction(column_uniq_id); + LOG(INFO) << "mark skipping inverted index compaction next time" + << ". tablet=" << _tablet->tablet_id() << ", rowset=" << rowset->rowset_id() + << ", column uniq id=" << column_uniq_id << ", index_id=" << index_id; + } + }; + + for (auto&& column_uniq_id : ctx.skip_inverted_index) { + auto index_id = _cur_tablet_schema->get_inverted_index(column_uniq_id, "")->index_id(); + try { + auto st = compact_column(index_id, src_segment_num, dest_segment_num, src_index_files, + dest_index_files, fs, index_writer_path, tablet_path, + trans_vec, dest_segment_num_rows); + if (!st.ok()) { + error_handler(index_id, column_uniq_id); + return st; } + } catch (CLuceneError& e) { + error_handler(index_id, column_uniq_id); + return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(e.what()); } + } - RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map)); - - // translation vec - // <<dest_idx_num, dest_docId>> - // the first level vector: index indicates src segment. - // the second level vector: index indicates row id of source segment, - // value indicates row id of destination segment. - // <UINT32_MAX, UINT32_MAX> indicates current row not exist. - std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec = - stats.rowid_conversion->get_rowid_conversion_map(); - - // source rowset,segment -> index_id - std::map<std::pair<RowsetId, uint32_t>, uint32_t> src_seg_to_id_map = - stats.rowid_conversion->get_src_segment_to_id_map(); - // dest rowset id - RowsetId dest_rowset_id = stats.rowid_conversion->get_dst_rowset_id(); - // dest segment id -> num rows - std::vector<uint32_t> dest_segment_num_rows; - RETURN_IF_ERROR(_output_rs_writer->get_segment_num_rows(&dest_segment_num_rows)); - - auto src_segment_num = src_seg_to_id_map.size(); - auto dest_segment_num = dest_segment_num_rows.size(); - - if (dest_segment_num > 0) { - // src index files - // format: rowsetId_segmentId - std::vector<std::string> src_index_files(src_segment_num); - for (const auto& m : src_seg_to_id_map) { - std::pair<RowsetId, uint32_t> p = m.first; - src_index_files[m.second] = p.first.to_string() + "_" + std::to_string(p.second); - } + return Status::OK(); +} - // dest index files - // format: rowsetId_segmentId - std::vector<std::string> dest_index_files(dest_segment_num); - for (int i = 0; i < dest_segment_num; ++i) { - auto prefix = dest_rowset_id.to_string() + "_" + std::to_string(i); - dest_index_files[i] = prefix; +void CompactionMixin::construct_skip_inverted_index(RowsetWriterContext& ctx) { + for (const auto& index : _cur_tablet_schema->indexes()) { + if (index.index_type() != IndexType::INVERTED) { + continue; + } + + auto col_unique_id = index.col_unique_ids()[0]; + auto has_inverted_index = [&](const RowsetSharedPtr& src_rs) { + auto rowset = static_cast<BetaRowset*>(src_rs.get()); + if (rowset->is_skip_index_compaction(col_unique_id)) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] rowset[" + << rowset->rowset_id() << "] column_unique_id[" << col_unique_id + << "] skip inverted index compaction due to last failure"; + return false; } - // create index_writer to compaction indexes - const auto& fs = _output_rowset->rowset_meta()->fs(); - const auto& tablet_path = _tablet->tablet_path(); - - // we choose the first destination segment name as the temporary index writer path - // Used to distinguish between different index compaction - auto index_writer_path = tablet_path + "/" + dest_index_files[0]; - LOG(INFO) << "start index compaction" - << ". tablet=" << _tablet->tablet_id() - << ", source index size=" << src_segment_num - << ", destination index size=" << dest_segment_num << "."; - Status status = Status::OK(); - std::for_each( - ctx.skip_inverted_index.cbegin(), ctx.skip_inverted_index.cend(), - [&src_segment_num, &dest_segment_num, &index_writer_path, &src_index_files, - &dest_index_files, &fs, &tablet_path, &trans_vec, &dest_segment_num_rows, - &status, this](int32_t column_uniq_id) { - auto index_id = _cur_tablet_schema->get_inverted_index(column_uniq_id, "") - ->index_id(); - try { - auto st = compact_column(index_id, src_segment_num, dest_segment_num, - src_index_files, dest_index_files, fs, - index_writer_path, tablet_path, trans_vec, - dest_segment_num_rows); - if (!st.ok()) { - LOG(WARNING) << "failed to do index compaction" - << ". tablet=" << _tablet->tablet_id() - << ". column uniq id=" << column_uniq_id - << ". index_id=" << index_id; - for (auto& rowset : _input_rowsets) { - rowset->set_skip_index_compaction(column_uniq_id); - LOG(INFO) << "mark skipping inverted index compaction next time" - << ". tablet=" << _tablet->tablet_id() - << ", rowset=" << rowset->rowset_id() - << ", column uniq id=" << column_uniq_id - << ", index_id=" << index_id; - } - status = Status::Error<ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>( - st.msg()); - } - } catch (CLuceneError& e) { - LOG(WARNING) << "failed to do index compaction" - << ". tablet=" << _tablet->tablet_id() - << ", column uniq id=" << column_uniq_id - << ", index_id=" << index_id; - for (auto& rowset : _input_rowsets) { - rowset->set_skip_index_compaction(column_uniq_id); - LOG(INFO) << "mark skipping inverted index compaction next time" - << ". tablet=" << _tablet->tablet_id() - << ", rowset=" << rowset->rowset_id() - << ", column uniq id=" << column_uniq_id - << ", index_id=" << index_id; - } - status = Status::Error<ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>( - e.what()); - } - }); - - // check index compaction status. If status is not ok, we should return error and end this compaction round. - if (!status.ok()) { - return status; + auto& fs = rowset->rowset_meta()->fs(); + + const auto* index_meta = rowset->tablet_schema()->get_inverted_index(col_unique_id, ""); + if (index_meta == nullptr) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id[" + << col_unique_id << "] index meta is null, will skip index compaction"; + return false; } - LOG(INFO) << "succeed to do index compaction" - << ". tablet=" << _tablet->tablet_id() - << ", input row number=" << _input_row_num - << ", output row number=" << _output_rowset->num_rows() - << ", input_rowset_size=" << _input_rowsets_size - << ", output_rowset_size=" << _output_rowset->data_disk_size() - << ". elapsed time=" << inverted_watch.get_elapse_second() << "s."; - } else { - LOG(INFO) << "skip doing index compaction due to no output segments" - << ". tablet=" << _tablet->tablet_id() - << ", input row number=" << _input_row_num - << ", output row number=" << _output_rowset->num_rows() - << ". elapsed time=" << inverted_watch.get_elapse_second() << "s."; - } - } + for (auto i = 0; i < rowset->num_segments(); i++) { + auto segment_file = rowset->segment_file_path(i); + std::string inverted_index_src_file_path = + InvertedIndexDescriptor::get_index_file_name( + segment_file, index_meta->index_id(), + index_meta->get_index_suffix()); + bool exists = false; + if (!fs->exists(inverted_index_src_file_path, &exists).ok()) { + LOG(ERROR) << inverted_index_src_file_path << " fs->exists error"; + return false; + } - // 4. modify rowsets in memory - RETURN_IF_ERROR(modify_rowsets(&stats)); + if (!exists) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id[" + << col_unique_id << "]," << inverted_index_src_file_path + << " is not exists, will skip index compaction"; + return false; + } - // 5. update last success compaction time - int64_t now = UnixMillis(); - // TODO(yingchun): do the judge in Tablet class - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { - _tablet->set_last_cumu_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { - _tablet->set_last_base_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) { - _tablet->set_last_full_compaction_success_time(now); - } + // check idx file size + int64_t file_size = 0; + if (fs->file_size(inverted_index_src_file_path, &file_size) != Status::OK()) { + LOG(ERROR) << inverted_index_src_file_path << " fs->file_size error"; + return false; + } - int64_t current_max_version = -1; - { - std::shared_lock rdlock(_tablet->get_header_lock()); - current_max_version = -1; - if (RowsetSharedPtr max_rowset = _tablet->get_rowset_with_max_version()) { - current_max_version = max_rowset->end_version(); - } - } + if (file_size == 0) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id[" + << col_unique_id << "]," << inverted_index_src_file_path + << " is empty file, will skip index compaction"; + return false; + } - auto* cumu_policy = _tablet->cumulative_compaction_policy(); - DCHECK(cumu_policy); - LOG(INFO) << "succeed to do " << compaction_name() << " is_vertical=" << vertical_compaction - << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version - << ", current_max_version=" << current_max_version - << ", disk=" << _tablet->data_dir()->path() << ", segments=" << _input_num_segments - << ", input_rowset_size=" << _input_rowsets_size - << ", output_rowset_size=" << _output_rowset->data_disk_size() - << ", input_row_num=" << _input_row_num - << ", output_row_num=" << _output_rowset->num_rows() - << ", filtered_row_num=" << stats.filtered_rows - << ", merged_row_num=" << stats.merged_rows - << ". elapsed time=" << watch.get_elapse_second() - << "s. cumulative_compaction_policy=" << cumu_policy->name() - << ", compact_row_per_second=" << int(_input_row_num / watch.get_elapse_second()); + // check index meta + std::filesystem::path p(inverted_index_src_file_path); + std::string dir_str = p.parent_path().string(); + std::string file_str = p.filename().string(); + lucene::store::Directory* dir = + DorisCompoundDirectoryFactory::getDirectory(fs, dir_str.c_str()); + DorisCompoundReader reader(dir, file_str.c_str()); + std::vector<std::string> files; + reader.list(&files); + reader.close(); + + // why is 3? + // bkd index will write at least 3 files + if (files.size() < 3) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id[" + << col_unique_id << "]," << inverted_index_src_file_path + << " is corrupted, will skip index compaction"; + return false; + } + } + return true; + }; - return Status::OK(); + bool all_have_inverted_index = std::all_of(_input_rowsets.begin(), _input_rowsets.end(), + std::move(has_inverted_index)); + + if (all_have_inverted_index && + field_is_slice_type(_cur_tablet_schema->column_by_uid(col_unique_id).type())) { + ctx.skip_inverted_index.insert(col_unique_id); + } + } } -Status Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool is_vertical) { +Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx) { + construct_skip_inverted_index(ctx); ctx.version = _output_version; ctx.rowset_state = VISIBLE; ctx.segments_overlap = NONOVERLAPPING; ctx.tablet_schema = _cur_tablet_schema; ctx.newest_write_timestamp = _newest_write_timestamp; ctx.write_type = DataWriteType::TYPE_COMPACTION; - if (config::inverted_index_compaction_enable && - ((_tablet->keys_type() == KeysType::UNIQUE_KEYS || - _tablet->keys_type() == KeysType::DUP_KEYS))) { - for (const auto& index : _cur_tablet_schema->indexes()) { - if (index.index_type() == IndexType::INVERTED) { - auto col_unique_id = index.col_unique_ids()[0]; - //NOTE: here src_rs may be in building index progress, so it would not contain inverted index info. - bool all_have_inverted_index = std::all_of( - _input_rowsets.begin(), _input_rowsets.end(), [&](const auto& src_rs) { - BetaRowsetSharedPtr rowset = - std::static_pointer_cast<BetaRowset>(src_rs); - if (rowset == nullptr) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] rowset is null, will skip index compaction"; - return false; - } - if (rowset->is_skip_index_compaction(col_unique_id)) { - LOG(WARNING) - << "tablet[" << _tablet->tablet_id() << "] rowset[" - << rowset->rowset_id() << "] column_unique_id[" - << col_unique_id - << "] skip inverted index compaction due to last failure"; - return false; - } - auto fs = rowset->rowset_meta()->fs(); - - const auto* index_meta = - rowset->tablet_schema()->get_inverted_index(col_unique_id, ""); - if (index_meta == nullptr) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] column_unique_id[" << col_unique_id - << "] index meta is null, will skip index compaction"; - return false; - } - for (auto i = 0; i < rowset->num_segments(); i++) { - auto segment_file = rowset->segment_file_path(i); - std::string inverted_index_src_file_path = - InvertedIndexDescriptor::get_index_file_name( - segment_file, index_meta->index_id(), - index_meta->get_index_suffix()); - bool exists = false; - if (!fs->exists(inverted_index_src_file_path, &exists).ok()) { - LOG(ERROR) - << inverted_index_src_file_path << " fs->exists error"; - return false; - } - if (!exists) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] column_unique_id[" << col_unique_id << "]," - << inverted_index_src_file_path - << " is not exists, will skip index compaction"; - return false; - } - - // check idx file size - int64_t file_size = 0; - if (fs->file_size(inverted_index_src_file_path, &file_size) != - Status::OK()) { - LOG(ERROR) << inverted_index_src_file_path - << " fs->file_size error"; - return false; - } - if (file_size == 0) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] column_unique_id[" << col_unique_id << "]," - << inverted_index_src_file_path - << " is empty file, will skip index compaction"; - return false; - } - - // check index meta - std::filesystem::path p(inverted_index_src_file_path); - std::string dir_str = p.parent_path().string(); - std::string file_str = p.filename().string(); - lucene::store::Directory* dir = - DorisCompoundDirectoryFactory::getDirectory( - fs, dir_str.c_str()); - DorisCompoundReader reader(dir, file_str.c_str()); - std::vector<std::string> files; - reader.list(&files); - reader.close(); - - // why is 3? - // bkd index will write at least 3 files - if (files.size() < 3) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] column_unique_id[" << col_unique_id << "]," - << inverted_index_src_file_path - << " is corrupted, will skip index compaction"; - return false; - } - } - return true; - }); - if (all_have_inverted_index && - field_is_slice_type(_cur_tablet_schema->column_by_uid(col_unique_id).type())) { - ctx.skip_inverted_index.insert(col_unique_id); - } - } - } - } - if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) { - // write output rowset to storage policy resource - auto storage_policy = get_storage_policy(_tablet->storage_policy_id()); - if (storage_policy == nullptr) { - return Status::InternalError("could not find storage_policy, storage_policy_id={}", - _tablet->storage_policy_id()); - } - auto resource = get_storage_resource(storage_policy->resource_id); - if (resource.fs == nullptr) { - return Status::InternalError("could not find resource, resouce_id={}", - storage_policy->resource_id); - } - DCHECK(atol(resource.fs->id().c_str()) == storage_policy->resource_id); - DCHECK(resource.fs->type() != io::FileSystemType::LOCAL); - ctx.fs = std::move(resource.fs); - } - _output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, is_vertical)); - _pending_rs_guard = ExecEnv::GetInstance()->storage_engine().to_local().add_pending_rowset(ctx); - return Status::OK(); -} - -Status Compaction::construct_input_rowset_readers() { - for (auto& rowset : _input_rowsets) { - RowsetReaderSharedPtr rs_reader; - RETURN_IF_ERROR(rowset->create_reader(&rs_reader)); - _input_rs_readers.push_back(std::move(rs_reader)); - } + _output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, _is_vertical)); + _pending_rs_guard = _engine.add_pending_rowset(ctx); return Status::OK(); } -Status Compaction::modify_rowsets(const Merger::Statistics* stats) { +Status CompactionMixin::modify_rowsets() { Review Comment: warning: function 'modify_rowsets' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status CompactionMixin::modify_rowsets() { ^ ``` <details> <summary>Additional context</summary> **be/src/olap/compaction.cpp:652:** 132 lines including whitespace and comments (threshold 80) ```cpp Status CompactionMixin::modify_rowsets() { ^ ``` </details> ########## be/src/olap/compaction.cpp: ########## @@ -293,449 +337,327 @@ // most rowset of current compaction is nonoverlapping // just handle nonoverlappint rowsets auto st = do_compact_ordered_rowsets(); + return st.ok(); +} + +Status CompactionMixin::execute_compact() { + uint32_t checksum_before; + uint32_t checksum_after; + bool enable_compaction_checksum = config::enable_compaction_checksum; + if (enable_compaction_checksum) { + EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), + _input_rowsets.back()->end_version(), &checksum_before); + RETURN_IF_ERROR(checksum_task.execute()); + } + + auto* data_dir = tablet()->data_dir(); + int64_t permits = get_compaction_permits(); + data_dir->disks_compaction_score_increment(permits); + data_dir->disks_compaction_num_increment(1); + + Status st = execute_compact_impl(permits); + + data_dir->disks_compaction_score_increment(-permits); + data_dir->disks_compaction_num_increment(-1); + if (!st.ok()) { - return false; + return st; } - return true; + + if (enable_compaction_checksum) { + EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), + _input_rowsets.back()->end_version(), &checksum_after); + RETURN_IF_ERROR(checksum_task.execute()); + if (checksum_before != checksum_after) { + return Status::InternalError( + "compaction tablet checksum not consistent, before={}, after={}, tablet_id={}", + checksum_before, checksum_after, _tablet->tablet_id()); + } + } + + _load_segment_to_cache(); + return Status::OK(); } -Status Compaction::do_compaction_impl(int64_t permits) { +Status CompactionMixin::execute_compact_impl(int64_t permits) { OlapStopWatch watch; if (handle_ordered_data_compaction()) { RETURN_IF_ERROR(modify_rowsets()); - - int64_t now = UnixMillis(); - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { - _tablet->set_last_cumu_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { - _tablet->set_last_base_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) { - _tablet->set_last_full_compaction_success_time(now); - } - auto cumu_policy = _tablet->cumulative_compaction_policy(); LOG(INFO) << "succeed to do ordered data " << compaction_name() << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version - << ", disk=" << _tablet->data_dir()->path() + << ", disk=" << tablet()->data_dir()->path() << ", segments=" << _input_num_segments << ", input_row_num=" << _input_row_num << ", output_row_num=" << _output_rowset->num_rows() << ", input_rowset_size=" << _input_rowsets_size << ", output_rowset_size=" << _output_rowset->data_disk_size() - << ". elapsed time=" << watch.get_elapse_second() - << "s. cumulative_compaction_policy=" - << (cumu_policy == nullptr ? "quick" : cumu_policy->name()); + << ". elapsed time=" << watch.get_elapse_second() << "s."; + _state = CompactionState::SUCCESS; return Status::OK(); } build_basic_info(); LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version << ", permits: " << permits; - bool vertical_compaction = should_vertical_compaction(); - RowsetWriterContext ctx; - RETURN_IF_ERROR(construct_input_rowset_readers()); - RETURN_IF_ERROR(construct_output_rowset_writer(ctx, vertical_compaction)); - // 2. write merged rows to output rowset - // The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool - Merger::Statistics stats; - // if ctx.skip_inverted_index.size() > 0, it means we need to do inverted index compaction. - // the row ID conversion matrix needs to be used for inverted index compaction. - if (ctx.skip_inverted_index.size() > 0 || (_tablet->keys_type() == KeysType::UNIQUE_KEYS && - _tablet->enable_unique_key_merge_on_write())) { - stats.rowid_conversion = &_rowid_conversion; + RETURN_IF_ERROR(merge_input_rowsets()); + + RETURN_IF_ERROR(do_inverted_index_compaction()); + + RETURN_IF_ERROR(modify_rowsets()); + + auto* cumu_policy = tablet()->cumulative_compaction_policy(); + DCHECK(cumu_policy); + LOG(INFO) << "succeed to do " << compaction_name() << " is_vertical=" << _is_vertical + << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version + << ", current_max_version=" << tablet()->max_version().second + << ", disk=" << tablet()->data_dir()->path() << ", segments=" << _input_num_segments + << ", input_rowset_size=" << _input_rowsets_size + << ", output_rowset_size=" << _output_rowset->data_disk_size() + << ", input_row_num=" << _input_row_num + << ", output_row_num=" << _output_rowset->num_rows() + << ", filtered_row_num=" << _stats.filtered_rows + << ", merged_row_num=" << _stats.merged_rows + << ". elapsed time=" << watch.get_elapse_second() + << "s. cumulative_compaction_policy=" << cumu_policy->name() + << ", compact_row_per_second=" << int(_input_row_num / watch.get_elapse_second()); + + _state = CompactionState::SUCCESS; + + return Status::OK(); +} + +Status CompactionMixin::do_inverted_index_compaction() { + const auto& ctx = _output_rs_writer->context(); + if (_input_row_num <= 0 || !_stats.rowid_conversion || + !config::inverted_index_compaction_enable || ctx.skip_inverted_index.empty()) { + return Status::OK(); } - Status res; - { - SCOPED_TIMER(_merge_rowsets_latency_timer); - if (vertical_compaction) { - res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, - _input_rs_readers, _output_rs_writer.get(), - get_avg_segment_rows(), &stats); - } else { - res = Merger::vmerge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, - _input_rs_readers, _output_rs_writer.get(), &stats); + OlapStopWatch inverted_watch; + + Version version = tablet()->max_version(); + DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id()); + std::set<RowLocation> missed_rows; + std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map; + // Convert the delete bitmap of the input rowsets to output rowset. + tablet()->calc_compaction_output_rowset_delete_bitmap( + _input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows, &location_map, + _tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap); + + if (!_allow_delete_in_cumu_compaction) { + if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && + _stats.merged_rows != missed_rows.size()) { + std::string err_msg = fmt::format( + "cumulative compaction: the merged rows({}) is not equal to missed " + "rows({}) in rowid conversion, tablet_id: {}, table_id:{}", + _stats.merged_rows, missed_rows.size(), _tablet->tablet_id(), + _tablet->table_id()); + DCHECK(false) << err_msg; + LOG(WARNING) << err_msg; } } - if (!res.ok()) { - LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res - << ", tablet=" << _tablet->tablet_id() - << ", output_version=" << _output_version; - return res; + RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map)); + + // translation vec + // <<dest_idx_num, dest_docId>> + // the first level vector: index indicates src segment. + // the second level vector: index indicates row id of source segment, + // value indicates row id of destination segment. + // <UINT32_MAX, UINT32_MAX> indicates current row not exist. + const auto& trans_vec = _stats.rowid_conversion->get_rowid_conversion_map(); + + // source rowset,segment -> index_id + const auto& src_seg_to_id_map = _stats.rowid_conversion->get_src_segment_to_id_map(); + + // dest rowset id + RowsetId dest_rowset_id = _stats.rowid_conversion->get_dst_rowset_id(); + // dest segment id -> num rows + std::vector<uint32_t> dest_segment_num_rows; + RETURN_IF_ERROR(_output_rs_writer->get_segment_num_rows(&dest_segment_num_rows)); + + auto src_segment_num = src_seg_to_id_map.size(); + auto dest_segment_num = dest_segment_num_rows.size(); + + if (dest_segment_num <= 0) { + LOG(INFO) << "skip doing index compaction due to no output segments" + << ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num + << ", output row number=" << _output_rowset->num_rows() + << ". elapsed time=" << inverted_watch.get_elapse_second() << "s."; + return Status::OK(); } - COUNTER_UPDATE(_merged_rows_counter, stats.merged_rows); - COUNTER_UPDATE(_filtered_rows_counter, stats.filtered_rows); - RETURN_NOT_OK_STATUS_WITH_WARN(_output_rs_writer->build(_output_rowset), - fmt::format("rowset writer build failed. output_version: {}", - _output_version.to_string())); - // Now we support delete in cumu compaction, to make all data in rowsets whose version - // is below output_version to be delete in the future base compaction, we should carry - // all delete predicate in the output rowset. - // Output start version > 2 means we must set the delete predicate in the output rowset - if (allow_delete_in_cumu_compaction() && _output_rowset->version().first > 2) { - DeletePredicatePB delete_predicate; - std::accumulate( - _input_rs_readers.begin(), _input_rs_readers.end(), &delete_predicate, - [](DeletePredicatePB* delete_predicate, const RowsetReaderSharedPtr& reader) { - if (reader->rowset()->rowset_meta()->has_delete_predicate()) { - delete_predicate->MergeFrom( - reader->rowset()->rowset_meta()->delete_predicate()); - } - return delete_predicate; - }); - // now version in delete_predicate is deprecated - if (!delete_predicate.in_predicates().empty() || - !delete_predicate.sub_predicates_v2().empty() || - !delete_predicate.sub_predicates().empty()) { - _output_rowset->rowset_meta()->set_delete_predicate(std::move(delete_predicate)); - } + // src index files + // format: rowsetId_segmentId + std::vector<std::string> src_index_files(src_segment_num); + for (const auto& m : src_seg_to_id_map) { + std::pair<RowsetId, uint32_t> p = m.first; + src_index_files[m.second] = p.first.to_string() + "_" + std::to_string(p.second); } - COUNTER_UPDATE(_output_rowset_data_size_counter, _output_rowset->data_disk_size()); - COUNTER_UPDATE(_output_row_num_counter, _output_rowset->num_rows()); - COUNTER_UPDATE(_output_segments_num_counter, _output_rowset->num_segments()); + // dest index files + // format: rowsetId_segmentId + std::vector<std::string> dest_index_files(dest_segment_num); + for (int i = 0; i < dest_segment_num; ++i) { + auto prefix = dest_rowset_id.to_string() + "_" + std::to_string(i); + dest_index_files[i] = prefix; + } - // 3. check correctness - RETURN_IF_ERROR(check_correctness(stats)); + // create index_writer to compaction indexes + const auto& fs = _output_rowset->rowset_meta()->fs(); + const auto& tablet_path = _tablet->tablet_path(); - if (_input_row_num > 0 && stats.rowid_conversion && config::inverted_index_compaction_enable && - !ctx.skip_inverted_index.empty()) { - OlapStopWatch inverted_watch; + // we choose the first destination segment name as the temporary index writer path + // Used to distinguish between different index compaction + auto index_writer_path = tablet_path + "/" + dest_index_files[0]; + LOG(INFO) << "start index compaction" + << ". tablet=" << _tablet->tablet_id() << ", source index size=" << src_segment_num + << ", destination index size=" << dest_segment_num << "."; - // check rowid_conversion correctness - Version version = _tablet->max_version(); - DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id()); - std::set<RowLocation> missed_rows; - std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map; - // Convert the delete bitmap of the input rowsets to output rowset. - std::size_t missed_rows_size = 0; - _tablet->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows, - &location_map, _tablet->tablet_meta()->delete_bitmap(), - &output_rowset_delete_bitmap); - if (!allow_delete_in_cumu_compaction()) { - missed_rows_size = missed_rows.size(); - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && - stats.merged_rows != missed_rows_size) { - std::string err_msg = fmt::format( - "cumulative compaction: the merged rows({}) is not equal to missed " - "rows({}) in rowid conversion, tablet_id: {}, table_id:{}", - stats.merged_rows, missed_rows_size, _tablet->tablet_id(), - _tablet->table_id()); - DCHECK(false) << err_msg; - LOG(WARNING) << err_msg; + auto error_handler = [this](int64_t index_id, int64_t column_uniq_id) { + LOG(WARNING) << "failed to do index compaction" + << ". tablet=" << _tablet->tablet_id() << ". column uniq id=" << column_uniq_id + << ". index_id=" << index_id; + for (auto& rowset : _input_rowsets) { + rowset->set_skip_index_compaction(column_uniq_id); + LOG(INFO) << "mark skipping inverted index compaction next time" + << ". tablet=" << _tablet->tablet_id() << ", rowset=" << rowset->rowset_id() + << ", column uniq id=" << column_uniq_id << ", index_id=" << index_id; + } + }; + + for (auto&& column_uniq_id : ctx.skip_inverted_index) { + auto index_id = _cur_tablet_schema->get_inverted_index(column_uniq_id, "")->index_id(); + try { + auto st = compact_column(index_id, src_segment_num, dest_segment_num, src_index_files, + dest_index_files, fs, index_writer_path, tablet_path, + trans_vec, dest_segment_num_rows); + if (!st.ok()) { + error_handler(index_id, column_uniq_id); + return st; } + } catch (CLuceneError& e) { + error_handler(index_id, column_uniq_id); + return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(e.what()); } + } - RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map)); - - // translation vec - // <<dest_idx_num, dest_docId>> - // the first level vector: index indicates src segment. - // the second level vector: index indicates row id of source segment, - // value indicates row id of destination segment. - // <UINT32_MAX, UINT32_MAX> indicates current row not exist. - std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec = - stats.rowid_conversion->get_rowid_conversion_map(); - - // source rowset,segment -> index_id - std::map<std::pair<RowsetId, uint32_t>, uint32_t> src_seg_to_id_map = - stats.rowid_conversion->get_src_segment_to_id_map(); - // dest rowset id - RowsetId dest_rowset_id = stats.rowid_conversion->get_dst_rowset_id(); - // dest segment id -> num rows - std::vector<uint32_t> dest_segment_num_rows; - RETURN_IF_ERROR(_output_rs_writer->get_segment_num_rows(&dest_segment_num_rows)); - - auto src_segment_num = src_seg_to_id_map.size(); - auto dest_segment_num = dest_segment_num_rows.size(); - - if (dest_segment_num > 0) { - // src index files - // format: rowsetId_segmentId - std::vector<std::string> src_index_files(src_segment_num); - for (const auto& m : src_seg_to_id_map) { - std::pair<RowsetId, uint32_t> p = m.first; - src_index_files[m.second] = p.first.to_string() + "_" + std::to_string(p.second); - } + return Status::OK(); +} - // dest index files - // format: rowsetId_segmentId - std::vector<std::string> dest_index_files(dest_segment_num); - for (int i = 0; i < dest_segment_num; ++i) { - auto prefix = dest_rowset_id.to_string() + "_" + std::to_string(i); - dest_index_files[i] = prefix; +void CompactionMixin::construct_skip_inverted_index(RowsetWriterContext& ctx) { + for (const auto& index : _cur_tablet_schema->indexes()) { + if (index.index_type() != IndexType::INVERTED) { + continue; + } + + auto col_unique_id = index.col_unique_ids()[0]; + auto has_inverted_index = [&](const RowsetSharedPtr& src_rs) { + auto rowset = static_cast<BetaRowset*>(src_rs.get()); + if (rowset->is_skip_index_compaction(col_unique_id)) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] rowset[" + << rowset->rowset_id() << "] column_unique_id[" << col_unique_id + << "] skip inverted index compaction due to last failure"; + return false; } - // create index_writer to compaction indexes - const auto& fs = _output_rowset->rowset_meta()->fs(); - const auto& tablet_path = _tablet->tablet_path(); - - // we choose the first destination segment name as the temporary index writer path - // Used to distinguish between different index compaction - auto index_writer_path = tablet_path + "/" + dest_index_files[0]; - LOG(INFO) << "start index compaction" - << ". tablet=" << _tablet->tablet_id() - << ", source index size=" << src_segment_num - << ", destination index size=" << dest_segment_num << "."; - Status status = Status::OK(); - std::for_each( - ctx.skip_inverted_index.cbegin(), ctx.skip_inverted_index.cend(), - [&src_segment_num, &dest_segment_num, &index_writer_path, &src_index_files, - &dest_index_files, &fs, &tablet_path, &trans_vec, &dest_segment_num_rows, - &status, this](int32_t column_uniq_id) { - auto index_id = _cur_tablet_schema->get_inverted_index(column_uniq_id, "") - ->index_id(); - try { - auto st = compact_column(index_id, src_segment_num, dest_segment_num, - src_index_files, dest_index_files, fs, - index_writer_path, tablet_path, trans_vec, - dest_segment_num_rows); - if (!st.ok()) { - LOG(WARNING) << "failed to do index compaction" - << ". tablet=" << _tablet->tablet_id() - << ". column uniq id=" << column_uniq_id - << ". index_id=" << index_id; - for (auto& rowset : _input_rowsets) { - rowset->set_skip_index_compaction(column_uniq_id); - LOG(INFO) << "mark skipping inverted index compaction next time" - << ". tablet=" << _tablet->tablet_id() - << ", rowset=" << rowset->rowset_id() - << ", column uniq id=" << column_uniq_id - << ", index_id=" << index_id; - } - status = Status::Error<ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>( - st.msg()); - } - } catch (CLuceneError& e) { - LOG(WARNING) << "failed to do index compaction" - << ". tablet=" << _tablet->tablet_id() - << ", column uniq id=" << column_uniq_id - << ", index_id=" << index_id; - for (auto& rowset : _input_rowsets) { - rowset->set_skip_index_compaction(column_uniq_id); - LOG(INFO) << "mark skipping inverted index compaction next time" - << ". tablet=" << _tablet->tablet_id() - << ", rowset=" << rowset->rowset_id() - << ", column uniq id=" << column_uniq_id - << ", index_id=" << index_id; - } - status = Status::Error<ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>( - e.what()); - } - }); - - // check index compaction status. If status is not ok, we should return error and end this compaction round. - if (!status.ok()) { - return status; + auto& fs = rowset->rowset_meta()->fs(); + + const auto* index_meta = rowset->tablet_schema()->get_inverted_index(col_unique_id, ""); + if (index_meta == nullptr) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id[" + << col_unique_id << "] index meta is null, will skip index compaction"; + return false; } - LOG(INFO) << "succeed to do index compaction" - << ". tablet=" << _tablet->tablet_id() - << ", input row number=" << _input_row_num - << ", output row number=" << _output_rowset->num_rows() - << ", input_rowset_size=" << _input_rowsets_size - << ", output_rowset_size=" << _output_rowset->data_disk_size() - << ". elapsed time=" << inverted_watch.get_elapse_second() << "s."; - } else { - LOG(INFO) << "skip doing index compaction due to no output segments" - << ". tablet=" << _tablet->tablet_id() - << ", input row number=" << _input_row_num - << ", output row number=" << _output_rowset->num_rows() - << ". elapsed time=" << inverted_watch.get_elapse_second() << "s."; - } - } + for (auto i = 0; i < rowset->num_segments(); i++) { + auto segment_file = rowset->segment_file_path(i); + std::string inverted_index_src_file_path = + InvertedIndexDescriptor::get_index_file_name( + segment_file, index_meta->index_id(), + index_meta->get_index_suffix()); + bool exists = false; + if (!fs->exists(inverted_index_src_file_path, &exists).ok()) { + LOG(ERROR) << inverted_index_src_file_path << " fs->exists error"; + return false; + } - // 4. modify rowsets in memory - RETURN_IF_ERROR(modify_rowsets(&stats)); + if (!exists) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id[" + << col_unique_id << "]," << inverted_index_src_file_path + << " is not exists, will skip index compaction"; + return false; + } - // 5. update last success compaction time - int64_t now = UnixMillis(); - // TODO(yingchun): do the judge in Tablet class - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { - _tablet->set_last_cumu_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { - _tablet->set_last_base_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) { - _tablet->set_last_full_compaction_success_time(now); - } + // check idx file size + int64_t file_size = 0; + if (fs->file_size(inverted_index_src_file_path, &file_size) != Status::OK()) { + LOG(ERROR) << inverted_index_src_file_path << " fs->file_size error"; + return false; + } - int64_t current_max_version = -1; - { - std::shared_lock rdlock(_tablet->get_header_lock()); - current_max_version = -1; - if (RowsetSharedPtr max_rowset = _tablet->get_rowset_with_max_version()) { - current_max_version = max_rowset->end_version(); - } - } + if (file_size == 0) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id[" + << col_unique_id << "]," << inverted_index_src_file_path + << " is empty file, will skip index compaction"; + return false; + } - auto* cumu_policy = _tablet->cumulative_compaction_policy(); - DCHECK(cumu_policy); - LOG(INFO) << "succeed to do " << compaction_name() << " is_vertical=" << vertical_compaction - << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version - << ", current_max_version=" << current_max_version - << ", disk=" << _tablet->data_dir()->path() << ", segments=" << _input_num_segments - << ", input_rowset_size=" << _input_rowsets_size - << ", output_rowset_size=" << _output_rowset->data_disk_size() - << ", input_row_num=" << _input_row_num - << ", output_row_num=" << _output_rowset->num_rows() - << ", filtered_row_num=" << stats.filtered_rows - << ", merged_row_num=" << stats.merged_rows - << ". elapsed time=" << watch.get_elapse_second() - << "s. cumulative_compaction_policy=" << cumu_policy->name() - << ", compact_row_per_second=" << int(_input_row_num / watch.get_elapse_second()); + // check index meta + std::filesystem::path p(inverted_index_src_file_path); + std::string dir_str = p.parent_path().string(); + std::string file_str = p.filename().string(); + lucene::store::Directory* dir = + DorisCompoundDirectoryFactory::getDirectory(fs, dir_str.c_str()); + DorisCompoundReader reader(dir, file_str.c_str()); + std::vector<std::string> files; + reader.list(&files); + reader.close(); + + // why is 3? + // bkd index will write at least 3 files + if (files.size() < 3) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id[" + << col_unique_id << "]," << inverted_index_src_file_path + << " is corrupted, will skip index compaction"; + return false; + } + } + return true; + }; - return Status::OK(); + bool all_have_inverted_index = std::all_of(_input_rowsets.begin(), _input_rowsets.end(), + std::move(has_inverted_index)); + + if (all_have_inverted_index && + field_is_slice_type(_cur_tablet_schema->column_by_uid(col_unique_id).type())) { + ctx.skip_inverted_index.insert(col_unique_id); + } + } } -Status Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool is_vertical) { +Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx) { + construct_skip_inverted_index(ctx); ctx.version = _output_version; ctx.rowset_state = VISIBLE; ctx.segments_overlap = NONOVERLAPPING; ctx.tablet_schema = _cur_tablet_schema; ctx.newest_write_timestamp = _newest_write_timestamp; ctx.write_type = DataWriteType::TYPE_COMPACTION; - if (config::inverted_index_compaction_enable && - ((_tablet->keys_type() == KeysType::UNIQUE_KEYS || - _tablet->keys_type() == KeysType::DUP_KEYS))) { - for (const auto& index : _cur_tablet_schema->indexes()) { - if (index.index_type() == IndexType::INVERTED) { - auto col_unique_id = index.col_unique_ids()[0]; - //NOTE: here src_rs may be in building index progress, so it would not contain inverted index info. - bool all_have_inverted_index = std::all_of( - _input_rowsets.begin(), _input_rowsets.end(), [&](const auto& src_rs) { - BetaRowsetSharedPtr rowset = - std::static_pointer_cast<BetaRowset>(src_rs); - if (rowset == nullptr) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] rowset is null, will skip index compaction"; - return false; - } - if (rowset->is_skip_index_compaction(col_unique_id)) { - LOG(WARNING) - << "tablet[" << _tablet->tablet_id() << "] rowset[" - << rowset->rowset_id() << "] column_unique_id[" - << col_unique_id - << "] skip inverted index compaction due to last failure"; - return false; - } - auto fs = rowset->rowset_meta()->fs(); - - const auto* index_meta = - rowset->tablet_schema()->get_inverted_index(col_unique_id, ""); - if (index_meta == nullptr) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] column_unique_id[" << col_unique_id - << "] index meta is null, will skip index compaction"; - return false; - } - for (auto i = 0; i < rowset->num_segments(); i++) { - auto segment_file = rowset->segment_file_path(i); - std::string inverted_index_src_file_path = - InvertedIndexDescriptor::get_index_file_name( - segment_file, index_meta->index_id(), - index_meta->get_index_suffix()); - bool exists = false; - if (!fs->exists(inverted_index_src_file_path, &exists).ok()) { - LOG(ERROR) - << inverted_index_src_file_path << " fs->exists error"; - return false; - } - if (!exists) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] column_unique_id[" << col_unique_id << "]," - << inverted_index_src_file_path - << " is not exists, will skip index compaction"; - return false; - } - - // check idx file size - int64_t file_size = 0; - if (fs->file_size(inverted_index_src_file_path, &file_size) != - Status::OK()) { - LOG(ERROR) << inverted_index_src_file_path - << " fs->file_size error"; - return false; - } - if (file_size == 0) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] column_unique_id[" << col_unique_id << "]," - << inverted_index_src_file_path - << " is empty file, will skip index compaction"; - return false; - } - - // check index meta - std::filesystem::path p(inverted_index_src_file_path); - std::string dir_str = p.parent_path().string(); - std::string file_str = p.filename().string(); - lucene::store::Directory* dir = - DorisCompoundDirectoryFactory::getDirectory( - fs, dir_str.c_str()); - DorisCompoundReader reader(dir, file_str.c_str()); - std::vector<std::string> files; - reader.list(&files); - reader.close(); - - // why is 3? - // bkd index will write at least 3 files - if (files.size() < 3) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] column_unique_id[" << col_unique_id << "]," - << inverted_index_src_file_path - << " is corrupted, will skip index compaction"; - return false; - } - } - return true; - }); - if (all_have_inverted_index && - field_is_slice_type(_cur_tablet_schema->column_by_uid(col_unique_id).type())) { - ctx.skip_inverted_index.insert(col_unique_id); - } - } - } - } - if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) { - // write output rowset to storage policy resource - auto storage_policy = get_storage_policy(_tablet->storage_policy_id()); - if (storage_policy == nullptr) { - return Status::InternalError("could not find storage_policy, storage_policy_id={}", - _tablet->storage_policy_id()); - } - auto resource = get_storage_resource(storage_policy->resource_id); - if (resource.fs == nullptr) { - return Status::InternalError("could not find resource, resouce_id={}", - storage_policy->resource_id); - } - DCHECK(atol(resource.fs->id().c_str()) == storage_policy->resource_id); - DCHECK(resource.fs->type() != io::FileSystemType::LOCAL); - ctx.fs = std::move(resource.fs); - } - _output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, is_vertical)); - _pending_rs_guard = ExecEnv::GetInstance()->storage_engine().to_local().add_pending_rowset(ctx); - return Status::OK(); -} - -Status Compaction::construct_input_rowset_readers() { - for (auto& rowset : _input_rowsets) { - RowsetReaderSharedPtr rs_reader; - RETURN_IF_ERROR(rowset->create_reader(&rs_reader)); - _input_rs_readers.push_back(std::move(rs_reader)); - } + _output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, _is_vertical)); + _pending_rs_guard = _engine.add_pending_rowset(ctx); return Status::OK(); } -Status Compaction::modify_rowsets(const Merger::Statistics* stats) { +Status CompactionMixin::modify_rowsets() { Review Comment: warning: function 'modify_rowsets' has cognitive complexity of 52 (threshold 50) [readability-function-cognitive-complexity] ```cpp Status CompactionMixin::modify_rowsets() { ^ ``` <details> <summary>Additional context</summary> **be/src/olap/compaction.cpp:656:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && ^ ``` **be/src/olap/compaction.cpp:673:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (!_allow_delete_in_cumu_compaction) { ^ ``` **be/src/olap/compaction.cpp:675:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && ^ ``` **be/src/olap/compaction.cpp:675:** +1 ```cpp if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && ^ ``` **be/src/olap/compaction.cpp:687:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (config::enable_rowid_conversion_correctness_check) { ^ ``` **be/src/olap/compaction.cpp:688:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map)); ^ ``` **be/src/common/status.h:540:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/compaction.cpp:688:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map)); ^ ``` **be/src/common/status.h:542:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/compaction.cpp:695:** nesting level increased to 2 ```cpp SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); ^ ``` **be/src/util/trace.h:32:** expanded from macro 'SCOPED_SIMPLE_TRACE_IF_TIMEOUT' ```cpp SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING)) ^ ``` **be/src/util/trace.h:44:** expanded from macro 'SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT' ```cpp SCOPED_CLEANUP({ \ ^ ``` **be/src/util/scoped_cleanup.h:33:** expanded from macro 'SCOPED_CLEANUP' ```cpp auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body }); ^ ``` **be/src/olap/compaction.cpp:695:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); ^ ``` **be/src/util/trace.h:32:** expanded from macro 'SCOPED_SIMPLE_TRACE_IF_TIMEOUT' ```cpp SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING)) ^ ``` **be/src/util/trace.h:49:** expanded from macro 'SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT' ```cpp if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) { \ ^ ``` **be/src/olap/compaction.cpp:741:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (!_allow_delete_in_cumu_compaction && ^ ``` **be/src/olap/compaction.cpp:741:** +1 ```cpp if (!_allow_delete_in_cumu_compaction && ^ ``` **be/src/olap/compaction.cpp:744:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (missed_rows.size() != missed_rows_size) { ^ ``` **be/src/olap/compaction.cpp:750:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (config::enable_rowid_conversion_correctness_check) { ^ ``` **be/src/olap/compaction.cpp:751:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map)); ^ ``` **be/src/common/status.h:540:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/compaction.cpp:751:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map)); ^ ``` **be/src/common/status.h:542:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/compaction.cpp:755:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets, _input_rowsets, true)); ^ ``` **be/src/common/status.h:540:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/compaction.cpp:755:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets, _input_rowsets, true)); ^ ``` **be/src/common/status.h:542:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/compaction.cpp:757:** +1, nesting level increased to 1 ```cpp } else { ^ ``` **be/src/olap/compaction.cpp:759:** nesting level increased to 2 ```cpp SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); ^ ``` **be/src/util/trace.h:32:** expanded from macro 'SCOPED_SIMPLE_TRACE_IF_TIMEOUT' ```cpp SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING)) ^ ``` **be/src/util/trace.h:44:** expanded from macro 'SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT' ```cpp SCOPED_CLEANUP({ \ ^ ``` **be/src/util/scoped_cleanup.h:33:** expanded from macro 'SCOPED_CLEANUP' ```cpp auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body }); ^ ``` **be/src/olap/compaction.cpp:759:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); ^ ``` **be/src/util/trace.h:32:** expanded from macro 'SCOPED_SIMPLE_TRACE_IF_TIMEOUT' ```cpp SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING)) ^ ``` **be/src/util/trace.h:49:** expanded from macro 'SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT' ```cpp if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) { \ ^ ``` **be/src/olap/compaction.cpp:760:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets, _input_rowsets, true)); ^ ``` **be/src/common/status.h:540:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/compaction.cpp:760:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets, _input_rowsets, true)); ^ ``` **be/src/common/status.h:542:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/compaction.cpp:763:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (config::tablet_rowset_stale_sweep_by_size && ^ ``` **be/src/olap/compaction.cpp:775:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && ^ ``` **be/src/olap/compaction.cpp:779:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (!st.ok()) { ^ ``` </details> ########## be/src/olap/merger.cpp: ########## @@ -51,9 +51,33 @@ #include "vec/olap/vertical_merge_iterator.h" namespace doris { +namespace { + +// for mow with cluster key table, the key group also contains cluster key columns. +// the `key_group_cluster_key_idxes` marks the positions of cluster key columns in key group. +void _generate_key_group_cluster_key_idxes(const TabletSchema& tablet_schema, + std::vector<std::vector<uint32_t>>& column_groups, + std::vector<uint32_t>& key_group_cluster_key_idxes) { + if (column_groups.empty() || tablet_schema.cluster_key_idxes().empty()) { + return; + } + + auto& key_column_group = column_groups[0]; + for (const auto& index_in_tablet_schema : tablet_schema.cluster_key_idxes()) { + for (auto j = 0; j < key_column_group.size(); ++j) { + auto cid = key_column_group[j]; + if (cid == index_in_tablet_schema) { + key_group_cluster_key_idxes.emplace_back(j); + break; + } + } + } +} -Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, - TabletSchemaSPtr cur_tablet_schema, +} // namespace + +Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, Review Comment: warning: method 'vmerge_rowsets' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, ``` ########## be/src/olap/merger.cpp: ########## @@ -349,8 +374,8 @@ // 2. compact groups one by one, generate a row_source_buf when compact key group // and use this row_source_buf to compact value column groups // 3. build output rowset -Status Merger::vertical_merge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, - TabletSchemaSPtr tablet_schema, +Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, Review Comment: warning: method 'vertical_merge_rowsets' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, ``` ########## be/src/olap/compaction.cpp: ########## @@ -835,42 +749,42 @@ } if (config::enable_rowid_conversion_correctness_check) { - RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map)); + RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map)); } - _tablet->merge_delete_bitmap(output_rowset_delete_bitmap); - RETURN_IF_ERROR(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true)); + tablet()->merge_delete_bitmap(output_rowset_delete_bitmap); + RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets, _input_rowsets, true)); } } else { std::lock_guard<std::shared_mutex> wrlock(_tablet->get_header_lock()); SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); - RETURN_IF_ERROR(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true)); + RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets, _input_rowsets, true)); } if (config::tablet_rowset_stale_sweep_by_size && _tablet->tablet_meta()->all_stale_rs_metas().size() >= config::tablet_rowset_stale_sweep_threshold_size) { - _tablet->delete_expired_stale_rowset(); + tablet()->delete_expired_stale_rowset(); } int64_t cur_max_version = 0; { std::shared_lock rlock(_tablet->get_header_lock()); cur_max_version = _tablet->max_version_unlocked(); - _tablet->save_meta(); + tablet()->save_meta(); } if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { auto st = TabletMetaManager::remove_old_version_delete_bitmap( - _tablet->data_dir(), _tablet->tablet_id(), cur_max_version); + tablet()->data_dir(), _tablet->tablet_id(), cur_max_version); if (!st.ok()) { LOG(WARNING) << "failed to remove old version delete bitmap, st: " << st; } } return Status::OK(); } -bool Compaction::_check_if_includes_input_rowsets( +bool CompactionMixin::_check_if_includes_input_rowsets( const RowsetIdUnorderedSet& commit_rowset_ids_set) const { Review Comment: warning: method '_check_if_includes_input_rowsets' can be made static [readability-convert-member-functions-to-static] ```suggestion static bool CompactionMixin::_check_if_includes_input_rowsets( const RowsetIdUnorderedSet& commit_rowset_ids_set) { ``` ########## be/src/olap/merger.cpp: ########## @@ -51,9 +51,33 @@ #include "vec/olap/vertical_merge_iterator.h" namespace doris { +namespace { + +// for mow with cluster key table, the key group also contains cluster key columns. +// the `key_group_cluster_key_idxes` marks the positions of cluster key columns in key group. +void _generate_key_group_cluster_key_idxes(const TabletSchema& tablet_schema, + std::vector<std::vector<uint32_t>>& column_groups, + std::vector<uint32_t>& key_group_cluster_key_idxes) { + if (column_groups.empty() || tablet_schema.cluster_key_idxes().empty()) { + return; + } + + auto& key_column_group = column_groups[0]; + for (const auto& index_in_tablet_schema : tablet_schema.cluster_key_idxes()) { + for (auto j = 0; j < key_column_group.size(); ++j) { + auto cid = key_column_group[j]; + if (cid == index_in_tablet_schema) { + key_group_cluster_key_idxes.emplace_back(j); + break; + } + } + } +} -Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, - TabletSchemaSPtr cur_tablet_schema, +} // namespace + +Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, Review Comment: warning: function 'vmerge_rowsets' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, ^ ``` <details> <summary>Additional context</summary> **be/src/olap/merger.cpp:78:** 86 lines including whitespace and comments (threshold 80) ```cpp Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, ^ ``` </details> ########## be/src/olap/merger.cpp: ########## @@ -289,8 +314,8 @@ } // for segcompaction -Status Merger::vertical_compact_one_group(TabletSharedPtr tablet, ReaderType reader_type, - TabletSchemaSPtr tablet_schema, bool is_key, +Status Merger::vertical_compact_one_group(int64_t tablet_id, ReaderType reader_type, Review Comment: warning: method 'vertical_compact_one_group' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status Merger::vertical_compact_one_group(int64_t tablet_id, ReaderType reader_type, ``` ########## be/src/olap/merger.cpp: ########## @@ -145,10 +169,10 @@ // split columns into several groups, make sure all keys in one group // unique_key should consider sequence&delete column -void Merger::vertical_split_columns(TabletSchemaSPtr tablet_schema, +void Merger::vertical_split_columns(const TabletSchema& tablet_schema, Review Comment: warning: method 'vertical_split_columns' can be made static [readability-convert-member-functions-to-static] ```suggestion static void Merger::vertical_split_columns(const TabletSchema& tablet_schema, ``` ########## be/src/olap/compaction.cpp: ########## @@ -885,79 +799,21 @@ input_rowset_ids.begin(), input_rowset_ids.end()); } -void Compaction::gc_output_rowset() { - if (_state != CompactionState::SUCCESS && _output_rowset != nullptr) { - if (!_output_rowset->is_local()) { - _tablet->record_unused_remote_rowset(_output_rowset->rowset_id(), - _output_rowset->rowset_meta()->resource_id(), - _output_rowset->num_segments()); - return; - } - ExecEnv::GetInstance()->storage_engine().to_local().add_unused_rowset(_output_rowset); - } -} - -// Find the longest consecutive version path in "rowset", from beginning. -// Two versions before and after the missing version will be saved in missing_version, -// if missing_version is not null. -Status Compaction::find_longest_consecutive_version(std::vector<RowsetSharedPtr>* rowsets, - std::vector<Version>* missing_version) { - if (rowsets->empty()) { - return Status::OK(); - } - RowsetSharedPtr prev_rowset = rowsets->front(); - size_t i = 1; - for (; i < rowsets->size(); ++i) { - RowsetSharedPtr rowset = (*rowsets)[i]; - if (rowset->start_version() != prev_rowset->end_version() + 1) { - if (missing_version != nullptr) { - missing_version->push_back(prev_rowset->version()); - missing_version->push_back(rowset->version()); - } - break; - } - prev_rowset = rowset; - } - - rowsets->resize(i); - return Status::OK(); -} - -Status Compaction::check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets) { - if (rowsets.empty()) { - return Status::OK(); - } - RowsetSharedPtr prev_rowset = rowsets.front(); - for (size_t i = 1; i < rowsets.size(); ++i) { - RowsetSharedPtr rowset = rowsets[i]; - if (rowset->start_version() != prev_rowset->end_version() + 1) { - return Status::Error<CUMULATIVE_MISS_VERSION>( - "There are missed versions among rowsets. prev_rowset version={}-{}, rowset " - "version={}-{}", - prev_rowset->start_version(), prev_rowset->end_version(), - rowset->start_version(), rowset->end_version()); - } - prev_rowset = rowset; - } - - return Status::OK(); -} - -Status Compaction::check_correctness(const Merger::Statistics& stats) { +Status Compaction::check_correctness() { // 1. check row number - if (_input_row_num != _output_rowset->num_rows() + stats.merged_rows + stats.filtered_rows) { + if (_input_row_num != _output_rowset->num_rows() + _stats.merged_rows + _stats.filtered_rows) { return Status::Error<CHECK_LINES_ERROR>( "row_num does not match between cumulative input and output! tablet={}, " "input_row_num={}, merged_row_num={}, filtered_row_num={}, output_row_num={}", - _tablet->tablet_id(), _input_row_num, stats.merged_rows, stats.filtered_rows, + _tablet->tablet_id(), _input_row_num, _stats.merged_rows, _stats.filtered_rows, _output_rowset->num_rows()); } return Status::OK(); } -int64_t Compaction::get_compaction_permits() { +int64_t CompactionMixin::get_compaction_permits() { Review Comment: warning: method 'get_compaction_permits' can be made static [readability-convert-member-functions-to-static] be/src/olap/compaction.h:128: ```diff - int64_t get_compaction_permits(); + static int64_t get_compaction_permits(); ``` ########## be/src/olap/compaction.cpp: ########## @@ -885,79 +799,21 @@ input_rowset_ids.begin(), input_rowset_ids.end()); } -void Compaction::gc_output_rowset() { - if (_state != CompactionState::SUCCESS && _output_rowset != nullptr) { - if (!_output_rowset->is_local()) { - _tablet->record_unused_remote_rowset(_output_rowset->rowset_id(), - _output_rowset->rowset_meta()->resource_id(), - _output_rowset->num_segments()); - return; - } - ExecEnv::GetInstance()->storage_engine().to_local().add_unused_rowset(_output_rowset); - } -} - -// Find the longest consecutive version path in "rowset", from beginning. -// Two versions before and after the missing version will be saved in missing_version, -// if missing_version is not null. -Status Compaction::find_longest_consecutive_version(std::vector<RowsetSharedPtr>* rowsets, - std::vector<Version>* missing_version) { - if (rowsets->empty()) { - return Status::OK(); - } - RowsetSharedPtr prev_rowset = rowsets->front(); - size_t i = 1; - for (; i < rowsets->size(); ++i) { - RowsetSharedPtr rowset = (*rowsets)[i]; - if (rowset->start_version() != prev_rowset->end_version() + 1) { - if (missing_version != nullptr) { - missing_version->push_back(prev_rowset->version()); - missing_version->push_back(rowset->version()); - } - break; - } - prev_rowset = rowset; - } - - rowsets->resize(i); - return Status::OK(); -} - -Status Compaction::check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets) { - if (rowsets.empty()) { - return Status::OK(); - } - RowsetSharedPtr prev_rowset = rowsets.front(); - for (size_t i = 1; i < rowsets.size(); ++i) { - RowsetSharedPtr rowset = rowsets[i]; - if (rowset->start_version() != prev_rowset->end_version() + 1) { - return Status::Error<CUMULATIVE_MISS_VERSION>( - "There are missed versions among rowsets. prev_rowset version={}-{}, rowset " - "version={}-{}", - prev_rowset->start_version(), prev_rowset->end_version(), - rowset->start_version(), rowset->end_version()); - } - prev_rowset = rowset; - } - - return Status::OK(); -} - -Status Compaction::check_correctness(const Merger::Statistics& stats) { +Status Compaction::check_correctness() { Review Comment: warning: method 'check_correctness' can be made static [readability-convert-member-functions-to-static] be/src/olap/compaction.h:69: ```diff - Status check_correctness(); + static Status check_correctness(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org