This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 0d38a9a36d4 [feature](restore) support atomic restore (#41107)
0d38a9a36d4 is described below

commit 0d38a9a36d4c4be6754b1c7d4239277504be3b99
Author: walter <w41te...@gmail.com>
AuthorDate: Tue Sep 24 09:41:41 2024 +0800

    [feature](restore) support atomic restore (#41107)
    
    Cherry-pick #40353, #40734, #40817, #40876, #40921, #41017, #41083
---
 be/src/olap/rowset/rowset_meta_manager.cpp         |  63 ++++
 be/src/olap/rowset/rowset_meta_manager.h           |   3 +
 be/src/olap/snapshot_manager.cpp                   |  42 ++-
 be/src/olap/snapshot_manager.h                     |   1 +
 be/src/olap/tablet.cpp                             |   5 +
 be/src/olap/tablet.h                               |   1 +
 be/src/olap/tablet_manager.cpp                     |  30 +-
 be/src/olap/task/engine_storage_migration_task.cpp | 101 +++++-
 be/src/olap/task/engine_storage_migration_task.h   |   4 +-
 .../org/apache/doris/analysis/RestoreStmt.java     |   9 +
 .../org/apache/doris/backup/BackupHandler.java     |   6 +-
 .../apache/doris/backup/RestoreFileMapping.java    |  18 +-
 .../java/org/apache/doris/backup/RestoreJob.java   | 372 ++++++++++++++++++---
 .../main/java/org/apache/doris/catalog/Env.java    |   8 +
 .../java/org/apache/doris/catalog/OlapTable.java   |  19 ++
 .../org/apache/doris/catalog/TableProperty.java    |  24 +-
 .../apache/doris/common/util/PropertyAnalyzer.java |   1 +
 .../apache/doris/datasource/InternalCatalog.java   |  14 +-
 .../apache/doris/service/FrontendServiceImpl.java  |   3 +
 .../java/org/apache/doris/task/SnapshotTask.java   |  13 +-
 .../doris/backup/RestoreFileMappingTest.java       |   6 +-
 .../org/apache/doris/backup/RestoreJobTest.java    |   3 +-
 gensrc/thrift/AgentService.thrift                  |   1 +
 gensrc/thrift/FrontendService.thrift               |   1 +
 .../backup_restore/test_backup_restore_atomic.out  |  78 +++++
 .../test_backup_restore_atomic_with_view.out       |  60 ++++
 .../test_backup_restore_atomic.groovy              | 209 ++++++++++++
 .../test_backup_restore_atomic_cancel.groovy       | 128 +++++++
 .../test_backup_restore_atomic_with_alter.groovy   | 241 +++++++++++++
 .../test_backup_restore_atomic_with_view.groovy    | 124 +++++++
 30 files changed, 1509 insertions(+), 79 deletions(-)

diff --git a/be/src/olap/rowset/rowset_meta_manager.cpp 
b/be/src/olap/rowset/rowset_meta_manager.cpp
index d89be5ab8ec..3f482427139 100644
--- a/be/src/olap/rowset/rowset_meta_manager.cpp
+++ b/be/src/olap/rowset/rowset_meta_manager.cpp
@@ -357,6 +357,69 @@ Status 
RowsetMetaManager::_get_rowset_binlog_metas(OlapMeta* meta, const TabletU
     return status;
 }
 
+Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, TabletUid 
tablet_uid,
+                                                  Version version, 
RowsetBinlogMetasPB* metas_pb) {
+    Status status;
+    auto tablet_uid_str = tablet_uid.to_string();
+    auto prefix_key = make_binlog_meta_key_prefix(tablet_uid);
+    auto begin_key = make_binlog_meta_key_prefix(tablet_uid, version.first);
+    auto end_key = make_binlog_meta_key_prefix(tablet_uid, version.second + 1);
+    auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &end_key](
+                                 std::string_view key, std::string_view value) 
-> bool {
+        VLOG_DEBUG << fmt::format("get rowset binlog metas, key={}, value={}", 
key, value);
+        if (key.compare(end_key) > 0) { // the binlog meta key is binary 
comparable.
+            // All binlog meta has been scanned
+            return false;
+        }
+
+        if (!starts_with_binlog_meta(key)) {
+            auto err_msg = fmt::format("invalid binlog meta key:{}", key);
+            status = Status::InternalError(err_msg);
+            LOG(WARNING) << err_msg;
+            return false;
+        }
+
+        BinlogMetaEntryPB binlog_meta_entry_pb;
+        if (!binlog_meta_entry_pb.ParseFromArray(value.data(), value.size())) {
+            auto err_msg = fmt::format("fail to parse binlog meta value:{}", 
value);
+            status = Status::InternalError(err_msg);
+            LOG(WARNING) << err_msg;
+            return false;
+        }
+
+        const auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();
+        auto* binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
+        binlog_meta_pb->set_rowset_id(rowset_id);
+        binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
+        binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
+        binlog_meta_pb->set_meta_key(std::string {key});
+        binlog_meta_pb->set_meta(std::string {value});
+
+        auto binlog_data_key =
+                make_binlog_data_key(tablet_uid_str, 
binlog_meta_entry_pb.version(), rowset_id);
+        std::string binlog_data;
+        status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, 
&binlog_data);
+        if (!status.ok()) {
+            LOG(WARNING) << status.to_string();
+            return false;
+        }
+        binlog_meta_pb->set_data_key(binlog_data_key);
+        binlog_meta_pb->set_data(binlog_data);
+
+        return false;
+    };
+
+    Status iterStatus =
+            meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, prefix_key, 
traverse_func);
+    if (!iterStatus.ok()) {
+        LOG(WARNING) << fmt::format(
+                "fail to iterate binlog meta. prefix_key:{}, version:{}, 
status:{}", prefix_key,
+                version.to_string(), iterStatus.to_string());
+        return iterStatus;
+    }
+    return status;
+}
+
 Status RowsetMetaManager::_get_all_rowset_binlog_metas(OlapMeta* meta, const 
TabletUid tablet_uid,
                                                        RowsetBinlogMetasPB* 
metas_pb) {
     Status status;
diff --git a/be/src/olap/rowset/rowset_meta_manager.h 
b/be/src/olap/rowset/rowset_meta_manager.h
index 0cfbb3383e3..b13f1e8d1d7 100644
--- a/be/src/olap/rowset/rowset_meta_manager.h
+++ b/be/src/olap/rowset/rowset_meta_manager.h
@@ -72,6 +72,9 @@ public:
     static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid 
tablet_uid,
                                           const std::vector<int64_t>& 
binlog_versions,
                                           RowsetBinlogMetasPB* metas_pb);
+    // get all binlog metas of a tablet in version.
+    static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid 
tablet_uid,
+                                          Version version, 
RowsetBinlogMetasPB* metas_pb);
     static Status remove_binlog(OlapMeta* meta, const std::string& suffix);
     static Status ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
                                       RowsetBinlogMetasPB* metas_pb);
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 6e36d0756af..d05d51ddffa 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -86,16 +86,33 @@ Status SnapshotManager::make_snapshot(const 
TSnapshotRequest& request, string* s
         return Status::Error<INVALID_ARGUMENT>("output parameter cannot be 
null");
     }
 
-    TabletSharedPtr ref_tablet =
+    TabletSharedPtr target_tablet =
             
StorageEngine::instance()->tablet_manager()->get_tablet(request.tablet_id);
 
-    DBUG_EXECUTE_IF("SnapshotManager::make_snapshot.inject_failure", { 
ref_tablet = nullptr; })
+    DBUG_EXECUTE_IF("SnapshotManager::make_snapshot.inject_failure", { 
target_tablet = nullptr; })
 
-    if (ref_tablet == nullptr) {
+    if (target_tablet == nullptr) {
         return Status::Error<TABLE_NOT_FOUND>("failed to get tablet. 
tablet={}", request.tablet_id);
     }
 
-    res = _create_snapshot_files(ref_tablet, request, snapshot_path, 
allow_incremental_clone);
+    TabletSharedPtr ref_tablet = target_tablet;
+    if (request.__isset.ref_tablet_id) {
+        int64_t ref_tablet_id = request.ref_tablet_id;
+        TabletSharedPtr base_tablet =
+                
StorageEngine::instance()->tablet_manager()->get_tablet(ref_tablet_id);
+
+        // Some tasks, like medium migration, cause the target tablet and base 
tablet to stay on
+        // different disks. In this case, we fall through to the normal 
restore path.
+        //
+        // Otherwise, we can directly link the rowset files from the base 
tablet to the target tablet.
+        if (base_tablet != nullptr &&
+            base_tablet->data_dir()->path() == 
target_tablet->data_dir()->path()) {
+            ref_tablet = std::move(base_tablet);
+        }
+    }
+
+    res = _create_snapshot_files(ref_tablet, target_tablet, request, 
snapshot_path,
+                                 allow_incremental_clone);
 
     if (!res.ok()) {
         LOG(WARNING) << "failed to make snapshot. res=" << res << " tablet=" 
<< request.tablet_id;
@@ -368,6 +385,7 @@ Status check_version_continuity(const 
std::vector<RowsetSharedPtr>& rowsets) {
 }
 
 Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& 
ref_tablet,
+                                               const TabletSharedPtr& 
target_tablet,
                                                const TSnapshotRequest& request,
                                                string* snapshot_path,
                                                bool* allow_incremental_clone) {
@@ -387,10 +405,10 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
         timeout_s = request.timeout;
     }
     std::string snapshot_id_path;
-    res = _calc_snapshot_id_path(ref_tablet, timeout_s, &snapshot_id_path);
+    res = _calc_snapshot_id_path(target_tablet, timeout_s, &snapshot_id_path);
     if (!res.ok()) {
-        LOG(WARNING) << "failed to calc snapshot_id_path, ref tablet="
-                     << ref_tablet->data_dir()->path();
+        LOG(WARNING) << "failed to calc snapshot_id_path, tablet="
+                     << target_tablet->data_dir()->path();
         return res;
     }
 
@@ -398,12 +416,12 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
 
     // schema_full_path_desc.filepath:
     //      /snapshot_id_path/tablet_id/schema_hash/
-    auto schema_full_path = get_schema_hash_full_path(ref_tablet, 
snapshot_id_path);
+    auto schema_full_path = get_schema_hash_full_path(target_tablet, 
snapshot_id_path);
     // header_path:
     //      /schema_full_path/tablet_id.hdr
-    auto header_path = _get_header_full_path(ref_tablet, schema_full_path);
+    auto header_path = _get_header_full_path(target_tablet, schema_full_path);
     //      /schema_full_path/tablet_id.hdr.json
-    auto json_header_path = _get_json_header_full_path(ref_tablet, 
schema_full_path);
+    auto json_header_path = _get_json_header_full_path(target_tablet, 
schema_full_path);
     bool exists = true;
     RETURN_IF_ERROR(io::global_local_filesystem()->exists(schema_full_path, 
&exists));
     if (exists) {
@@ -585,7 +603,9 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
                         << rs->rowset_meta()->empty();
         }
         if (!res.ok()) {
-            LOG(WARNING) << "fail to create hard link. [path=" << 
snapshot_id_path << "]";
+            LOG(WARNING) << "fail to create hard link. path=" << 
snapshot_id_path
+                         << " tablet=" << target_tablet->tablet_id()
+                         << " ref tablet=" << ref_tablet->tablet_id();
             break;
         }
 
diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h
index 9781d4f69ee..e32409dd3cd 100644
--- a/be/src/olap/snapshot_manager.h
+++ b/be/src/olap/snapshot_manager.h
@@ -81,6 +81,7 @@ private:
                                       const std::vector<RowsetSharedPtr>& 
consistent_rowsets);
 
     Status _create_snapshot_files(const TabletSharedPtr& ref_tablet,
+                                  const TabletSharedPtr& target_tablet,
                                   const TSnapshotRequest& request, 
std::string* snapshot_path,
                                   bool* allow_incremental_clone);
 
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index ad0cc795dc3..c1a94223eff 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3875,6 +3875,11 @@ Status Tablet::get_rowset_binlog_metas(const 
std::vector<int64_t>& binlog_versio
                                                       binlog_versions, 
metas_pb);
 }
 
+Status Tablet::get_rowset_binlog_metas(Version binlog_versions, 
RowsetBinlogMetasPB* metas_pb) {
+    return RowsetMetaManager::get_rowset_binlog_metas(_data_dir->get_meta(), 
tablet_uid(),
+                                                      binlog_versions, 
metas_pb);
+}
+
 std::string Tablet::get_segment_filepath(std::string_view rowset_id,
                                          std::string_view segment_index) const 
{
     return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id, 
segment_index);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 490c36aa519..fb20498355f 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -553,6 +553,7 @@ public:
                                        std::string_view rowset_id) const;
     Status get_rowset_binlog_metas(const std::vector<int64_t>& binlog_versions,
                                    RowsetBinlogMetasPB* metas_pb);
+    Status get_rowset_binlog_metas(Version binlog_versions, 
RowsetBinlogMetasPB* metas_pb);
     std::string get_segment_filepath(std::string_view rowset_id,
                                      std::string_view segment_index) const;
     std::string get_segment_filepath(std::string_view rowset_id, int64_t 
segment_index) const;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 05a91f463ee..bfa23257cb0 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -278,9 +278,12 @@ Status TabletManager::create_tablet(const 
TCreateTabletReq& request, std::vector
     // we need use write lock on shard-1 and then use read lock on shard-2
     // if there have create rollup tablet C(assume on shard-2) from tablet 
D(assume on shard-1) at the same time, we will meet deadlock
     std::unique_lock two_tablet_lock(_two_tablet_mtx, std::defer_lock);
-    bool is_schema_change = request.__isset.base_tablet_id && 
request.base_tablet_id > 0;
-    bool need_two_lock = is_schema_change && ((_tablets_shards_mask & 
request.base_tablet_id) !=
-                                              (_tablets_shards_mask & 
tablet_id));
+    bool in_restore_mode = request.__isset.in_restore_mode && 
request.in_restore_mode;
+    bool is_schema_change_or_atomic_restore =
+            request.__isset.base_tablet_id && request.base_tablet_id > 0;
+    bool need_two_lock =
+            is_schema_change_or_atomic_restore &&
+            ((_tablets_shards_mask & request.base_tablet_id) != 
(_tablets_shards_mask & tablet_id));
     if (need_two_lock) {
         SCOPED_TIMER(ADD_TIMER(profile, "GetTwoTableLock"));
         two_tablet_lock.lock();
@@ -309,7 +312,7 @@ Status TabletManager::create_tablet(const TCreateTabletReq& 
request, std::vector
 
     TabletSharedPtr base_tablet = nullptr;
     // If the CreateTabletReq has base_tablet_id then it is a alter-tablet 
request
-    if (is_schema_change) {
+    if (is_schema_change_or_atomic_restore) {
         // if base_tablet_id's lock diffrent with new_tablet_id, we need lock 
it.
         if (need_two_lock) {
             SCOPED_TIMER(ADD_TIMER(profile, "GetBaseTablet"));
@@ -322,22 +325,28 @@ Status TabletManager::create_tablet(const 
TCreateTabletReq& request, std::vector
         if (base_tablet == nullptr) {
             
DorisMetrics::instance()->create_tablet_requests_failed->increment(1);
             return Status::Error<TABLE_CREATE_META_ERROR>(
-                    "fail to create tablet(change schema), base tablet does 
not exist. "
-                    "new_tablet_id={}, base_tablet_id={}",
+                    "fail to create tablet(change schema/atomic restore), base 
tablet does not "
+                    "exist. new_tablet_id={}, base_tablet_id={}",
                     tablet_id, request.base_tablet_id);
         }
-        // If we are doing schema-change, we should use the same data dir
+        // If we are doing schema-change or atomic-restore, we should use the 
same data dir
         // TODO(lingbin): A litter trick here, the directory should be 
determined before
         // entering this method
-        if (request.storage_medium == 
base_tablet->data_dir()->storage_medium()) {
+        //
+        // ATTN: Since all restored replicas will be saved to HDD, so no 
storage_medium check here.
+        if (in_restore_mode ||
+            request.storage_medium == 
base_tablet->data_dir()->storage_medium()) {
+            LOG(INFO) << "create tablet use the base tablet data dir. 
tablet_id=" << tablet_id
+                      << ", base tablet_id=" << request.base_tablet_id
+                      << ", data dir=" << base_tablet->data_dir()->path();
             stores.clear();
             stores.push_back(base_tablet->data_dir());
         }
     }
 
     // set alter type to schema-change. it is useless
-    TabletSharedPtr tablet = _internal_create_tablet_unlocked(request, 
is_schema_change,
-                                                              
base_tablet.get(), stores, profile);
+    TabletSharedPtr tablet = _internal_create_tablet_unlocked(
+            request, is_schema_change_or_atomic_restore, base_tablet.get(), 
stores, profile);
     if (tablet == nullptr) {
         DorisMetrics::instance()->create_tablet_requests_failed->increment(1);
         return Status::Error<CE_CMD_PARAMS_ERROR>("fail to create tablet. 
tablet_id={}",
@@ -934,6 +943,7 @@ Status TabletManager::load_tablet_from_dir(DataDir* store, 
TTabletId tablet_id,
         if (binlog_meta_filesize > 0) {
             contain_binlog = true;
             RETURN_IF_ERROR(read_pb(binlog_metas_file, 
&rowset_binlog_metas_pb));
+            VLOG_DEBUG << "load rowset binlog metas from file. file_path=" << 
binlog_metas_file;
         }
         
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(binlog_metas_file));
     }
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp 
b/be/src/olap/task/engine_storage_migration_task.cpp
index 0164aee1472..f9ba8963966 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -37,6 +37,7 @@
 #include "olap/data_dir.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
+#include "olap/pb_helper.h"
 #include "olap/rowset/rowset_meta.h"
 #include "olap/snapshot_manager.h"
 #include "olap/storage_engine.h"
@@ -259,9 +260,11 @@ Status EngineStorageMigrationTask::_migrate() {
     }
 
     std::vector<RowsetSharedPtr> temp_consistent_rowsets(consistent_rowsets);
+    RowsetBinlogMetasPB rowset_binlog_metas_pb;
     do {
         // migrate all index and data files but header file
-        res = _copy_index_and_data_files(full_path, temp_consistent_rowsets);
+        res = _copy_index_and_data_files(full_path, temp_consistent_rowsets,
+                                         &rowset_binlog_metas_pb);
         if (!res.ok()) {
             break;
         }
@@ -289,7 +292,8 @@ Status EngineStorageMigrationTask::_migrate() {
             // we take the lock to complete it to avoid long-term competition 
with other tasks
             if (_is_rowsets_size_less_than_threshold(temp_consistent_rowsets)) 
{
                 // force to copy the remaining data and index
-                res = _copy_index_and_data_files(full_path, 
temp_consistent_rowsets);
+                res = _copy_index_and_data_files(full_path, 
temp_consistent_rowsets,
+                                                 &rowset_binlog_metas_pb);
                 if (!res.ok()) {
                     break;
                 }
@@ -304,6 +308,16 @@ Status EngineStorageMigrationTask::_migrate() {
             }
         }
 
+        // save rowset binlog metas
+        if (rowset_binlog_metas_pb.rowset_binlog_metas_size() > 0) {
+            auto rowset_binlog_metas_pb_filename =
+                    fmt::format("{}/rowset_binlog_metas.pb", full_path);
+            res = write_pb(rowset_binlog_metas_pb_filename, 
rowset_binlog_metas_pb);
+            if (!res.ok()) {
+                break;
+            }
+        }
+
         // generate new tablet meta and write to hdr file
         res = _gen_and_write_header_to_hdr_file(shard, full_path, 
consistent_rowsets, end_version);
         if (!res.ok()) {
@@ -347,10 +361,91 @@ void EngineStorageMigrationTask::_generate_new_header(
 }
 
 Status EngineStorageMigrationTask::_copy_index_and_data_files(
-        const string& full_path, const std::vector<RowsetSharedPtr>& 
consistent_rowsets) const {
+        const string& full_path, const std::vector<RowsetSharedPtr>& 
consistent_rowsets,
+        RowsetBinlogMetasPB* all_binlog_metas_pb) const {
+    RowsetBinlogMetasPB rowset_binlog_metas_pb;
     for (const auto& rs : consistent_rowsets) {
         RETURN_IF_ERROR(rs->copy_files_to(full_path, rs->rowset_id()));
+
+        Version binlog_versions = rs->version();
+        RETURN_IF_ERROR(_tablet->get_rowset_binlog_metas(binlog_versions, 
&rowset_binlog_metas_pb));
+    }
+
+    // copy index binlog files.
+    for (const auto& rowset_binlog_meta : 
rowset_binlog_metas_pb.rowset_binlog_metas()) {
+        auto num_segments = rowset_binlog_meta.num_segments();
+        std::string_view rowset_id = rowset_binlog_meta.rowset_id();
+
+        RowsetMetaPB rowset_meta_pb;
+        if (!rowset_meta_pb.ParseFromString(rowset_binlog_meta.data())) {
+            auto err_msg = fmt::format("fail to parse binlog meta data 
value:{}",
+                                       rowset_binlog_meta.data());
+            LOG(WARNING) << err_msg;
+            return Status::InternalError(err_msg);
+        }
+        const auto& tablet_schema_pb = rowset_meta_pb.tablet_schema();
+        TabletSchema tablet_schema;
+        tablet_schema.init_from_pb(tablet_schema_pb);
+
+        // copy segment files and index files
+        for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
+            std::string segment_file_path = 
_tablet->get_segment_filepath(rowset_id, segment_index);
+            auto snapshot_segment_file_path =
+                    fmt::format("{}/{}_{}.binlog", full_path, rowset_id, 
segment_index);
+
+            Status status = 
io::global_local_filesystem()->copy_path(segment_file_path,
+                                                                     
snapshot_segment_file_path);
+            if (!status.ok()) {
+                LOG(WARNING) << "fail to copy binlog segment file. [src=" << 
segment_file_path
+                             << ", dest=" << snapshot_segment_file_path << "]" 
<< status;
+                return status;
+            }
+            VLOG_DEBUG << "copy " << segment_file_path << " to " << 
snapshot_segment_file_path;
+
+            if (tablet_schema.get_inverted_index_storage_format() ==
+                InvertedIndexStorageFormatPB::V1) {
+                for (const auto& index : tablet_schema.indexes()) {
+                    if (index.index_type() != IndexType::INVERTED) {
+                        continue;
+                    }
+                    auto index_id = index.index_id();
+                    auto index_file =
+                            _tablet->get_segment_index_filepath(rowset_id, 
segment_index, index_id);
+                    auto snapshot_segment_index_file_path =
+                            fmt::format("{}/{}_{}_{}.binlog-index", full_path, 
rowset_id,
+                                        segment_index, index_id);
+                    VLOG_DEBUG << "copy " << index_file << " to "
+                               << snapshot_segment_index_file_path;
+                    status = io::global_local_filesystem()->copy_path(
+                            index_file, snapshot_segment_index_file_path);
+                    if (!status.ok()) {
+                        LOG(WARNING)
+                                << "fail to copy binlog index file. [src=" << 
index_file
+                                << ", dest=" << 
snapshot_segment_index_file_path << "]" << status;
+                        return status;
+                    }
+                }
+            } else if (tablet_schema.has_inverted_index()) {
+                auto index_file = 
InvertedIndexDescriptor::get_index_file_name(segment_file_path);
+                auto snapshot_segment_index_file_path =
+                        fmt::format("{}/{}_{}.binlog-index", full_path, 
rowset_id, segment_index);
+                VLOG_DEBUG << "copy " << index_file << " to " << 
snapshot_segment_index_file_path;
+                status = io::global_local_filesystem()->copy_path(index_file,
+                                                                  
snapshot_segment_index_file_path);
+                if (!status.ok()) {
+                    LOG(WARNING) << "fail to copy binlog index file. [src=" << 
index_file
+                                 << ", dest=" << 
snapshot_segment_index_file_path << "]" << status;
+                    return status;
+                }
+            }
+        }
     }
+
+    std::move(rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->begin(),
+              rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->end(),
+              google::protobuf::RepeatedFieldBackInserter(
+                      all_binlog_metas_pb->mutable_rowset_binlog_metas()));
+
     return Status::OK();
 }
 
diff --git a/be/src/olap/task/engine_storage_migration_task.h 
b/be/src/olap/task/engine_storage_migration_task.h
index c15b0576701..5e30d4a04c9 100644
--- a/be/src/olap/task/engine_storage_migration_task.h
+++ b/be/src/olap/task/engine_storage_migration_task.h
@@ -18,6 +18,7 @@
 #ifndef DORIS_BE_SRC_OLAP_TASK_ENGINE_STORAGE_MIGRATION_TASK_H
 #define DORIS_BE_SRC_OLAP_TASK_ENGINE_STORAGE_MIGRATION_TASK_H
 
+#include <gen_cpp/olap_file.pb.h>
 #include <stdint.h>
 
 #include <mutex>
@@ -73,7 +74,8 @@ private:
     // TODO: hkp
     // rewrite this function
     Status _copy_index_and_data_files(const std::string& full_path,
-                                      const std::vector<RowsetSharedPtr>& 
consistent_rowsets) const;
+                                      const std::vector<RowsetSharedPtr>& 
consistent_rowsets,
+                                      RowsetBinlogMetasPB* 
all_binlog_metas_pb) const;
 
 private:
     // tablet to do migrated
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
index 12e7000caad..82e4be09cd0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
@@ -43,6 +43,7 @@ public class RestoreStmt extends AbstractBackupStmt {
     public static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE = 
"reserve_dynamic_partition_enable";
     public static final String PROP_CLEAN_TABLES = "clean_tables";
     public static final String PROP_CLEAN_PARTITIONS = "clean_partitions";
+    public static final String PROP_ATOMIC_RESTORE = "atomic_restore";
 
     private boolean allowLoad = false;
     private ReplicaAllocation replicaAlloc = 
ReplicaAllocation.DEFAULT_ALLOCATION;
@@ -54,6 +55,7 @@ public class RestoreStmt extends AbstractBackupStmt {
     private boolean isBeingSynced = false;
     private boolean isCleanTables = false;
     private boolean isCleanPartitions = false;
+    private boolean isAtomicRestore = false;
     private byte[] meta = null;
     private byte[] jobInfo = null;
 
@@ -121,6 +123,10 @@ public class RestoreStmt extends AbstractBackupStmt {
         return isCleanPartitions;
     }
 
+    public boolean isAtomicRestore() {
+        return isAtomicRestore;
+    }
+
     @Override
     public void analyze(Analyzer analyzer) throws UserException {
         if (repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) {
@@ -203,6 +209,9 @@ public class RestoreStmt extends AbstractBackupStmt {
         // is clean partitions
         isCleanPartitions = eatBooleanProperty(copiedProperties, 
PROP_CLEAN_PARTITIONS, isCleanPartitions);
 
+        // is atomic restore
+        isAtomicRestore = eatBooleanProperty(copiedProperties, 
PROP_ATOMIC_RESTORE, isAtomicRestore);
+
         if (!copiedProperties.isEmpty()) {
             ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
                     "Unknown restore job properties: " + 
copiedProperties.keySet());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index 5feeb8005bd..f546fc40ead 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -517,12 +517,14 @@ public class BackupHandler extends MasterDaemon 
implements Writable {
                     db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), 
stmt.getReplicaAlloc(),
                     stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(),
                     stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(),
-                    stmt.isCleanTables(), stmt.isCleanPartitions(), env, 
Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
+                    stmt.isCleanTables(), stmt.isCleanPartitions(), 
stmt.isAtomicRestore(),
+                    env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
         } else {
             restoreJob = new RestoreJob(stmt.getLabel(), 
stmt.getBackupTimestamp(),
                 db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), 
stmt.getReplicaAlloc(),
                 stmt.getTimeoutMs(), stmt.getMetaVersion(), 
stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(),
-                stmt.isBeingSynced(), stmt.isCleanTables(), 
stmt.isCleanPartitions(), env, repository.getId());
+                stmt.isBeingSynced(), stmt.isCleanTables(), 
stmt.isCleanPartitions(), stmt.isAtomicRestore(),
+                env, repository.getId());
         }
 
         env.getEditLog().logRestoreJob(restoreJob);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreFileMapping.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreFileMapping.java
index 07ddf6844dc..f712afbb271 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreFileMapping.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreFileMapping.java
@@ -39,7 +39,7 @@ public class RestoreFileMapping implements Writable {
         }
 
         public IdChain(Long... ids) {
-            Preconditions.checkState(ids.length == 5);
+            Preconditions.checkState(ids.length == 6);
             chain = ids;
         }
 
@@ -63,6 +63,14 @@ public class RestoreFileMapping implements Writable {
             return chain[4];
         }
 
+        public boolean hasRefTabletId() {
+            return chain.length >= 6 && chain[5] != -1L;
+        }
+
+        public long getRefTabletId() {
+            return chain[5];
+        }
+
         @Override
         public String toString() {
             StringBuilder sb = new StringBuilder();
@@ -78,8 +86,12 @@ public class RestoreFileMapping implements Writable {
                 return false;
             }
 
+            if (((IdChain) obj).chain.length != chain.length) {
+                return false;
+            }
+
             IdChain other = (IdChain) obj;
-            for (int i = 0; i < 5; i++) {
+            for (int i = 0; i < chain.length; i++) {
                 // DO NOT use ==, Long_1 != Long_2
                 if (!chain[i].equals(other.chain[i])) {
                     return false;
@@ -92,7 +104,7 @@ public class RestoreFileMapping implements Writable {
         @Override
         public int hashCode() {
             int code = chain[0].hashCode();
-            for (int i = 1; i < 5; i++) {
+            for (int i = 1; i < chain.length; i++) {
                 code ^= chain[i].hashCode();
             }
             return code;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index a35858e498a..558c9ddf33c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -61,6 +61,7 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.DbUtil;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DynamicPartitionUtil;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.TimeUtils;
@@ -114,6 +115,8 @@ public class RestoreJob extends AbstractJob {
     private static final String PROP_IS_BEING_SYNCED = 
PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED;
     private static final String PROP_CLEAN_TABLES = 
RestoreStmt.PROP_CLEAN_TABLES;
     private static final String PROP_CLEAN_PARTITIONS = 
RestoreStmt.PROP_CLEAN_PARTITIONS;
+    private static final String PROP_ATOMIC_RESTORE = 
RestoreStmt.PROP_ATOMIC_RESTORE;
+    private static final String ATOMIC_RESTORE_TABLE_PREFIX = 
"__doris_atomic_restore_prefix__";
 
     private static final Logger LOG = LogManager.getLogger(RestoreJob.class);
 
@@ -182,6 +185,8 @@ public class RestoreJob extends AbstractJob {
     private boolean isCleanTables = false;
     // Whether to delete existing partitions that are not involved in the 
restore.
     private boolean isCleanPartitions = false;
+    // Whether to restore the data into a temp table, and then replace the 
origin one.
+    private boolean isAtomicRestore = false;
 
     // restore properties
     private Map<String, String> properties = Maps.newHashMap();
@@ -193,7 +198,7 @@ public class RestoreJob extends AbstractJob {
     public RestoreJob(String label, String backupTs, long dbId, String dbName, 
BackupJobInfo jobInfo, boolean allowLoad,
             ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, 
boolean reserveReplica,
             boolean reserveDynamicPartitionEnable, boolean isBeingSynced, 
boolean isCleanTables,
-            boolean isCleanPartitions, Env env, long repoId) {
+            boolean isCleanPartitions, boolean isAtomicRestore, Env env, long 
repoId) {
         super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId);
         this.backupTimestamp = backupTs;
         this.jobInfo = jobInfo;
@@ -210,19 +215,22 @@ public class RestoreJob extends AbstractJob {
         this.isBeingSynced = isBeingSynced;
         this.isCleanTables = isCleanTables;
         this.isCleanPartitions = isCleanPartitions;
+        this.isAtomicRestore = isAtomicRestore;
         properties.put(PROP_RESERVE_REPLICA, String.valueOf(reserveReplica));
         properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, 
String.valueOf(reserveDynamicPartitionEnable));
         properties.put(PROP_IS_BEING_SYNCED, String.valueOf(isBeingSynced));
         properties.put(PROP_CLEAN_TABLES, String.valueOf(isCleanTables));
         properties.put(PROP_CLEAN_PARTITIONS, 
String.valueOf(isCleanPartitions));
+        properties.put(PROP_ATOMIC_RESTORE, String.valueOf(isAtomicRestore));
     }
 
     public RestoreJob(String label, String backupTs, long dbId, String dbName, 
BackupJobInfo jobInfo, boolean allowLoad,
             ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, 
boolean reserveReplica,
             boolean reserveDynamicPartitionEnable, boolean isBeingSynced, 
boolean isCleanTables,
-            boolean isCleanPartitions, Env env, long repoId, BackupMeta 
backupMeta) {
+            boolean isCleanPartitions, boolean isAtomicRestore, Env env, long 
repoId, BackupMeta backupMeta) {
         this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, 
timeoutMs, metaVersion, reserveReplica,
-                reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, 
isCleanPartitions, env, repoId);
+                reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, 
isCleanPartitions, isAtomicRestore, env,
+                repoId);
         this.backupMeta = backupMeta;
     }
 
@@ -411,6 +419,12 @@ public class RestoreJob extends AbstractJob {
         checkIfNeedCancel();
 
         if (status.ok()) {
+            if (state != RestoreJobState.PENDING && label.equals(
+                    
DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_NON_PENDING_RESTORE_JOB", ""))) 
{
+                LOG.info("pause restore job by debug point: {}", this);
+                return;
+            }
+
             switch (state) {
                 case PENDING:
                     checkAndPrepareMeta();
@@ -518,8 +532,10 @@ public class RestoreJob extends AbstractJob {
         }
         Preconditions.checkNotNull(backupMeta);
 
-        // Set all restored tbls' state to RESTORE
-        // Table's origin state must be NORMAL and does not have unfinished 
load job.
+        // Check the olap table state.
+        //
+        // If isAtomicRestore is not set, set all restored tbls' state to 
RESTORE,
+        // the table's origin state must be NORMAL and does not have 
unfinished load job.
         for (String tableName : jobInfo.backupOlapTableObjects.keySet()) {
             Table tbl = 
db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(tableName));
             if (tbl == null) {
@@ -547,6 +563,13 @@ public class RestoreJob extends AbstractJob {
                     return;
                 }
 
+                if (isAtomicRestore) {
+                    // We will create new OlapTable in atomic restore, so does 
not set the RESTORE state.
+                    // Instead, set table in atomic restore state, to forbid 
the alter table operation.
+                    olapTbl.setInAtomicRestore();
+                    continue;
+                }
+
                 for (Partition partition : olapTbl.getPartitions()) {
                     if 
(!env.getLoadInstance().checkPartitionLoadFinished(partition.getId(), null)) {
                         status = new Status(ErrCode.COMMON_ERROR,
@@ -608,6 +631,9 @@ public class RestoreJob extends AbstractJob {
             }
         }
 
+        // the new tablets -> { local tablet, schema hash, storage medium }, 
used in atomic restore.
+        Map<Long, TabletRef> tabletBases = new HashMap<>();
+
         // Check and prepare meta objects.
         AgentBatchTask batchTask = new AgentBatchTask();
         db.readLock();
@@ -618,14 +644,15 @@ public class RestoreJob extends AbstractJob {
                 Table remoteTbl = backupMeta.getTable(tableName);
                 Preconditions.checkNotNull(remoteTbl);
                 Table localTbl = 
db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(tableName));
+                if (localTbl != null && localTbl.getType() != TableType.OLAP) {
+                    // table already exist, but is not OLAP
+                    status = new Status(ErrCode.COMMON_ERROR,
+                            "The type of local table should be same as type of 
remote table: "
+                                    + remoteTbl.getName());
+                    return;
+                }
+
                 if (localTbl != null) {
-                    // table already exist, check schema
-                    if (localTbl.getType() != TableType.OLAP) {
-                        status = new Status(ErrCode.COMMON_ERROR,
-                                "The type of local table should be same as 
type of remote table: "
-                                        + remoteTbl.getName());
-                        return;
-                    }
                     OlapTable localOlapTbl = (OlapTable) localTbl;
                     OlapTable remoteOlapTbl = (OlapTable) remoteTbl;
 
@@ -671,28 +698,26 @@ public class RestoreJob extends AbstractJob {
                                     PartitionItem localItem = 
localPartInfo.getItem(localPartition.getId());
                                     PartitionItem remoteItem = remoteOlapTbl
                                             
.getPartitionInfo().getItem(backupPartInfo.id);
-                                    if (localItem.equals(remoteItem)) {
-                                        // Same partition, same range
-                                        if 
(genFileMappingWhenBackupReplicasEqual(localPartInfo, localPartition,
-                                                localTbl, backupPartInfo, 
partitionName, tblInfo, remoteReplicaAlloc)) {
-                                            return;
-                                        }
-                                    } else {
+                                    if (!localItem.equals(remoteItem)) {
                                         // Same partition name, different range
                                         status = new 
Status(ErrCode.COMMON_ERROR, "Partition " + partitionName
                                                 + " in table " + 
localTbl.getName()
                                                 + " has different partition 
item with partition in repository");
                                         return;
                                     }
-                                } else {
-                                    // If this is a single partitioned table.
-                                    if 
(genFileMappingWhenBackupReplicasEqual(localPartInfo, localPartition, localTbl,
-                                            backupPartInfo, partitionName, 
tblInfo, remoteReplicaAlloc)) {
-                                        return;
-                                    }
                                 }
 
-                            } else {
+                                if (isAtomicRestore) {
+                                    // skip gen file mapping for atomic 
restore.
+                                    continue;
+                                }
+
+                                // Same partition, same range or a single 
partitioned table.
+                                if 
(genFileMappingWhenBackupReplicasEqual(localPartInfo, localPartition,
+                                        localTbl, backupPartInfo, 
partitionName, tblInfo, remoteReplicaAlloc)) {
+                                    return;
+                                }
+                            } else if (!isAtomicRestore) {
                                 // partitions does not exist
                                 PartitionInfo localPartitionInfo = 
localOlapTbl.getPartitionInfo();
                                 if (localPartitionInfo.getType() == 
PartitionType.RANGE
@@ -732,8 +757,10 @@ public class RestoreJob extends AbstractJob {
                     } finally {
                         localOlapTbl.readUnlock();
                     }
-                } else {
-                    // Table does not exist
+                }
+
+                // Table does not exist or atomic restore
+                if (localTbl == null || isAtomicRestore) {
                     OlapTable remoteOlapTbl = (OlapTable) remoteTbl;
                     // Retain only expected restore partitions in this table;
                     Set<String> allPartNames = 
remoteOlapTbl.getPartitionNames();
@@ -761,6 +788,15 @@ public class RestoreJob extends AbstractJob {
                     // DO NOT set remote table's new name here, cause we will 
still need the origin name later
                     // 
remoteOlapTbl.setName(jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
                     remoteOlapTbl.setState(allowLoad ? 
OlapTableState.RESTORE_WITH_LOAD : OlapTableState.RESTORE);
+
+                    if (isAtomicRestore && localTbl != null) {
+                        // bind the backends and base tablets from local tbl.
+                        status = 
bindLocalAndRemoteOlapTableReplicas((OlapTable) localTbl, remoteOlapTbl, 
tabletBases);
+                        if (!status.ok()) {
+                            return;
+                        }
+                    }
+
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("put remote table {} to restoredTbls", 
remoteOlapTbl.getName());
                     }
@@ -824,6 +860,9 @@ public class RestoreJob extends AbstractJob {
             // for now, nothing is modified in catalog
 
             // generate create replica tasks for all restored partitions
+            if (isAtomicRestore && !restoredPartitions.isEmpty()) {
+                throw new RuntimeException("atomic restore is set, but the 
restored partitions is not empty");
+            }
             for (Pair<String, Partition> entry : restoredPartitions) {
                 OlapTable localTbl = (OlapTable) 
db.getTableNullable(entry.first);
                 Preconditions.checkNotNull(localTbl, localTbl.getName());
@@ -843,11 +882,12 @@ public class RestoreJob extends AbstractJob {
                 if (restoreTbl.getType() == TableType.OLAP) {
                     OlapTable restoreOlapTable = (OlapTable) restoreTbl;
                     for (Partition restorePart : 
restoreOlapTable.getPartitions()) {
-                        createReplicas(db, batchTask, restoreOlapTable, 
restorePart);
+                        createReplicas(db, batchTask, restoreOlapTable, 
restorePart, tabletBases);
                         BackupOlapTableInfo backupOlapTableInfo = 
jobInfo.getOlapTableInfo(restoreOlapTable.getName());
                         genFileMapping(restoreOlapTable, restorePart, 
backupOlapTableInfo.id,
                                 
backupOlapTableInfo.getPartInfo(restorePart.getName()),
-                                !allowLoad /* if allow load, do not overwrite 
when commit */);
+                                !allowLoad /* if allow load, do not overwrite 
when commit */,
+                                tabletBases);
                     }
                 }
                 // set restored table's new name after all 'genFileMapping'
@@ -855,6 +895,9 @@ public class RestoreJob extends AbstractJob {
                 if (Env.isStoredTableNamesLowerCase()) {
                     tableName = tableName.toLowerCase();
                 }
+                if (restoreTbl.getType() == TableType.OLAP && isAtomicRestore) 
{
+                    tableName = tableAliasWithAtomicRestore(tableName);
+                }
                 restoreTbl.setName(tableName);
             }
 
@@ -978,6 +1021,90 @@ public class RestoreJob extends AbstractJob {
         // No log here, PENDING state restore job will redo this method
     }
 
+    private Status bindLocalAndRemoteOlapTableReplicas(
+            OlapTable localOlapTbl, OlapTable remoteOlapTbl,
+            Map<Long, TabletRef> tabletBases) {
+        localOlapTbl.readLock();
+        try {
+            // The storage medium of the remote olap table's storage is HDD, 
because we want to
+            // restore the tables in another cluster might without SSD.
+            //
+            // Keep the storage medium of the new olap table the same as the 
old one, so that
+            // the replicas in the new olap table will not be migrated to 
other storage mediums.
+            remoteOlapTbl.setStorageMedium(localOlapTbl.getStorageMedium());
+            for (Partition partition : remoteOlapTbl.getPartitions()) {
+                Partition localPartition = 
localOlapTbl.getPartition(partition.getName());
+                if (localPartition == null) {
+                    continue;
+                }
+                // Since the replicas are bound to the same disk, the storage 
medium must be the same
+                // to avoid media migration.
+                TStorageMedium storageMedium = localOlapTbl.getPartitionInfo()
+                        
.getDataProperty(localPartition.getId()).getStorageMedium();
+                
remoteOlapTbl.getPartitionInfo().getDataProperty(partition.getId())
+                        .setStorageMedium(storageMedium);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("bind local partition {} and remote partition {} 
with same storage medium {}, name: {}",
+                            localPartition.getId(), partition.getId(), 
storageMedium, partition.getName());
+                }
+                for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+                    String indexName = 
remoteOlapTbl.getIndexNameById(index.getId());
+                    Long localIndexId = 
localOlapTbl.getIndexIdByName(indexName);
+                    MaterializedIndex localIndex = localIndexId == null ? null 
: localPartition.getIndex(localIndexId);
+                    if (localIndex == null) {
+                        continue;
+                    }
+                    int schemaHash = 
localOlapTbl.getSchemaHashByIndexId(localIndexId);
+                    if (schemaHash == -1) {
+                        return new Status(ErrCode.COMMON_ERROR, String.format(
+                                "schema hash of local index %d is not found, 
remote table=%d, remote index=%d, "
+                                + "local table=%d, local index=%d", 
localIndexId, remoteOlapTbl.getId(), index.getId(),
+                                localOlapTbl.getId(), localIndexId));
+                    }
+
+                    List<Tablet> localTablets = localIndex.getTablets();
+                    List<Tablet> remoteTablets = index.getTablets();
+                    if (localTablets.size() != remoteTablets.size()) {
+                        return new Status(ErrCode.COMMON_ERROR, String.format(
+                                "the size of local tablet %s is not equals to 
the remote %s, "
+                                + "is_atomic_restore=true, remote table=%d, 
remote index=%d, "
+                                + "local table=%d, local index=%d", 
localTablets.size(), remoteTablets.size(),
+                                remoteOlapTbl.getId(), index.getId(), 
localOlapTbl.getId(), localIndexId));
+                    }
+                    for (int i = 0; i < remoteTablets.size(); i++) {
+                        Tablet localTablet = localTablets.get(i);
+                        Tablet remoteTablet = remoteTablets.get(i);
+                        List<Replica> localReplicas = 
localTablet.getReplicas();
+                        List<Replica> remoteReplicas = 
remoteTablet.getReplicas();
+                        if (localReplicas.size() != remoteReplicas.size()) {
+                            return new Status(ErrCode.COMMON_ERROR, 
String.format(
+                                    "the size of local replicas %s is not 
equals to the remote %s, "
+                                    + "is_atomic_restore=true, remote 
table=%d, remote index=%d, "
+                                    + "local table=%d, local index=%d, local 
replicas=%d, remote replicas=%d",
+                                    localTablets.size(), remoteTablets.size(), 
remoteOlapTbl.getId(),
+                                    index.getId(), localOlapTbl.getId(), 
localIndexId, localReplicas.size(),
+                                    remoteReplicas.size()));
+                        }
+                        for (int j = 0; j < remoteReplicas.size(); j++) {
+                            long backendId = 
localReplicas.get(j).getBackendId();
+                            remoteReplicas.get(j).setBackendId(backendId);
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("bind local replica {} and remote 
replica {} with same backend {}, table={}",
+                                        localReplicas.get(j).getId(), 
remoteReplicas.get(j).getId(), backendId,
+                                        localOlapTbl.getName());
+                            }
+                        }
+                        tabletBases.put(remoteTablet.getId(),
+                                new TabletRef(localTablet.getId(), schemaHash, 
storageMedium));
+                    }
+                }
+            }
+        } finally {
+            localOlapTbl.readUnlock();
+        }
+        return Status.OK;
+    }
+
     private void prepareAndSendSnapshotTaskForOlapTable(Database db) {
         LOG.info("begin to make snapshot. {} when restore content is ALL", 
this);
         // begin to make snapshots for all replicas
@@ -989,7 +1116,8 @@ public class RestoreJob extends AbstractJob {
         AgentBatchTask batchTask = new AgentBatchTask();
         db.readLock();
         try {
-            for (IdChain idChain : fileMapping.getMapping().keySet()) {
+            for (Map.Entry<IdChain, IdChain> entry : 
fileMapping.getMapping().entrySet()) {
+                IdChain idChain = entry.getKey();
                 OlapTable tbl = (OlapTable) 
db.getTableNullable(idChain.getTblId());
                 tbl.readLock();
                 try {
@@ -998,9 +1126,15 @@ public class RestoreJob extends AbstractJob {
                     Tablet tablet = index.getTablet(idChain.getTabletId());
                     Replica replica = 
tablet.getReplicaById(idChain.getReplicaId());
                     long signature = env.getNextId();
+                    boolean isRestoreTask = true;
+                    // We don't care the visible version in restore job, the 
end version is used.
+                    long visibleVersion = -1L;
                     SnapshotTask task = new SnapshotTask(null, 
replica.getBackendId(), signature, jobId, db.getId(),
-                            tbl.getId(), part.getId(), index.getId(), 
tablet.getId(), part.getVisibleVersion(),
-                            tbl.getSchemaHashByIndexId(index.getId()), 
timeoutMs, true /* is restore task*/);
+                            tbl.getId(), part.getId(), index.getId(), 
tablet.getId(), visibleVersion,
+                            tbl.getSchemaHashByIndexId(index.getId()), 
timeoutMs, isRestoreTask);
+                    if (entry.getValue().hasRefTabletId()) {
+                        task.setRefTabletId(entry.getValue().getRefTabletId());
+                    }
                     batchTask.addTask(task);
                     unfinishedSignatureToId.put(signature, tablet.getId());
                     bePathsMap.put(replica.getBackendId(), 
replica.getPathHash());
@@ -1088,6 +1222,11 @@ public class RestoreJob extends AbstractJob {
     }
 
     private void createReplicas(Database db, AgentBatchTask batchTask, 
OlapTable localTbl, Partition restorePart) {
+        createReplicas(db, batchTask, localTbl, restorePart, null);
+    }
+
+    private void createReplicas(Database db, AgentBatchTask batchTask, 
OlapTable localTbl, Partition restorePart,
+            Map<Long, TabletRef> tabletBases) {
         Set<String> bfColumns = localTbl.getCopiedBfColumns();
         double bfFpp = localTbl.getBfFpp();
 
@@ -1102,8 +1241,12 @@ public class RestoreJob extends AbstractJob {
         for (MaterializedIndex restoredIdx : 
restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
             MaterializedIndexMeta indexMeta = 
localTbl.getIndexMetaByIndexId(restoredIdx.getId());
             for (Tablet restoreTablet : restoredIdx.getTablets()) {
+                TabletRef baseTabletRef = tabletBases == null ? null : 
tabletBases.get(restoreTablet.getId());
+                // All restored replicas will be saved to HDD by default.
+                TStorageMedium storageMedium = baseTabletRef == null
+                        ? TStorageMedium.HDD : baseTabletRef.storageMedium;
                 TabletMeta tabletMeta = new TabletMeta(db.getId(), 
localTbl.getId(), restorePart.getId(),
-                        restoredIdx.getId(), indexMeta.getSchemaHash(), 
TStorageMedium.HDD);
+                        restoredIdx.getId(), indexMeta.getSchemaHash(), 
storageMedium);
                 Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), 
tabletMeta);
                 for (Replica restoreReplica : restoreTablet.getReplicas()) {
                     
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica);
@@ -1112,7 +1255,7 @@ public class RestoreJob extends AbstractJob {
                             restoreTablet.getId(), restoreReplica.getId(), 
indexMeta.getShortKeyColumnCount(),
                             indexMeta.getSchemaHash(), 
restoreReplica.getVersion(),
                             indexMeta.getKeysType(), TStorageType.COLUMN,
-                            TStorageMedium.HDD /* all restored replicas will 
be saved to HDD */,
+                            storageMedium,
                             indexMeta.getSchema(), bfColumns, bfFpp, null,
                             localTbl.getCopiedIndexes(),
                             localTbl.isInMemory(),
@@ -1134,6 +1277,12 @@ public class RestoreJob extends AbstractJob {
                             localTbl.rowStorePageSize());
                     
task.setInvertedIndexStorageFormat(localTbl.getInvertedIndexStorageFormat());
                     task.setInRestoreMode(true);
+                    if (baseTabletRef != null) {
+                        // ensure this replica is bound to the same backend 
disk as the origin table's replica.
+                        task.setBaseTablet(baseTabletRef.tabletId, 
baseTabletRef.schemaHash);
+                        LOG.info("set base tablet {} for replica {} in restore 
job {}, tablet id={}",
+                                baseTabletRef.tabletId, 
restoreReplica.getId(), jobId, restoreTablet.getId());
+                    }
                     batchTask.addTask(task);
                 }
             }
@@ -1216,6 +1365,11 @@ public class RestoreJob extends AbstractJob {
     // files in repo to files in local
     private void genFileMapping(OlapTable localTbl, Partition localPartition, 
Long remoteTblId,
             BackupPartitionInfo backupPartInfo, boolean overwrite) {
+        genFileMapping(localTbl, localPartition, remoteTblId, backupPartInfo, 
overwrite, null);
+    }
+
+    private void genFileMapping(OlapTable localTbl, Partition localPartition, 
Long remoteTblId,
+            BackupPartitionInfo backupPartInfo, boolean overwrite, Map<Long, 
TabletRef> tabletBases) {
         for (MaterializedIndex localIdx : 
localPartition.getMaterializedIndices(IndexExtState.VISIBLE)) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("get index id: {}, index name: {}", localIdx.getId(),
@@ -1230,10 +1384,21 @@ public class RestoreJob extends AbstractJob {
                     LOG.debug("get tablet mapping: {} to {}, index {}", 
backupTabletInfo.id, localTablet.getId(), i);
                 }
                 for (Replica localReplica : localTablet.getReplicas()) {
-                    IdChain src = new IdChain(remoteTblId, backupPartInfo.id, 
backupIdxInfo.id, backupTabletInfo.id,
-                            -1L /* no replica id */);
-                    IdChain dest = new IdChain(localTbl.getId(), 
localPartition.getId(),
-                            localIdx.getId(), localTablet.getId(), 
localReplica.getId());
+                    long refTabletId = -1L;
+                    if (tabletBases != null && 
tabletBases.containsKey(localTablet.getId())) {
+                        refTabletId = 
tabletBases.get(localTablet.getId()).tabletId;
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("restored tablet {} is based on exists 
tablet {}",
+                                    localTablet.getId(), refTabletId);
+                        }
+                    }
+
+                    long noReplicaId = -1L;
+                    long noRefTabletId = -1L;
+                    IdChain src = new IdChain(remoteTblId, backupPartInfo.id, 
backupIdxInfo.id,
+                            backupTabletInfo.id, noReplicaId, refTabletId);
+                    IdChain dest = new IdChain(localTbl.getId(), 
localPartition.getId(), localIdx.getId(),
+                            localTablet.getId(), localReplica.getId(), 
noRefTabletId);
                     fileMapping.putMapping(dest, src, overwrite);
                 }
             }
@@ -1280,6 +1445,12 @@ public class RestoreJob extends AbstractJob {
             OlapTable olapTbl = (OlapTable) tbl;
             tbl.writeLock();
             try {
+                if (isAtomicRestore) {
+                    // Atomic restore will creates new replica of the 
OlapTable.
+                    olapTbl.setInAtomicRestore();
+                    continue;
+                }
+
                 olapTbl.setState(OlapTableState.RESTORE);
                 // set restore status for partitions
                 BackupOlapTableInfo tblInfo = 
jobInfo.backupOlapTableObjects.get(tableName);
@@ -1400,7 +1571,7 @@ public class RestoreJob extends AbstractJob {
     }
 
     private void downloadRemoteSnapshots() {
-        // Categorize snapshot onfos by db id.
+        // Categorize snapshot infos by db id.
         ArrayListMultimap<Long, SnapshotInfo> dbToSnapshotInfos = 
ArrayListMultimap.create();
         for (SnapshotInfo info : snapshotInfos.values()) {
             dbToSnapshotInfos.put(info.getDbId(), info);
@@ -1500,8 +1671,9 @@ public class RestoreJob extends AbstractJob {
                                     return;
                                 }
 
+                                long refTabletId = -1L;  // no ref tablet id
                                 IdChain catalogIds = new IdChain(tbl.getId(), 
part.getId(), idx.getId(),
-                                        info.getTabletId(), replica.getId());
+                                        info.getTabletId(), replica.getId(), 
refTabletId);
                                 IdChain repoIds = fileMapping.get(catalogIds);
                                 if (repoIds == null) {
                                     status = new Status(ErrCode.NOT_FOUND,
@@ -1648,8 +1820,9 @@ public class RestoreJob extends AbstractJob {
                                     return;
                                 }
 
+                                long refTabletId = -1L;  // no ref tablet id
                                 IdChain catalogIds = new IdChain(tbl.getId(), 
part.getId(), idx.getId(),
-                                        info.getTabletId(), replica.getId());
+                                        info.getTabletId(), replica.getId(), 
refTabletId);
                                 IdChain repoIds = fileMapping.get(catalogIds);
                                 if (repoIds == null) {
                                     status = new Status(ErrCode.NOT_FOUND,
@@ -1791,6 +1964,14 @@ public class RestoreJob extends AbstractJob {
             return new Status(ErrCode.NOT_FOUND, "database " + dbId + " does 
not exist");
         }
 
+        // replace the origin tables in atomic.
+        if (isAtomicRestore) {
+            Status st = atomicReplaceOlapTables(db, isReplay);
+            if (!st.ok()) {
+                return st;
+            }
+        }
+
         // set all restored partition version and version hash
         // set all tables' state to NORMAL
         setTableStateToNormalAndUpdateProperties(db, true, isReplay);
@@ -2041,6 +2222,12 @@ public class RestoreJob extends AbstractJob {
 
             // remove restored tbls
             for (Table restoreTbl : restoredTbls) {
+                if (isAtomicRestore && restoreTbl.getType() == TableType.OLAP
+                        && 
!restoreTbl.getName().startsWith(ATOMIC_RESTORE_TABLE_PREFIX)) {
+                    // In atomic restore, a table registered to db must have a 
name with the prefix,
+                    // otherwise, it has not been registered and can be 
ignored here.
+                    continue;
+                }
                 LOG.info("remove restored table when cancelled: {}", 
restoreTbl.getName());
                 if (db.writeLockIfExist()) {
                     try {
@@ -2117,6 +2304,86 @@ public class RestoreJob extends AbstractJob {
         LOG.info("finished to cancel restore job. is replay: {}. {}", 
isReplay, this);
     }
 
+    private Status atomicReplaceOlapTables(Database db, boolean isReplay) {
+        assert isAtomicRestore;
+        for (String tableName : jobInfo.backupOlapTableObjects.keySet()) {
+            String originName = jobInfo.getAliasByOriginNameIfSet(tableName);
+            if (Env.isStoredTableNamesLowerCase()) {
+                originName = originName.toLowerCase();
+            }
+            String aliasName = tableAliasWithAtomicRestore(originName);
+
+            if (!db.writeLockIfExist()) {
+                return Status.OK;
+            }
+            try {
+                Table newTbl = db.getTableNullable(aliasName);
+                if (newTbl == null) {
+                    LOG.warn("replace table from {} to {}, but the temp table 
is not found", aliasName, originName);
+                    return new Status(ErrCode.COMMON_ERROR, "replace table 
failed, the temp table "
+                            + aliasName + " is not found");
+                }
+                if (newTbl.getType() != TableType.OLAP) {
+                    LOG.warn("replace table from {} to {}, but the temp table 
is not OLAP, it type is {}",
+                            aliasName, originName, newTbl.getType());
+                    return new Status(ErrCode.COMMON_ERROR, "replace table 
failed, the temp table " + aliasName
+                            + " is not OLAP table, it is " + newTbl.getType());
+                }
+
+                OlapTable originOlapTbl = null;
+                Table originTbl = db.getTableNullable(originName);
+                if (originTbl != null) {
+                    if (originTbl.getType() != TableType.OLAP) {
+                        LOG.warn("replace table from {} to {}, but the origin 
table is not OLAP, it type is {}",
+                                aliasName, originName, originTbl.getType());
+                        return new Status(ErrCode.COMMON_ERROR, "replace table 
failed, the origin table "
+                                + originName + " is not OLAP table, it is " + 
originTbl.getType());
+                    }
+                    originOlapTbl = (OlapTable) originTbl;  // save the origin 
olap table, then drop it.
+                }
+
+                // replace the table.
+                OlapTable newOlapTbl = (OlapTable) newTbl;
+                newOlapTbl.writeLock();
+                try {
+                    // rename new table name to origin table name and add the 
new table to database.
+                    db.unregisterTable(aliasName);
+                    newOlapTbl.checkAndSetName(originName, false);
+                    db.unregisterTable(originName);
+                    db.registerTable(newOlapTbl);
+
+                    // set the olap table state to normal immediately for 
querying
+                    newOlapTbl.setState(OlapTableState.NORMAL);
+                    LOG.info("atomic restore replace table {} name to {}, and 
set state to normal, origin table={}",
+                            newOlapTbl.getId(), originName, originOlapTbl == 
null ? -1L : originOlapTbl.getId());
+                } catch (DdlException e) {
+                    LOG.warn("atomic restore replace table {} name from {} to 
{}",
+                            newOlapTbl.getId(), aliasName, originName, e);
+                    return new Status(ErrCode.COMMON_ERROR, "replace table 
from " + aliasName + " to " + originName
+                            + " failed, reason=" + e.getMessage());
+                } finally {
+                    newOlapTbl.writeUnlock();
+                }
+
+                if (originOlapTbl != null) {
+                    // The origin table is not used anymore, need to drop all 
its tablets.
+                    originOlapTbl.writeLock();
+                    try {
+                        LOG.info("drop the origin olap table {} by atomic 
restore. table={}",
+                                originOlapTbl.getName(), 
originOlapTbl.getId());
+                        Env.getCurrentEnv().onEraseOlapTable(originOlapTbl, 
isReplay);
+                    } finally {
+                        originOlapTbl.writeUnlock();
+                    }
+                }
+            } finally {
+                db.writeUnlock();
+            }
+        }
+
+        return Status.OK;
+    }
+
     private void setTableStateToNormalAndUpdateProperties(Database db, boolean 
committed, boolean isReplay) {
         for (String tableName : jobInfo.backupOlapTableObjects.keySet()) {
             Table tbl = 
db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(tableName));
@@ -2139,6 +2406,10 @@ public class RestoreJob extends AbstractJob {
                     LOG.info("table {} set state from {} to normal", 
tableName, olapTbl.getState());
                     olapTbl.setState(OlapTableState.NORMAL);
                 }
+                if (olapTbl.isInAtomicRestore()) {
+                    olapTbl.clearInAtomicRestore();
+                    LOG.info("table {} set state from atomic restore to 
normal", tableName);
+                }
 
                 BackupOlapTableInfo tblInfo = 
jobInfo.backupOlapTableObjects.get(tableName);
                 for (Map.Entry<String, BackupPartitionInfo> partitionEntry : 
tblInfo.partitions.entrySet()) {
@@ -2329,6 +2600,7 @@ public class RestoreJob extends AbstractJob {
         isBeingSynced = 
Boolean.parseBoolean(properties.get(PROP_IS_BEING_SYNCED));
         isCleanTables = 
Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES));
         isCleanPartitions = 
Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS));
+        isAtomicRestore = 
Boolean.parseBoolean(properties.get(PROP_ATOMIC_RESTORE));
     }
 
     @Override
@@ -2338,4 +2610,20 @@ public class RestoreJob extends AbstractJob {
         sb.append(", state: ").append(state.name());
         return sb.toString();
     }
+
+    public static String tableAliasWithAtomicRestore(String tableName) {
+        return ATOMIC_RESTORE_TABLE_PREFIX + tableName;
+    }
+
+    private static class TabletRef {
+        public long tabletId;
+        public int schemaHash;
+        public TStorageMedium storageMedium;
+
+        TabletRef(long tabletId, int schemaHash, TStorageMedium storageMedium) 
{
+            this.tabletId = tabletId;
+            this.schemaHash = schemaHash;
+            this.storageMedium = storageMedium;
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index b7209ba550d..f425d94fa61 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -78,6 +78,7 @@ import org.apache.doris.analysis.TableRenameClause;
 import org.apache.doris.analysis.TruncateTableStmt;
 import org.apache.doris.analysis.UninstallPluginStmt;
 import org.apache.doris.backup.BackupHandler;
+import org.apache.doris.backup.RestoreJob;
 import org.apache.doris.binlog.BinlogGcer;
 import org.apache.doris.binlog.BinlogManager;
 import org.apache.doris.blockrule.SqlBlockRuleMgr;
@@ -3507,6 +3508,10 @@ public class Env {
                     .append("\" = \"");
             sb.append(olapTable.isDuplicateWithoutKey()).append("\"");
         }
+
+        if (olapTable.isInAtomicRestore()) {
+            
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_IN_ATOMIC_RESTORE).append("\"
 = \"true\"");
+        }
     }
 
     /**
@@ -4532,6 +4537,9 @@ public class Env {
                 if (db.getTable(newTableName).isPresent()) {
                     throw new DdlException("Table name[" + newTableName + "] 
is already used");
                 }
+                if 
(db.getTable(RestoreJob.tableAliasWithAtomicRestore(newTableName)).isPresent()) 
{
+                    throw new DdlException("Table name[" + newTableName + "] 
is already used (in restoring)");
+                }
 
                 if (table.isManagedTable()) {
                     // olap table should also check if any rollup has same 
name as "newTableName"
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 48c70917da4..a8c06760e3d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -1865,6 +1865,10 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
             throw new DdlException("Table[" + name + "]'s state(" + 
state.toString()
                     + ") is not NORMAL. Do not allow doing ALTER ops");
         }
+        if (tableProperty != null && tableProperty.isInAtomicRestore()) {
+            throw new DdlException("Table[" + name + "] is in atomic restore 
state. "
+                    + "Do not allow doing ALTER ops");
+        }
     }
 
     public boolean isStable(SystemInfoService infoService, TabletScheduler 
tabletScheduler) {
@@ -2114,6 +2118,21 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
         return "";
     }
 
+    public void setInAtomicRestore() {
+        getOrCreatTableProperty().setInAtomicRestore().buildInAtomicRestore();
+    }
+
+    public void clearInAtomicRestore() {
+        
getOrCreatTableProperty().clearInAtomicRestore().buildInAtomicRestore();
+    }
+
+    public boolean isInAtomicRestore() {
+        if (tableProperty != null) {
+            return tableProperty.isInAtomicRestore();
+        }
+        return false;
+    }
+
     public boolean getEnableLightSchemaChange() {
         if (tableProperty != null) {
             return tableProperty.getUseSchemaLightChange();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index 0d58aabea08..0c075191923 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -61,6 +61,7 @@ public class TableProperty implements Writable {
     private ReplicaAllocation replicaAlloc = 
ReplicaAllocation.DEFAULT_ALLOCATION;
     private boolean isInMemory = false;
     private short minLoadReplicaNum = -1;
+    private boolean isInAtomicRestore = false;
 
     private String storagePolicy = "";
     private Boolean isBeingSynced = null;
@@ -197,6 +198,26 @@ public class TableProperty implements Writable {
         return this;
     }
 
+    public TableProperty buildInAtomicRestore() {
+        isInAtomicRestore = Boolean.parseBoolean(properties.getOrDefault(
+                PropertyAnalyzer.PROPERTIES_IN_ATOMIC_RESTORE, "false"));
+        return this;
+    }
+
+    public boolean isInAtomicRestore() {
+        return isInAtomicRestore;
+    }
+
+    public TableProperty setInAtomicRestore() {
+        properties.put(PropertyAnalyzer.PROPERTIES_IN_ATOMIC_RESTORE, "true");
+        return this;
+    }
+
+    public TableProperty clearInAtomicRestore() {
+        properties.remove(PropertyAnalyzer.PROPERTIES_IN_ATOMIC_RESTORE);
+        return this;
+    }
+
     public TableProperty buildEnableLightSchemaChange() {
         enableLightSchemaChange = Boolean.parseBoolean(
                 
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ENABLE_LIGHT_SCHEMA_CHANGE, 
"false"));
@@ -628,7 +649,8 @@ public class TableProperty implements Writable {
                 .buildDisableAutoCompaction()
                 .buildEnableSingleReplicaCompaction()
                 .buildTimeSeriesCompactionEmptyRowsetsThreshold()
-                .buildTimeSeriesCompactionLevelThreshold();
+                .buildTimeSeriesCompactionLevelThreshold()
+                .buildInAtomicRestore();
         if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) {
             // get replica num from property map and create replica allocation
             String repNum = 
tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 288ee72afd3..553b322076a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -78,6 +78,7 @@ public class PropertyAnalyzer {
     public static final String PROPERTIES_SCHEMA_VERSION = "schema_version";
     public static final String PROPERTIES_PARTITION_ID = "partition_id";
     public static final String PROPERTIES_VISIBLE_VERSION = "visible_version";
+    public static final String PROPERTIES_IN_ATOMIC_RESTORE = 
"in_atomic_restore";
 
     public static final String PROPERTIES_BF_COLUMNS = "bloom_filter_columns";
     public static final String PROPERTIES_BF_FPP = "bloom_filter_fpp";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 8f37f73a82b..e5131b1dd30 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -56,6 +56,7 @@ import org.apache.doris.analysis.TableName;
 import org.apache.doris.analysis.TableRef;
 import org.apache.doris.analysis.TruncateTableStmt;
 import org.apache.doris.analysis.TypeDef;
+import org.apache.doris.backup.RestoreJob;
 import org.apache.doris.catalog.BinlogConfig;
 import org.apache.doris.catalog.BrokerTable;
 import org.apache.doris.catalog.ColocateGroupSchema;
@@ -899,10 +900,16 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                 OlapTable olapTable = (OlapTable) table;
                 if ((olapTable.getState() != OlapTableState.NORMAL)) {
                     throw new DdlException("The table [" + tableName + "]'s 
state is " + olapTable.getState()
-                            + ", cannot be dropped." + " please cancel the 
operation on olap table firstly."
+                            + ", cannot be dropped. please cancel the 
operation on olap table firstly."
                             + " If you want to forcibly drop(cannot be 
recovered),"
                             + " please use \"DROP table FORCE\".");
                 }
+                if (olapTable.isInAtomicRestore()) {
+                    throw new DdlException("The table [" + tableName + "]'s 
state is in atomic restore"
+                            + ", cannot be dropped. please cancel the restore 
operation on olap table"
+                            + " firstly. If you want to forcibly drop(cannot 
be recovered),"
+                            + " please use \"DROP table FORCE\".");
+                }
             }
 
             dropTableInternal(db, table, stmt.isForceDrop(), watch, costTimes);
@@ -1168,6 +1175,11 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                 
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
             }
         }
+        if 
(db.getTable(RestoreJob.tableAliasWithAtomicRestore(tableName)).isPresent()) {
+            ErrorReport.reportDdlException(
+                    "table[{}] is in atomic restore, please cancel the restore 
operation firstly",
+                    ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+        }
 
         if (engineName.equals("olap")) {
             return createOlapTable(db, stmt);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index abe85b06bef..be05c023166 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -3118,6 +3118,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         if (request.isCleanTables()) {
             properties.put(RestoreStmt.PROP_CLEAN_TABLES, "true");
         }
+        if (request.isAtomicRestore()) {
+            properties.put(RestoreStmt.PROP_ATOMIC_RESTORE, "true");
+        }
 
         AbstractBackupTableRefClause restoreTableRefClause = null;
         if (request.isSetTableRefs()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SnapshotTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/SnapshotTask.java
index 71b3570f288..81177305683 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/SnapshotTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/SnapshotTask.java
@@ -29,6 +29,7 @@ public class SnapshotTask extends AgentTask {
     private int schemaHash;
     private long timeoutMs;
     private boolean isRestoreTask;
+    private Long refTabletId;
 
     // Set to true if this task for AdminCopyTablet.
     // Otherwise, it is for Backup/Restore operation.
@@ -98,13 +99,23 @@ public class SnapshotTask extends AgentTask {
         return resultSnapshotPath;
     }
 
+    public void setRefTabletId(long refTabletId) {
+        assert refTabletId > 0;
+        this.refTabletId = refTabletId;
+    }
+
     public TSnapshotRequest toThrift() {
         TSnapshotRequest request = new TSnapshotRequest(tabletId, schemaHash);
-        request.setVersion(version);
         request.setListFiles(true);
         
request.setPreferredSnapshotVersion(TypesConstants.TPREFER_SNAPSHOT_REQ_VERSION);
         request.setTimeout(timeoutMs / 1000);
         request.setIsCopyTabletTask(isCopyTabletTask);
+        if (refTabletId != null) {
+            request.setRefTabletId(refTabletId);
+        }
+        if (version > 0L) {
+            request.setVersion(version);
+        }
         return request;
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreFileMappingTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreFileMappingTest.java
index d37a63f6d14..85de627fa44 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreFileMappingTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreFileMappingTest.java
@@ -31,14 +31,14 @@ public class RestoreFileMappingTest {
 
     @Before
     public void setUp() {
-        src = new IdChain(10005L, 10006L, 10005L, 10007L, 10008L);
-        dest = new IdChain(10004L, 10003L, 10004L, 10007L, -1L);
+        src = new IdChain(10005L, 10006L, 10005L, 10007L, 10008L, -1L);
+        dest = new IdChain(10004L, 10003L, 10004L, 10007L, -1L, -1L);
         fileMapping.putMapping(src, dest, true);
     }
 
     @Test
     public void test() {
-        IdChain key = new IdChain(10005L, 10006L, 10005L, 10007L, 10008L);
+        IdChain key = new IdChain(10005L, 10006L, 10005L, 10007L, 10008L, -1L);
         Assert.assertEquals(key, src);
         Assert.assertEquals(src, key);
         IdChain val = fileMapping.get(key);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
index fb4d9fe768f..cccfdb517da 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
@@ -251,7 +251,8 @@ public class RestoreJobTest {
         db.unregisterTable(expectedRestoreTbl.getName());
 
         job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), 
db.getFullName(), jobInfo, false,
-                new ReplicaAllocation((short) 3), 100000, -1, false, false, 
false, false, false, env, repo.getId());
+                new ReplicaAllocation((short) 3), 100000, -1, false, false, 
false, false, false, false,
+                env, repo.getId());
 
         List<Table> tbls = Lists.newArrayList();
         List<Resource> resources = Lists.newArrayList();
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index a5b91dc2498..e879b54615d 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -358,6 +358,7 @@ struct TSnapshotRequest {
     11: optional Types.TVersion start_version
     12: optional Types.TVersion end_version
     13: optional bool is_copy_binlog
+    14: optional Types.TTabletId ref_tablet_id
 }
 
 struct TReleaseSnapshotRequest {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 5480a84cf69..dac90824fa2 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1211,6 +1211,7 @@ struct TRestoreSnapshotRequest {
     12: optional binary job_info
     13: optional bool clean_tables
     14: optional bool clean_partitions
+    15: optional bool atomic_restore
 }
 
 struct TRestoreSnapshotResult {
diff --git a/regression-test/data/backup_restore/test_backup_restore_atomic.out 
b/regression-test/data/backup_restore/test_backup_restore_atomic.out
new file mode 100644
index 00000000000..bee7a4da44f
--- /dev/null
+++ b/regression-test/data/backup_restore/test_backup_restore_atomic.out
@@ -0,0 +1,78 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+10     10
+20     20
+30     30
+40     40
+50     50
+60     60
+70     70
+80     80
+90     90
+100    100
+
+-- !sql --
+10     10
+20     20
+
+-- !sql --
+10     10
+20     20
+30     30
+40     40
+50     50
+60     60
+70     70
+80     80
+90     90
+100    100
+
+-- !sql --
+10     10
+20     20
+30     30
+40     40
+50     50
+60     60
+70     70
+80     80
+90     90
+100    100
+
+-- !sql --
+10     10
+20     20
+30     30
+40     40
+50     50
+60     60
+70     70
+80     80
+90     90
+100    100
+
+-- !sql --
+10     10
+20     20
+30     30
+40     40
+50     50
+60     60
+70     70
+80     80
+90     90
+100    100
+
+-- !sql --
+10     20
+20     40
+30     60
+40     80
+50     100
+60     120
+70     140
+80     160
+90     180
+100    200
+200    200
+
diff --git 
a/regression-test/data/backup_restore/test_backup_restore_atomic_with_view.out 
b/regression-test/data/backup_restore/test_backup_restore_atomic_with_view.out
new file mode 100644
index 00000000000..cad6dbe8fd8
--- /dev/null
+++ 
b/regression-test/data/backup_restore/test_backup_restore_atomic_with_view.out
@@ -0,0 +1,60 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1      1
+2      2
+3      3
+4      4
+5      5
+6      6
+7      7
+8      8
+9      9
+10     10
+
+-- !sql --
+6      6
+7      7
+8      8
+9      9
+10     10
+
+-- !sql --
+1      1
+2      2
+3      3
+4      4
+5      5
+6      6
+7      7
+8      8
+9      9
+10     10
+
+-- !sql --
+6      6
+7      7
+8      8
+9      9
+10     10
+
+-- !sql --
+1      1
+2      2
+3      3
+4      4
+5      5
+6      6
+7      7
+8      8
+9      9
+10     10
+11     11
+
+-- !sql --
+6      6
+7      7
+8      8
+9      9
+10     10
+11     11
+
diff --git 
a/regression-test/suites/backup_restore/test_backup_restore_atomic.groovy 
b/regression-test/suites/backup_restore/test_backup_restore_atomic.groovy
new file mode 100644
index 00000000000..4b87340fb35
--- /dev/null
+++ b/regression-test/suites/backup_restore/test_backup_restore_atomic.groovy
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_backup_restore_atomic", "backup_restore") {
+    String suiteName = "test_backup_restore_atomic"
+    String dbName = "${suiteName}_db_1"
+    String dbName1 = "${suiteName}_db_2"
+    String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
+    String snapshotName = "${suiteName}_snapshot"
+    String tableNamePrefix = "${suiteName}_tables"
+
+    def syncer = getSyncer()
+    syncer.createS3Repository(repoName)
+    sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+    sql "CREATE DATABASE IF NOT EXISTS ${dbName1}"
+
+    // 1. restore to not exists table_0
+    // 2. restore partial data to table_1
+    // 3. restore less data to table_2
+    // 4. restore incremental data to table_3
+    int numTables = 4;
+    List<String> tables = []
+    for (int i = 0; i < numTables; ++i) {
+        String tableName = "${tableNamePrefix}_${i}"
+        tables.add(tableName)
+        sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
+        sql """
+            CREATE TABLE ${dbName}.${tableName} (
+                `id` LARGEINT NOT NULL,
+                `count` LARGEINT SUM DEFAULT "0"
+            )
+            AGGREGATE KEY(`id`)
+            PARTITION BY RANGE(`id`)
+            (
+                PARTITION p1 VALUES LESS THAN ("10"),
+                PARTITION p2 VALUES LESS THAN ("20"),
+                PARTITION p3 VALUES LESS THAN ("30"),
+                PARTITION p4 VALUES LESS THAN ("40"),
+                PARTITION p5 VALUES LESS THAN ("50"),
+                PARTITION p6 VALUES LESS THAN ("60"),
+                PARTITION p7 VALUES LESS THAN ("120")
+            )
+            DISTRIBUTED BY HASH(`id`) BUCKETS 2
+            PROPERTIES
+            (
+                "replication_num" = "1"
+            )
+            """
+    }
+
+    // 5. the len of table name equals to the config table_name_length_limit
+    def maxLabelLen = getFeConfig("table_name_length_limit").toInteger()
+    def maxTableName = "".padRight(maxLabelLen, "x")
+    logger.info("config table_name_length_limit = ${maxLabelLen}, table name = 
${maxTableName}")
+    sql "DROP TABLE IF EXISTS ${dbName}.${maxTableName}"
+    sql """
+        CREATE TABLE ${dbName}.${maxTableName} (
+            `id` LARGEINT NOT NULL,
+            `count` LARGEINT SUM DEFAULT "0"
+        )
+        AGGREGATE KEY(`id`)
+        PARTITION BY RANGE(`id`)
+        (
+            PARTITION p1 VALUES LESS THAN ("10"),
+            PARTITION p2 VALUES LESS THAN ("20"),
+            PARTITION p3 VALUES LESS THAN ("30"),
+            PARTITION p4 VALUES LESS THAN ("40"),
+            PARTITION p5 VALUES LESS THAN ("50"),
+            PARTITION p6 VALUES LESS THAN ("60"),
+            PARTITION p7 VALUES LESS THAN ("120")
+        )
+        DISTRIBUTED BY HASH(`id`) BUCKETS 2
+        PROPERTIES
+        (
+            "replication_num" = "1"
+        )
+        """
+    tables.add(maxTableName)
+
+    int numRows = 10;
+    List<String> values = []
+    for (int j = 1; j <= numRows; ++j) {
+        values.add("(${j}0, ${j}0)")
+    }
+
+    sql "INSERT INTO ${dbName}.${tableNamePrefix}_0 VALUES ${values.join(",")}"
+    sql "INSERT INTO ${dbName}.${tableNamePrefix}_1 VALUES ${values.join(",")}"
+    sql "INSERT INTO ${dbName}.${tableNamePrefix}_2 VALUES ${values.join(",")}"
+    sql "INSERT INTO ${dbName}.${tableNamePrefix}_3 VALUES ${values.join(",")}"
+    sql "INSERT INTO ${dbName}.${maxTableName} VALUES ${values.join(",")}"
+
+    // the other partitions of table_1 will be drop
+    sql """
+        BACKUP SNAPSHOT ${dbName}.${snapshotName}
+        TO `${repoName}`
+        ON (
+            ${tableNamePrefix}_0,
+            ${tableNamePrefix}_1 PARTITION (p1, p2, p3),
+            ${tableNamePrefix}_2,
+            ${tableNamePrefix}_3,
+            ${maxTableName}
+        )
+    """
+
+    syncer.waitSnapshotFinish(dbName)
+
+    def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
+    assertTrue(snapshot != null)
+
+    // drop table_0
+    sql "DROP TABLE ${dbName}.${tableNamePrefix}_0 FORCE"
+
+    // insert external data to table_2
+    sql "INSERT INTO ${dbName}.${tableNamePrefix}_2 VALUES ${values.join(",")}"
+
+    sql "TRUNCATE TABLE ${dbName}.${tableNamePrefix}_3"
+
+    sql """
+        RESTORE SNAPSHOT ${dbName}.${snapshotName}
+        FROM `${repoName}`
+        PROPERTIES
+        (
+            "backup_timestamp" = "${snapshot}",
+            "reserve_replica" = "true",
+            "atomic_restore" = "true"
+        )
+    """
+
+    syncer.waitAllRestoreFinish(dbName)
+
+    for (def tableName in tables) {
+        qt_sql "SELECT * FROM ${dbName}.${tableName} ORDER BY id"
+    }
+
+    // restore table_3 to new db
+    sql """
+        RESTORE SNAPSHOT ${dbName1}.${snapshotName}
+        FROM `${repoName}`
+        ON (${tableNamePrefix}_3)
+        PROPERTIES
+        (
+            "backup_timestamp" = "${snapshot}",
+            "reserve_replica" = "true",
+            "atomic_restore" = "true"
+        )
+    """
+
+    syncer.waitAllRestoreFinish(dbName1)
+
+    qt_sql "SELECT * FROM ${dbName1}.${tableNamePrefix}_3 ORDER BY id"
+
+    // add partition and insert some data.
+    sql "ALTER TABLE ${dbName}.${tableNamePrefix}_3 ADD PARTITION p8 VALUES 
LESS THAN MAXVALUE"
+    sql "INSERT INTO ${dbName}.${tableNamePrefix}_3 VALUES ${values.join(",")}"
+    sql "INSERT INTO ${dbName}.${tableNamePrefix}_3 VALUES (200, 200)"
+
+    // backup again
+    snapshotName = "${snapshotName}_1"
+    sql """
+        BACKUP SNAPSHOT ${dbName}.${snapshotName}
+        TO `${repoName}`
+        ON (${tableNamePrefix}_3)
+    """
+
+    syncer.waitSnapshotFinish(dbName)
+
+    snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
+    assertTrue(snapshot != null)
+
+    // restore with incremental data
+    sql """
+        RESTORE SNAPSHOT ${dbName1}.${snapshotName}
+        FROM `${repoName}`
+        ON (${tableNamePrefix}_3)
+        PROPERTIES
+        (
+            "backup_timestamp" = "${snapshot}",
+            "reserve_replica" = "true",
+            "atomic_restore" = "true"
+        )
+    """
+
+    syncer.waitAllRestoreFinish(dbName1)
+
+    qt_sql "SELECT * FROM ${dbName1}.${tableNamePrefix}_3 ORDER BY id"
+
+    for (def tableName in tables) {
+        sql "DROP TABLE ${dbName}.${tableName} FORCE"
+    }
+    sql "DROP DATABASE ${dbName} FORCE"
+    sql "DROP DATABASE ${dbName1} FORCE"
+    sql "DROP REPOSITORY `${repoName}`"
+}
+
+
diff --git 
a/regression-test/suites/backup_restore/test_backup_restore_atomic_cancel.groovy
 
b/regression-test/suites/backup_restore/test_backup_restore_atomic_cancel.groovy
new file mode 100644
index 00000000000..3487c93b0d6
--- /dev/null
+++ 
b/regression-test/suites/backup_restore/test_backup_restore_atomic_cancel.groovy
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_backup_restore_atomic_cancel") {
+    String suiteName = "test_backup_restore_atomic_cancelled"
+    String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
+    String dbName = "${suiteName}_db"
+    String tableName = "${suiteName}_table"
+    String tableName1 = "${suiteName}_table_1"
+    String viewName = "${suiteName}_view"
+    String snapshotName = "${suiteName}_snapshot"
+
+    def syncer = getSyncer()
+    syncer.createS3Repository(repoName)
+
+    sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+    sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
+    sql """
+        CREATE TABLE ${dbName}.${tableName} (
+            `id` LARGEINT NOT NULL,
+            `count` LARGEINT SUM DEFAULT "0")
+        AGGREGATE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 2
+        PROPERTIES
+        (
+            "replication_num" = "1"
+        )
+        """
+    sql "DROP TABLE IF EXISTS ${dbName}.${tableName1}"
+    sql """
+        CREATE TABLE ${dbName}.${tableName1} (
+            `id` LARGEINT NOT NULL,
+            `count` LARGEINT SUM DEFAULT "0")
+        AGGREGATE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 2
+        PROPERTIES
+        (
+            "replication_num" = "1"
+        )
+        """
+    sql "DROP VIEW IF EXISTS ${dbName}.${viewName}"
+    sql """
+        CREATE VIEW ${dbName}.${viewName}
+        AS
+        SELECT id, count FROM ${dbName}.${tableName}
+        WHERE id > 5
+    """
+
+    List<String> values = []
+    for (int i = 1; i <= 10; ++i) {
+        values.add("(${i}, ${i})")
+    }
+    sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}"
+    def result = sql "SELECT * FROM ${dbName}.${tableName}"
+    assertEquals(result.size(), values.size());
+
+    sql "INSERT INTO ${dbName}.${tableName1} VALUES ${values.join(",")}"
+    result = sql "SELECT * FROM ${dbName}.${tableName1}"
+    assertEquals(result.size(), values.size());
+
+
+    sql """
+        BACKUP SNAPSHOT ${dbName}.${snapshotName}
+        TO `${repoName}`
+    """
+
+    syncer.waitSnapshotFinish(dbName)
+
+    // alter view and restore, it must failed because the signatures are not 
matched
+
+    sql """
+        ALTER VIEW ${dbName}.${viewName}
+        AS
+        SELECT id,count FROM ${dbName}.${tableName}
+        WHERE id < 100
+
+    """
+
+    sql "INSERT INTO ${dbName}.${tableName} VALUES (11, 11)"
+
+    def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
+    sql """
+        RESTORE SNAPSHOT ${dbName}.${snapshotName}
+        FROM `${repoName}`
+        PROPERTIES
+        (
+            "backup_timestamp" = "${snapshot}",
+            "reserve_replica" = "true",
+            "atomic_restore" = "true"
+        )
+    """
+
+    syncer.waitAllRestoreFinish(dbName)
+
+    def restore_result = sql_return_maparray """ SHOW RESTORE FROM ${dbName} 
WHERE Label ="${snapshotName}" """
+    restore_result.last()
+    logger.info("show restore result: ${restore_result}")
+    assertTrue(restore_result.last().State == "CANCELLED")
+
+
+    // Do not affect any tables.
+    result = sql "SELECT * FROM ${dbName}.${tableName}"
+    assertEquals(result.size(), values.size() + 1);
+
+    result = sql "SELECT * FROM ${dbName}.${tableName1}"
+    assertEquals(result.size(), values.size());
+
+    sql "DROP TABLE ${dbName}.${tableName} FORCE"
+    sql "DROP TABLE ${dbName}.${tableName1} FORCE"
+    sql "DROP DATABASE ${dbName} FORCE"
+    sql "DROP REPOSITORY `${repoName}`"
+}
+
+
diff --git 
a/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy
 
b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy
new file mode 100644
index 00000000000..46a3ca5b29d
--- /dev/null
+++ 
b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_backup_restore_atomic_with_alter", "backup_restore") {
+    if (!getFeConfig("enable_debug_points").equals("true")) {
+        logger.info("Config.enable_debug_points=true is required")
+        return
+    }
+
+    String suiteName = "test_backup_restore_atomic_with_alter"
+    String dbName = "${suiteName}_db"
+    String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
+    String snapshotName = "snapshot_" + 
UUID.randomUUID().toString().replace("-", "")
+    String tableNamePrefix = "${suiteName}_tables"
+
+    def syncer = getSyncer()
+    syncer.createS3Repository(repoName)
+    sql "DROP DATABASE IF EXISTS ${dbName} FORCE"
+    sql "CREATE DATABASE ${dbName}"
+
+    // during restoring, if:
+    // 1. table_0 not exists, create table_0 is not allowed
+    // 2. table_1 exists, alter operation is not allowed
+    // 3. table_1 exists, drop table is not allowed
+    // 4. table_0 not exists, rename table_2 to table_0 is not allowed
+    int numTables = 3;
+    List<String> tables = []
+    for (int i = 0; i < numTables; ++i) {
+        String tableName = "${tableNamePrefix}_${i}"
+        tables.add(tableName)
+        sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
+        sql """
+            CREATE TABLE ${dbName}.${tableName} (
+                `id` LARGEINT NOT NULL,
+                `count` LARGEINT SUM DEFAULT "0"
+            )
+            AGGREGATE KEY(`id`)
+            PARTITION BY RANGE(`id`)
+            (
+                PARTITION p1 VALUES LESS THAN ("10"),
+                PARTITION p2 VALUES LESS THAN ("20"),
+                PARTITION p3 VALUES LESS THAN ("30"),
+                PARTITION p4 VALUES LESS THAN ("40"),
+                PARTITION p5 VALUES LESS THAN ("50"),
+                PARTITION p6 VALUES LESS THAN ("60"),
+                PARTITION p7 VALUES LESS THAN ("120")
+            )
+            DISTRIBUTED BY HASH(`id`) BUCKETS 2
+            PROPERTIES
+            (
+                "replication_num" = "1"
+            )
+            """
+    }
+
+    int numRows = 10;
+    List<String> values = []
+    for (int j = 1; j <= numRows; ++j) {
+        values.add("(${j}0, ${j}0)")
+    }
+
+    sql "INSERT INTO ${dbName}.${tableNamePrefix}_0 VALUES ${values.join(",")}"
+    sql "INSERT INTO ${dbName}.${tableNamePrefix}_1 VALUES ${values.join(",")}"
+    sql "INSERT INTO ${dbName}.${tableNamePrefix}_2 VALUES ${values.join(",")}"
+
+    // only backup table 0,1
+    sql """
+        BACKUP SNAPSHOT ${dbName}.${snapshotName}
+        TO `${repoName}`
+        ON (
+            ${tableNamePrefix}_0,
+            ${tableNamePrefix}_1
+        )
+    """
+
+    syncer.waitSnapshotFinish(dbName)
+
+    def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
+    assertTrue(snapshot != null)
+
+    // drop table_0
+    sql "DROP TABLE ${dbName}.${tableNamePrefix}_0 FORCE"
+
+    // disable restore
+    
GetDebugPoint().enableDebugPointForAllFEs("FE.PAUSE_NON_PENDING_RESTORE_JOB", 
[value:snapshotName])
+
+    sql """
+        RESTORE SNAPSHOT ${dbName}.${snapshotName}
+        FROM `${repoName}`
+        PROPERTIES
+        (
+            "backup_timestamp" = "${snapshot}",
+            "reserve_replica" = "true",
+            "atomic_restore" = "true"
+        )
+    """
+
+    boolean restore_paused = false
+    for (int k = 0; k < 60; k++) {
+        def records = sql_return_maparray """ SHOW RESTORE FROM ${dbName} 
WHERE Label = "${snapshotName}" """
+        if (records.size() == 1 && records[0].State != 'PENDING') {
+            restore_paused = true
+            break
+        }
+        logger.info("SHOW RESTORE result: ${records}")
+        sleep(3000)
+    }
+    assertTrue(restore_paused)
+
+    // 0. table_1 has in_atomic_restore property
+    def show_result = sql """ SHOW CREATE TABLE ${dbName}.${tableNamePrefix}_1 
"""
+    logger.info("SHOW CREATE TABLE ${tableNamePrefix}_1: ${show_result}")
+    assertTrue(show_result[0][1].contains("in_atomic_restore"))
+
+    // 1. create a restoring table (not exists before)
+    expectExceptionLike({ ->
+        sql """
+            CREATE TABLE ${dbName}.${tableNamePrefix}_0
+            (
+                `id` LARGEINT NOT NULL,
+                `count` LARGEINT SUM DEFAULT "0"
+            )
+            AGGREGATE KEY(`id`)
+            PARTITION BY RANGE(`id`)
+            (
+                PARTITION p1 VALUES LESS THAN ("10"),
+                PARTITION p2 VALUES LESS THAN ("20"),
+                PARTITION p3 VALUES LESS THAN ("30"),
+                PARTITION p4 VALUES LESS THAN ("40"),
+                PARTITION p5 VALUES LESS THAN ("50"),
+                PARTITION p6 VALUES LESS THAN ("60"),
+                PARTITION p7 VALUES LESS THAN ("120")
+            )
+            DISTRIBUTED BY HASH(`id`) BUCKETS 2
+            PROPERTIES
+            (
+                "replication_num" = "1"
+            )
+        """
+    }, "is in atomic restore, please cancel the restore operation firstly")
+
+    // 2. alter is not allowed
+    expectExceptionLike({
+        sql """
+            ALTER TABLE ${dbName}.${tableNamePrefix}_1
+            ADD PARTITION p8 VALUES LESS THAN("200")
+        """
+    }, "Do not allow doing ALTER ops")
+    expectExceptionLike({
+        sql """
+            ALTER TABLE ${dbName}.${tableNamePrefix}_1
+            DROP PARTITION p1
+        """
+    }, "Do not allow doing ALTER ops")
+    expectExceptionLike({
+        sql """
+            ALTER TABLE ${dbName}.${tableNamePrefix}_1
+            MODIFY PARTITION p1 SET ("key"="value")
+        """
+    }, "Do not allow doing ALTER ops")
+    expectExceptionLike({
+        sql """
+            ALTER TABLE ${dbName}.${tableNamePrefix}_1
+            ADD COLUMN new_col INT DEFAULT "0" AFTER count
+        """
+    }, "Do not allow doing ALTER ops")
+    expectExceptionLike({
+        sql """
+            ALTER TABLE ${dbName}.${tableNamePrefix}_1
+            DROP COLUMN count
+        """
+    }, "Do not allow doing ALTER ops")
+    expectExceptionLike({
+        sql """
+            ALTER TABLE ${dbName}.${tableNamePrefix}_1
+            SET ("is_being_synced"="false")
+        """
+    }, "Do not allow doing ALTER ops")
+    expectExceptionLike({
+        sql """
+            ALTER TABLE ${dbName}.${tableNamePrefix}_1
+            RENAME newTableName
+        """
+    }, "Do not allow doing ALTER ops")
+    // BTW, the tmp table also don't allow rename
+    expectExceptionLike({
+        sql """
+            ALTER TABLE 
${dbName}.__doris_atomic_restore_prefix__${tableNamePrefix}_1
+            RENAME newTableName
+        """
+    }, "Do not allow doing ALTER ops")
+    // 3. drop table is not allowed
+    expectExceptionLike({
+        sql """
+            DROP TABLE ${dbName}.${tableNamePrefix}_1
+        """
+    }, "state is in atomic restore")
+    expectExceptionLike({
+        sql """
+            DROP TABLE 
${dbName}.__doris_atomic_restore_prefix__${tableNamePrefix}_1
+        """
+    }, "state is RESTORE")
+    // 4. the table name is occupied
+    expectExceptionLike({
+        sql """
+            ALTER TABLE ${dbName}.${tableNamePrefix}_2
+            RENAME ${tableNamePrefix}_0
+        """
+    }, "is already used (in restoring)")
+
+
+    sql "CANCEL RESTORE FROM ${dbName}"
+
+    // 5. The restore job is cancelled, the in_atomic_restore property has 
been removed.
+    show_result = sql """ SHOW CREATE TABLE ${dbName}.${tableNamePrefix}_1 """
+    logger.info("SHOW CREATE TABLE ${tableNamePrefix}_1: ${show_result}")
+    assertFalse(show_result[0][1].contains("in_atomic_restore"))
+
+    for (def tableName in tables) {
+        sql "DROP TABLE IF EXISTS ${dbName}.${tableName} FORCE"
+    }
+    sql "DROP DATABASE ${dbName} FORCE"
+    sql "DROP REPOSITORY `${repoName}`"
+}
+
+
+
diff --git 
a/regression-test/suites/backup_restore/test_backup_restore_atomic_with_view.groovy
 
b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_view.groovy
new file mode 100644
index 00000000000..9d090281364
--- /dev/null
+++ 
b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_view.groovy
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_backup_restore_atomic_with_view", "backup_restore") {
+    String suiteName = "backup_restore_atomic_with_view"
+    String dbName = "${suiteName}_db"
+    String dbName1 = "${suiteName}_db_1"
+    String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
+    String snapshotName = "${suiteName}_snapshot"
+    String tableName = "${suiteName}_table"
+    String viewName = "${suiteName}_view"
+
+    def syncer = getSyncer()
+    syncer.createS3Repository(repoName)
+    sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+    sql "CREATE DATABASE IF NOT EXISTS ${dbName1}"
+
+    int numRows = 10;
+    sql "DROP TABLE IF EXISTS ${dbName}.${tableName} FORCE"
+    sql "DROP VIEW IF EXISTS ${dbName}.${viewName}"
+    sql """
+        CREATE TABLE ${dbName}.${tableName} (
+            `id` LARGEINT NOT NULL,
+            `count` LARGEINT SUM DEFAULT "0"
+        )
+        AGGREGATE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 2
+        PROPERTIES
+        (
+            "replication_num" = "1"
+        )
+        """
+    List<String> values = []
+    for (int j = 1; j <= numRows; ++j) {
+        values.add("(${j}, ${j})")
+    }
+    sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}"
+
+    sql """CREATE VIEW ${dbName}.${viewName} (id, count)
+        AS
+        SELECT * FROM ${dbName}.${tableName} WHERE count > 5
+        """
+
+    qt_sql "SELECT * FROM ${dbName}.${tableName} ORDER BY id ASC"
+    qt_sql "SELECT * FROM ${dbName}.${viewName} ORDER BY id ASC"
+
+    sql """
+        BACKUP SNAPSHOT ${dbName}.${snapshotName}
+        TO `${repoName}`
+    """
+
+    syncer.waitSnapshotFinish(dbName)
+
+    def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
+    assertTrue(snapshot != null)
+
+    // restore new view
+    sql "DROP TABLE IF EXISTS ${dbName1}.${tableName} FORCE"
+    sql "DROP VIEW IF EXISTS ${dbName1}.${viewName}"
+
+    sql """
+        RESTORE SNAPSHOT ${dbName1}.${snapshotName}
+        FROM `${repoName}`
+        PROPERTIES
+        (
+            "backup_timestamp" = "${snapshot}",
+            "atomic_restore" = "true",
+            "reserve_replica" = "true"
+        )
+    """
+
+    syncer.waitAllRestoreFinish(dbName1)
+
+    qt_sql "SELECT * FROM ${dbName1}.${tableName} ORDER BY id ASC"
+    qt_sql "SELECT * FROM ${dbName1}.${viewName} ORDER BY id ASC"
+    def show_view_result = sql_return_maparray "SHOW VIEW FROM ${tableName} 
FROM ${dbName1}"
+    logger.info("show view result: ${show_view_result}")
+    assertTrue(show_view_result.size() == 1);
+    def show_view = show_view_result[0]['Create View']
+    assertTrue(show_view.contains("${dbName1}"))
+    assertTrue(show_view.contains("${tableName}"))
+
+    // restore an exists view
+    sql """
+        RESTORE SNAPSHOT ${dbName}.${snapshotName}
+        FROM `${repoName}`
+        PROPERTIES
+        (
+            "backup_timestamp" = "${snapshot}",
+            "atomic_restore" = "true",
+            "reserve_replica" = "true"
+        )
+    """
+
+    syncer.waitAllRestoreFinish(dbName)
+    def restore_result = sql_return_maparray """ SHOW RESTORE FROM ${dbName} 
WHERE Label ="${snapshotName}" """
+    restore_result.last()
+    logger.info("show restore result: ${restore_result}")
+    assertTrue(restore_result.last().State == "FINISHED")
+
+    // View could read the incremental data.
+    sql "INSERT INTO ${dbName}.${tableName} VALUES (11, 11)"
+
+    qt_sql "SELECT * FROM ${dbName}.${tableName} ORDER BY id ASC"
+    qt_sql "SELECT * FROM ${dbName}.${viewName} ORDER BY id ASC"
+
+    sql "DROP REPOSITORY `${repoName}`"
+}
+
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to