This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 0d38a9a36d4 [feature](restore) support atomic restore (#41107) 0d38a9a36d4 is described below commit 0d38a9a36d4c4be6754b1c7d4239277504be3b99 Author: walter <w41te...@gmail.com> AuthorDate: Tue Sep 24 09:41:41 2024 +0800 [feature](restore) support atomic restore (#41107) Cherry-pick #40353, #40734, #40817, #40876, #40921, #41017, #41083 --- be/src/olap/rowset/rowset_meta_manager.cpp | 63 ++++ be/src/olap/rowset/rowset_meta_manager.h | 3 + be/src/olap/snapshot_manager.cpp | 42 ++- be/src/olap/snapshot_manager.h | 1 + be/src/olap/tablet.cpp | 5 + be/src/olap/tablet.h | 1 + be/src/olap/tablet_manager.cpp | 30 +- be/src/olap/task/engine_storage_migration_task.cpp | 101 +++++- be/src/olap/task/engine_storage_migration_task.h | 4 +- .../org/apache/doris/analysis/RestoreStmt.java | 9 + .../org/apache/doris/backup/BackupHandler.java | 6 +- .../apache/doris/backup/RestoreFileMapping.java | 18 +- .../java/org/apache/doris/backup/RestoreJob.java | 372 ++++++++++++++++++--- .../main/java/org/apache/doris/catalog/Env.java | 8 + .../java/org/apache/doris/catalog/OlapTable.java | 19 ++ .../org/apache/doris/catalog/TableProperty.java | 24 +- .../apache/doris/common/util/PropertyAnalyzer.java | 1 + .../apache/doris/datasource/InternalCatalog.java | 14 +- .../apache/doris/service/FrontendServiceImpl.java | 3 + .../java/org/apache/doris/task/SnapshotTask.java | 13 +- .../doris/backup/RestoreFileMappingTest.java | 6 +- .../org/apache/doris/backup/RestoreJobTest.java | 3 +- gensrc/thrift/AgentService.thrift | 1 + gensrc/thrift/FrontendService.thrift | 1 + .../backup_restore/test_backup_restore_atomic.out | 78 +++++ .../test_backup_restore_atomic_with_view.out | 60 ++++ .../test_backup_restore_atomic.groovy | 209 ++++++++++++ .../test_backup_restore_atomic_cancel.groovy | 128 +++++++ .../test_backup_restore_atomic_with_alter.groovy | 241 +++++++++++++ .../test_backup_restore_atomic_with_view.groovy | 124 +++++++ 30 files changed, 1509 insertions(+), 79 deletions(-) diff --git a/be/src/olap/rowset/rowset_meta_manager.cpp b/be/src/olap/rowset/rowset_meta_manager.cpp index d89be5ab8ec..3f482427139 100644 --- a/be/src/olap/rowset/rowset_meta_manager.cpp +++ b/be/src/olap/rowset/rowset_meta_manager.cpp @@ -357,6 +357,69 @@ Status RowsetMetaManager::_get_rowset_binlog_metas(OlapMeta* meta, const TabletU return status; } +Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, TabletUid tablet_uid, + Version version, RowsetBinlogMetasPB* metas_pb) { + Status status; + auto tablet_uid_str = tablet_uid.to_string(); + auto prefix_key = make_binlog_meta_key_prefix(tablet_uid); + auto begin_key = make_binlog_meta_key_prefix(tablet_uid, version.first); + auto end_key = make_binlog_meta_key_prefix(tablet_uid, version.second + 1); + auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &end_key]( + std::string_view key, std::string_view value) -> bool { + VLOG_DEBUG << fmt::format("get rowset binlog metas, key={}, value={}", key, value); + if (key.compare(end_key) > 0) { // the binlog meta key is binary comparable. + // All binlog meta has been scanned + return false; + } + + if (!starts_with_binlog_meta(key)) { + auto err_msg = fmt::format("invalid binlog meta key:{}", key); + status = Status::InternalError(err_msg); + LOG(WARNING) << err_msg; + return false; + } + + BinlogMetaEntryPB binlog_meta_entry_pb; + if (!binlog_meta_entry_pb.ParseFromArray(value.data(), value.size())) { + auto err_msg = fmt::format("fail to parse binlog meta value:{}", value); + status = Status::InternalError(err_msg); + LOG(WARNING) << err_msg; + return false; + } + + const auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2(); + auto* binlog_meta_pb = metas_pb->add_rowset_binlog_metas(); + binlog_meta_pb->set_rowset_id(rowset_id); + binlog_meta_pb->set_version(binlog_meta_entry_pb.version()); + binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments()); + binlog_meta_pb->set_meta_key(std::string {key}); + binlog_meta_pb->set_meta(std::string {value}); + + auto binlog_data_key = + make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id); + std::string binlog_data; + status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data); + if (!status.ok()) { + LOG(WARNING) << status.to_string(); + return false; + } + binlog_meta_pb->set_data_key(binlog_data_key); + binlog_meta_pb->set_data(binlog_data); + + return false; + }; + + Status iterStatus = + meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, prefix_key, traverse_func); + if (!iterStatus.ok()) { + LOG(WARNING) << fmt::format( + "fail to iterate binlog meta. prefix_key:{}, version:{}, status:{}", prefix_key, + version.to_string(), iterStatus.to_string()); + return iterStatus; + } + return status; +} + Status RowsetMetaManager::_get_all_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, RowsetBinlogMetasPB* metas_pb) { Status status; diff --git a/be/src/olap/rowset/rowset_meta_manager.h b/be/src/olap/rowset/rowset_meta_manager.h index 0cfbb3383e3..b13f1e8d1d7 100644 --- a/be/src/olap/rowset/rowset_meta_manager.h +++ b/be/src/olap/rowset/rowset_meta_manager.h @@ -72,6 +72,9 @@ public: static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, const std::vector<int64_t>& binlog_versions, RowsetBinlogMetasPB* metas_pb); + // get all binlog metas of a tablet in version. + static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, + Version version, RowsetBinlogMetasPB* metas_pb); static Status remove_binlog(OlapMeta* meta, const std::string& suffix); static Status ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid, RowsetBinlogMetasPB* metas_pb); diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index 6e36d0756af..d05d51ddffa 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -86,16 +86,33 @@ Status SnapshotManager::make_snapshot(const TSnapshotRequest& request, string* s return Status::Error<INVALID_ARGUMENT>("output parameter cannot be null"); } - TabletSharedPtr ref_tablet = + TabletSharedPtr target_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(request.tablet_id); - DBUG_EXECUTE_IF("SnapshotManager::make_snapshot.inject_failure", { ref_tablet = nullptr; }) + DBUG_EXECUTE_IF("SnapshotManager::make_snapshot.inject_failure", { target_tablet = nullptr; }) - if (ref_tablet == nullptr) { + if (target_tablet == nullptr) { return Status::Error<TABLE_NOT_FOUND>("failed to get tablet. tablet={}", request.tablet_id); } - res = _create_snapshot_files(ref_tablet, request, snapshot_path, allow_incremental_clone); + TabletSharedPtr ref_tablet = target_tablet; + if (request.__isset.ref_tablet_id) { + int64_t ref_tablet_id = request.ref_tablet_id; + TabletSharedPtr base_tablet = + StorageEngine::instance()->tablet_manager()->get_tablet(ref_tablet_id); + + // Some tasks, like medium migration, cause the target tablet and base tablet to stay on + // different disks. In this case, we fall through to the normal restore path. + // + // Otherwise, we can directly link the rowset files from the base tablet to the target tablet. + if (base_tablet != nullptr && + base_tablet->data_dir()->path() == target_tablet->data_dir()->path()) { + ref_tablet = std::move(base_tablet); + } + } + + res = _create_snapshot_files(ref_tablet, target_tablet, request, snapshot_path, + allow_incremental_clone); if (!res.ok()) { LOG(WARNING) << "failed to make snapshot. res=" << res << " tablet=" << request.tablet_id; @@ -368,6 +385,7 @@ Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets) { } Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet, + const TabletSharedPtr& target_tablet, const TSnapshotRequest& request, string* snapshot_path, bool* allow_incremental_clone) { @@ -387,10 +405,10 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet timeout_s = request.timeout; } std::string snapshot_id_path; - res = _calc_snapshot_id_path(ref_tablet, timeout_s, &snapshot_id_path); + res = _calc_snapshot_id_path(target_tablet, timeout_s, &snapshot_id_path); if (!res.ok()) { - LOG(WARNING) << "failed to calc snapshot_id_path, ref tablet=" - << ref_tablet->data_dir()->path(); + LOG(WARNING) << "failed to calc snapshot_id_path, tablet=" + << target_tablet->data_dir()->path(); return res; } @@ -398,12 +416,12 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet // schema_full_path_desc.filepath: // /snapshot_id_path/tablet_id/schema_hash/ - auto schema_full_path = get_schema_hash_full_path(ref_tablet, snapshot_id_path); + auto schema_full_path = get_schema_hash_full_path(target_tablet, snapshot_id_path); // header_path: // /schema_full_path/tablet_id.hdr - auto header_path = _get_header_full_path(ref_tablet, schema_full_path); + auto header_path = _get_header_full_path(target_tablet, schema_full_path); // /schema_full_path/tablet_id.hdr.json - auto json_header_path = _get_json_header_full_path(ref_tablet, schema_full_path); + auto json_header_path = _get_json_header_full_path(target_tablet, schema_full_path); bool exists = true; RETURN_IF_ERROR(io::global_local_filesystem()->exists(schema_full_path, &exists)); if (exists) { @@ -585,7 +603,9 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet << rs->rowset_meta()->empty(); } if (!res.ok()) { - LOG(WARNING) << "fail to create hard link. [path=" << snapshot_id_path << "]"; + LOG(WARNING) << "fail to create hard link. path=" << snapshot_id_path + << " tablet=" << target_tablet->tablet_id() + << " ref tablet=" << ref_tablet->tablet_id(); break; } diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h index 9781d4f69ee..e32409dd3cd 100644 --- a/be/src/olap/snapshot_manager.h +++ b/be/src/olap/snapshot_manager.h @@ -81,6 +81,7 @@ private: const std::vector<RowsetSharedPtr>& consistent_rowsets); Status _create_snapshot_files(const TabletSharedPtr& ref_tablet, + const TabletSharedPtr& target_tablet, const TSnapshotRequest& request, std::string* snapshot_path, bool* allow_incremental_clone); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index ad0cc795dc3..c1a94223eff 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3875,6 +3875,11 @@ Status Tablet::get_rowset_binlog_metas(const std::vector<int64_t>& binlog_versio binlog_versions, metas_pb); } +Status Tablet::get_rowset_binlog_metas(Version binlog_versions, RowsetBinlogMetasPB* metas_pb) { + return RowsetMetaManager::get_rowset_binlog_metas(_data_dir->get_meta(), tablet_uid(), + binlog_versions, metas_pb); +} + std::string Tablet::get_segment_filepath(std::string_view rowset_id, std::string_view segment_index) const { return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id, segment_index); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 490c36aa519..fb20498355f 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -553,6 +553,7 @@ public: std::string_view rowset_id) const; Status get_rowset_binlog_metas(const std::vector<int64_t>& binlog_versions, RowsetBinlogMetasPB* metas_pb); + Status get_rowset_binlog_metas(Version binlog_versions, RowsetBinlogMetasPB* metas_pb); std::string get_segment_filepath(std::string_view rowset_id, std::string_view segment_index) const; std::string get_segment_filepath(std::string_view rowset_id, int64_t segment_index) const; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 05a91f463ee..bfa23257cb0 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -278,9 +278,12 @@ Status TabletManager::create_tablet(const TCreateTabletReq& request, std::vector // we need use write lock on shard-1 and then use read lock on shard-2 // if there have create rollup tablet C(assume on shard-2) from tablet D(assume on shard-1) at the same time, we will meet deadlock std::unique_lock two_tablet_lock(_two_tablet_mtx, std::defer_lock); - bool is_schema_change = request.__isset.base_tablet_id && request.base_tablet_id > 0; - bool need_two_lock = is_schema_change && ((_tablets_shards_mask & request.base_tablet_id) != - (_tablets_shards_mask & tablet_id)); + bool in_restore_mode = request.__isset.in_restore_mode && request.in_restore_mode; + bool is_schema_change_or_atomic_restore = + request.__isset.base_tablet_id && request.base_tablet_id > 0; + bool need_two_lock = + is_schema_change_or_atomic_restore && + ((_tablets_shards_mask & request.base_tablet_id) != (_tablets_shards_mask & tablet_id)); if (need_two_lock) { SCOPED_TIMER(ADD_TIMER(profile, "GetTwoTableLock")); two_tablet_lock.lock(); @@ -309,7 +312,7 @@ Status TabletManager::create_tablet(const TCreateTabletReq& request, std::vector TabletSharedPtr base_tablet = nullptr; // If the CreateTabletReq has base_tablet_id then it is a alter-tablet request - if (is_schema_change) { + if (is_schema_change_or_atomic_restore) { // if base_tablet_id's lock diffrent with new_tablet_id, we need lock it. if (need_two_lock) { SCOPED_TIMER(ADD_TIMER(profile, "GetBaseTablet")); @@ -322,22 +325,28 @@ Status TabletManager::create_tablet(const TCreateTabletReq& request, std::vector if (base_tablet == nullptr) { DorisMetrics::instance()->create_tablet_requests_failed->increment(1); return Status::Error<TABLE_CREATE_META_ERROR>( - "fail to create tablet(change schema), base tablet does not exist. " - "new_tablet_id={}, base_tablet_id={}", + "fail to create tablet(change schema/atomic restore), base tablet does not " + "exist. new_tablet_id={}, base_tablet_id={}", tablet_id, request.base_tablet_id); } - // If we are doing schema-change, we should use the same data dir + // If we are doing schema-change or atomic-restore, we should use the same data dir // TODO(lingbin): A litter trick here, the directory should be determined before // entering this method - if (request.storage_medium == base_tablet->data_dir()->storage_medium()) { + // + // ATTN: Since all restored replicas will be saved to HDD, so no storage_medium check here. + if (in_restore_mode || + request.storage_medium == base_tablet->data_dir()->storage_medium()) { + LOG(INFO) << "create tablet use the base tablet data dir. tablet_id=" << tablet_id + << ", base tablet_id=" << request.base_tablet_id + << ", data dir=" << base_tablet->data_dir()->path(); stores.clear(); stores.push_back(base_tablet->data_dir()); } } // set alter type to schema-change. it is useless - TabletSharedPtr tablet = _internal_create_tablet_unlocked(request, is_schema_change, - base_tablet.get(), stores, profile); + TabletSharedPtr tablet = _internal_create_tablet_unlocked( + request, is_schema_change_or_atomic_restore, base_tablet.get(), stores, profile); if (tablet == nullptr) { DorisMetrics::instance()->create_tablet_requests_failed->increment(1); return Status::Error<CE_CMD_PARAMS_ERROR>("fail to create tablet. tablet_id={}", @@ -934,6 +943,7 @@ Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id, if (binlog_meta_filesize > 0) { contain_binlog = true; RETURN_IF_ERROR(read_pb(binlog_metas_file, &rowset_binlog_metas_pb)); + VLOG_DEBUG << "load rowset binlog metas from file. file_path=" << binlog_metas_file; } RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(binlog_metas_file)); } diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index 0164aee1472..f9ba8963966 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -37,6 +37,7 @@ #include "olap/data_dir.h" #include "olap/olap_common.h" #include "olap/olap_define.h" +#include "olap/pb_helper.h" #include "olap/rowset/rowset_meta.h" #include "olap/snapshot_manager.h" #include "olap/storage_engine.h" @@ -259,9 +260,11 @@ Status EngineStorageMigrationTask::_migrate() { } std::vector<RowsetSharedPtr> temp_consistent_rowsets(consistent_rowsets); + RowsetBinlogMetasPB rowset_binlog_metas_pb; do { // migrate all index and data files but header file - res = _copy_index_and_data_files(full_path, temp_consistent_rowsets); + res = _copy_index_and_data_files(full_path, temp_consistent_rowsets, + &rowset_binlog_metas_pb); if (!res.ok()) { break; } @@ -289,7 +292,8 @@ Status EngineStorageMigrationTask::_migrate() { // we take the lock to complete it to avoid long-term competition with other tasks if (_is_rowsets_size_less_than_threshold(temp_consistent_rowsets)) { // force to copy the remaining data and index - res = _copy_index_and_data_files(full_path, temp_consistent_rowsets); + res = _copy_index_and_data_files(full_path, temp_consistent_rowsets, + &rowset_binlog_metas_pb); if (!res.ok()) { break; } @@ -304,6 +308,16 @@ Status EngineStorageMigrationTask::_migrate() { } } + // save rowset binlog metas + if (rowset_binlog_metas_pb.rowset_binlog_metas_size() > 0) { + auto rowset_binlog_metas_pb_filename = + fmt::format("{}/rowset_binlog_metas.pb", full_path); + res = write_pb(rowset_binlog_metas_pb_filename, rowset_binlog_metas_pb); + if (!res.ok()) { + break; + } + } + // generate new tablet meta and write to hdr file res = _gen_and_write_header_to_hdr_file(shard, full_path, consistent_rowsets, end_version); if (!res.ok()) { @@ -347,10 +361,91 @@ void EngineStorageMigrationTask::_generate_new_header( } Status EngineStorageMigrationTask::_copy_index_and_data_files( - const string& full_path, const std::vector<RowsetSharedPtr>& consistent_rowsets) const { + const string& full_path, const std::vector<RowsetSharedPtr>& consistent_rowsets, + RowsetBinlogMetasPB* all_binlog_metas_pb) const { + RowsetBinlogMetasPB rowset_binlog_metas_pb; for (const auto& rs : consistent_rowsets) { RETURN_IF_ERROR(rs->copy_files_to(full_path, rs->rowset_id())); + + Version binlog_versions = rs->version(); + RETURN_IF_ERROR(_tablet->get_rowset_binlog_metas(binlog_versions, &rowset_binlog_metas_pb)); + } + + // copy index binlog files. + for (const auto& rowset_binlog_meta : rowset_binlog_metas_pb.rowset_binlog_metas()) { + auto num_segments = rowset_binlog_meta.num_segments(); + std::string_view rowset_id = rowset_binlog_meta.rowset_id(); + + RowsetMetaPB rowset_meta_pb; + if (!rowset_meta_pb.ParseFromString(rowset_binlog_meta.data())) { + auto err_msg = fmt::format("fail to parse binlog meta data value:{}", + rowset_binlog_meta.data()); + LOG(WARNING) << err_msg; + return Status::InternalError(err_msg); + } + const auto& tablet_schema_pb = rowset_meta_pb.tablet_schema(); + TabletSchema tablet_schema; + tablet_schema.init_from_pb(tablet_schema_pb); + + // copy segment files and index files + for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { + std::string segment_file_path = _tablet->get_segment_filepath(rowset_id, segment_index); + auto snapshot_segment_file_path = + fmt::format("{}/{}_{}.binlog", full_path, rowset_id, segment_index); + + Status status = io::global_local_filesystem()->copy_path(segment_file_path, + snapshot_segment_file_path); + if (!status.ok()) { + LOG(WARNING) << "fail to copy binlog segment file. [src=" << segment_file_path + << ", dest=" << snapshot_segment_file_path << "]" << status; + return status; + } + VLOG_DEBUG << "copy " << segment_file_path << " to " << snapshot_segment_file_path; + + if (tablet_schema.get_inverted_index_storage_format() == + InvertedIndexStorageFormatPB::V1) { + for (const auto& index : tablet_schema.indexes()) { + if (index.index_type() != IndexType::INVERTED) { + continue; + } + auto index_id = index.index_id(); + auto index_file = + _tablet->get_segment_index_filepath(rowset_id, segment_index, index_id); + auto snapshot_segment_index_file_path = + fmt::format("{}/{}_{}_{}.binlog-index", full_path, rowset_id, + segment_index, index_id); + VLOG_DEBUG << "copy " << index_file << " to " + << snapshot_segment_index_file_path; + status = io::global_local_filesystem()->copy_path( + index_file, snapshot_segment_index_file_path); + if (!status.ok()) { + LOG(WARNING) + << "fail to copy binlog index file. [src=" << index_file + << ", dest=" << snapshot_segment_index_file_path << "]" << status; + return status; + } + } + } else if (tablet_schema.has_inverted_index()) { + auto index_file = InvertedIndexDescriptor::get_index_file_name(segment_file_path); + auto snapshot_segment_index_file_path = + fmt::format("{}/{}_{}.binlog-index", full_path, rowset_id, segment_index); + VLOG_DEBUG << "copy " << index_file << " to " << snapshot_segment_index_file_path; + status = io::global_local_filesystem()->copy_path(index_file, + snapshot_segment_index_file_path); + if (!status.ok()) { + LOG(WARNING) << "fail to copy binlog index file. [src=" << index_file + << ", dest=" << snapshot_segment_index_file_path << "]" << status; + return status; + } + } + } } + + std::move(rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->begin(), + rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->end(), + google::protobuf::RepeatedFieldBackInserter( + all_binlog_metas_pb->mutable_rowset_binlog_metas())); + return Status::OK(); } diff --git a/be/src/olap/task/engine_storage_migration_task.h b/be/src/olap/task/engine_storage_migration_task.h index c15b0576701..5e30d4a04c9 100644 --- a/be/src/olap/task/engine_storage_migration_task.h +++ b/be/src/olap/task/engine_storage_migration_task.h @@ -18,6 +18,7 @@ #ifndef DORIS_BE_SRC_OLAP_TASK_ENGINE_STORAGE_MIGRATION_TASK_H #define DORIS_BE_SRC_OLAP_TASK_ENGINE_STORAGE_MIGRATION_TASK_H +#include <gen_cpp/olap_file.pb.h> #include <stdint.h> #include <mutex> @@ -73,7 +74,8 @@ private: // TODO: hkp // rewrite this function Status _copy_index_and_data_files(const std::string& full_path, - const std::vector<RowsetSharedPtr>& consistent_rowsets) const; + const std::vector<RowsetSharedPtr>& consistent_rowsets, + RowsetBinlogMetasPB* all_binlog_metas_pb) const; private: // tablet to do migrated diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java index 12e7000caad..82e4be09cd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java @@ -43,6 +43,7 @@ public class RestoreStmt extends AbstractBackupStmt { public static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE = "reserve_dynamic_partition_enable"; public static final String PROP_CLEAN_TABLES = "clean_tables"; public static final String PROP_CLEAN_PARTITIONS = "clean_partitions"; + public static final String PROP_ATOMIC_RESTORE = "atomic_restore"; private boolean allowLoad = false; private ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION; @@ -54,6 +55,7 @@ public class RestoreStmt extends AbstractBackupStmt { private boolean isBeingSynced = false; private boolean isCleanTables = false; private boolean isCleanPartitions = false; + private boolean isAtomicRestore = false; private byte[] meta = null; private byte[] jobInfo = null; @@ -121,6 +123,10 @@ public class RestoreStmt extends AbstractBackupStmt { return isCleanPartitions; } + public boolean isAtomicRestore() { + return isAtomicRestore; + } + @Override public void analyze(Analyzer analyzer) throws UserException { if (repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) { @@ -203,6 +209,9 @@ public class RestoreStmt extends AbstractBackupStmt { // is clean partitions isCleanPartitions = eatBooleanProperty(copiedProperties, PROP_CLEAN_PARTITIONS, isCleanPartitions); + // is atomic restore + isAtomicRestore = eatBooleanProperty(copiedProperties, PROP_ATOMIC_RESTORE, isAtomicRestore); + if (!copiedProperties.isEmpty()) { ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR, "Unknown restore job properties: " + copiedProperties.keySet()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index 5feeb8005bd..f546fc40ead 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -517,12 +517,14 @@ public class BackupHandler extends MasterDaemon implements Writable { db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(), - stmt.isCleanTables(), stmt.isCleanPartitions(), env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta); + stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), + env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta); } else { restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(), db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(), - stmt.isBeingSynced(), stmt.isCleanTables(), stmt.isCleanPartitions(), env, repository.getId()); + stmt.isBeingSynced(), stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), + env, repository.getId()); } env.getEditLog().logRestoreJob(restoreJob); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreFileMapping.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreFileMapping.java index 07ddf6844dc..f712afbb271 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreFileMapping.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreFileMapping.java @@ -39,7 +39,7 @@ public class RestoreFileMapping implements Writable { } public IdChain(Long... ids) { - Preconditions.checkState(ids.length == 5); + Preconditions.checkState(ids.length == 6); chain = ids; } @@ -63,6 +63,14 @@ public class RestoreFileMapping implements Writable { return chain[4]; } + public boolean hasRefTabletId() { + return chain.length >= 6 && chain[5] != -1L; + } + + public long getRefTabletId() { + return chain[5]; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -78,8 +86,12 @@ public class RestoreFileMapping implements Writable { return false; } + if (((IdChain) obj).chain.length != chain.length) { + return false; + } + IdChain other = (IdChain) obj; - for (int i = 0; i < 5; i++) { + for (int i = 0; i < chain.length; i++) { // DO NOT use ==, Long_1 != Long_2 if (!chain[i].equals(other.chain[i])) { return false; @@ -92,7 +104,7 @@ public class RestoreFileMapping implements Writable { @Override public int hashCode() { int code = chain[0].hashCode(); - for (int i = 1; i < 5; i++) { + for (int i = 1; i < chain.length; i++) { code ^= chain[i].hashCode(); } return code; diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index a35858e498a..558c9ddf33c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -61,6 +61,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.DbUtil; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.TimeUtils; @@ -114,6 +115,8 @@ public class RestoreJob extends AbstractJob { private static final String PROP_IS_BEING_SYNCED = PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED; private static final String PROP_CLEAN_TABLES = RestoreStmt.PROP_CLEAN_TABLES; private static final String PROP_CLEAN_PARTITIONS = RestoreStmt.PROP_CLEAN_PARTITIONS; + private static final String PROP_ATOMIC_RESTORE = RestoreStmt.PROP_ATOMIC_RESTORE; + private static final String ATOMIC_RESTORE_TABLE_PREFIX = "__doris_atomic_restore_prefix__"; private static final Logger LOG = LogManager.getLogger(RestoreJob.class); @@ -182,6 +185,8 @@ public class RestoreJob extends AbstractJob { private boolean isCleanTables = false; // Whether to delete existing partitions that are not involved in the restore. private boolean isCleanPartitions = false; + // Whether to restore the data into a temp table, and then replace the origin one. + private boolean isAtomicRestore = false; // restore properties private Map<String, String> properties = Maps.newHashMap(); @@ -193,7 +198,7 @@ public class RestoreJob extends AbstractJob { public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables, - boolean isCleanPartitions, Env env, long repoId) { + boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId) { super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId); this.backupTimestamp = backupTs; this.jobInfo = jobInfo; @@ -210,19 +215,22 @@ public class RestoreJob extends AbstractJob { this.isBeingSynced = isBeingSynced; this.isCleanTables = isCleanTables; this.isCleanPartitions = isCleanPartitions; + this.isAtomicRestore = isAtomicRestore; properties.put(PROP_RESERVE_REPLICA, String.valueOf(reserveReplica)); properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable)); properties.put(PROP_IS_BEING_SYNCED, String.valueOf(isBeingSynced)); properties.put(PROP_CLEAN_TABLES, String.valueOf(isCleanTables)); properties.put(PROP_CLEAN_PARTITIONS, String.valueOf(isCleanPartitions)); + properties.put(PROP_ATOMIC_RESTORE, String.valueOf(isAtomicRestore)); } public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables, - boolean isCleanPartitions, Env env, long repoId, BackupMeta backupMeta) { + boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId, BackupMeta backupMeta) { this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica, - reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, env, repoId); + reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, isAtomicRestore, env, + repoId); this.backupMeta = backupMeta; } @@ -411,6 +419,12 @@ public class RestoreJob extends AbstractJob { checkIfNeedCancel(); if (status.ok()) { + if (state != RestoreJobState.PENDING && label.equals( + DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_NON_PENDING_RESTORE_JOB", ""))) { + LOG.info("pause restore job by debug point: {}", this); + return; + } + switch (state) { case PENDING: checkAndPrepareMeta(); @@ -518,8 +532,10 @@ public class RestoreJob extends AbstractJob { } Preconditions.checkNotNull(backupMeta); - // Set all restored tbls' state to RESTORE - // Table's origin state must be NORMAL and does not have unfinished load job. + // Check the olap table state. + // + // If isAtomicRestore is not set, set all restored tbls' state to RESTORE, + // the table's origin state must be NORMAL and does not have unfinished load job. for (String tableName : jobInfo.backupOlapTableObjects.keySet()) { Table tbl = db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(tableName)); if (tbl == null) { @@ -547,6 +563,13 @@ public class RestoreJob extends AbstractJob { return; } + if (isAtomicRestore) { + // We will create new OlapTable in atomic restore, so does not set the RESTORE state. + // Instead, set table in atomic restore state, to forbid the alter table operation. + olapTbl.setInAtomicRestore(); + continue; + } + for (Partition partition : olapTbl.getPartitions()) { if (!env.getLoadInstance().checkPartitionLoadFinished(partition.getId(), null)) { status = new Status(ErrCode.COMMON_ERROR, @@ -608,6 +631,9 @@ public class RestoreJob extends AbstractJob { } } + // the new tablets -> { local tablet, schema hash, storage medium }, used in atomic restore. + Map<Long, TabletRef> tabletBases = new HashMap<>(); + // Check and prepare meta objects. AgentBatchTask batchTask = new AgentBatchTask(); db.readLock(); @@ -618,14 +644,15 @@ public class RestoreJob extends AbstractJob { Table remoteTbl = backupMeta.getTable(tableName); Preconditions.checkNotNull(remoteTbl); Table localTbl = db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(tableName)); + if (localTbl != null && localTbl.getType() != TableType.OLAP) { + // table already exist, but is not OLAP + status = new Status(ErrCode.COMMON_ERROR, + "The type of local table should be same as type of remote table: " + + remoteTbl.getName()); + return; + } + if (localTbl != null) { - // table already exist, check schema - if (localTbl.getType() != TableType.OLAP) { - status = new Status(ErrCode.COMMON_ERROR, - "The type of local table should be same as type of remote table: " - + remoteTbl.getName()); - return; - } OlapTable localOlapTbl = (OlapTable) localTbl; OlapTable remoteOlapTbl = (OlapTable) remoteTbl; @@ -671,28 +698,26 @@ public class RestoreJob extends AbstractJob { PartitionItem localItem = localPartInfo.getItem(localPartition.getId()); PartitionItem remoteItem = remoteOlapTbl .getPartitionInfo().getItem(backupPartInfo.id); - if (localItem.equals(remoteItem)) { - // Same partition, same range - if (genFileMappingWhenBackupReplicasEqual(localPartInfo, localPartition, - localTbl, backupPartInfo, partitionName, tblInfo, remoteReplicaAlloc)) { - return; - } - } else { + if (!localItem.equals(remoteItem)) { // Same partition name, different range status = new Status(ErrCode.COMMON_ERROR, "Partition " + partitionName + " in table " + localTbl.getName() + " has different partition item with partition in repository"); return; } - } else { - // If this is a single partitioned table. - if (genFileMappingWhenBackupReplicasEqual(localPartInfo, localPartition, localTbl, - backupPartInfo, partitionName, tblInfo, remoteReplicaAlloc)) { - return; - } } - } else { + if (isAtomicRestore) { + // skip gen file mapping for atomic restore. + continue; + } + + // Same partition, same range or a single partitioned table. + if (genFileMappingWhenBackupReplicasEqual(localPartInfo, localPartition, + localTbl, backupPartInfo, partitionName, tblInfo, remoteReplicaAlloc)) { + return; + } + } else if (!isAtomicRestore) { // partitions does not exist PartitionInfo localPartitionInfo = localOlapTbl.getPartitionInfo(); if (localPartitionInfo.getType() == PartitionType.RANGE @@ -732,8 +757,10 @@ public class RestoreJob extends AbstractJob { } finally { localOlapTbl.readUnlock(); } - } else { - // Table does not exist + } + + // Table does not exist or atomic restore + if (localTbl == null || isAtomicRestore) { OlapTable remoteOlapTbl = (OlapTable) remoteTbl; // Retain only expected restore partitions in this table; Set<String> allPartNames = remoteOlapTbl.getPartitionNames(); @@ -761,6 +788,15 @@ public class RestoreJob extends AbstractJob { // DO NOT set remote table's new name here, cause we will still need the origin name later // remoteOlapTbl.setName(jobInfo.getAliasByOriginNameIfSet(tblInfo.name)); remoteOlapTbl.setState(allowLoad ? OlapTableState.RESTORE_WITH_LOAD : OlapTableState.RESTORE); + + if (isAtomicRestore && localTbl != null) { + // bind the backends and base tablets from local tbl. + status = bindLocalAndRemoteOlapTableReplicas((OlapTable) localTbl, remoteOlapTbl, tabletBases); + if (!status.ok()) { + return; + } + } + if (LOG.isDebugEnabled()) { LOG.debug("put remote table {} to restoredTbls", remoteOlapTbl.getName()); } @@ -824,6 +860,9 @@ public class RestoreJob extends AbstractJob { // for now, nothing is modified in catalog // generate create replica tasks for all restored partitions + if (isAtomicRestore && !restoredPartitions.isEmpty()) { + throw new RuntimeException("atomic restore is set, but the restored partitions is not empty"); + } for (Pair<String, Partition> entry : restoredPartitions) { OlapTable localTbl = (OlapTable) db.getTableNullable(entry.first); Preconditions.checkNotNull(localTbl, localTbl.getName()); @@ -843,11 +882,12 @@ public class RestoreJob extends AbstractJob { if (restoreTbl.getType() == TableType.OLAP) { OlapTable restoreOlapTable = (OlapTable) restoreTbl; for (Partition restorePart : restoreOlapTable.getPartitions()) { - createReplicas(db, batchTask, restoreOlapTable, restorePart); + createReplicas(db, batchTask, restoreOlapTable, restorePart, tabletBases); BackupOlapTableInfo backupOlapTableInfo = jobInfo.getOlapTableInfo(restoreOlapTable.getName()); genFileMapping(restoreOlapTable, restorePart, backupOlapTableInfo.id, backupOlapTableInfo.getPartInfo(restorePart.getName()), - !allowLoad /* if allow load, do not overwrite when commit */); + !allowLoad /* if allow load, do not overwrite when commit */, + tabletBases); } } // set restored table's new name after all 'genFileMapping' @@ -855,6 +895,9 @@ public class RestoreJob extends AbstractJob { if (Env.isStoredTableNamesLowerCase()) { tableName = tableName.toLowerCase(); } + if (restoreTbl.getType() == TableType.OLAP && isAtomicRestore) { + tableName = tableAliasWithAtomicRestore(tableName); + } restoreTbl.setName(tableName); } @@ -978,6 +1021,90 @@ public class RestoreJob extends AbstractJob { // No log here, PENDING state restore job will redo this method } + private Status bindLocalAndRemoteOlapTableReplicas( + OlapTable localOlapTbl, OlapTable remoteOlapTbl, + Map<Long, TabletRef> tabletBases) { + localOlapTbl.readLock(); + try { + // The storage medium of the remote olap table's storage is HDD, because we want to + // restore the tables in another cluster might without SSD. + // + // Keep the storage medium of the new olap table the same as the old one, so that + // the replicas in the new olap table will not be migrated to other storage mediums. + remoteOlapTbl.setStorageMedium(localOlapTbl.getStorageMedium()); + for (Partition partition : remoteOlapTbl.getPartitions()) { + Partition localPartition = localOlapTbl.getPartition(partition.getName()); + if (localPartition == null) { + continue; + } + // Since the replicas are bound to the same disk, the storage medium must be the same + // to avoid media migration. + TStorageMedium storageMedium = localOlapTbl.getPartitionInfo() + .getDataProperty(localPartition.getId()).getStorageMedium(); + remoteOlapTbl.getPartitionInfo().getDataProperty(partition.getId()) + .setStorageMedium(storageMedium); + if (LOG.isDebugEnabled()) { + LOG.debug("bind local partition {} and remote partition {} with same storage medium {}, name: {}", + localPartition.getId(), partition.getId(), storageMedium, partition.getName()); + } + for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) { + String indexName = remoteOlapTbl.getIndexNameById(index.getId()); + Long localIndexId = localOlapTbl.getIndexIdByName(indexName); + MaterializedIndex localIndex = localIndexId == null ? null : localPartition.getIndex(localIndexId); + if (localIndex == null) { + continue; + } + int schemaHash = localOlapTbl.getSchemaHashByIndexId(localIndexId); + if (schemaHash == -1) { + return new Status(ErrCode.COMMON_ERROR, String.format( + "schema hash of local index %d is not found, remote table=%d, remote index=%d, " + + "local table=%d, local index=%d", localIndexId, remoteOlapTbl.getId(), index.getId(), + localOlapTbl.getId(), localIndexId)); + } + + List<Tablet> localTablets = localIndex.getTablets(); + List<Tablet> remoteTablets = index.getTablets(); + if (localTablets.size() != remoteTablets.size()) { + return new Status(ErrCode.COMMON_ERROR, String.format( + "the size of local tablet %s is not equals to the remote %s, " + + "is_atomic_restore=true, remote table=%d, remote index=%d, " + + "local table=%d, local index=%d", localTablets.size(), remoteTablets.size(), + remoteOlapTbl.getId(), index.getId(), localOlapTbl.getId(), localIndexId)); + } + for (int i = 0; i < remoteTablets.size(); i++) { + Tablet localTablet = localTablets.get(i); + Tablet remoteTablet = remoteTablets.get(i); + List<Replica> localReplicas = localTablet.getReplicas(); + List<Replica> remoteReplicas = remoteTablet.getReplicas(); + if (localReplicas.size() != remoteReplicas.size()) { + return new Status(ErrCode.COMMON_ERROR, String.format( + "the size of local replicas %s is not equals to the remote %s, " + + "is_atomic_restore=true, remote table=%d, remote index=%d, " + + "local table=%d, local index=%d, local replicas=%d, remote replicas=%d", + localTablets.size(), remoteTablets.size(), remoteOlapTbl.getId(), + index.getId(), localOlapTbl.getId(), localIndexId, localReplicas.size(), + remoteReplicas.size())); + } + for (int j = 0; j < remoteReplicas.size(); j++) { + long backendId = localReplicas.get(j).getBackendId(); + remoteReplicas.get(j).setBackendId(backendId); + if (LOG.isDebugEnabled()) { + LOG.debug("bind local replica {} and remote replica {} with same backend {}, table={}", + localReplicas.get(j).getId(), remoteReplicas.get(j).getId(), backendId, + localOlapTbl.getName()); + } + } + tabletBases.put(remoteTablet.getId(), + new TabletRef(localTablet.getId(), schemaHash, storageMedium)); + } + } + } + } finally { + localOlapTbl.readUnlock(); + } + return Status.OK; + } + private void prepareAndSendSnapshotTaskForOlapTable(Database db) { LOG.info("begin to make snapshot. {} when restore content is ALL", this); // begin to make snapshots for all replicas @@ -989,7 +1116,8 @@ public class RestoreJob extends AbstractJob { AgentBatchTask batchTask = new AgentBatchTask(); db.readLock(); try { - for (IdChain idChain : fileMapping.getMapping().keySet()) { + for (Map.Entry<IdChain, IdChain> entry : fileMapping.getMapping().entrySet()) { + IdChain idChain = entry.getKey(); OlapTable tbl = (OlapTable) db.getTableNullable(idChain.getTblId()); tbl.readLock(); try { @@ -998,9 +1126,15 @@ public class RestoreJob extends AbstractJob { Tablet tablet = index.getTablet(idChain.getTabletId()); Replica replica = tablet.getReplicaById(idChain.getReplicaId()); long signature = env.getNextId(); + boolean isRestoreTask = true; + // We don't care the visible version in restore job, the end version is used. + long visibleVersion = -1L; SnapshotTask task = new SnapshotTask(null, replica.getBackendId(), signature, jobId, db.getId(), - tbl.getId(), part.getId(), index.getId(), tablet.getId(), part.getVisibleVersion(), - tbl.getSchemaHashByIndexId(index.getId()), timeoutMs, true /* is restore task*/); + tbl.getId(), part.getId(), index.getId(), tablet.getId(), visibleVersion, + tbl.getSchemaHashByIndexId(index.getId()), timeoutMs, isRestoreTask); + if (entry.getValue().hasRefTabletId()) { + task.setRefTabletId(entry.getValue().getRefTabletId()); + } batchTask.addTask(task); unfinishedSignatureToId.put(signature, tablet.getId()); bePathsMap.put(replica.getBackendId(), replica.getPathHash()); @@ -1088,6 +1222,11 @@ public class RestoreJob extends AbstractJob { } private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable localTbl, Partition restorePart) { + createReplicas(db, batchTask, localTbl, restorePart, null); + } + + private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable localTbl, Partition restorePart, + Map<Long, TabletRef> tabletBases) { Set<String> bfColumns = localTbl.getCopiedBfColumns(); double bfFpp = localTbl.getBfFpp(); @@ -1102,8 +1241,12 @@ public class RestoreJob extends AbstractJob { for (MaterializedIndex restoredIdx : restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) { MaterializedIndexMeta indexMeta = localTbl.getIndexMetaByIndexId(restoredIdx.getId()); for (Tablet restoreTablet : restoredIdx.getTablets()) { + TabletRef baseTabletRef = tabletBases == null ? null : tabletBases.get(restoreTablet.getId()); + // All restored replicas will be saved to HDD by default. + TStorageMedium storageMedium = baseTabletRef == null + ? TStorageMedium.HDD : baseTabletRef.storageMedium; TabletMeta tabletMeta = new TabletMeta(db.getId(), localTbl.getId(), restorePart.getId(), - restoredIdx.getId(), indexMeta.getSchemaHash(), TStorageMedium.HDD); + restoredIdx.getId(), indexMeta.getSchemaHash(), storageMedium); Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta); for (Replica restoreReplica : restoreTablet.getReplicas()) { Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica); @@ -1112,7 +1255,7 @@ public class RestoreJob extends AbstractJob { restoreTablet.getId(), restoreReplica.getId(), indexMeta.getShortKeyColumnCount(), indexMeta.getSchemaHash(), restoreReplica.getVersion(), indexMeta.getKeysType(), TStorageType.COLUMN, - TStorageMedium.HDD /* all restored replicas will be saved to HDD */, + storageMedium, indexMeta.getSchema(), bfColumns, bfFpp, null, localTbl.getCopiedIndexes(), localTbl.isInMemory(), @@ -1134,6 +1277,12 @@ public class RestoreJob extends AbstractJob { localTbl.rowStorePageSize()); task.setInvertedIndexStorageFormat(localTbl.getInvertedIndexStorageFormat()); task.setInRestoreMode(true); + if (baseTabletRef != null) { + // ensure this replica is bound to the same backend disk as the origin table's replica. + task.setBaseTablet(baseTabletRef.tabletId, baseTabletRef.schemaHash); + LOG.info("set base tablet {} for replica {} in restore job {}, tablet id={}", + baseTabletRef.tabletId, restoreReplica.getId(), jobId, restoreTablet.getId()); + } batchTask.addTask(task); } } @@ -1216,6 +1365,11 @@ public class RestoreJob extends AbstractJob { // files in repo to files in local private void genFileMapping(OlapTable localTbl, Partition localPartition, Long remoteTblId, BackupPartitionInfo backupPartInfo, boolean overwrite) { + genFileMapping(localTbl, localPartition, remoteTblId, backupPartInfo, overwrite, null); + } + + private void genFileMapping(OlapTable localTbl, Partition localPartition, Long remoteTblId, + BackupPartitionInfo backupPartInfo, boolean overwrite, Map<Long, TabletRef> tabletBases) { for (MaterializedIndex localIdx : localPartition.getMaterializedIndices(IndexExtState.VISIBLE)) { if (LOG.isDebugEnabled()) { LOG.debug("get index id: {}, index name: {}", localIdx.getId(), @@ -1230,10 +1384,21 @@ public class RestoreJob extends AbstractJob { LOG.debug("get tablet mapping: {} to {}, index {}", backupTabletInfo.id, localTablet.getId(), i); } for (Replica localReplica : localTablet.getReplicas()) { - IdChain src = new IdChain(remoteTblId, backupPartInfo.id, backupIdxInfo.id, backupTabletInfo.id, - -1L /* no replica id */); - IdChain dest = new IdChain(localTbl.getId(), localPartition.getId(), - localIdx.getId(), localTablet.getId(), localReplica.getId()); + long refTabletId = -1L; + if (tabletBases != null && tabletBases.containsKey(localTablet.getId())) { + refTabletId = tabletBases.get(localTablet.getId()).tabletId; + if (LOG.isDebugEnabled()) { + LOG.debug("restored tablet {} is based on exists tablet {}", + localTablet.getId(), refTabletId); + } + } + + long noReplicaId = -1L; + long noRefTabletId = -1L; + IdChain src = new IdChain(remoteTblId, backupPartInfo.id, backupIdxInfo.id, + backupTabletInfo.id, noReplicaId, refTabletId); + IdChain dest = new IdChain(localTbl.getId(), localPartition.getId(), localIdx.getId(), + localTablet.getId(), localReplica.getId(), noRefTabletId); fileMapping.putMapping(dest, src, overwrite); } } @@ -1280,6 +1445,12 @@ public class RestoreJob extends AbstractJob { OlapTable olapTbl = (OlapTable) tbl; tbl.writeLock(); try { + if (isAtomicRestore) { + // Atomic restore will creates new replica of the OlapTable. + olapTbl.setInAtomicRestore(); + continue; + } + olapTbl.setState(OlapTableState.RESTORE); // set restore status for partitions BackupOlapTableInfo tblInfo = jobInfo.backupOlapTableObjects.get(tableName); @@ -1400,7 +1571,7 @@ public class RestoreJob extends AbstractJob { } private void downloadRemoteSnapshots() { - // Categorize snapshot onfos by db id. + // Categorize snapshot infos by db id. ArrayListMultimap<Long, SnapshotInfo> dbToSnapshotInfos = ArrayListMultimap.create(); for (SnapshotInfo info : snapshotInfos.values()) { dbToSnapshotInfos.put(info.getDbId(), info); @@ -1500,8 +1671,9 @@ public class RestoreJob extends AbstractJob { return; } + long refTabletId = -1L; // no ref tablet id IdChain catalogIds = new IdChain(tbl.getId(), part.getId(), idx.getId(), - info.getTabletId(), replica.getId()); + info.getTabletId(), replica.getId(), refTabletId); IdChain repoIds = fileMapping.get(catalogIds); if (repoIds == null) { status = new Status(ErrCode.NOT_FOUND, @@ -1648,8 +1820,9 @@ public class RestoreJob extends AbstractJob { return; } + long refTabletId = -1L; // no ref tablet id IdChain catalogIds = new IdChain(tbl.getId(), part.getId(), idx.getId(), - info.getTabletId(), replica.getId()); + info.getTabletId(), replica.getId(), refTabletId); IdChain repoIds = fileMapping.get(catalogIds); if (repoIds == null) { status = new Status(ErrCode.NOT_FOUND, @@ -1791,6 +1964,14 @@ public class RestoreJob extends AbstractJob { return new Status(ErrCode.NOT_FOUND, "database " + dbId + " does not exist"); } + // replace the origin tables in atomic. + if (isAtomicRestore) { + Status st = atomicReplaceOlapTables(db, isReplay); + if (!st.ok()) { + return st; + } + } + // set all restored partition version and version hash // set all tables' state to NORMAL setTableStateToNormalAndUpdateProperties(db, true, isReplay); @@ -2041,6 +2222,12 @@ public class RestoreJob extends AbstractJob { // remove restored tbls for (Table restoreTbl : restoredTbls) { + if (isAtomicRestore && restoreTbl.getType() == TableType.OLAP + && !restoreTbl.getName().startsWith(ATOMIC_RESTORE_TABLE_PREFIX)) { + // In atomic restore, a table registered to db must have a name with the prefix, + // otherwise, it has not been registered and can be ignored here. + continue; + } LOG.info("remove restored table when cancelled: {}", restoreTbl.getName()); if (db.writeLockIfExist()) { try { @@ -2117,6 +2304,86 @@ public class RestoreJob extends AbstractJob { LOG.info("finished to cancel restore job. is replay: {}. {}", isReplay, this); } + private Status atomicReplaceOlapTables(Database db, boolean isReplay) { + assert isAtomicRestore; + for (String tableName : jobInfo.backupOlapTableObjects.keySet()) { + String originName = jobInfo.getAliasByOriginNameIfSet(tableName); + if (Env.isStoredTableNamesLowerCase()) { + originName = originName.toLowerCase(); + } + String aliasName = tableAliasWithAtomicRestore(originName); + + if (!db.writeLockIfExist()) { + return Status.OK; + } + try { + Table newTbl = db.getTableNullable(aliasName); + if (newTbl == null) { + LOG.warn("replace table from {} to {}, but the temp table is not found", aliasName, originName); + return new Status(ErrCode.COMMON_ERROR, "replace table failed, the temp table " + + aliasName + " is not found"); + } + if (newTbl.getType() != TableType.OLAP) { + LOG.warn("replace table from {} to {}, but the temp table is not OLAP, it type is {}", + aliasName, originName, newTbl.getType()); + return new Status(ErrCode.COMMON_ERROR, "replace table failed, the temp table " + aliasName + + " is not OLAP table, it is " + newTbl.getType()); + } + + OlapTable originOlapTbl = null; + Table originTbl = db.getTableNullable(originName); + if (originTbl != null) { + if (originTbl.getType() != TableType.OLAP) { + LOG.warn("replace table from {} to {}, but the origin table is not OLAP, it type is {}", + aliasName, originName, originTbl.getType()); + return new Status(ErrCode.COMMON_ERROR, "replace table failed, the origin table " + + originName + " is not OLAP table, it is " + originTbl.getType()); + } + originOlapTbl = (OlapTable) originTbl; // save the origin olap table, then drop it. + } + + // replace the table. + OlapTable newOlapTbl = (OlapTable) newTbl; + newOlapTbl.writeLock(); + try { + // rename new table name to origin table name and add the new table to database. + db.unregisterTable(aliasName); + newOlapTbl.checkAndSetName(originName, false); + db.unregisterTable(originName); + db.registerTable(newOlapTbl); + + // set the olap table state to normal immediately for querying + newOlapTbl.setState(OlapTableState.NORMAL); + LOG.info("atomic restore replace table {} name to {}, and set state to normal, origin table={}", + newOlapTbl.getId(), originName, originOlapTbl == null ? -1L : originOlapTbl.getId()); + } catch (DdlException e) { + LOG.warn("atomic restore replace table {} name from {} to {}", + newOlapTbl.getId(), aliasName, originName, e); + return new Status(ErrCode.COMMON_ERROR, "replace table from " + aliasName + " to " + originName + + " failed, reason=" + e.getMessage()); + } finally { + newOlapTbl.writeUnlock(); + } + + if (originOlapTbl != null) { + // The origin table is not used anymore, need to drop all its tablets. + originOlapTbl.writeLock(); + try { + LOG.info("drop the origin olap table {} by atomic restore. table={}", + originOlapTbl.getName(), originOlapTbl.getId()); + Env.getCurrentEnv().onEraseOlapTable(originOlapTbl, isReplay); + } finally { + originOlapTbl.writeUnlock(); + } + } + } finally { + db.writeUnlock(); + } + } + + return Status.OK; + } + private void setTableStateToNormalAndUpdateProperties(Database db, boolean committed, boolean isReplay) { for (String tableName : jobInfo.backupOlapTableObjects.keySet()) { Table tbl = db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(tableName)); @@ -2139,6 +2406,10 @@ public class RestoreJob extends AbstractJob { LOG.info("table {} set state from {} to normal", tableName, olapTbl.getState()); olapTbl.setState(OlapTableState.NORMAL); } + if (olapTbl.isInAtomicRestore()) { + olapTbl.clearInAtomicRestore(); + LOG.info("table {} set state from atomic restore to normal", tableName); + } BackupOlapTableInfo tblInfo = jobInfo.backupOlapTableObjects.get(tableName); for (Map.Entry<String, BackupPartitionInfo> partitionEntry : tblInfo.partitions.entrySet()) { @@ -2329,6 +2600,7 @@ public class RestoreJob extends AbstractJob { isBeingSynced = Boolean.parseBoolean(properties.get(PROP_IS_BEING_SYNCED)); isCleanTables = Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES)); isCleanPartitions = Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS)); + isAtomicRestore = Boolean.parseBoolean(properties.get(PROP_ATOMIC_RESTORE)); } @Override @@ -2338,4 +2610,20 @@ public class RestoreJob extends AbstractJob { sb.append(", state: ").append(state.name()); return sb.toString(); } + + public static String tableAliasWithAtomicRestore(String tableName) { + return ATOMIC_RESTORE_TABLE_PREFIX + tableName; + } + + private static class TabletRef { + public long tabletId; + public int schemaHash; + public TStorageMedium storageMedium; + + TabletRef(long tabletId, int schemaHash, TStorageMedium storageMedium) { + this.tabletId = tabletId; + this.schemaHash = schemaHash; + this.storageMedium = storageMedium; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index b7209ba550d..f425d94fa61 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -78,6 +78,7 @@ import org.apache.doris.analysis.TableRenameClause; import org.apache.doris.analysis.TruncateTableStmt; import org.apache.doris.analysis.UninstallPluginStmt; import org.apache.doris.backup.BackupHandler; +import org.apache.doris.backup.RestoreJob; import org.apache.doris.binlog.BinlogGcer; import org.apache.doris.binlog.BinlogManager; import org.apache.doris.blockrule.SqlBlockRuleMgr; @@ -3507,6 +3508,10 @@ public class Env { .append("\" = \""); sb.append(olapTable.isDuplicateWithoutKey()).append("\""); } + + if (olapTable.isInAtomicRestore()) { + sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_IN_ATOMIC_RESTORE).append("\" = \"true\""); + } } /** @@ -4532,6 +4537,9 @@ public class Env { if (db.getTable(newTableName).isPresent()) { throw new DdlException("Table name[" + newTableName + "] is already used"); } + if (db.getTable(RestoreJob.tableAliasWithAtomicRestore(newTableName)).isPresent()) { + throw new DdlException("Table name[" + newTableName + "] is already used (in restoring)"); + } if (table.isManagedTable()) { // olap table should also check if any rollup has same name as "newTableName" diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 48c70917da4..a8c06760e3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1865,6 +1865,10 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { throw new DdlException("Table[" + name + "]'s state(" + state.toString() + ") is not NORMAL. Do not allow doing ALTER ops"); } + if (tableProperty != null && tableProperty.isInAtomicRestore()) { + throw new DdlException("Table[" + name + "] is in atomic restore state. " + + "Do not allow doing ALTER ops"); + } } public boolean isStable(SystemInfoService infoService, TabletScheduler tabletScheduler) { @@ -2114,6 +2118,21 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { return ""; } + public void setInAtomicRestore() { + getOrCreatTableProperty().setInAtomicRestore().buildInAtomicRestore(); + } + + public void clearInAtomicRestore() { + getOrCreatTableProperty().clearInAtomicRestore().buildInAtomicRestore(); + } + + public boolean isInAtomicRestore() { + if (tableProperty != null) { + return tableProperty.isInAtomicRestore(); + } + return false; + } + public boolean getEnableLightSchemaChange() { if (tableProperty != null) { return tableProperty.getUseSchemaLightChange(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index 0d58aabea08..0c075191923 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -61,6 +61,7 @@ public class TableProperty implements Writable { private ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION; private boolean isInMemory = false; private short minLoadReplicaNum = -1; + private boolean isInAtomicRestore = false; private String storagePolicy = ""; private Boolean isBeingSynced = null; @@ -197,6 +198,26 @@ public class TableProperty implements Writable { return this; } + public TableProperty buildInAtomicRestore() { + isInAtomicRestore = Boolean.parseBoolean(properties.getOrDefault( + PropertyAnalyzer.PROPERTIES_IN_ATOMIC_RESTORE, "false")); + return this; + } + + public boolean isInAtomicRestore() { + return isInAtomicRestore; + } + + public TableProperty setInAtomicRestore() { + properties.put(PropertyAnalyzer.PROPERTIES_IN_ATOMIC_RESTORE, "true"); + return this; + } + + public TableProperty clearInAtomicRestore() { + properties.remove(PropertyAnalyzer.PROPERTIES_IN_ATOMIC_RESTORE); + return this; + } + public TableProperty buildEnableLightSchemaChange() { enableLightSchemaChange = Boolean.parseBoolean( properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ENABLE_LIGHT_SCHEMA_CHANGE, "false")); @@ -628,7 +649,8 @@ public class TableProperty implements Writable { .buildDisableAutoCompaction() .buildEnableSingleReplicaCompaction() .buildTimeSeriesCompactionEmptyRowsetsThreshold() - .buildTimeSeriesCompactionLevelThreshold(); + .buildTimeSeriesCompactionLevelThreshold() + .buildInAtomicRestore(); if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) { // get replica num from property map and create replica allocation String repNum = tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 288ee72afd3..553b322076a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -78,6 +78,7 @@ public class PropertyAnalyzer { public static final String PROPERTIES_SCHEMA_VERSION = "schema_version"; public static final String PROPERTIES_PARTITION_ID = "partition_id"; public static final String PROPERTIES_VISIBLE_VERSION = "visible_version"; + public static final String PROPERTIES_IN_ATOMIC_RESTORE = "in_atomic_restore"; public static final String PROPERTIES_BF_COLUMNS = "bloom_filter_columns"; public static final String PROPERTIES_BF_FPP = "bloom_filter_fpp"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 8f37f73a82b..e5131b1dd30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -56,6 +56,7 @@ import org.apache.doris.analysis.TableName; import org.apache.doris.analysis.TableRef; import org.apache.doris.analysis.TruncateTableStmt; import org.apache.doris.analysis.TypeDef; +import org.apache.doris.backup.RestoreJob; import org.apache.doris.catalog.BinlogConfig; import org.apache.doris.catalog.BrokerTable; import org.apache.doris.catalog.ColocateGroupSchema; @@ -899,10 +900,16 @@ public class InternalCatalog implements CatalogIf<Database> { OlapTable olapTable = (OlapTable) table; if ((olapTable.getState() != OlapTableState.NORMAL)) { throw new DdlException("The table [" + tableName + "]'s state is " + olapTable.getState() - + ", cannot be dropped." + " please cancel the operation on olap table firstly." + + ", cannot be dropped. please cancel the operation on olap table firstly." + " If you want to forcibly drop(cannot be recovered)," + " please use \"DROP table FORCE\"."); } + if (olapTable.isInAtomicRestore()) { + throw new DdlException("The table [" + tableName + "]'s state is in atomic restore" + + ", cannot be dropped. please cancel the restore operation on olap table" + + " firstly. If you want to forcibly drop(cannot be recovered)," + + " please use \"DROP table FORCE\"."); + } } dropTableInternal(db, table, stmt.isForceDrop(), watch, costTimes); @@ -1168,6 +1175,11 @@ public class InternalCatalog implements CatalogIf<Database> { ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); } } + if (db.getTable(RestoreJob.tableAliasWithAtomicRestore(tableName)).isPresent()) { + ErrorReport.reportDdlException( + "table[{}] is in atomic restore, please cancel the restore operation firstly", + ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); + } if (engineName.equals("olap")) { return createOlapTable(db, stmt); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index abe85b06bef..be05c023166 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3118,6 +3118,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (request.isCleanTables()) { properties.put(RestoreStmt.PROP_CLEAN_TABLES, "true"); } + if (request.isAtomicRestore()) { + properties.put(RestoreStmt.PROP_ATOMIC_RESTORE, "true"); + } AbstractBackupTableRefClause restoreTableRefClause = null; if (request.isSetTableRefs()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SnapshotTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/SnapshotTask.java index 71b3570f288..81177305683 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/SnapshotTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SnapshotTask.java @@ -29,6 +29,7 @@ public class SnapshotTask extends AgentTask { private int schemaHash; private long timeoutMs; private boolean isRestoreTask; + private Long refTabletId; // Set to true if this task for AdminCopyTablet. // Otherwise, it is for Backup/Restore operation. @@ -98,13 +99,23 @@ public class SnapshotTask extends AgentTask { return resultSnapshotPath; } + public void setRefTabletId(long refTabletId) { + assert refTabletId > 0; + this.refTabletId = refTabletId; + } + public TSnapshotRequest toThrift() { TSnapshotRequest request = new TSnapshotRequest(tabletId, schemaHash); - request.setVersion(version); request.setListFiles(true); request.setPreferredSnapshotVersion(TypesConstants.TPREFER_SNAPSHOT_REQ_VERSION); request.setTimeout(timeoutMs / 1000); request.setIsCopyTabletTask(isCopyTabletTask); + if (refTabletId != null) { + request.setRefTabletId(refTabletId); + } + if (version > 0L) { + request.setVersion(version); + } return request; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreFileMappingTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreFileMappingTest.java index d37a63f6d14..85de627fa44 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreFileMappingTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreFileMappingTest.java @@ -31,14 +31,14 @@ public class RestoreFileMappingTest { @Before public void setUp() { - src = new IdChain(10005L, 10006L, 10005L, 10007L, 10008L); - dest = new IdChain(10004L, 10003L, 10004L, 10007L, -1L); + src = new IdChain(10005L, 10006L, 10005L, 10007L, 10008L, -1L); + dest = new IdChain(10004L, 10003L, 10004L, 10007L, -1L, -1L); fileMapping.putMapping(src, dest, true); } @Test public void test() { - IdChain key = new IdChain(10005L, 10006L, 10005L, 10007L, 10008L); + IdChain key = new IdChain(10005L, 10006L, 10005L, 10007L, 10008L, -1L); Assert.assertEquals(key, src); Assert.assertEquals(src, key); IdChain val = fileMapping.get(key); diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java index fb4d9fe768f..cccfdb517da 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java @@ -251,7 +251,8 @@ public class RestoreJobTest { db.unregisterTable(expectedRestoreTbl.getName()); job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), db.getFullName(), jobInfo, false, - new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, env, repo.getId()); + new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, + env, repo.getId()); List<Table> tbls = Lists.newArrayList(); List<Resource> resources = Lists.newArrayList(); diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index a5b91dc2498..e879b54615d 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -358,6 +358,7 @@ struct TSnapshotRequest { 11: optional Types.TVersion start_version 12: optional Types.TVersion end_version 13: optional bool is_copy_binlog + 14: optional Types.TTabletId ref_tablet_id } struct TReleaseSnapshotRequest { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 5480a84cf69..dac90824fa2 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1211,6 +1211,7 @@ struct TRestoreSnapshotRequest { 12: optional binary job_info 13: optional bool clean_tables 14: optional bool clean_partitions + 15: optional bool atomic_restore } struct TRestoreSnapshotResult { diff --git a/regression-test/data/backup_restore/test_backup_restore_atomic.out b/regression-test/data/backup_restore/test_backup_restore_atomic.out new file mode 100644 index 00000000000..bee7a4da44f --- /dev/null +++ b/regression-test/data/backup_restore/test_backup_restore_atomic.out @@ -0,0 +1,78 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +10 10 +20 20 +30 30 +40 40 +50 50 +60 60 +70 70 +80 80 +90 90 +100 100 + +-- !sql -- +10 10 +20 20 + +-- !sql -- +10 10 +20 20 +30 30 +40 40 +50 50 +60 60 +70 70 +80 80 +90 90 +100 100 + +-- !sql -- +10 10 +20 20 +30 30 +40 40 +50 50 +60 60 +70 70 +80 80 +90 90 +100 100 + +-- !sql -- +10 10 +20 20 +30 30 +40 40 +50 50 +60 60 +70 70 +80 80 +90 90 +100 100 + +-- !sql -- +10 10 +20 20 +30 30 +40 40 +50 50 +60 60 +70 70 +80 80 +90 90 +100 100 + +-- !sql -- +10 20 +20 40 +30 60 +40 80 +50 100 +60 120 +70 140 +80 160 +90 180 +100 200 +200 200 + diff --git a/regression-test/data/backup_restore/test_backup_restore_atomic_with_view.out b/regression-test/data/backup_restore/test_backup_restore_atomic_with_view.out new file mode 100644 index 00000000000..cad6dbe8fd8 --- /dev/null +++ b/regression-test/data/backup_restore/test_backup_restore_atomic_with_view.out @@ -0,0 +1,60 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 +7 7 +8 8 +9 9 +10 10 + +-- !sql -- +6 6 +7 7 +8 8 +9 9 +10 10 + +-- !sql -- +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 +7 7 +8 8 +9 9 +10 10 + +-- !sql -- +6 6 +7 7 +8 8 +9 9 +10 10 + +-- !sql -- +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 +7 7 +8 8 +9 9 +10 10 +11 11 + +-- !sql -- +6 6 +7 7 +8 8 +9 9 +10 10 +11 11 + diff --git a/regression-test/suites/backup_restore/test_backup_restore_atomic.groovy b/regression-test/suites/backup_restore/test_backup_restore_atomic.groovy new file mode 100644 index 00000000000..4b87340fb35 --- /dev/null +++ b/regression-test/suites/backup_restore/test_backup_restore_atomic.groovy @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_backup_restore_atomic", "backup_restore") { + String suiteName = "test_backup_restore_atomic" + String dbName = "${suiteName}_db_1" + String dbName1 = "${suiteName}_db_2" + String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "") + String snapshotName = "${suiteName}_snapshot" + String tableNamePrefix = "${suiteName}_tables" + + def syncer = getSyncer() + syncer.createS3Repository(repoName) + sql "CREATE DATABASE IF NOT EXISTS ${dbName}" + sql "CREATE DATABASE IF NOT EXISTS ${dbName1}" + + // 1. restore to not exists table_0 + // 2. restore partial data to table_1 + // 3. restore less data to table_2 + // 4. restore incremental data to table_3 + int numTables = 4; + List<String> tables = [] + for (int i = 0; i < numTables; ++i) { + String tableName = "${tableNamePrefix}_${i}" + tables.add(tableName) + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + sql """ + CREATE TABLE ${dbName}.${tableName} ( + `id` LARGEINT NOT NULL, + `count` LARGEINT SUM DEFAULT "0" + ) + AGGREGATE KEY(`id`) + PARTITION BY RANGE(`id`) + ( + PARTITION p1 VALUES LESS THAN ("10"), + PARTITION p2 VALUES LESS THAN ("20"), + PARTITION p3 VALUES LESS THAN ("30"), + PARTITION p4 VALUES LESS THAN ("40"), + PARTITION p5 VALUES LESS THAN ("50"), + PARTITION p6 VALUES LESS THAN ("60"), + PARTITION p7 VALUES LESS THAN ("120") + ) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES + ( + "replication_num" = "1" + ) + """ + } + + // 5. the len of table name equals to the config table_name_length_limit + def maxLabelLen = getFeConfig("table_name_length_limit").toInteger() + def maxTableName = "".padRight(maxLabelLen, "x") + logger.info("config table_name_length_limit = ${maxLabelLen}, table name = ${maxTableName}") + sql "DROP TABLE IF EXISTS ${dbName}.${maxTableName}" + sql """ + CREATE TABLE ${dbName}.${maxTableName} ( + `id` LARGEINT NOT NULL, + `count` LARGEINT SUM DEFAULT "0" + ) + AGGREGATE KEY(`id`) + PARTITION BY RANGE(`id`) + ( + PARTITION p1 VALUES LESS THAN ("10"), + PARTITION p2 VALUES LESS THAN ("20"), + PARTITION p3 VALUES LESS THAN ("30"), + PARTITION p4 VALUES LESS THAN ("40"), + PARTITION p5 VALUES LESS THAN ("50"), + PARTITION p6 VALUES LESS THAN ("60"), + PARTITION p7 VALUES LESS THAN ("120") + ) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES + ( + "replication_num" = "1" + ) + """ + tables.add(maxTableName) + + int numRows = 10; + List<String> values = [] + for (int j = 1; j <= numRows; ++j) { + values.add("(${j}0, ${j}0)") + } + + sql "INSERT INTO ${dbName}.${tableNamePrefix}_0 VALUES ${values.join(",")}" + sql "INSERT INTO ${dbName}.${tableNamePrefix}_1 VALUES ${values.join(",")}" + sql "INSERT INTO ${dbName}.${tableNamePrefix}_2 VALUES ${values.join(",")}" + sql "INSERT INTO ${dbName}.${tableNamePrefix}_3 VALUES ${values.join(",")}" + sql "INSERT INTO ${dbName}.${maxTableName} VALUES ${values.join(",")}" + + // the other partitions of table_1 will be drop + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + ON ( + ${tableNamePrefix}_0, + ${tableNamePrefix}_1 PARTITION (p1, p2, p3), + ${tableNamePrefix}_2, + ${tableNamePrefix}_3, + ${maxTableName} + ) + """ + + syncer.waitSnapshotFinish(dbName) + + def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName) + assertTrue(snapshot != null) + + // drop table_0 + sql "DROP TABLE ${dbName}.${tableNamePrefix}_0 FORCE" + + // insert external data to table_2 + sql "INSERT INTO ${dbName}.${tableNamePrefix}_2 VALUES ${values.join(",")}" + + sql "TRUNCATE TABLE ${dbName}.${tableNamePrefix}_3" + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "atomic_restore" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + for (def tableName in tables) { + qt_sql "SELECT * FROM ${dbName}.${tableName} ORDER BY id" + } + + // restore table_3 to new db + sql """ + RESTORE SNAPSHOT ${dbName1}.${snapshotName} + FROM `${repoName}` + ON (${tableNamePrefix}_3) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "atomic_restore" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName1) + + qt_sql "SELECT * FROM ${dbName1}.${tableNamePrefix}_3 ORDER BY id" + + // add partition and insert some data. + sql "ALTER TABLE ${dbName}.${tableNamePrefix}_3 ADD PARTITION p8 VALUES LESS THAN MAXVALUE" + sql "INSERT INTO ${dbName}.${tableNamePrefix}_3 VALUES ${values.join(",")}" + sql "INSERT INTO ${dbName}.${tableNamePrefix}_3 VALUES (200, 200)" + + // backup again + snapshotName = "${snapshotName}_1" + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + ON (${tableNamePrefix}_3) + """ + + syncer.waitSnapshotFinish(dbName) + + snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName) + assertTrue(snapshot != null) + + // restore with incremental data + sql """ + RESTORE SNAPSHOT ${dbName1}.${snapshotName} + FROM `${repoName}` + ON (${tableNamePrefix}_3) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "atomic_restore" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName1) + + qt_sql "SELECT * FROM ${dbName1}.${tableNamePrefix}_3 ORDER BY id" + + for (def tableName in tables) { + sql "DROP TABLE ${dbName}.${tableName} FORCE" + } + sql "DROP DATABASE ${dbName} FORCE" + sql "DROP DATABASE ${dbName1} FORCE" + sql "DROP REPOSITORY `${repoName}`" +} + + diff --git a/regression-test/suites/backup_restore/test_backup_restore_atomic_cancel.groovy b/regression-test/suites/backup_restore/test_backup_restore_atomic_cancel.groovy new file mode 100644 index 00000000000..3487c93b0d6 --- /dev/null +++ b/regression-test/suites/backup_restore/test_backup_restore_atomic_cancel.groovy @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_backup_restore_atomic_cancel") { + String suiteName = "test_backup_restore_atomic_cancelled" + String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "") + String dbName = "${suiteName}_db" + String tableName = "${suiteName}_table" + String tableName1 = "${suiteName}_table_1" + String viewName = "${suiteName}_view" + String snapshotName = "${suiteName}_snapshot" + + def syncer = getSyncer() + syncer.createS3Repository(repoName) + + sql "CREATE DATABASE IF NOT EXISTS ${dbName}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + sql """ + CREATE TABLE ${dbName}.${tableName} ( + `id` LARGEINT NOT NULL, + `count` LARGEINT SUM DEFAULT "0") + AGGREGATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES + ( + "replication_num" = "1" + ) + """ + sql "DROP TABLE IF EXISTS ${dbName}.${tableName1}" + sql """ + CREATE TABLE ${dbName}.${tableName1} ( + `id` LARGEINT NOT NULL, + `count` LARGEINT SUM DEFAULT "0") + AGGREGATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES + ( + "replication_num" = "1" + ) + """ + sql "DROP VIEW IF EXISTS ${dbName}.${viewName}" + sql """ + CREATE VIEW ${dbName}.${viewName} + AS + SELECT id, count FROM ${dbName}.${tableName} + WHERE id > 5 + """ + + List<String> values = [] + for (int i = 1; i <= 10; ++i) { + values.add("(${i}, ${i})") + } + sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}" + def result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + sql "INSERT INTO ${dbName}.${tableName1} VALUES ${values.join(",")}" + result = sql "SELECT * FROM ${dbName}.${tableName1}" + assertEquals(result.size(), values.size()); + + + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + """ + + syncer.waitSnapshotFinish(dbName) + + // alter view and restore, it must failed because the signatures are not matched + + sql """ + ALTER VIEW ${dbName}.${viewName} + AS + SELECT id,count FROM ${dbName}.${tableName} + WHERE id < 100 + + """ + + sql "INSERT INTO ${dbName}.${tableName} VALUES (11, 11)" + + def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName) + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "atomic_restore" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + def restore_result = sql_return_maparray """ SHOW RESTORE FROM ${dbName} WHERE Label ="${snapshotName}" """ + restore_result.last() + logger.info("show restore result: ${restore_result}") + assertTrue(restore_result.last().State == "CANCELLED") + + + // Do not affect any tables. + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size() + 1); + + result = sql "SELECT * FROM ${dbName}.${tableName1}" + assertEquals(result.size(), values.size()); + + sql "DROP TABLE ${dbName}.${tableName} FORCE" + sql "DROP TABLE ${dbName}.${tableName1} FORCE" + sql "DROP DATABASE ${dbName} FORCE" + sql "DROP REPOSITORY `${repoName}`" +} + + diff --git a/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy new file mode 100644 index 00000000000..46a3ca5b29d --- /dev/null +++ b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_backup_restore_atomic_with_alter", "backup_restore") { + if (!getFeConfig("enable_debug_points").equals("true")) { + logger.info("Config.enable_debug_points=true is required") + return + } + + String suiteName = "test_backup_restore_atomic_with_alter" + String dbName = "${suiteName}_db" + String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "") + String snapshotName = "snapshot_" + UUID.randomUUID().toString().replace("-", "") + String tableNamePrefix = "${suiteName}_tables" + + def syncer = getSyncer() + syncer.createS3Repository(repoName) + sql "DROP DATABASE IF EXISTS ${dbName} FORCE" + sql "CREATE DATABASE ${dbName}" + + // during restoring, if: + // 1. table_0 not exists, create table_0 is not allowed + // 2. table_1 exists, alter operation is not allowed + // 3. table_1 exists, drop table is not allowed + // 4. table_0 not exists, rename table_2 to table_0 is not allowed + int numTables = 3; + List<String> tables = [] + for (int i = 0; i < numTables; ++i) { + String tableName = "${tableNamePrefix}_${i}" + tables.add(tableName) + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + sql """ + CREATE TABLE ${dbName}.${tableName} ( + `id` LARGEINT NOT NULL, + `count` LARGEINT SUM DEFAULT "0" + ) + AGGREGATE KEY(`id`) + PARTITION BY RANGE(`id`) + ( + PARTITION p1 VALUES LESS THAN ("10"), + PARTITION p2 VALUES LESS THAN ("20"), + PARTITION p3 VALUES LESS THAN ("30"), + PARTITION p4 VALUES LESS THAN ("40"), + PARTITION p5 VALUES LESS THAN ("50"), + PARTITION p6 VALUES LESS THAN ("60"), + PARTITION p7 VALUES LESS THAN ("120") + ) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES + ( + "replication_num" = "1" + ) + """ + } + + int numRows = 10; + List<String> values = [] + for (int j = 1; j <= numRows; ++j) { + values.add("(${j}0, ${j}0)") + } + + sql "INSERT INTO ${dbName}.${tableNamePrefix}_0 VALUES ${values.join(",")}" + sql "INSERT INTO ${dbName}.${tableNamePrefix}_1 VALUES ${values.join(",")}" + sql "INSERT INTO ${dbName}.${tableNamePrefix}_2 VALUES ${values.join(",")}" + + // only backup table 0,1 + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + ON ( + ${tableNamePrefix}_0, + ${tableNamePrefix}_1 + ) + """ + + syncer.waitSnapshotFinish(dbName) + + def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName) + assertTrue(snapshot != null) + + // drop table_0 + sql "DROP TABLE ${dbName}.${tableNamePrefix}_0 FORCE" + + // disable restore + GetDebugPoint().enableDebugPointForAllFEs("FE.PAUSE_NON_PENDING_RESTORE_JOB", [value:snapshotName]) + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "atomic_restore" = "true" + ) + """ + + boolean restore_paused = false + for (int k = 0; k < 60; k++) { + def records = sql_return_maparray """ SHOW RESTORE FROM ${dbName} WHERE Label = "${snapshotName}" """ + if (records.size() == 1 && records[0].State != 'PENDING') { + restore_paused = true + break + } + logger.info("SHOW RESTORE result: ${records}") + sleep(3000) + } + assertTrue(restore_paused) + + // 0. table_1 has in_atomic_restore property + def show_result = sql """ SHOW CREATE TABLE ${dbName}.${tableNamePrefix}_1 """ + logger.info("SHOW CREATE TABLE ${tableNamePrefix}_1: ${show_result}") + assertTrue(show_result[0][1].contains("in_atomic_restore")) + + // 1. create a restoring table (not exists before) + expectExceptionLike({ -> + sql """ + CREATE TABLE ${dbName}.${tableNamePrefix}_0 + ( + `id` LARGEINT NOT NULL, + `count` LARGEINT SUM DEFAULT "0" + ) + AGGREGATE KEY(`id`) + PARTITION BY RANGE(`id`) + ( + PARTITION p1 VALUES LESS THAN ("10"), + PARTITION p2 VALUES LESS THAN ("20"), + PARTITION p3 VALUES LESS THAN ("30"), + PARTITION p4 VALUES LESS THAN ("40"), + PARTITION p5 VALUES LESS THAN ("50"), + PARTITION p6 VALUES LESS THAN ("60"), + PARTITION p7 VALUES LESS THAN ("120") + ) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES + ( + "replication_num" = "1" + ) + """ + }, "is in atomic restore, please cancel the restore operation firstly") + + // 2. alter is not allowed + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.${tableNamePrefix}_1 + ADD PARTITION p8 VALUES LESS THAN("200") + """ + }, "Do not allow doing ALTER ops") + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.${tableNamePrefix}_1 + DROP PARTITION p1 + """ + }, "Do not allow doing ALTER ops") + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.${tableNamePrefix}_1 + MODIFY PARTITION p1 SET ("key"="value") + """ + }, "Do not allow doing ALTER ops") + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.${tableNamePrefix}_1 + ADD COLUMN new_col INT DEFAULT "0" AFTER count + """ + }, "Do not allow doing ALTER ops") + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.${tableNamePrefix}_1 + DROP COLUMN count + """ + }, "Do not allow doing ALTER ops") + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.${tableNamePrefix}_1 + SET ("is_being_synced"="false") + """ + }, "Do not allow doing ALTER ops") + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.${tableNamePrefix}_1 + RENAME newTableName + """ + }, "Do not allow doing ALTER ops") + // BTW, the tmp table also don't allow rename + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.__doris_atomic_restore_prefix__${tableNamePrefix}_1 + RENAME newTableName + """ + }, "Do not allow doing ALTER ops") + // 3. drop table is not allowed + expectExceptionLike({ + sql """ + DROP TABLE ${dbName}.${tableNamePrefix}_1 + """ + }, "state is in atomic restore") + expectExceptionLike({ + sql """ + DROP TABLE ${dbName}.__doris_atomic_restore_prefix__${tableNamePrefix}_1 + """ + }, "state is RESTORE") + // 4. the table name is occupied + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.${tableNamePrefix}_2 + RENAME ${tableNamePrefix}_0 + """ + }, "is already used (in restoring)") + + + sql "CANCEL RESTORE FROM ${dbName}" + + // 5. The restore job is cancelled, the in_atomic_restore property has been removed. + show_result = sql """ SHOW CREATE TABLE ${dbName}.${tableNamePrefix}_1 """ + logger.info("SHOW CREATE TABLE ${tableNamePrefix}_1: ${show_result}") + assertFalse(show_result[0][1].contains("in_atomic_restore")) + + for (def tableName in tables) { + sql "DROP TABLE IF EXISTS ${dbName}.${tableName} FORCE" + } + sql "DROP DATABASE ${dbName} FORCE" + sql "DROP REPOSITORY `${repoName}`" +} + + + diff --git a/regression-test/suites/backup_restore/test_backup_restore_atomic_with_view.groovy b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_view.groovy new file mode 100644 index 00000000000..9d090281364 --- /dev/null +++ b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_view.groovy @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_backup_restore_atomic_with_view", "backup_restore") { + String suiteName = "backup_restore_atomic_with_view" + String dbName = "${suiteName}_db" + String dbName1 = "${suiteName}_db_1" + String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "") + String snapshotName = "${suiteName}_snapshot" + String tableName = "${suiteName}_table" + String viewName = "${suiteName}_view" + + def syncer = getSyncer() + syncer.createS3Repository(repoName) + sql "CREATE DATABASE IF NOT EXISTS ${dbName}" + sql "CREATE DATABASE IF NOT EXISTS ${dbName1}" + + int numRows = 10; + sql "DROP TABLE IF EXISTS ${dbName}.${tableName} FORCE" + sql "DROP VIEW IF EXISTS ${dbName}.${viewName}" + sql """ + CREATE TABLE ${dbName}.${tableName} ( + `id` LARGEINT NOT NULL, + `count` LARGEINT SUM DEFAULT "0" + ) + AGGREGATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES + ( + "replication_num" = "1" + ) + """ + List<String> values = [] + for (int j = 1; j <= numRows; ++j) { + values.add("(${j}, ${j})") + } + sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}" + + sql """CREATE VIEW ${dbName}.${viewName} (id, count) + AS + SELECT * FROM ${dbName}.${tableName} WHERE count > 5 + """ + + qt_sql "SELECT * FROM ${dbName}.${tableName} ORDER BY id ASC" + qt_sql "SELECT * FROM ${dbName}.${viewName} ORDER BY id ASC" + + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + """ + + syncer.waitSnapshotFinish(dbName) + + def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName) + assertTrue(snapshot != null) + + // restore new view + sql "DROP TABLE IF EXISTS ${dbName1}.${tableName} FORCE" + sql "DROP VIEW IF EXISTS ${dbName1}.${viewName}" + + sql """ + RESTORE SNAPSHOT ${dbName1}.${snapshotName} + FROM `${repoName}` + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "atomic_restore" = "true", + "reserve_replica" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName1) + + qt_sql "SELECT * FROM ${dbName1}.${tableName} ORDER BY id ASC" + qt_sql "SELECT * FROM ${dbName1}.${viewName} ORDER BY id ASC" + def show_view_result = sql_return_maparray "SHOW VIEW FROM ${tableName} FROM ${dbName1}" + logger.info("show view result: ${show_view_result}") + assertTrue(show_view_result.size() == 1); + def show_view = show_view_result[0]['Create View'] + assertTrue(show_view.contains("${dbName1}")) + assertTrue(show_view.contains("${tableName}")) + + // restore an exists view + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "atomic_restore" = "true", + "reserve_replica" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + def restore_result = sql_return_maparray """ SHOW RESTORE FROM ${dbName} WHERE Label ="${snapshotName}" """ + restore_result.last() + logger.info("show restore result: ${restore_result}") + assertTrue(restore_result.last().State == "FINISHED") + + // View could read the incremental data. + sql "INSERT INTO ${dbName}.${tableName} VALUES (11, 11)" + + qt_sql "SELECT * FROM ${dbName}.${tableName} ORDER BY id ASC" + qt_sql "SELECT * FROM ${dbName}.${viewName} ORDER BY id ASC" + + sql "DROP REPOSITORY `${repoName}`" +} + + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org