github-actions[bot] commented on code in PR #30839: URL: https://github.com/apache/doris/pull/30839#discussion_r1478358227
########## be/src/olap/compaction.cpp: ########## @@ -293,449 +337,338 @@ bool Compaction::handle_ordered_data_compaction() { // most rowset of current compaction is nonoverlapping // just handle nonoverlappint rowsets auto st = do_compact_ordered_rowsets(); + return st.ok(); +} + +Status CompactionMixin::execute_compact() { + uint32_t checksum_before; + uint32_t checksum_after; + bool enable_compaction_checksum = config::enable_compaction_checksum; + if (enable_compaction_checksum) { + EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), + _input_rowsets.back()->end_version(), &checksum_before); + RETURN_IF_ERROR(checksum_task.execute()); + } + + auto* data_dir = tablet()->data_dir(); + int64_t permits = get_compaction_permits(); + data_dir->disks_compaction_score_increment(permits); + data_dir->disks_compaction_num_increment(1); + + Status st = execute_compact_impl(permits); + + data_dir->disks_compaction_score_increment(-permits); + data_dir->disks_compaction_num_increment(-1); + if (!st.ok()) { - return false; + return st; } - return true; + + if (enable_compaction_checksum) { + EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), + _input_rowsets.back()->end_version(), &checksum_after); + RETURN_IF_ERROR(checksum_task.execute()); + if (checksum_before != checksum_after) { + return Status::InternalError( + "compaction tablet checksum not consistent, before={}, after={}, tablet_id={}", + checksum_before, checksum_after, _tablet->tablet_id()); + } + } + + _load_segment_to_cache(); + return Status::OK(); } -Status Compaction::do_compaction_impl(int64_t permits) { +Status CompactionMixin::execute_compact_impl(int64_t permits) { OlapStopWatch watch; if (handle_ordered_data_compaction()) { RETURN_IF_ERROR(modify_rowsets()); - - int64_t now = UnixMillis(); - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { - _tablet->set_last_cumu_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { - _tablet->set_last_base_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) { - _tablet->set_last_full_compaction_success_time(now); - } - auto cumu_policy = _tablet->cumulative_compaction_policy(); LOG(INFO) << "succeed to do ordered data " << compaction_name() << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version - << ", disk=" << _tablet->data_dir()->path() + << ", disk=" << tablet()->data_dir()->path() << ", segments=" << _input_num_segments << ", input_row_num=" << _input_row_num << ", output_row_num=" << _output_rowset->num_rows() << ", input_rowset_size=" << _input_rowsets_size << ", output_rowset_size=" << _output_rowset->data_disk_size() - << ". elapsed time=" << watch.get_elapse_second() - << "s. cumulative_compaction_policy=" - << (cumu_policy == nullptr ? "quick" : cumu_policy->name()); + << ". elapsed time=" << watch.get_elapse_second() << "s."; + _state = CompactionState::SUCCESS; return Status::OK(); } build_basic_info(); LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version << ", permits: " << permits; - bool vertical_compaction = should_vertical_compaction(); - RowsetWriterContext ctx; - RETURN_IF_ERROR(construct_input_rowset_readers()); - RETURN_IF_ERROR(construct_output_rowset_writer(ctx, vertical_compaction)); - // 2. write merged rows to output rowset - // The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool - Merger::Statistics stats; - // if ctx.skip_inverted_index.size() > 0, it means we need to do inverted index compaction. - // the row ID conversion matrix needs to be used for inverted index compaction. - if (ctx.skip_inverted_index.size() > 0 || (_tablet->keys_type() == KeysType::UNIQUE_KEYS && - _tablet->enable_unique_key_merge_on_write())) { - stats.rowid_conversion = &_rowid_conversion; - } + RETURN_IF_ERROR(merge_input_rowsets()); - Status res; - { - SCOPED_TIMER(_merge_rowsets_latency_timer); - if (vertical_compaction) { - res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, - _input_rs_readers, _output_rs_writer.get(), - get_avg_segment_rows(), &stats); - } else { - res = Merger::vmerge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, - _input_rs_readers, _output_rs_writer.get(), &stats); - } - } + RETURN_IF_ERROR(do_inverted_index_compaction()); - if (!res.ok()) { - LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res - << ", tablet=" << _tablet->tablet_id() - << ", output_version=" << _output_version; - return res; + RETURN_IF_ERROR(modify_rowsets()); + + auto* cumu_policy = tablet()->cumulative_compaction_policy(); + DCHECK(cumu_policy); + LOG(INFO) << "succeed to do " << compaction_name() << " is_vertical=" << _is_vertical + << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version + << ", current_max_version=" << tablet()->max_version().second + << ", disk=" << tablet()->data_dir()->path() << ", segments=" << _input_num_segments + << ", input_rowset_size=" << _input_rowsets_size + << ", output_rowset_size=" << _output_rowset->data_disk_size() + << ", input_row_num=" << _input_row_num + << ", output_row_num=" << _output_rowset->num_rows() + << ", filtered_row_num=" << _stats.filtered_rows + << ", merged_row_num=" << _stats.merged_rows + << ". elapsed time=" << watch.get_elapse_second() + << "s. cumulative_compaction_policy=" << cumu_policy->name() + << ", compact_row_per_second=" << int(_input_row_num / watch.get_elapse_second()); + + _state = CompactionState::SUCCESS; + + return Status::OK(); +} + +Status CompactionMixin::do_inverted_index_compaction() { Review Comment: warning: function 'do_inverted_index_compaction' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status CompactionMixin::do_inverted_index_compaction() { ^ ``` <details> <summary>Additional context</summary> **be/src/olap/compaction.cpp:429:** 124 lines including whitespace and comments (threshold 80) ```cpp Status CompactionMixin::do_inverted_index_compaction() { ^ ``` </details> ########## be/src/olap/compaction.cpp: ########## @@ -293,449 +337,338 @@ // most rowset of current compaction is nonoverlapping // just handle nonoverlappint rowsets auto st = do_compact_ordered_rowsets(); + return st.ok(); +} + +Status CompactionMixin::execute_compact() { + uint32_t checksum_before; + uint32_t checksum_after; + bool enable_compaction_checksum = config::enable_compaction_checksum; + if (enable_compaction_checksum) { + EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), + _input_rowsets.back()->end_version(), &checksum_before); + RETURN_IF_ERROR(checksum_task.execute()); + } + + auto* data_dir = tablet()->data_dir(); + int64_t permits = get_compaction_permits(); + data_dir->disks_compaction_score_increment(permits); + data_dir->disks_compaction_num_increment(1); + + Status st = execute_compact_impl(permits); + + data_dir->disks_compaction_score_increment(-permits); + data_dir->disks_compaction_num_increment(-1); + if (!st.ok()) { - return false; + return st; } - return true; + + if (enable_compaction_checksum) { + EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), + _input_rowsets.back()->end_version(), &checksum_after); + RETURN_IF_ERROR(checksum_task.execute()); + if (checksum_before != checksum_after) { + return Status::InternalError( + "compaction tablet checksum not consistent, before={}, after={}, tablet_id={}", + checksum_before, checksum_after, _tablet->tablet_id()); + } + } + + _load_segment_to_cache(); + return Status::OK(); } -Status Compaction::do_compaction_impl(int64_t permits) { +Status CompactionMixin::execute_compact_impl(int64_t permits) { OlapStopWatch watch; if (handle_ordered_data_compaction()) { RETURN_IF_ERROR(modify_rowsets()); - - int64_t now = UnixMillis(); - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { - _tablet->set_last_cumu_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { - _tablet->set_last_base_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) { - _tablet->set_last_full_compaction_success_time(now); - } - auto cumu_policy = _tablet->cumulative_compaction_policy(); LOG(INFO) << "succeed to do ordered data " << compaction_name() << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version - << ", disk=" << _tablet->data_dir()->path() + << ", disk=" << tablet()->data_dir()->path() << ", segments=" << _input_num_segments << ", input_row_num=" << _input_row_num << ", output_row_num=" << _output_rowset->num_rows() << ", input_rowset_size=" << _input_rowsets_size << ", output_rowset_size=" << _output_rowset->data_disk_size() - << ". elapsed time=" << watch.get_elapse_second() - << "s. cumulative_compaction_policy=" - << (cumu_policy == nullptr ? "quick" : cumu_policy->name()); + << ". elapsed time=" << watch.get_elapse_second() << "s."; + _state = CompactionState::SUCCESS; return Status::OK(); } build_basic_info(); LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version << ", permits: " << permits; - bool vertical_compaction = should_vertical_compaction(); - RowsetWriterContext ctx; - RETURN_IF_ERROR(construct_input_rowset_readers()); - RETURN_IF_ERROR(construct_output_rowset_writer(ctx, vertical_compaction)); - // 2. write merged rows to output rowset - // The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool - Merger::Statistics stats; - // if ctx.skip_inverted_index.size() > 0, it means we need to do inverted index compaction. - // the row ID conversion matrix needs to be used for inverted index compaction. - if (ctx.skip_inverted_index.size() > 0 || (_tablet->keys_type() == KeysType::UNIQUE_KEYS && - _tablet->enable_unique_key_merge_on_write())) { - stats.rowid_conversion = &_rowid_conversion; - } + RETURN_IF_ERROR(merge_input_rowsets()); - Status res; - { - SCOPED_TIMER(_merge_rowsets_latency_timer); - if (vertical_compaction) { - res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, - _input_rs_readers, _output_rs_writer.get(), - get_avg_segment_rows(), &stats); - } else { - res = Merger::vmerge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, - _input_rs_readers, _output_rs_writer.get(), &stats); - } - } + RETURN_IF_ERROR(do_inverted_index_compaction()); - if (!res.ok()) { - LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res - << ", tablet=" << _tablet->tablet_id() - << ", output_version=" << _output_version; - return res; + RETURN_IF_ERROR(modify_rowsets()); + + auto* cumu_policy = tablet()->cumulative_compaction_policy(); + DCHECK(cumu_policy); + LOG(INFO) << "succeed to do " << compaction_name() << " is_vertical=" << _is_vertical + << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version + << ", current_max_version=" << tablet()->max_version().second + << ", disk=" << tablet()->data_dir()->path() << ", segments=" << _input_num_segments + << ", input_rowset_size=" << _input_rowsets_size + << ", output_rowset_size=" << _output_rowset->data_disk_size() + << ", input_row_num=" << _input_row_num + << ", output_row_num=" << _output_rowset->num_rows() + << ", filtered_row_num=" << _stats.filtered_rows + << ", merged_row_num=" << _stats.merged_rows + << ". elapsed time=" << watch.get_elapse_second() + << "s. cumulative_compaction_policy=" << cumu_policy->name() + << ", compact_row_per_second=" << int(_input_row_num / watch.get_elapse_second()); + + _state = CompactionState::SUCCESS; + + return Status::OK(); +} + +Status CompactionMixin::do_inverted_index_compaction() { + const auto& ctx = _output_rs_writer->context(); + if (!config::inverted_index_compaction_enable || _input_row_num <= 0 || + !_stats.rowid_conversion || ctx.skip_inverted_index.empty()) { + return Status::OK(); } - COUNTER_UPDATE(_merged_rows_counter, stats.merged_rows); - COUNTER_UPDATE(_filtered_rows_counter, stats.filtered_rows); - RETURN_NOT_OK_STATUS_WITH_WARN(_output_rs_writer->build(_output_rowset), - fmt::format("rowset writer build failed. output_version: {}", - _output_version.to_string())); - // Now we support delete in cumu compaction, to make all data in rowsets whose version - // is below output_version to be delete in the future base compaction, we should carry - // all delete predicate in the output rowset. - // Output start version > 2 means we must set the delete predicate in the output rowset - if (allow_delete_in_cumu_compaction() && _output_rowset->version().first > 2) { - DeletePredicatePB delete_predicate; - std::accumulate( - _input_rs_readers.begin(), _input_rs_readers.end(), &delete_predicate, - [](DeletePredicatePB* delete_predicate, const RowsetReaderSharedPtr& reader) { - if (reader->rowset()->rowset_meta()->has_delete_predicate()) { - delete_predicate->MergeFrom( - reader->rowset()->rowset_meta()->delete_predicate()); - } - return delete_predicate; - }); - // now version in delete_predicate is deprecated - if (!delete_predicate.in_predicates().empty() || - !delete_predicate.sub_predicates_v2().empty() || - !delete_predicate.sub_predicates().empty()) { - _output_rowset->rowset_meta()->set_delete_predicate(std::move(delete_predicate)); + OlapStopWatch inverted_watch; + + Version version = tablet()->max_version(); + DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id()); + std::set<RowLocation> missed_rows; + std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map; + // Convert the delete bitmap of the input rowsets to output rowset. + tablet()->calc_compaction_output_rowset_delete_bitmap( + _input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows, &location_map, + _tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap); + + if (!_allow_delete_in_cumu_compaction) { + if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && + _stats.merged_rows != missed_rows.size()) { + std::string err_msg = fmt::format( + "cumulative compaction: the merged rows({}) is not equal to missed " + "rows({}) in rowid conversion, tablet_id: {}, table_id:{}", + _stats.merged_rows, missed_rows.size(), _tablet->tablet_id(), + _tablet->table_id()); + DCHECK(false) << err_msg; + LOG(WARNING) << err_msg; } } - COUNTER_UPDATE(_output_rowset_data_size_counter, _output_rowset->data_disk_size()); - COUNTER_UPDATE(_output_row_num_counter, _output_rowset->num_rows()); - COUNTER_UPDATE(_output_segments_num_counter, _output_rowset->num_segments()); + RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map)); - // 3. check correctness - RETURN_IF_ERROR(check_correctness(stats)); + // translation vec + // <<dest_idx_num, dest_docId>> + // the first level vector: index indicates src segment. + // the second level vector: index indicates row id of source segment, + // value indicates row id of destination segment. + // <UINT32_MAX, UINT32_MAX> indicates current row not exist. + const auto& trans_vec = _stats.rowid_conversion->get_rowid_conversion_map(); - if (_input_row_num > 0 && stats.rowid_conversion && config::inverted_index_compaction_enable && - !ctx.skip_inverted_index.empty()) { - OlapStopWatch inverted_watch; + // source rowset,segment -> index_id + const auto& src_seg_to_id_map = _stats.rowid_conversion->get_src_segment_to_id_map(); - // check rowid_conversion correctness - Version version = _tablet->max_version(); - DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id()); - std::set<RowLocation> missed_rows; - std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map; - // Convert the delete bitmap of the input rowsets to output rowset. - std::size_t missed_rows_size = 0; - _tablet->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows, - &location_map, _tablet->tablet_meta()->delete_bitmap(), - &output_rowset_delete_bitmap); - if (!allow_delete_in_cumu_compaction()) { - missed_rows_size = missed_rows.size(); - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && - stats.merged_rows != missed_rows_size) { - std::string err_msg = fmt::format( - "cumulative compaction: the merged rows({}) is not equal to missed " - "rows({}) in rowid conversion, tablet_id: {}, table_id:{}", - stats.merged_rows, missed_rows_size, _tablet->tablet_id(), - _tablet->table_id()); - DCHECK(false) << err_msg; - LOG(WARNING) << err_msg; - } - } + // dest rowset id + RowsetId dest_rowset_id = _stats.rowid_conversion->get_dst_rowset_id(); + // dest segment id -> num rows + std::vector<uint32_t> dest_segment_num_rows; + RETURN_IF_ERROR(_output_rs_writer->get_segment_num_rows(&dest_segment_num_rows)); - RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map)); - - // translation vec - // <<dest_idx_num, dest_docId>> - // the first level vector: index indicates src segment. - // the second level vector: index indicates row id of source segment, - // value indicates row id of destination segment. - // <UINT32_MAX, UINT32_MAX> indicates current row not exist. - std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec = - stats.rowid_conversion->get_rowid_conversion_map(); - - // source rowset,segment -> index_id - std::map<std::pair<RowsetId, uint32_t>, uint32_t> src_seg_to_id_map = - stats.rowid_conversion->get_src_segment_to_id_map(); - // dest rowset id - RowsetId dest_rowset_id = stats.rowid_conversion->get_dst_rowset_id(); - // dest segment id -> num rows - std::vector<uint32_t> dest_segment_num_rows; - RETURN_IF_ERROR(_output_rs_writer->get_segment_num_rows(&dest_segment_num_rows)); - - auto src_segment_num = src_seg_to_id_map.size(); - auto dest_segment_num = dest_segment_num_rows.size(); - - if (dest_segment_num > 0) { - // src index files - // format: rowsetId_segmentId - std::vector<std::string> src_index_files(src_segment_num); - for (const auto& m : src_seg_to_id_map) { - std::pair<RowsetId, uint32_t> p = m.first; - src_index_files[m.second] = p.first.to_string() + "_" + std::to_string(p.second); - } + auto src_segment_num = src_seg_to_id_map.size(); + auto dest_segment_num = dest_segment_num_rows.size(); - // dest index files - // format: rowsetId_segmentId - std::vector<std::string> dest_index_files(dest_segment_num); - for (int i = 0; i < dest_segment_num; ++i) { - auto prefix = dest_rowset_id.to_string() + "_" + std::to_string(i); - dest_index_files[i] = prefix; - } + if (dest_segment_num <= 0) { + LOG(INFO) << "skip doing index compaction due to no output segments" + << ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num + << ", output row number=" << _output_rowset->num_rows() + << ". elapsed time=" << inverted_watch.get_elapse_second() << "s."; + return Status::OK(); + } - // create index_writer to compaction indexes - const auto& fs = _output_rowset->rowset_meta()->fs(); - const auto& tablet_path = _tablet->tablet_path(); - - // we choose the first destination segment name as the temporary index writer path - // Used to distinguish between different index compaction - auto index_writer_path = tablet_path + "/" + dest_index_files[0]; - LOG(INFO) << "start index compaction" - << ". tablet=" << _tablet->tablet_id() - << ", source index size=" << src_segment_num - << ", destination index size=" << dest_segment_num << "."; - Status status = Status::OK(); - std::for_each( - ctx.skip_inverted_index.cbegin(), ctx.skip_inverted_index.cend(), - [&src_segment_num, &dest_segment_num, &index_writer_path, &src_index_files, - &dest_index_files, &fs, &tablet_path, &trans_vec, &dest_segment_num_rows, - &status, this](int32_t column_uniq_id) { - auto index_id = _cur_tablet_schema->get_inverted_index(column_uniq_id, "") - ->index_id(); - try { - auto st = compact_column(index_id, src_segment_num, dest_segment_num, - src_index_files, dest_index_files, fs, - index_writer_path, tablet_path, trans_vec, - dest_segment_num_rows); - if (!st.ok()) { - LOG(WARNING) << "failed to do index compaction" - << ". tablet=" << _tablet->tablet_id() - << ". column uniq id=" << column_uniq_id - << ". index_id=" << index_id; - for (auto& rowset : _input_rowsets) { - rowset->set_skip_index_compaction(column_uniq_id); - LOG(INFO) << "mark skipping inverted index compaction next time" - << ". tablet=" << _tablet->tablet_id() - << ", rowset=" << rowset->rowset_id() - << ", column uniq id=" << column_uniq_id - << ", index_id=" << index_id; - } - status = Status::Error<ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>( - st.msg()); - } - } catch (CLuceneError& e) { - LOG(WARNING) << "failed to do index compaction" - << ". tablet=" << _tablet->tablet_id() - << ", column uniq id=" << column_uniq_id - << ", index_id=" << index_id; - for (auto& rowset : _input_rowsets) { - rowset->set_skip_index_compaction(column_uniq_id); - LOG(INFO) << "mark skipping inverted index compaction next time" - << ". tablet=" << _tablet->tablet_id() - << ", rowset=" << rowset->rowset_id() - << ", column uniq id=" << column_uniq_id - << ", index_id=" << index_id; - } - status = Status::Error<ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>( - e.what()); - } - }); - - // check index compaction status. If status is not ok, we should return error and end this compaction round. - if (!status.ok()) { - return status; - } + // src index files + // format: rowsetId_segmentId + std::vector<std::string> src_index_files(src_segment_num); + for (const auto& m : src_seg_to_id_map) { + std::pair<RowsetId, uint32_t> p = m.first; + src_index_files[m.second] = p.first.to_string() + "_" + std::to_string(p.second); + } - LOG(INFO) << "succeed to do index compaction" - << ". tablet=" << _tablet->tablet_id() - << ", input row number=" << _input_row_num - << ", output row number=" << _output_rowset->num_rows() - << ", input_rowset_size=" << _input_rowsets_size - << ", output_rowset_size=" << _output_rowset->data_disk_size() - << ". elapsed time=" << inverted_watch.get_elapse_second() << "s."; - } else { - LOG(INFO) << "skip doing index compaction due to no output segments" - << ". tablet=" << _tablet->tablet_id() - << ", input row number=" << _input_row_num - << ", output row number=" << _output_rowset->num_rows() - << ". elapsed time=" << inverted_watch.get_elapse_second() << "s."; - } + // dest index files + // format: rowsetId_segmentId + std::vector<std::string> dest_index_files(dest_segment_num); + for (int i = 0; i < dest_segment_num; ++i) { + auto prefix = dest_rowset_id.to_string() + "_" + std::to_string(i); + dest_index_files[i] = prefix; } - // 4. modify rowsets in memory - RETURN_IF_ERROR(modify_rowsets(&stats)); + // create index_writer to compaction indexes + const auto& fs = _output_rowset->rowset_meta()->fs(); + const auto& tablet_path = _tablet->tablet_path(); - // 5. update last success compaction time - int64_t now = UnixMillis(); - // TODO(yingchun): do the judge in Tablet class - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { - _tablet->set_last_cumu_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { - _tablet->set_last_base_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) { - _tablet->set_last_full_compaction_success_time(now); - } + // we choose the first destination segment name as the temporary index writer path + // Used to distinguish between different index compaction + auto index_writer_path = tablet_path + "/" + dest_index_files[0]; + LOG(INFO) << "start index compaction" + << ". tablet=" << _tablet->tablet_id() << ", source index size=" << src_segment_num + << ", destination index size=" << dest_segment_num << "."; - int64_t current_max_version = -1; - { - std::shared_lock rdlock(_tablet->get_header_lock()); - current_max_version = -1; - if (RowsetSharedPtr max_rowset = _tablet->get_rowset_with_max_version()) { - current_max_version = max_rowset->end_version(); + auto error_handler = [this](int64_t index_id, int64_t column_uniq_id) { + LOG(WARNING) << "failed to do index compaction" + << ". tablet=" << _tablet->tablet_id() << ". column uniq id=" << column_uniq_id + << ". index_id=" << index_id; + for (auto& rowset : _input_rowsets) { + rowset->set_skip_index_compaction(column_uniq_id); + LOG(INFO) << "mark skipping inverted index compaction next time" + << ". tablet=" << _tablet->tablet_id() << ", rowset=" << rowset->rowset_id() + << ", column uniq id=" << column_uniq_id << ", index_id=" << index_id; + } + }; + + for (auto&& column_uniq_id : ctx.skip_inverted_index) { + auto index_id = _cur_tablet_schema->get_inverted_index(column_uniq_id, "")->index_id(); + try { + auto st = compact_column(index_id, src_segment_num, dest_segment_num, src_index_files, + dest_index_files, fs, index_writer_path, tablet_path, + trans_vec, dest_segment_num_rows); + if (!st.ok()) { + error_handler(index_id, column_uniq_id); + return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(st.msg()); + } + } catch (CLuceneError& e) { + error_handler(index_id, column_uniq_id); + return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(e.what()); } } - auto* cumu_policy = _tablet->cumulative_compaction_policy(); - DCHECK(cumu_policy); - LOG(INFO) << "succeed to do " << compaction_name() << " is_vertical=" << vertical_compaction - << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version - << ", current_max_version=" << current_max_version - << ", disk=" << _tablet->data_dir()->path() << ", segments=" << _input_num_segments + LOG(INFO) << "succeed to do index compaction" + << ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num + << ", output row number=" << _output_rowset->num_rows() << ", input_rowset_size=" << _input_rowsets_size << ", output_rowset_size=" << _output_rowset->data_disk_size() - << ", input_row_num=" << _input_row_num - << ", output_row_num=" << _output_rowset->num_rows() - << ", filtered_row_num=" << stats.filtered_rows - << ", merged_row_num=" << stats.merged_rows - << ". elapsed time=" << watch.get_elapse_second() - << "s. cumulative_compaction_policy=" << cumu_policy->name() - << ", compact_row_per_second=" << int(_input_row_num / watch.get_elapse_second()); + << ". elapsed time=" << inverted_watch.get_elapse_second() << "s."; return Status::OK(); } -Status Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool is_vertical) { - ctx.version = _output_version; - ctx.rowset_state = VISIBLE; - ctx.segments_overlap = NONOVERLAPPING; - ctx.tablet_schema = _cur_tablet_schema; - ctx.newest_write_timestamp = _newest_write_timestamp; - ctx.write_type = DataWriteType::TYPE_COMPACTION; - if (config::inverted_index_compaction_enable && - ((_tablet->keys_type() == KeysType::UNIQUE_KEYS || - _tablet->keys_type() == KeysType::DUP_KEYS))) { - for (const auto& index : _cur_tablet_schema->indexes()) { - if (index.index_type() == IndexType::INVERTED) { - auto col_unique_id = index.col_unique_ids()[0]; - //NOTE: here src_rs may be in building index progress, so it would not contain inverted index info. - bool all_have_inverted_index = std::all_of( - _input_rowsets.begin(), _input_rowsets.end(), [&](const auto& src_rs) { - BetaRowsetSharedPtr rowset = - std::static_pointer_cast<BetaRowset>(src_rs); - if (rowset == nullptr) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] rowset is null, will skip index compaction"; - return false; - } - if (rowset->is_skip_index_compaction(col_unique_id)) { - LOG(WARNING) - << "tablet[" << _tablet->tablet_id() << "] rowset[" - << rowset->rowset_id() << "] column_unique_id[" - << col_unique_id - << "] skip inverted index compaction due to last failure"; - return false; - } - auto fs = rowset->rowset_meta()->fs(); - - const auto* index_meta = - rowset->tablet_schema()->get_inverted_index(col_unique_id, ""); - if (index_meta == nullptr) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] column_unique_id[" << col_unique_id - << "] index meta is null, will skip index compaction"; - return false; - } - for (auto i = 0; i < rowset->num_segments(); i++) { - auto segment_file = rowset->segment_file_path(i); - std::string inverted_index_src_file_path = - InvertedIndexDescriptor::get_index_file_name( - segment_file, index_meta->index_id(), - index_meta->get_index_suffix()); - bool exists = false; - if (!fs->exists(inverted_index_src_file_path, &exists).ok()) { - LOG(ERROR) - << inverted_index_src_file_path << " fs->exists error"; - return false; - } - if (!exists) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] column_unique_id[" << col_unique_id << "]," - << inverted_index_src_file_path - << " is not exists, will skip index compaction"; - return false; - } - - // check idx file size - int64_t file_size = 0; - if (fs->file_size(inverted_index_src_file_path, &file_size) != - Status::OK()) { - LOG(ERROR) << inverted_index_src_file_path - << " fs->file_size error"; - return false; - } - if (file_size == 0) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] column_unique_id[" << col_unique_id << "]," - << inverted_index_src_file_path - << " is empty file, will skip index compaction"; - return false; - } - - // check index meta - std::filesystem::path p(inverted_index_src_file_path); - std::string dir_str = p.parent_path().string(); - std::string file_str = p.filename().string(); - lucene::store::Directory* dir = - DorisCompoundDirectoryFactory::getDirectory( - fs, dir_str.c_str()); - DorisCompoundReader reader(dir, file_str.c_str()); - std::vector<std::string> files; - reader.list(&files); - reader.close(); - - // why is 3? - // bkd index will write at least 3 files - if (files.size() < 3) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] column_unique_id[" << col_unique_id << "]," - << inverted_index_src_file_path - << " is corrupted, will skip index compaction"; - return false; - } - } - return true; - }); - if (all_have_inverted_index && - field_is_slice_type(_cur_tablet_schema->column_by_uid(col_unique_id).type())) { - ctx.skip_inverted_index.insert(col_unique_id); +void CompactionMixin::construct_skip_inverted_index(RowsetWriterContext& ctx) { + for (const auto& index : _cur_tablet_schema->indexes()) { + if (index.index_type() != IndexType::INVERTED) { + continue; + } + + auto col_unique_id = index.col_unique_ids()[0]; + auto has_inverted_index = [&](const RowsetSharedPtr& src_rs) { + auto rowset = static_cast<BetaRowset*>(src_rs.get()); + if (rowset->is_skip_index_compaction(col_unique_id)) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] rowset[" + << rowset->rowset_id() << "] column_unique_id[" << col_unique_id + << "] skip inverted index compaction due to last failure"; + return false; + } + + auto& fs = rowset->rowset_meta()->fs(); + + const auto* index_meta = rowset->tablet_schema()->get_inverted_index(col_unique_id, ""); + if (index_meta == nullptr) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id[" + << col_unique_id << "] index meta is null, will skip index compaction"; + return false; + } + + for (auto i = 0; i < rowset->num_segments(); i++) { + auto segment_file = rowset->segment_file_path(i); + std::string inverted_index_src_file_path = + InvertedIndexDescriptor::get_index_file_name( + segment_file, index_meta->index_id(), + index_meta->get_index_suffix()); + bool exists = false; + if (!fs->exists(inverted_index_src_file_path, &exists).ok()) { + LOG(ERROR) << inverted_index_src_file_path << " fs->exists error"; + return false; + } + + if (!exists) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id[" + << col_unique_id << "]," << inverted_index_src_file_path + << " is not exists, will skip index compaction"; + return false; + } + + // check idx file size + int64_t file_size = 0; + if (fs->file_size(inverted_index_src_file_path, &file_size) != Status::OK()) { + LOG(ERROR) << inverted_index_src_file_path << " fs->file_size error"; + return false; + } + + if (file_size == 0) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id[" + << col_unique_id << "]," << inverted_index_src_file_path + << " is empty file, will skip index compaction"; + return false; + } + + // check index meta + std::filesystem::path p(inverted_index_src_file_path); + std::string dir_str = p.parent_path().string(); + std::string file_str = p.filename().string(); + lucene::store::Directory* dir = + DorisCompoundDirectoryFactory::getDirectory(fs, dir_str.c_str()); + DorisCompoundReader reader(dir, file_str.c_str()); + std::vector<std::string> files; + reader.list(&files); + reader.close(); + + // why is 3? + // bkd index will write at least 3 files + if (files.size() < 3) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id[" + << col_unique_id << "]," << inverted_index_src_file_path + << " is corrupted, will skip index compaction"; + return false; } } + return true; + }; + + bool all_have_inverted_index = std::all_of(_input_rowsets.begin(), _input_rowsets.end(), + std::move(has_inverted_index)); + + if (all_have_inverted_index && + field_is_slice_type(_cur_tablet_schema->column_by_uid(col_unique_id).type())) { + ctx.skip_inverted_index.insert(col_unique_id); } } - if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) { - // write output rowset to storage policy resource - auto storage_policy = get_storage_policy(_tablet->storage_policy_id()); - if (storage_policy == nullptr) { - return Status::InternalError("could not find storage_policy, storage_policy_id={}", - _tablet->storage_policy_id()); - } - auto resource = get_storage_resource(storage_policy->resource_id); - if (resource.fs == nullptr) { - return Status::InternalError("could not find resource, resouce_id={}", - storage_policy->resource_id); - } - DCHECK(atol(resource.fs->id().c_str()) == storage_policy->resource_id); - DCHECK(resource.fs->type() != io::FileSystemType::LOCAL); - ctx.fs = std::move(resource.fs); - } - _output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, is_vertical)); - _pending_rs_guard = ExecEnv::GetInstance()->storage_engine().to_local().add_pending_rowset(ctx); - return Status::OK(); } -Status Compaction::construct_input_rowset_readers() { - for (auto& rowset : _input_rowsets) { - RowsetReaderSharedPtr rs_reader; - RETURN_IF_ERROR(rowset->create_reader(&rs_reader)); - _input_rs_readers.push_back(std::move(rs_reader)); +Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx) { + if (config::inverted_index_compaction_enable && + ((_tablet->keys_type() == KeysType::UNIQUE_KEYS || + _tablet->keys_type() == KeysType::DUP_KEYS))) { + construct_skip_inverted_index(ctx); } + ctx.version = _output_version; + ctx.rowset_state = VISIBLE; + ctx.segments_overlap = NONOVERLAPPING; + ctx.tablet_schema = _cur_tablet_schema; + ctx.newest_write_timestamp = _newest_write_timestamp; + ctx.write_type = DataWriteType::TYPE_COMPACTION; + _output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, _is_vertical)); + _pending_rs_guard = _engine.add_pending_rowset(ctx); return Status::OK(); } -Status Compaction::modify_rowsets(const Merger::Statistics* stats) { +Status CompactionMixin::modify_rowsets() { Review Comment: warning: function 'modify_rowsets' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status CompactionMixin::modify_rowsets() { ^ ``` <details> <summary>Additional context</summary> **be/src/olap/compaction.cpp:663:** 132 lines including whitespace and comments (threshold 80) ```cpp Status CompactionMixin::modify_rowsets() { ^ ``` </details> ########## be/src/olap/compaction.cpp: ########## @@ -293,449 +337,338 @@ // most rowset of current compaction is nonoverlapping // just handle nonoverlappint rowsets auto st = do_compact_ordered_rowsets(); + return st.ok(); +} + +Status CompactionMixin::execute_compact() { + uint32_t checksum_before; + uint32_t checksum_after; + bool enable_compaction_checksum = config::enable_compaction_checksum; + if (enable_compaction_checksum) { + EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), + _input_rowsets.back()->end_version(), &checksum_before); + RETURN_IF_ERROR(checksum_task.execute()); + } + + auto* data_dir = tablet()->data_dir(); + int64_t permits = get_compaction_permits(); + data_dir->disks_compaction_score_increment(permits); + data_dir->disks_compaction_num_increment(1); + + Status st = execute_compact_impl(permits); + + data_dir->disks_compaction_score_increment(-permits); + data_dir->disks_compaction_num_increment(-1); + if (!st.ok()) { - return false; + return st; } - return true; + + if (enable_compaction_checksum) { + EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), + _input_rowsets.back()->end_version(), &checksum_after); + RETURN_IF_ERROR(checksum_task.execute()); + if (checksum_before != checksum_after) { + return Status::InternalError( + "compaction tablet checksum not consistent, before={}, after={}, tablet_id={}", + checksum_before, checksum_after, _tablet->tablet_id()); + } + } + + _load_segment_to_cache(); + return Status::OK(); } -Status Compaction::do_compaction_impl(int64_t permits) { +Status CompactionMixin::execute_compact_impl(int64_t permits) { OlapStopWatch watch; if (handle_ordered_data_compaction()) { RETURN_IF_ERROR(modify_rowsets()); - - int64_t now = UnixMillis(); - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { - _tablet->set_last_cumu_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { - _tablet->set_last_base_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) { - _tablet->set_last_full_compaction_success_time(now); - } - auto cumu_policy = _tablet->cumulative_compaction_policy(); LOG(INFO) << "succeed to do ordered data " << compaction_name() << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version - << ", disk=" << _tablet->data_dir()->path() + << ", disk=" << tablet()->data_dir()->path() << ", segments=" << _input_num_segments << ", input_row_num=" << _input_row_num << ", output_row_num=" << _output_rowset->num_rows() << ", input_rowset_size=" << _input_rowsets_size << ", output_rowset_size=" << _output_rowset->data_disk_size() - << ". elapsed time=" << watch.get_elapse_second() - << "s. cumulative_compaction_policy=" - << (cumu_policy == nullptr ? "quick" : cumu_policy->name()); + << ". elapsed time=" << watch.get_elapse_second() << "s."; + _state = CompactionState::SUCCESS; return Status::OK(); } build_basic_info(); LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version << ", permits: " << permits; - bool vertical_compaction = should_vertical_compaction(); - RowsetWriterContext ctx; - RETURN_IF_ERROR(construct_input_rowset_readers()); - RETURN_IF_ERROR(construct_output_rowset_writer(ctx, vertical_compaction)); - // 2. write merged rows to output rowset - // The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool - Merger::Statistics stats; - // if ctx.skip_inverted_index.size() > 0, it means we need to do inverted index compaction. - // the row ID conversion matrix needs to be used for inverted index compaction. - if (ctx.skip_inverted_index.size() > 0 || (_tablet->keys_type() == KeysType::UNIQUE_KEYS && - _tablet->enable_unique_key_merge_on_write())) { - stats.rowid_conversion = &_rowid_conversion; - } + RETURN_IF_ERROR(merge_input_rowsets()); - Status res; - { - SCOPED_TIMER(_merge_rowsets_latency_timer); - if (vertical_compaction) { - res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, - _input_rs_readers, _output_rs_writer.get(), - get_avg_segment_rows(), &stats); - } else { - res = Merger::vmerge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, - _input_rs_readers, _output_rs_writer.get(), &stats); - } - } + RETURN_IF_ERROR(do_inverted_index_compaction()); - if (!res.ok()) { - LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res - << ", tablet=" << _tablet->tablet_id() - << ", output_version=" << _output_version; - return res; + RETURN_IF_ERROR(modify_rowsets()); + + auto* cumu_policy = tablet()->cumulative_compaction_policy(); + DCHECK(cumu_policy); + LOG(INFO) << "succeed to do " << compaction_name() << " is_vertical=" << _is_vertical + << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version + << ", current_max_version=" << tablet()->max_version().second + << ", disk=" << tablet()->data_dir()->path() << ", segments=" << _input_num_segments + << ", input_rowset_size=" << _input_rowsets_size + << ", output_rowset_size=" << _output_rowset->data_disk_size() + << ", input_row_num=" << _input_row_num + << ", output_row_num=" << _output_rowset->num_rows() + << ", filtered_row_num=" << _stats.filtered_rows + << ", merged_row_num=" << _stats.merged_rows + << ". elapsed time=" << watch.get_elapse_second() + << "s. cumulative_compaction_policy=" << cumu_policy->name() + << ", compact_row_per_second=" << int(_input_row_num / watch.get_elapse_second()); + + _state = CompactionState::SUCCESS; + + return Status::OK(); +} + +Status CompactionMixin::do_inverted_index_compaction() { + const auto& ctx = _output_rs_writer->context(); + if (!config::inverted_index_compaction_enable || _input_row_num <= 0 || + !_stats.rowid_conversion || ctx.skip_inverted_index.empty()) { + return Status::OK(); } - COUNTER_UPDATE(_merged_rows_counter, stats.merged_rows); - COUNTER_UPDATE(_filtered_rows_counter, stats.filtered_rows); - RETURN_NOT_OK_STATUS_WITH_WARN(_output_rs_writer->build(_output_rowset), - fmt::format("rowset writer build failed. output_version: {}", - _output_version.to_string())); - // Now we support delete in cumu compaction, to make all data in rowsets whose version - // is below output_version to be delete in the future base compaction, we should carry - // all delete predicate in the output rowset. - // Output start version > 2 means we must set the delete predicate in the output rowset - if (allow_delete_in_cumu_compaction() && _output_rowset->version().first > 2) { - DeletePredicatePB delete_predicate; - std::accumulate( - _input_rs_readers.begin(), _input_rs_readers.end(), &delete_predicate, - [](DeletePredicatePB* delete_predicate, const RowsetReaderSharedPtr& reader) { - if (reader->rowset()->rowset_meta()->has_delete_predicate()) { - delete_predicate->MergeFrom( - reader->rowset()->rowset_meta()->delete_predicate()); - } - return delete_predicate; - }); - // now version in delete_predicate is deprecated - if (!delete_predicate.in_predicates().empty() || - !delete_predicate.sub_predicates_v2().empty() || - !delete_predicate.sub_predicates().empty()) { - _output_rowset->rowset_meta()->set_delete_predicate(std::move(delete_predicate)); + OlapStopWatch inverted_watch; + + Version version = tablet()->max_version(); + DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id()); + std::set<RowLocation> missed_rows; + std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map; + // Convert the delete bitmap of the input rowsets to output rowset. + tablet()->calc_compaction_output_rowset_delete_bitmap( + _input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows, &location_map, + _tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap); + + if (!_allow_delete_in_cumu_compaction) { + if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && + _stats.merged_rows != missed_rows.size()) { + std::string err_msg = fmt::format( + "cumulative compaction: the merged rows({}) is not equal to missed " + "rows({}) in rowid conversion, tablet_id: {}, table_id:{}", + _stats.merged_rows, missed_rows.size(), _tablet->tablet_id(), + _tablet->table_id()); + DCHECK(false) << err_msg; + LOG(WARNING) << err_msg; } } - COUNTER_UPDATE(_output_rowset_data_size_counter, _output_rowset->data_disk_size()); - COUNTER_UPDATE(_output_row_num_counter, _output_rowset->num_rows()); - COUNTER_UPDATE(_output_segments_num_counter, _output_rowset->num_segments()); + RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map)); - // 3. check correctness - RETURN_IF_ERROR(check_correctness(stats)); + // translation vec + // <<dest_idx_num, dest_docId>> + // the first level vector: index indicates src segment. + // the second level vector: index indicates row id of source segment, + // value indicates row id of destination segment. + // <UINT32_MAX, UINT32_MAX> indicates current row not exist. + const auto& trans_vec = _stats.rowid_conversion->get_rowid_conversion_map(); - if (_input_row_num > 0 && stats.rowid_conversion && config::inverted_index_compaction_enable && - !ctx.skip_inverted_index.empty()) { - OlapStopWatch inverted_watch; + // source rowset,segment -> index_id + const auto& src_seg_to_id_map = _stats.rowid_conversion->get_src_segment_to_id_map(); - // check rowid_conversion correctness - Version version = _tablet->max_version(); - DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id()); - std::set<RowLocation> missed_rows; - std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map; - // Convert the delete bitmap of the input rowsets to output rowset. - std::size_t missed_rows_size = 0; - _tablet->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows, - &location_map, _tablet->tablet_meta()->delete_bitmap(), - &output_rowset_delete_bitmap); - if (!allow_delete_in_cumu_compaction()) { - missed_rows_size = missed_rows.size(); - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && - stats.merged_rows != missed_rows_size) { - std::string err_msg = fmt::format( - "cumulative compaction: the merged rows({}) is not equal to missed " - "rows({}) in rowid conversion, tablet_id: {}, table_id:{}", - stats.merged_rows, missed_rows_size, _tablet->tablet_id(), - _tablet->table_id()); - DCHECK(false) << err_msg; - LOG(WARNING) << err_msg; - } - } + // dest rowset id + RowsetId dest_rowset_id = _stats.rowid_conversion->get_dst_rowset_id(); + // dest segment id -> num rows + std::vector<uint32_t> dest_segment_num_rows; + RETURN_IF_ERROR(_output_rs_writer->get_segment_num_rows(&dest_segment_num_rows)); - RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map)); - - // translation vec - // <<dest_idx_num, dest_docId>> - // the first level vector: index indicates src segment. - // the second level vector: index indicates row id of source segment, - // value indicates row id of destination segment. - // <UINT32_MAX, UINT32_MAX> indicates current row not exist. - std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec = - stats.rowid_conversion->get_rowid_conversion_map(); - - // source rowset,segment -> index_id - std::map<std::pair<RowsetId, uint32_t>, uint32_t> src_seg_to_id_map = - stats.rowid_conversion->get_src_segment_to_id_map(); - // dest rowset id - RowsetId dest_rowset_id = stats.rowid_conversion->get_dst_rowset_id(); - // dest segment id -> num rows - std::vector<uint32_t> dest_segment_num_rows; - RETURN_IF_ERROR(_output_rs_writer->get_segment_num_rows(&dest_segment_num_rows)); - - auto src_segment_num = src_seg_to_id_map.size(); - auto dest_segment_num = dest_segment_num_rows.size(); - - if (dest_segment_num > 0) { - // src index files - // format: rowsetId_segmentId - std::vector<std::string> src_index_files(src_segment_num); - for (const auto& m : src_seg_to_id_map) { - std::pair<RowsetId, uint32_t> p = m.first; - src_index_files[m.second] = p.first.to_string() + "_" + std::to_string(p.second); - } + auto src_segment_num = src_seg_to_id_map.size(); + auto dest_segment_num = dest_segment_num_rows.size(); - // dest index files - // format: rowsetId_segmentId - std::vector<std::string> dest_index_files(dest_segment_num); - for (int i = 0; i < dest_segment_num; ++i) { - auto prefix = dest_rowset_id.to_string() + "_" + std::to_string(i); - dest_index_files[i] = prefix; - } + if (dest_segment_num <= 0) { + LOG(INFO) << "skip doing index compaction due to no output segments" + << ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num + << ", output row number=" << _output_rowset->num_rows() + << ". elapsed time=" << inverted_watch.get_elapse_second() << "s."; + return Status::OK(); + } - // create index_writer to compaction indexes - const auto& fs = _output_rowset->rowset_meta()->fs(); - const auto& tablet_path = _tablet->tablet_path(); - - // we choose the first destination segment name as the temporary index writer path - // Used to distinguish between different index compaction - auto index_writer_path = tablet_path + "/" + dest_index_files[0]; - LOG(INFO) << "start index compaction" - << ". tablet=" << _tablet->tablet_id() - << ", source index size=" << src_segment_num - << ", destination index size=" << dest_segment_num << "."; - Status status = Status::OK(); - std::for_each( - ctx.skip_inverted_index.cbegin(), ctx.skip_inverted_index.cend(), - [&src_segment_num, &dest_segment_num, &index_writer_path, &src_index_files, - &dest_index_files, &fs, &tablet_path, &trans_vec, &dest_segment_num_rows, - &status, this](int32_t column_uniq_id) { - auto index_id = _cur_tablet_schema->get_inverted_index(column_uniq_id, "") - ->index_id(); - try { - auto st = compact_column(index_id, src_segment_num, dest_segment_num, - src_index_files, dest_index_files, fs, - index_writer_path, tablet_path, trans_vec, - dest_segment_num_rows); - if (!st.ok()) { - LOG(WARNING) << "failed to do index compaction" - << ". tablet=" << _tablet->tablet_id() - << ". column uniq id=" << column_uniq_id - << ". index_id=" << index_id; - for (auto& rowset : _input_rowsets) { - rowset->set_skip_index_compaction(column_uniq_id); - LOG(INFO) << "mark skipping inverted index compaction next time" - << ". tablet=" << _tablet->tablet_id() - << ", rowset=" << rowset->rowset_id() - << ", column uniq id=" << column_uniq_id - << ", index_id=" << index_id; - } - status = Status::Error<ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>( - st.msg()); - } - } catch (CLuceneError& e) { - LOG(WARNING) << "failed to do index compaction" - << ". tablet=" << _tablet->tablet_id() - << ", column uniq id=" << column_uniq_id - << ", index_id=" << index_id; - for (auto& rowset : _input_rowsets) { - rowset->set_skip_index_compaction(column_uniq_id); - LOG(INFO) << "mark skipping inverted index compaction next time" - << ". tablet=" << _tablet->tablet_id() - << ", rowset=" << rowset->rowset_id() - << ", column uniq id=" << column_uniq_id - << ", index_id=" << index_id; - } - status = Status::Error<ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>( - e.what()); - } - }); - - // check index compaction status. If status is not ok, we should return error and end this compaction round. - if (!status.ok()) { - return status; - } + // src index files + // format: rowsetId_segmentId + std::vector<std::string> src_index_files(src_segment_num); + for (const auto& m : src_seg_to_id_map) { + std::pair<RowsetId, uint32_t> p = m.first; + src_index_files[m.second] = p.first.to_string() + "_" + std::to_string(p.second); + } - LOG(INFO) << "succeed to do index compaction" - << ". tablet=" << _tablet->tablet_id() - << ", input row number=" << _input_row_num - << ", output row number=" << _output_rowset->num_rows() - << ", input_rowset_size=" << _input_rowsets_size - << ", output_rowset_size=" << _output_rowset->data_disk_size() - << ". elapsed time=" << inverted_watch.get_elapse_second() << "s."; - } else { - LOG(INFO) << "skip doing index compaction due to no output segments" - << ". tablet=" << _tablet->tablet_id() - << ", input row number=" << _input_row_num - << ", output row number=" << _output_rowset->num_rows() - << ". elapsed time=" << inverted_watch.get_elapse_second() << "s."; - } + // dest index files + // format: rowsetId_segmentId + std::vector<std::string> dest_index_files(dest_segment_num); + for (int i = 0; i < dest_segment_num; ++i) { + auto prefix = dest_rowset_id.to_string() + "_" + std::to_string(i); + dest_index_files[i] = prefix; } - // 4. modify rowsets in memory - RETURN_IF_ERROR(modify_rowsets(&stats)); + // create index_writer to compaction indexes + const auto& fs = _output_rowset->rowset_meta()->fs(); + const auto& tablet_path = _tablet->tablet_path(); - // 5. update last success compaction time - int64_t now = UnixMillis(); - // TODO(yingchun): do the judge in Tablet class - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { - _tablet->set_last_cumu_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { - _tablet->set_last_base_compaction_success_time(now); - } else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) { - _tablet->set_last_full_compaction_success_time(now); - } + // we choose the first destination segment name as the temporary index writer path + // Used to distinguish between different index compaction + auto index_writer_path = tablet_path + "/" + dest_index_files[0]; + LOG(INFO) << "start index compaction" + << ". tablet=" << _tablet->tablet_id() << ", source index size=" << src_segment_num + << ", destination index size=" << dest_segment_num << "."; - int64_t current_max_version = -1; - { - std::shared_lock rdlock(_tablet->get_header_lock()); - current_max_version = -1; - if (RowsetSharedPtr max_rowset = _tablet->get_rowset_with_max_version()) { - current_max_version = max_rowset->end_version(); + auto error_handler = [this](int64_t index_id, int64_t column_uniq_id) { + LOG(WARNING) << "failed to do index compaction" + << ". tablet=" << _tablet->tablet_id() << ". column uniq id=" << column_uniq_id + << ". index_id=" << index_id; + for (auto& rowset : _input_rowsets) { + rowset->set_skip_index_compaction(column_uniq_id); + LOG(INFO) << "mark skipping inverted index compaction next time" + << ". tablet=" << _tablet->tablet_id() << ", rowset=" << rowset->rowset_id() + << ", column uniq id=" << column_uniq_id << ", index_id=" << index_id; + } + }; + + for (auto&& column_uniq_id : ctx.skip_inverted_index) { + auto index_id = _cur_tablet_schema->get_inverted_index(column_uniq_id, "")->index_id(); + try { + auto st = compact_column(index_id, src_segment_num, dest_segment_num, src_index_files, + dest_index_files, fs, index_writer_path, tablet_path, + trans_vec, dest_segment_num_rows); + if (!st.ok()) { + error_handler(index_id, column_uniq_id); + return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(st.msg()); + } + } catch (CLuceneError& e) { + error_handler(index_id, column_uniq_id); + return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(e.what()); } } - auto* cumu_policy = _tablet->cumulative_compaction_policy(); - DCHECK(cumu_policy); - LOG(INFO) << "succeed to do " << compaction_name() << " is_vertical=" << vertical_compaction - << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version - << ", current_max_version=" << current_max_version - << ", disk=" << _tablet->data_dir()->path() << ", segments=" << _input_num_segments + LOG(INFO) << "succeed to do index compaction" + << ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num + << ", output row number=" << _output_rowset->num_rows() << ", input_rowset_size=" << _input_rowsets_size << ", output_rowset_size=" << _output_rowset->data_disk_size() - << ", input_row_num=" << _input_row_num - << ", output_row_num=" << _output_rowset->num_rows() - << ", filtered_row_num=" << stats.filtered_rows - << ", merged_row_num=" << stats.merged_rows - << ". elapsed time=" << watch.get_elapse_second() - << "s. cumulative_compaction_policy=" << cumu_policy->name() - << ", compact_row_per_second=" << int(_input_row_num / watch.get_elapse_second()); + << ". elapsed time=" << inverted_watch.get_elapse_second() << "s."; return Status::OK(); } -Status Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool is_vertical) { - ctx.version = _output_version; - ctx.rowset_state = VISIBLE; - ctx.segments_overlap = NONOVERLAPPING; - ctx.tablet_schema = _cur_tablet_schema; - ctx.newest_write_timestamp = _newest_write_timestamp; - ctx.write_type = DataWriteType::TYPE_COMPACTION; - if (config::inverted_index_compaction_enable && - ((_tablet->keys_type() == KeysType::UNIQUE_KEYS || - _tablet->keys_type() == KeysType::DUP_KEYS))) { - for (const auto& index : _cur_tablet_schema->indexes()) { - if (index.index_type() == IndexType::INVERTED) { - auto col_unique_id = index.col_unique_ids()[0]; - //NOTE: here src_rs may be in building index progress, so it would not contain inverted index info. - bool all_have_inverted_index = std::all_of( - _input_rowsets.begin(), _input_rowsets.end(), [&](const auto& src_rs) { - BetaRowsetSharedPtr rowset = - std::static_pointer_cast<BetaRowset>(src_rs); - if (rowset == nullptr) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] rowset is null, will skip index compaction"; - return false; - } - if (rowset->is_skip_index_compaction(col_unique_id)) { - LOG(WARNING) - << "tablet[" << _tablet->tablet_id() << "] rowset[" - << rowset->rowset_id() << "] column_unique_id[" - << col_unique_id - << "] skip inverted index compaction due to last failure"; - return false; - } - auto fs = rowset->rowset_meta()->fs(); - - const auto* index_meta = - rowset->tablet_schema()->get_inverted_index(col_unique_id, ""); - if (index_meta == nullptr) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] column_unique_id[" << col_unique_id - << "] index meta is null, will skip index compaction"; - return false; - } - for (auto i = 0; i < rowset->num_segments(); i++) { - auto segment_file = rowset->segment_file_path(i); - std::string inverted_index_src_file_path = - InvertedIndexDescriptor::get_index_file_name( - segment_file, index_meta->index_id(), - index_meta->get_index_suffix()); - bool exists = false; - if (!fs->exists(inverted_index_src_file_path, &exists).ok()) { - LOG(ERROR) - << inverted_index_src_file_path << " fs->exists error"; - return false; - } - if (!exists) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] column_unique_id[" << col_unique_id << "]," - << inverted_index_src_file_path - << " is not exists, will skip index compaction"; - return false; - } - - // check idx file size - int64_t file_size = 0; - if (fs->file_size(inverted_index_src_file_path, &file_size) != - Status::OK()) { - LOG(ERROR) << inverted_index_src_file_path - << " fs->file_size error"; - return false; - } - if (file_size == 0) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] column_unique_id[" << col_unique_id << "]," - << inverted_index_src_file_path - << " is empty file, will skip index compaction"; - return false; - } - - // check index meta - std::filesystem::path p(inverted_index_src_file_path); - std::string dir_str = p.parent_path().string(); - std::string file_str = p.filename().string(); - lucene::store::Directory* dir = - DorisCompoundDirectoryFactory::getDirectory( - fs, dir_str.c_str()); - DorisCompoundReader reader(dir, file_str.c_str()); - std::vector<std::string> files; - reader.list(&files); - reader.close(); - - // why is 3? - // bkd index will write at least 3 files - if (files.size() < 3) { - LOG(WARNING) << "tablet[" << _tablet->tablet_id() - << "] column_unique_id[" << col_unique_id << "]," - << inverted_index_src_file_path - << " is corrupted, will skip index compaction"; - return false; - } - } - return true; - }); - if (all_have_inverted_index && - field_is_slice_type(_cur_tablet_schema->column_by_uid(col_unique_id).type())) { - ctx.skip_inverted_index.insert(col_unique_id); +void CompactionMixin::construct_skip_inverted_index(RowsetWriterContext& ctx) { + for (const auto& index : _cur_tablet_schema->indexes()) { + if (index.index_type() != IndexType::INVERTED) { + continue; + } + + auto col_unique_id = index.col_unique_ids()[0]; + auto has_inverted_index = [&](const RowsetSharedPtr& src_rs) { + auto rowset = static_cast<BetaRowset*>(src_rs.get()); + if (rowset->is_skip_index_compaction(col_unique_id)) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] rowset[" + << rowset->rowset_id() << "] column_unique_id[" << col_unique_id + << "] skip inverted index compaction due to last failure"; + return false; + } + + auto& fs = rowset->rowset_meta()->fs(); + + const auto* index_meta = rowset->tablet_schema()->get_inverted_index(col_unique_id, ""); + if (index_meta == nullptr) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id[" + << col_unique_id << "] index meta is null, will skip index compaction"; + return false; + } + + for (auto i = 0; i < rowset->num_segments(); i++) { + auto segment_file = rowset->segment_file_path(i); + std::string inverted_index_src_file_path = + InvertedIndexDescriptor::get_index_file_name( + segment_file, index_meta->index_id(), + index_meta->get_index_suffix()); + bool exists = false; + if (!fs->exists(inverted_index_src_file_path, &exists).ok()) { + LOG(ERROR) << inverted_index_src_file_path << " fs->exists error"; + return false; + } + + if (!exists) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id[" + << col_unique_id << "]," << inverted_index_src_file_path + << " is not exists, will skip index compaction"; + return false; + } + + // check idx file size + int64_t file_size = 0; + if (fs->file_size(inverted_index_src_file_path, &file_size) != Status::OK()) { + LOG(ERROR) << inverted_index_src_file_path << " fs->file_size error"; + return false; + } + + if (file_size == 0) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id[" + << col_unique_id << "]," << inverted_index_src_file_path + << " is empty file, will skip index compaction"; + return false; + } + + // check index meta + std::filesystem::path p(inverted_index_src_file_path); + std::string dir_str = p.parent_path().string(); + std::string file_str = p.filename().string(); + lucene::store::Directory* dir = + DorisCompoundDirectoryFactory::getDirectory(fs, dir_str.c_str()); + DorisCompoundReader reader(dir, file_str.c_str()); + std::vector<std::string> files; + reader.list(&files); + reader.close(); + + // why is 3? + // bkd index will write at least 3 files + if (files.size() < 3) { + LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id[" + << col_unique_id << "]," << inverted_index_src_file_path + << " is corrupted, will skip index compaction"; + return false; } } + return true; + }; + + bool all_have_inverted_index = std::all_of(_input_rowsets.begin(), _input_rowsets.end(), + std::move(has_inverted_index)); + + if (all_have_inverted_index && + field_is_slice_type(_cur_tablet_schema->column_by_uid(col_unique_id).type())) { + ctx.skip_inverted_index.insert(col_unique_id); } } - if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) { - // write output rowset to storage policy resource - auto storage_policy = get_storage_policy(_tablet->storage_policy_id()); - if (storage_policy == nullptr) { - return Status::InternalError("could not find storage_policy, storage_policy_id={}", - _tablet->storage_policy_id()); - } - auto resource = get_storage_resource(storage_policy->resource_id); - if (resource.fs == nullptr) { - return Status::InternalError("could not find resource, resouce_id={}", - storage_policy->resource_id); - } - DCHECK(atol(resource.fs->id().c_str()) == storage_policy->resource_id); - DCHECK(resource.fs->type() != io::FileSystemType::LOCAL); - ctx.fs = std::move(resource.fs); - } - _output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, is_vertical)); - _pending_rs_guard = ExecEnv::GetInstance()->storage_engine().to_local().add_pending_rowset(ctx); - return Status::OK(); } -Status Compaction::construct_input_rowset_readers() { - for (auto& rowset : _input_rowsets) { - RowsetReaderSharedPtr rs_reader; - RETURN_IF_ERROR(rowset->create_reader(&rs_reader)); - _input_rs_readers.push_back(std::move(rs_reader)); +Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx) { + if (config::inverted_index_compaction_enable && + ((_tablet->keys_type() == KeysType::UNIQUE_KEYS || + _tablet->keys_type() == KeysType::DUP_KEYS))) { + construct_skip_inverted_index(ctx); } + ctx.version = _output_version; + ctx.rowset_state = VISIBLE; + ctx.segments_overlap = NONOVERLAPPING; + ctx.tablet_schema = _cur_tablet_schema; + ctx.newest_write_timestamp = _newest_write_timestamp; + ctx.write_type = DataWriteType::TYPE_COMPACTION; + _output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, _is_vertical)); + _pending_rs_guard = _engine.add_pending_rowset(ctx); return Status::OK(); } -Status Compaction::modify_rowsets(const Merger::Statistics* stats) { +Status CompactionMixin::modify_rowsets() { Review Comment: warning: function 'modify_rowsets' has cognitive complexity of 52 (threshold 50) [readability-function-cognitive-complexity] ```cpp Status CompactionMixin::modify_rowsets() { ^ ``` <details> <summary>Additional context</summary> **be/src/olap/compaction.cpp:667:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && ^ ``` **be/src/olap/compaction.cpp:684:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (!_allow_delete_in_cumu_compaction) { ^ ``` **be/src/olap/compaction.cpp:686:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && ^ ``` **be/src/olap/compaction.cpp:686:** +1 ```cpp if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && ^ ``` **be/src/olap/compaction.cpp:698:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (config::enable_rowid_conversion_correctness_check) { ^ ``` **be/src/olap/compaction.cpp:699:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map)); ^ ``` **be/src/common/status.h:540:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/compaction.cpp:699:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map)); ^ ``` **be/src/common/status.h:542:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/compaction.cpp:706:** nesting level increased to 2 ```cpp SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); ^ ``` **be/src/util/trace.h:32:** expanded from macro 'SCOPED_SIMPLE_TRACE_IF_TIMEOUT' ```cpp SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING)) ^ ``` **be/src/util/trace.h:44:** expanded from macro 'SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT' ```cpp SCOPED_CLEANUP({ \ ^ ``` **be/src/util/scoped_cleanup.h:33:** expanded from macro 'SCOPED_CLEANUP' ```cpp auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body }); ^ ``` **be/src/olap/compaction.cpp:706:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); ^ ``` **be/src/util/trace.h:32:** expanded from macro 'SCOPED_SIMPLE_TRACE_IF_TIMEOUT' ```cpp SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING)) ^ ``` **be/src/util/trace.h:49:** expanded from macro 'SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT' ```cpp if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) { \ ^ ``` **be/src/olap/compaction.cpp:752:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (!_allow_delete_in_cumu_compaction && ^ ``` **be/src/olap/compaction.cpp:752:** +1 ```cpp if (!_allow_delete_in_cumu_compaction && ^ ``` **be/src/olap/compaction.cpp:755:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (missed_rows.size() != missed_rows_size) { ^ ``` **be/src/olap/compaction.cpp:761:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (config::enable_rowid_conversion_correctness_check) { ^ ``` **be/src/olap/compaction.cpp:762:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map)); ^ ``` **be/src/common/status.h:540:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/compaction.cpp:762:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map)); ^ ``` **be/src/common/status.h:542:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/compaction.cpp:766:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets, _input_rowsets, true)); ^ ``` **be/src/common/status.h:540:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/compaction.cpp:766:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets, _input_rowsets, true)); ^ ``` **be/src/common/status.h:542:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/compaction.cpp:768:** +1, nesting level increased to 1 ```cpp } else { ^ ``` **be/src/olap/compaction.cpp:770:** nesting level increased to 2 ```cpp SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); ^ ``` **be/src/util/trace.h:32:** expanded from macro 'SCOPED_SIMPLE_TRACE_IF_TIMEOUT' ```cpp SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING)) ^ ``` **be/src/util/trace.h:44:** expanded from macro 'SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT' ```cpp SCOPED_CLEANUP({ \ ^ ``` **be/src/util/scoped_cleanup.h:33:** expanded from macro 'SCOPED_CLEANUP' ```cpp auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body }); ^ ``` **be/src/olap/compaction.cpp:770:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); ^ ``` **be/src/util/trace.h:32:** expanded from macro 'SCOPED_SIMPLE_TRACE_IF_TIMEOUT' ```cpp SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING)) ^ ``` **be/src/util/trace.h:49:** expanded from macro 'SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT' ```cpp if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) { \ ^ ``` **be/src/olap/compaction.cpp:771:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets, _input_rowsets, true)); ^ ``` **be/src/common/status.h:540:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/compaction.cpp:771:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets, _input_rowsets, true)); ^ ``` **be/src/common/status.h:542:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/compaction.cpp:774:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (config::tablet_rowset_stale_sweep_by_size && ^ ``` **be/src/olap/compaction.cpp:786:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && ^ ``` **be/src/olap/compaction.cpp:790:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (!st.ok()) { ^ ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org