Tanya-W commented on code in PR #16371: URL: https://github.com/apache/doris/pull/16371#discussion_r1098397282
########## be/src/olap/schema_change.cpp: ########## @@ -958,6 +1232,270 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& return res; } +Status SchemaChangeHandler::_do_process_alter_inverted_index( + TabletSharedPtr tablet, const TAlterInvertedIndexReq& request) { + Status res = Status::OK(); + // TODO(wy): check whether the tablet's max continuous version == request.version + if (tablet->tablet_state() == TABLET_TOMBSTONED || tablet->tablet_state() == TABLET_STOPPED || + tablet->tablet_state() == TABLET_SHUTDOWN) { + LOG(WARNING) << "tablet's state=" << tablet->tablet_state() + << " cannot alter inverted index"; + return Status::Error<ErrorCode::INTERNAL_ERROR>(); + } + + std::shared_lock base_migration_rlock(tablet->get_migration_lock(), std::try_to_lock); + if (!base_migration_rlock.owns_lock()) { + return Status::Error<TRY_LOCK_FAILED>(); + } + + TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>(); + tablet_schema->copy_from(*tablet->tablet_schema()); + if (!request.columns.empty() && request.columns[0].col_unique_id >= 0) { + tablet_schema->clear_columns(); + for (const auto& column : request.columns) { + tablet_schema->append_column(TabletColumn(column)); + } + } + + // get rowset reader + std::vector<RowsetReaderSharedPtr> rs_readers; + DeleteHandler delete_handler; + RETURN_IF_ERROR( + _get_rowset_readers(tablet, tablet_schema, request, &rs_readers, &delete_handler)); + if (request.__isset.is_drop_op && request.is_drop_op) { + // drop index + res = _drop_inverted_index(rs_readers, tablet_schema, tablet, request); + } else { + // add index + res = _add_inverted_index(rs_readers, &delete_handler, tablet_schema, tablet, request); + } + + if (!res.ok()) { + LOG(WARNING) << "failed to alter tablet. tablet=" << tablet->full_name(); + return res; + } + + return Status::OK(); +} + +Status SchemaChangeHandler::_get_rowset_readers(TabletSharedPtr tablet, + const TabletSchemaSPtr& tablet_schema, + const TAlterInvertedIndexReq& request, + std::vector<RowsetReaderSharedPtr>* rs_readers, + DeleteHandler* delete_handler) { + Status res = Status::OK(); + std::vector<Version> versions_to_be_changed; + std::vector<ColumnId> return_columns; + std::vector<TOlapTableIndex> alter_inverted_indexs; + + if (request.__isset.alter_inverted_indexes) { + alter_inverted_indexs = request.alter_inverted_indexes; + } + + for (auto& inverted_index : alter_inverted_indexs) { + DCHECK_EQ(inverted_index.columns.size(), 1); + auto column_name = inverted_index.columns[0]; + auto idx = tablet_schema->field_index(column_name); + return_columns.emplace_back(idx); + } + + // obtain base tablet's push lock and header write lock to prevent loading data + { + std::lock_guard<std::mutex> tablet_lock(tablet->get_push_lock()); + std::lock_guard<std::shared_mutex> tablet_wlock(tablet->get_header_lock()); + + do { + RowsetSharedPtr max_rowset; + // get history data to rebuild inverted index and it will check if there is hold in base tablet + res = _get_versions_to_be_changed(tablet, &versions_to_be_changed, &max_rowset); + if (!res.ok()) { + LOG(WARNING) << "fail to get version to be rebuild inverted index. res=" << res; + break; + } + + // should check the max_version >= request.alter_version, if not the rebuild index is useless + if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) { + LOG(WARNING) << "base tablet's max version=" + << (max_rowset == nullptr ? 0 : max_rowset->end_version()) + << " is less than request version=" << request.alter_version; + res = Status::InternalError( + "base tablet's max version={} is less than request version={}", + (max_rowset == nullptr ? 0 : max_rowset->end_version()), + request.alter_version); + break; + } + + // init one delete handler + int64_t end_version = -1; + for (auto& version : versions_to_be_changed) { + end_version = std::max(end_version, version.second); + } + + auto& all_del_preds = tablet->delete_predicates(); + for (auto& delete_pred : all_del_preds) { + if (delete_pred->version().first > end_version) { + continue; + } + tablet_schema->merge_dropped_columns(tablet->tablet_schema(delete_pred->version())); + } + res = delete_handler->init(tablet_schema, all_del_preds, end_version); + if (!res) { + LOG(WARNING) << "init delete handler failed. tablet=" << tablet->full_name() + << ", end_version=" << end_version; + break; + } + + // acquire data sources correspond to history versions + tablet->capture_rs_readers(versions_to_be_changed, rs_readers); + if (rs_readers->size() < 1) { + LOG(WARNING) << "fail to acquire all data sources. " + << "version_num=" << versions_to_be_changed.size() + << ", data_source_num=" << rs_readers->size(); + res = Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>(); + break; + } + + // reader_context is stack variables, it's lifetime should keep the same with rs_readers + RowsetReaderContext reader_context; + reader_context.reader_type = READER_ALTER_TABLE; + reader_context.tablet_schema = tablet_schema; + reader_context.need_ordered_result = true; + reader_context.delete_handler = delete_handler; Review Comment: I will add more regression test case -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org