This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ea708d9bafb [fix](cluster key) fix mow cluster key with schema change 
(#40372)
ea708d9bafb is described below

commit ea708d9bafb1206c0c8e3387c6794e348e30afa3
Author: meiyi <myime...@gmail.com>
AuthorDate: Sat Sep 14 20:02:55 2024 +0800

    [fix](cluster key) fix mow cluster key with schema change (#40372)
---
 be/src/olap/memtable.cpp                           |   9 +-
 be/src/olap/merger.cpp                             |  56 ++--
 be/src/olap/merger.h                               |   3 +-
 be/src/olap/rowset/segcompaction.cpp               |   4 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |  24 +-
 .../rowset/segment_v2/vertical_segment_writer.cpp  |   8 +-
 .../java/org/apache/doris/alter/RollupJobV2.java   |   2 +-
 .../apache/doris/alter/SchemaChangeHandler.java    |   5 +-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  24 ++
 .../java/org/apache/doris/analysis/ColumnDef.java  |   4 +
 .../org/apache/doris/analysis/CreateTableStmt.java |   5 +-
 .../java/org/apache/doris/analysis/KeysDesc.java   |  67 ++--
 .../java/org/apache/doris/backup/RestoreJob.java   |  11 +
 .../java/org/apache/doris/catalog/OlapTable.java   |  11 +
 .../cloud/datasource/CloudInternalCatalog.java     |   3 +-
 .../apache/doris/datasource/InternalCatalog.java   |  49 ++-
 .../org/apache/doris/master/ReportHandler.java     |  12 +-
 .../trees/plans/commands/info/CreateTableInfo.java |  94 +++---
 .../unique_with_mow_c_p0/test_schema_change_ck.out | 365 +++++++++++++++++++++
 .../unique_with_mow_c_p0/test_create_table.groovy  |   2 +-
 .../unique_with_mow_c_p0/test_schema_change.groovy |   6 +-
 .../test_schema_change_ck.groovy                   | 262 +++++++++++++++
 22 files changed, 849 insertions(+), 177 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 4f66a361650..671e07d7556 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -323,9 +323,14 @@ Status MemTable::_sort_by_cluster_keys() {
     }
     Tie tie = Tie(0, mutable_block.rows());
 
-    for (auto i : _tablet_schema->cluster_key_idxes()) {
+    for (auto cid : _tablet_schema->cluster_key_idxes()) {
+        auto index = _tablet_schema->field_index(cid);
+        if (index == -1) {
+            return Status::InternalError("could not find cluster key column 
with unique_id=" +
+                                         std::to_string(cid) + " in tablet 
schema");
+        }
         auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int {
-            return mutable_block.compare_one_column(lhs->_row_pos, 
rhs->_row_pos, i, -1);
+            return mutable_block.compare_one_column(lhs->_row_pos, 
rhs->_row_pos, index, -1);
         };
         _sort_one_column(row_in_blocks, tie, cmp);
     }
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index cba828785d9..ab034123ac8 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -57,30 +57,6 @@
 #include "vec/olap/vertical_merge_iterator.h"
 
 namespace doris {
-namespace {
-
-// for mow with cluster key table, the key group also contains cluster key 
columns.
-// the `key_group_cluster_key_idxes` marks the positions of cluster key 
columns in key group.
-void _generate_key_group_cluster_key_idxes(const TabletSchema& tablet_schema,
-                                           std::vector<std::vector<uint32_t>>& 
column_groups,
-                                           std::vector<uint32_t>& 
key_group_cluster_key_idxes) {
-    if (column_groups.empty() || tablet_schema.cluster_key_idxes().empty()) {
-        return;
-    }
-
-    auto& key_column_group = column_groups[0];
-    for (const auto& index_in_tablet_schema : 
tablet_schema.cluster_key_idxes()) {
-        for (auto j = 0; j < key_column_group.size(); ++j) {
-            auto cid = key_column_group[j];
-            if (cid == index_in_tablet_schema) {
-                key_group_cluster_key_idxes.emplace_back(j);
-                break;
-            }
-        }
-    }
-}
-
-} // namespace
 
 Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type,
                               const TabletSchema& cur_tablet_schema,
@@ -183,7 +159,8 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, 
ReaderType reader_type,
 // split columns into several groups, make sure all keys in one group
 // unique_key should consider sequence&delete column
 void Merger::vertical_split_columns(const TabletSchema& tablet_schema,
-                                    std::vector<std::vector<uint32_t>>* 
column_groups) {
+                                    std::vector<std::vector<uint32_t>>* 
column_groups,
+                                    std::vector<uint32_t>* 
key_group_cluster_key_idxes) {
     uint32_t num_key_cols = tablet_schema.num_key_columns();
     uint32_t total_cols = tablet_schema.num_columns();
     std::vector<uint32_t> key_columns;
@@ -206,8 +183,24 @@ void Merger::vertical_split_columns(const TabletSchema& 
tablet_schema,
         }
         if (!tablet_schema.cluster_key_idxes().empty()) {
             for (const auto& cid : tablet_schema.cluster_key_idxes()) {
-                if (cid >= num_key_cols) {
-                    key_columns.emplace_back(cid);
+                auto idx = tablet_schema.field_index(cid);
+                DCHECK(idx >= 0) << "could not find cluster key column with 
unique_id=" << cid
+                                 << " in tablet schema, table_id=" << 
tablet_schema.table_id();
+                if (idx >= num_key_cols) {
+                    key_columns.emplace_back(idx);
+                }
+            }
+            // tablet schema unique ids: [1, 2, 5, 3, 6, 4], [1 2] is key 
columns
+            // cluster key unique ids: [3, 1, 4]
+            // the key_columns should be [0, 1, 3, 5]
+            // the key_group_cluster_key_idxes should be [2, 1, 3]
+            for (const auto& cid : tablet_schema.cluster_key_idxes()) {
+                auto idx = tablet_schema.field_index(cid);
+                for (auto i = 0; i < key_columns.size(); ++i) {
+                    if (idx == key_columns[i]) {
+                        key_group_cluster_key_idxes->emplace_back(i);
+                        break;
+                    }
                 }
             }
         }
@@ -218,14 +211,12 @@ void Merger::vertical_split_columns(const TabletSchema& 
tablet_schema,
     if (!key_columns.empty()) {
         column_groups->emplace_back(std::move(key_columns));
     }
-    auto&& cluster_key_idxes = tablet_schema.cluster_key_idxes();
 
     std::vector<uint32_t> value_columns;
 
     for (uint32_t i = num_key_cols; i < total_cols; ++i) {
         if (i == sequence_col_idx || i == delete_sign_idx ||
-            cluster_key_idxes.end() !=
-                    std::find(cluster_key_idxes.begin(), 
cluster_key_idxes.end(), i)) {
+            key_columns.end() != std::find(key_columns.begin(), 
key_columns.end(), i)) {
             continue;
         }
 
@@ -460,11 +451,8 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr 
tablet, ReaderType reader_t
                                       int64_t merge_way_num, Statistics* 
stats_output) {
     LOG(INFO) << "Start to do vertical compaction, tablet_id: " << 
tablet->tablet_id();
     std::vector<std::vector<uint32_t>> column_groups;
-    vertical_split_columns(tablet_schema, &column_groups);
-
     std::vector<uint32_t> key_group_cluster_key_idxes;
-    _generate_key_group_cluster_key_idxes(tablet_schema, column_groups,
-                                          key_group_cluster_key_idxes);
+    vertical_split_columns(tablet_schema, &column_groups, 
&key_group_cluster_key_idxes);
 
     vectorized::RowSourcesBuffer row_sources_buf(
             tablet->tablet_id(), dst_rowset_writer->context().tablet_path, 
reader_type);
diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h
index cb05162b3bc..7d430cde7f3 100644
--- a/be/src/olap/merger.h
+++ b/be/src/olap/merger.h
@@ -66,7 +66,8 @@ public:
 
     // for vertical compaction
     static void vertical_split_columns(const TabletSchema& tablet_schema,
-                                       std::vector<std::vector<uint32_t>>* 
column_groups);
+                                       std::vector<std::vector<uint32_t>>* 
column_groups,
+                                       std::vector<uint32_t>* 
key_group_cluster_key_idxes);
     static Status vertical_compact_one_group(
             BaseTabletSPtr tablet, ReaderType reader_type, const TabletSchema& 
tablet_schema,
             bool is_key, const std::vector<uint32_t>& column_group,
diff --git a/be/src/olap/rowset/segcompaction.cpp 
b/be/src/olap/rowset/segcompaction.cpp
index 374056f7b9d..fc8baf952c1 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -248,7 +248,9 @@ Status 
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
     }
 
     std::vector<std::vector<uint32_t>> column_groups;
-    Merger::vertical_split_columns(*ctx.tablet_schema, &column_groups);
+    std::vector<uint32_t> key_group_cluster_key_idxes;
+    Merger::vertical_split_columns(*ctx.tablet_schema, &column_groups,
+                                   &key_group_cluster_key_idxes);
     vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), 
tablet->tablet_path(),
                                                  
ReaderType::READER_SEGMENT_COMPACTION);
 
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 0e9b55d99b8..225677f5d1f 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -126,7 +126,7 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, 
uint32_t segment_id,
             _key_index_size.clear();
             _num_sort_key_columns = _tablet_schema->cluster_key_idxes().size();
             for (auto cid : _tablet_schema->cluster_key_idxes()) {
-                const auto& column = _tablet_schema->column(cid);
+                const auto& column = _tablet_schema->column_by_uid(cid);
                 _key_coders.push_back(get_key_coder(column.type()));
                 _key_index_size.push_back(column.index_length());
             }
@@ -755,17 +755,31 @@ Status SegmentWriter::append_block(const 
vectorized::Block* block, size_t row_po
             // 2. generate short key index (use cluster key)
             key_columns.clear();
             for (const auto& cid : _tablet_schema->cluster_key_idxes()) {
-                for (size_t id = 0; id < _column_writers.size(); ++id) {
-                    // olap data convertor always start from id = 0
-                    if (cid == _column_ids[id]) {
-                        auto converted_result = 
_olap_data_convertor->convert_column_data(id);
+                // find cluster key index in tablet schema
+                auto cluster_key_index = _tablet_schema->field_index(cid);
+                if (cluster_key_index == -1) {
+                    return Status::InternalError(
+                            "could not find cluster key column with 
unique_id=" +
+                            std::to_string(cid) + " in tablet schema");
+                }
+                bool found = false;
+                for (auto i = 0; i < _column_ids.size(); ++i) {
+                    if (_column_ids[i] == cluster_key_index) {
+                        auto converted_result = 
_olap_data_convertor->convert_column_data(i);
                         if (!converted_result.first.ok()) {
                             return converted_result.first;
                         }
                         key_columns.push_back(converted_result.second);
+                        found = true;
                         break;
                     }
                 }
+                if (!found) {
+                    return Status::InternalError(
+                            "could not found cluster key column with 
unique_id=" +
+                            std::to_string(cid) +
+                            ", tablet schema index=" + 
std::to_string(cluster_key_index));
+                }
             }
             RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, 
short_key_pos));
         } else if (_is_mow()) {
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 5663c3779df..4863f2c0401 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -130,7 +130,7 @@ 
VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32
             _key_index_size.clear();
             _num_sort_key_columns = _tablet_schema->cluster_key_idxes().size();
             for (auto cid : _tablet_schema->cluster_key_idxes()) {
-                const auto& column = _tablet_schema->column(cid);
+                const auto& column = _tablet_schema->column_by_uid(cid);
                 _key_coders.push_back(get_key_coder(column.type()));
                 _key_index_size.push_back(column.index_length());
             }
@@ -714,6 +714,7 @@ Status VerticalSegmentWriter::write_batch() {
 
     std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
     vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
+    // the key is cluster key column unique id
     std::map<uint32_t, vectorized::IOlapColumnDataAccessor*> cid_to_column;
     for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
         RETURN_IF_ERROR(_create_column_writer(cid, 
_tablet_schema->column(cid), _tablet_schema));
@@ -732,11 +733,12 @@ Status VerticalSegmentWriter::write_batch() {
             if (_tablet_schema->has_sequence_col() && cid == 
_tablet_schema->sequence_col_idx()) {
                 seq_column = column;
             }
+            auto column_unique_id = _tablet_schema->column(cid).unique_id();
             if (_is_mow_with_cluster_key() &&
                 std::find(_tablet_schema->cluster_key_idxes().begin(),
                           _tablet_schema->cluster_key_idxes().end(),
-                          cid) != _tablet_schema->cluster_key_idxes().end()) {
-                cid_to_column[cid] = column;
+                          column_unique_id) != 
_tablet_schema->cluster_key_idxes().end()) {
+                cid_to_column[column_unique_id] = column;
             }
             
RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), 
column->get_data(),
                                                          data.num_rows));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 3a29c0c542e..62eff357875 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -233,7 +233,6 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
                 TStorageMedium storageMedium = 
tbl.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
                 TTabletType tabletType = 
tbl.getPartitionInfo().getTabletType(partitionId);
                 MaterializedIndex rollupIndex = entry.getValue();
-
                 Map<Long, Long> tabletIdMap = 
this.partitionIdToBaseRollupTabletIdMap.get(partitionId);
                 for (Tablet rollupTablet : rollupIndex.getTablets()) {
                     long rollupTabletId = rollupTablet.getId();
@@ -276,6 +275,7 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
                         if (this.storageFormat != null) {
                             
createReplicaTask.setStorageFormat(this.storageFormat);
                         }
+                        // rollup replica does not need to set mow cluster keys
                         batchTask.addTask(createReplicaTask);
                     } // end for rollupReplicas
                 } // end for rollupTablets
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index c47344f14c5..43857b2e898 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -310,7 +310,7 @@ public class SchemaChangeHandler extends AlterHandler {
         boolean lightSchemaChange = olapTable.getEnableLightSchemaChange();
         /*
          * UNIQUE:
-         *      Can not drop any key column.
+         *      Can not drop any key column, cluster key column
          * AGGREGATION:
          *      Can not drp any key column is has value with REPLACE method
          */
@@ -844,9 +844,6 @@ public class SchemaChangeHandler extends AlterHandler {
                 if (!column.isVisible()) {
                     newSchema.add(column);
                 }
-                if (column.isClusterKey()) {
-                    throw new DdlException("Can not modify column order in 
Unique data model table");
-                }
             }
         }
         if (newSchema.size() != targetIndexSchema.size()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index f44d9416e72..c514bf6306e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -71,6 +71,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Table;
 import com.google.common.collect.Table.Cell;
 import com.google.gson.annotations.SerializedName;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -216,6 +217,20 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         partitionOriginIndexIdMap.clear();
     }
 
+    private boolean isShadowIndexOfBase(long shadowIdxId, OlapTable tbl) {
+        if 
(indexIdToName.get(shadowIdxId).startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX))
 {
+            String shadowIndexName = indexIdToName.get(shadowIdxId);
+            String indexName = shadowIndexName
+                    
.substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length());
+            long indexId = tbl.getIndexIdByName(indexName);
+            LOG.info("shadow index id: {}, shadow index name: {}, pointer to 
index id: {}, index name: {}, "
+                            + "base index id: {}, table_id: {}", shadowIdxId, 
shadowIndexName, indexId, indexName,
+                    tbl.getBaseIndexId(), tbl.getId());
+            return indexId == tbl.getBaseIndexId();
+        }
+        return false;
+    }
+
     protected void createShadowIndexReplica() throws AlterCancelException {
         Database db = Env.getCurrentInternalCatalog()
                 .getDbOrException(dbId, s -> new 
AlterCancelException("Database " + s + " does not exist"));
@@ -261,6 +276,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
                     short shadowShortKeyColumnCount = 
indexShortKeyMap.get(shadowIdxId);
                     List<Column> shadowSchema = 
indexSchemaMap.get(shadowIdxId);
+                    List<Integer> clusterKeyIndexes = null;
+                    if (shadowIdxId == tbl.getBaseIndexId() || 
isShadowIndexOfBase(shadowIdxId, tbl)) {
+                        clusterKeyIndexes = 
OlapTable.getClusterKeyIndexes(shadowSchema);
+                    }
                     int shadowSchemaHash = 
indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
                     long originIndexId = indexIdMap.get(shadowIdxId);
                     int originSchemaHash = 
tbl.getSchemaHashByIndexId(originIndexId);
@@ -309,6 +328,11 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                             }
                             
createReplicaTask.setInvertedIndexFileStorageFormat(tbl
                                                     
.getInvertedIndexFileStorageFormat());
+                            if (!CollectionUtils.isEmpty(clusterKeyIndexes)) {
+                                
createReplicaTask.setClusterKeyIndexes(clusterKeyIndexes);
+                                LOG.info("table: {}, partition: {}, index: {}, 
tablet: {}, cluster key indexes: {}",
+                                        tableId, partitionId, shadowIdxId, 
shadowTabletId, clusterKeyIndexes);
+                            }
                             batchTask.addTask(createReplicaTask);
                         } // end for rollupReplicas
                     } // end for rollupTablets
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
index 33474f8263c..625a3b3b131 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
@@ -354,6 +354,10 @@ public class ColumnDef {
         return visible;
     }
 
+    public int getClusterKeyId() {
+        return this.clusterKeyId;
+    }
+
     public void setClusterKeyId(int clusterKeyId) {
         this.clusterKeyId = clusterKeyId;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index 865489a113e..d3f37b632ca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -421,9 +421,6 @@ public class CreateTableStmt extends DdlStmt implements 
NotFallbackInParser {
 
             keysDesc.analyze(columnDefs);
             if 
(!CollectionUtils.isEmpty(keysDesc.getClusterKeysColumnNames())) {
-                if (Config.isCloudMode()) {
-                    throw new AnalysisException("Cluster key is not supported 
in cloud mode");
-                }
                 if (!enableUniqueKeyMergeOnWrite) {
                     throw new AnalysisException("Cluster keys only support 
unique keys table which enabled "
                             + 
PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE);
@@ -503,7 +500,7 @@ public class CreateTableStmt extends DdlStmt implements 
NotFallbackInParser {
                             columnDef.getType().getPrimitiveType() + " column 
can't support aggregation "
                                     + columnDef.getAggregateType());
                 }
-                if (columnDef.isKey()) {
+                if (columnDef.isKey() || columnDef.getClusterKeyId() != -1) {
                     throw new 
AnalysisException(columnDef.getType().getPrimitiveType()
                             + " can only be used in the non-key column of the 
duplicate table at present.");
                 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java
index e7359657ef2..0076ce74de3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java
@@ -34,7 +34,6 @@ public class KeysDesc implements Writable {
     private KeysType type;
     private List<String> keysColumnNames;
     private List<String> clusterKeysColumnNames;
-    private List<Integer> clusterKeysColumnIds = null;
 
     public KeysDesc() {
         this.type = KeysType.AGG_KEYS;
@@ -51,12 +50,6 @@ public class KeysDesc implements Writable {
         this.clusterKeysColumnNames = clusterKeyColumnNames;
     }
 
-    public KeysDesc(KeysType type, List<String> keysColumnNames, List<String> 
clusterKeyColumnNames,
-                    List<Integer> clusterKeysColumnIds) {
-        this(type, keysColumnNames, clusterKeyColumnNames);
-        this.clusterKeysColumnIds = clusterKeysColumnIds;
-    }
-
     public KeysType getKeysType() {
         return type;
     }
@@ -69,10 +62,6 @@ public class KeysDesc implements Writable {
         return clusterKeysColumnNames;
     }
 
-    public List<Integer> getClusterKeysColumnIds() {
-        return clusterKeysColumnIds;
-    }
-
     public boolean containsCol(String colName) {
         return keysColumnNames.contains(colName);
     }
@@ -90,17 +79,6 @@ public class KeysDesc implements Writable {
             throw new AnalysisException("The number of key columns should be 
less than the number of columns.");
         }
 
-        if (clusterKeysColumnNames != null) {
-            if (Config.isCloudMode()) {
-                throw new AnalysisException("Cluster key is not supported in 
cloud mode");
-            }
-            if (type != KeysType.UNIQUE_KEYS) {
-                throw new AnalysisException("Cluster keys only support unique 
keys table.");
-            }
-            clusterKeysColumnIds = Lists.newArrayList();
-            analyzeClusterKeys(cols);
-        }
-
         for (int i = 0; i < keysColumnNames.size(); ++i) {
             String name = cols.get(i).getName();
             if (!keysColumnNames.get(i).equalsIgnoreCase(name)) {
@@ -135,39 +113,48 @@ public class KeysDesc implements Writable {
         }
 
         if (clusterKeysColumnNames != null) {
-            int minKeySize = keysColumnNames.size() < 
clusterKeysColumnNames.size() ? keysColumnNames.size()
-                    : clusterKeysColumnNames.size();
-            boolean sameKey = true;
-            for (int i = 0; i < minKeySize; ++i) {
-                if 
(!keysColumnNames.get(i).equalsIgnoreCase(clusterKeysColumnNames.get(i))) {
-                    sameKey = false;
-                    break;
-                }
-            }
-            if (sameKey) {
-                throw new AnalysisException("Unique keys and cluster keys 
should be different.");
-            }
+            analyzeClusterKeys(cols);
         }
     }
 
     private void analyzeClusterKeys(List<ColumnDef> cols) throws 
AnalysisException {
-        for (int i = 0; i < clusterKeysColumnNames.size(); ++i) {
+        if (Config.isCloudMode()) {
+            throw new AnalysisException("Cluster key is not supported in cloud 
mode");
+        }
+        if (type != KeysType.UNIQUE_KEYS) {
+            throw new AnalysisException("Cluster keys only support unique keys 
table");
+        }
+        // check that cluster keys is not duplicated
+        for (int i = 0; i < clusterKeysColumnNames.size(); i++) {
             String name = clusterKeysColumnNames.get(i);
-            // check if key is duplicate
             for (int j = 0; j < i; j++) {
                 if (clusterKeysColumnNames.get(j).equalsIgnoreCase(name)) {
                     throw new AnalysisException("Duplicate cluster key 
column[" + name + "].");
                 }
             }
-            // check if key exists and generate key column ids
+        }
+        // check that cluster keys is not equal to primary keys
+        int minKeySize = Math.min(keysColumnNames.size(), 
clusterKeysColumnNames.size());
+        boolean sameKey = true;
+        for (int i = 0; i < minKeySize; i++) {
+            if 
(!keysColumnNames.get(i).equalsIgnoreCase(clusterKeysColumnNames.get(i))) {
+                sameKey = false;
+                break;
+            }
+        }
+        if (sameKey) {
+            throw new AnalysisException("Unique keys and cluster keys should 
be different.");
+        }
+        // check that cluster key column exists
+        for (int i = 0; i < clusterKeysColumnNames.size(); i++) {
+            String name = clusterKeysColumnNames.get(i);
             for (int j = 0; j < cols.size(); j++) {
                 if (cols.get(j).getName().equalsIgnoreCase(name)) {
-                    cols.get(j).setClusterKeyId(clusterKeysColumnIds.size());
-                    clusterKeysColumnIds.add(j);
+                    cols.get(j).setClusterKeyId(i);
                     break;
                 }
                 if (j == cols.size() - 1) {
-                    throw new AnalysisException("Key cluster column[" + name + 
"] doesn't exist.");
+                    throw new AnalysisException("Cluster key column[" + name + 
"] doesn't exist.");
                 }
             }
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 13a6d3a8051..27ad19e1762 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -100,6 +100,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Table.Cell;
 import com.google.gson.annotations.SerializedName;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -1239,6 +1240,10 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
             MaterializedIndexMeta indexMeta = 
localTbl.getIndexMetaByIndexId(restoredIdx.getId());
             List<Index> indexes = restoredIdx.getId() == 
localTbl.getBaseIndexId()
                                     ? localTbl.getCopiedIndexes() : null;
+            List<Integer> clusterKeyIndexes = null;
+            if (indexMeta.getIndexId() == localTbl.getBaseIndexId() || 
localTbl.isShadowIndex(indexMeta.getIndexId())) {
+                clusterKeyIndexes = 
OlapTable.getClusterKeyIndexes(indexMeta.getSchema());
+            }
             for (Tablet restoreTablet : restoredIdx.getTablets()) {
                 TabletMeta tabletMeta = new TabletMeta(db.getId(), 
localTbl.getId(), restorePart.getId(),
                         restoredIdx.getId(), indexMeta.getSchemaHash(), 
TStorageMedium.HDD);
@@ -1282,6 +1287,12 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
                         LOG.info("set base tablet {} for replica {} in restore 
job {}, tablet id={}",
                                 baseTablet.first, restoreReplica.getId(), 
jobId, restoreTablet.getId());
                     }
+                    if (!CollectionUtils.isEmpty(clusterKeyIndexes)) {
+                        task.setClusterKeyIndexes(clusterKeyIndexes);
+                        LOG.info("table: {}, partition: {}, index: {}, tablet: 
{}, cluster key indexes: {}",
+                                localTbl.getId(), restorePart.getId(), 
restoredIdx.getId(), restoreTablet.getId(),
+                                clusterKeyIndexes);
+                    }
                     batchTask.addTask(task);
                 }
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 20737d9a035..9728a9e4154 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -110,6 +110,7 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
@@ -3131,6 +3132,16 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
         }
     }
 
+    public static List<Integer> getClusterKeyIndexes(List<Column> columns) {
+        Map<Integer, Integer> clusterKeyIndexes = new TreeMap<>();
+        for (Column column : columns) {
+            if (column.isClusterKey()) {
+                clusterKeyIndexes.put(column.getClusterKeyId(), 
column.getUniqueId());
+            }
+        }
+        return clusterKeyIndexes.isEmpty() ? null : new 
ArrayList<>(clusterKeyIndexes.values());
+    }
+
     public long getVisibleVersionTime() {
         return tableAttributes.getVisibleVersionTime();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index c1c58f7b898..78044f2190d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -101,8 +101,7 @@ public class CloudInternalCatalog extends InternalCatalog {
                                                    String storagePolicy,
                                                    IdGeneratorBuffer 
idGeneratorBuffer,
                                                    BinlogConfig binlogConfig,
-                                                   boolean 
isStorageMediumSpecified,
-                                                   List<Integer> 
clusterKeyIndexes)
+                                                   boolean 
isStorageMediumSpecified)
             throws DdlException {
         // create base index first.
         Preconditions.checkArgument(tbl.getBaseIndexId() != -1);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 77fe701f204..03c33a21e94 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1770,8 +1770,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                     singlePartitionDesc.isInMemory(),
                     singlePartitionDesc.getTabletType(),
                     storagePolicy, idGeneratorBuffer,
-                    binlogConfig, dataProperty.isStorageMediumSpecified(), 
null);
-            // TODO cluster key ids
+                    binlogConfig, dataProperty.isStorageMediumSpecified());
 
             // check again
             olapTable = db.getOlapTableOrDdlException(tableName);
@@ -2086,8 +2085,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                                                    String storagePolicy,
                                                    IdGeneratorBuffer 
idGeneratorBuffer,
                                                    BinlogConfig binlogConfig,
-                                                   boolean 
isStorageMediumSpecified,
-                                                   List<Integer> 
clusterKeyIndexes)
+                                                   boolean 
isStorageMediumSpecified)
             throws DdlException {
         // create base index first.
         Preconditions.checkArgument(tbl.getBaseIndexId() != -1);
@@ -2145,6 +2143,11 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             short shortKeyColumnCount = indexMeta.getShortKeyColumnCount();
             TStorageType storageType = indexMeta.getStorageType();
             List<Column> schema = indexMeta.getSchema();
+            List<Integer> clusterKeyIndexes = null;
+            if (indexId == tbl.getBaseIndexId()) {
+                // only base and shadow index need cluster key indexes
+                clusterKeyIndexes = OlapTable.getClusterKeyIndexes(schema);
+            }
             KeysType keysType = indexMeta.getKeysType();
             List<Index> indexes = indexId == tbl.getBaseIndexId() ? 
tbl.getCopiedIndexes() : null;
             int totalTaskNum = index.getTablets().size() * totalReplicaNum;
@@ -2176,7 +2179,11 @@ public class InternalCatalog implements 
CatalogIf<Database> {
 
                     task.setStorageFormat(tbl.getStorageFormat());
                     
task.setInvertedIndexFileStorageFormat(tbl.getInvertedIndexFileStorageFormat());
-                    task.setClusterKeyIndexes(clusterKeyIndexes);
+                    if (!CollectionUtils.isEmpty(clusterKeyIndexes)) {
+                        task.setClusterKeyIndexes(clusterKeyIndexes);
+                        LOG.info("table: {}, partition: {}, index: {}, tablet: 
{}, cluster key indexes: {}",
+                                tbl.getId(), partitionId, indexId, tabletId, 
clusterKeyIndexes);
+                    }
                     batchTask.addTask(task);
                     // add to AgentTaskQueue for handling finish report.
                     // not for resending task
@@ -2649,8 +2656,8 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         olapTable.setRowStorePageSize(rowStorePageSize);
 
         // check data sort properties
-        int keyColumnSize = 
CollectionUtils.isEmpty(keysDesc.getClusterKeysColumnIds()) ? 
keysDesc.keysColumnSize() :
-                keysDesc.getClusterKeysColumnIds().size();
+        int keyColumnSize = 
CollectionUtils.isEmpty(keysDesc.getClusterKeysColumnNames()) ? 
keysDesc.keysColumnSize() :
+                keysDesc.getClusterKeysColumnNames().size();
         DataSortInfo dataSortInfo = 
PropertyAnalyzer.analyzeDataSortInfo(properties, keysType,
                 keyColumnSize, storageFormat);
         olapTable.setDataSortInfo(dataSortInfo);
@@ -2662,6 +2669,10 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             } catch (AnalysisException e) {
                 throw new DdlException(e.getMessage());
             }
+            if (enableUniqueKeyMergeOnWrite && !enableLightSchemaChange && 
!CollectionUtils.isEmpty(
+                    keysDesc.getClusterKeysColumnNames())) {
+                throw new DdlException("Unique merge-on-write table with 
cluster keys must enable light schema change");
+            }
         }
         olapTable.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite);
 
@@ -2990,18 +3001,15 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             throw new DdlException(e.getMessage());
         }
 
-        // analyse group commit interval ms
-        int groupCommitIntervalMs;
         try {
-            groupCommitIntervalMs = 
PropertyAnalyzer.analyzeGroupCommitIntervalMs(properties);
+            int groupCommitIntervalMs = 
PropertyAnalyzer.analyzeGroupCommitIntervalMs(properties);
             olapTable.setGroupCommitIntervalMs(groupCommitIntervalMs);
         } catch (Exception e) {
             throw new DdlException(e.getMessage());
         }
 
-        int groupCommitDataBytes;
         try {
-            groupCommitDataBytes = 
PropertyAnalyzer.analyzeGroupCommitDataBytes(properties);
+            int groupCommitDataBytes = 
PropertyAnalyzer.analyzeGroupCommitDataBytes(properties);
             olapTable.setGroupCommitDataBytes(groupCommitDataBytes);
         } catch (Exception e) {
             throw new DdlException(e.getMessage());
@@ -3057,8 +3065,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                         storagePolicy,
                         idGeneratorBuffer,
                         binlogConfigForTask,
-                        
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified(),
-                        keysDesc.getClusterKeysColumnIds());
+                        
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified());
                 afterCreatePartitions(db.getId(), olapTable.getId(), null,
                         olapTable.getIndexIdList(), true);
                 olapTable.addPartition(partition);
@@ -3142,8 +3149,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                             partitionInfo.getTabletType(entry.getValue()),
                             partionStoragePolicy, idGeneratorBuffer,
                             binlogConfigForTask,
-                            dataProperty.isStorageMediumSpecified(),
-                            keysDesc.getClusterKeysColumnIds());
+                            dataProperty.isStorageMediumSpecified());
                     olapTable.addPartition(partition);
                     
olapTable.getPartitionInfo().getDataProperty(partition.getId())
                             .setStoragePolicy(partionStoragePolicy);
@@ -3566,14 +3572,6 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                 Env.getCurrentInvertedIndex().deleteTablet(tabletId);
             }
         };
-        Map<Integer, Integer> clusterKeyMap = new TreeMap<>();
-        for (int i = 0; i < olapTable.getBaseSchema().size(); i++) {
-            Column column = olapTable.getBaseSchema().get(i);
-            if (column.getClusterKeyId() != -1) {
-                clusterKeyMap.put(column.getClusterKeyId(), i);
-            }
-        }
-        List<Integer> clusterKeyIdxes = 
clusterKeyMap.values().stream().collect(Collectors.toList());
         try {
             long bufferSize = 
IdGeneratorUtil.getBufferSizeForTruncateTable(copiedTbl, 
origPartitions.values());
             IdGeneratorBuffer idGeneratorBuffer =
@@ -3609,8 +3607,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                         
copiedTbl.getPartitionInfo().getTabletType(oldPartitionId),
                         
olapTable.getPartitionInfo().getDataProperty(oldPartitionId).getStoragePolicy(),
                         idGeneratorBuffer, binlogConfig,
-                        
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified(),
-                        clusterKeyIdxes);
+                        
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified());
                 newPartitions.add(newPartition);
             }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index f7702a49554..a4a5273e8ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -96,6 +96,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -964,10 +965,19 @@ public class ReportHandler extends Daemon {
                                             objectPool,
                                             olapTable.rowStorePageSize(),
                                             
olapTable.variantEnableFlattenNested());
-
                                     createReplicaTask.setIsRecoverTask(true);
                                     
createReplicaTask.setInvertedIndexFileStorageFormat(olapTable
                                                                 
.getInvertedIndexFileStorageFormat());
+                                    if (indexId == olapTable.getBaseIndexId() 
|| olapTable.isShadowIndex(indexId)) {
+                                        List<Integer> clusterKeyIndexes = 
OlapTable.getClusterKeyIndexes(
+                                                indexMeta.getSchema());
+                                        if 
(!CollectionUtils.isEmpty(clusterKeyIndexes)) {
+                                            
createReplicaTask.setClusterKeyIndexes(clusterKeyIndexes);
+                                            LOG.info("table: {}, partition: 
{}, index: {}, tablet: {}, "
+                                                            + "cluster key 
indexes: {}", tableId, partitionId, indexId,
+                                                    tabletId, 
clusterKeyIndexes);
+                                        }
+                                    }
                                     
createReplicaBatchTask.addTask(createReplicaTask);
                                 } else {
                                     // just set this replica as bad
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
index 730c6f115a3..04ce3786bb6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
@@ -131,7 +131,6 @@ public class CreateTableInfo {
     private boolean isExternal = false;
     private String clusterName = null;
     private List<String> clusterKeysColumnNames = null;
-    private List<Integer> clusterKeysColumnIds = null;
     private PartitionTableInfo partitionTableInfo; // get when validate
 
     /**
@@ -424,9 +423,6 @@ public class CreateTableInfo {
 
             validateKeyColumns();
             if (!clusterKeysColumnNames.isEmpty()) {
-                if (Config.isCloudMode()) {
-                    throw new AnalysisException("Cluster key is not supported 
in cloud mode");
-                }
                 if (!isEnableMergeOnWrite) {
                     throw new AnalysisException(
                             "Cluster keys only support unique keys table which 
enabled "
@@ -736,50 +732,6 @@ public class CreateTableInfo {
                     "The number of key columns should be less than the number 
of columns.");
         }
 
-        if (!clusterKeysColumnNames.isEmpty()) {
-            if (Config.isCloudMode()) {
-                throw new AnalysisException("Cluster key is not supported in 
cloud mode");
-            }
-            if (keysType != KeysType.UNIQUE_KEYS) {
-                throw new AnalysisException("Cluster keys only support unique 
keys table.");
-            }
-            clusterKeysColumnIds = Lists.newArrayList();
-            for (int i = 0; i < clusterKeysColumnNames.size(); ++i) {
-                String name = clusterKeysColumnNames.get(i);
-                // check if key is duplicate
-                for (int j = 0; j < i; j++) {
-                    if (clusterKeysColumnNames.get(j).equalsIgnoreCase(name)) {
-                        throw new AnalysisException("Duplicate cluster key 
column[" + name + "].");
-                    }
-                }
-                // check if key exists and generate key column ids
-                for (int j = 0; j < columns.size(); j++) {
-                    if (columns.get(j).getName().equalsIgnoreCase(name)) {
-                        
columns.get(j).setClusterKeyId(clusterKeysColumnIds.size());
-                        clusterKeysColumnIds.add(j);
-                        break;
-                    }
-                    if (j == columns.size() - 1) {
-                        throw new AnalysisException(
-                                "Key cluster column[" + name + "] doesn't 
exist.");
-                    }
-                }
-            }
-
-            int minKeySize = keys.size() < clusterKeysColumnNames.size() ? 
keys.size()
-                    : clusterKeysColumnNames.size();
-            boolean sameKey = true;
-            for (int i = 0; i < minKeySize; ++i) {
-                if 
(!keys.get(i).equalsIgnoreCase(clusterKeysColumnNames.get(i))) {
-                    sameKey = false;
-                    break;
-                }
-            }
-            if (sameKey) {
-                throw new AnalysisException("Unique keys and cluster keys 
should be different.");
-            }
-        }
-
         for (int i = 0; i < keys.size(); ++i) {
             String name = columns.get(i).getName();
             if (!keys.get(i).equalsIgnoreCase(name)) {
@@ -815,6 +767,50 @@ public class CreateTableInfo {
                 }
             }
         }
+
+        if (!clusterKeysColumnNames.isEmpty()) {
+            // the same code as KeysDesc#analyzeClusterKeys
+            if (Config.isCloudMode()) {
+                throw new AnalysisException("Cluster key is not supported in 
cloud mode");
+            }
+            if (keysType != KeysType.UNIQUE_KEYS) {
+                throw new AnalysisException("Cluster keys only support unique 
keys table");
+            }
+            // check that cluster keys is not duplicated
+            for (int i = 0; i < clusterKeysColumnNames.size(); i++) {
+                String name = clusterKeysColumnNames.get(i);
+                for (int j = 0; j < i; j++) {
+                    if (clusterKeysColumnNames.get(j).equalsIgnoreCase(name)) {
+                        throw new AnalysisException("Duplicate cluster key 
column[" + name + "].");
+                    }
+                }
+            }
+            // check that cluster keys is not equal to primary keys
+            int minKeySize = Math.min(keys.size(), 
clusterKeysColumnNames.size());
+            boolean sameKey = true;
+            for (int i = 0; i < minKeySize; ++i) {
+                if 
(!keys.get(i).equalsIgnoreCase(clusterKeysColumnNames.get(i))) {
+                    sameKey = false;
+                    break;
+                }
+            }
+            if (sameKey) {
+                throw new AnalysisException("Unique keys and cluster keys 
should be different.");
+            }
+            // check that cluster key column exists
+            for (int i = 0; i < clusterKeysColumnNames.size(); ++i) {
+                String name = clusterKeysColumnNames.get(i);
+                for (int j = 0; j < columns.size(); j++) {
+                    if (columns.get(j).getName().equalsIgnoreCase(name)) {
+                        columns.get(j).setClusterKeyId(i);
+                        break;
+                    }
+                    if (j == columns.size() - 1) {
+                        throw new AnalysisException("Cluster key column[" + 
name + "] doesn't exist.");
+                    }
+                }
+            }
+        }
     }
 
     /**
@@ -858,7 +854,7 @@ public class CreateTableInfo {
         return new CreateTableStmt(ifNotExists, isExternal,
                 new TableName(ctlName, dbName, tableName),
                 catalogColumns, catalogIndexes, engineName,
-                new KeysDesc(keysType, keys, clusterKeysColumnNames, 
clusterKeysColumnIds),
+                new KeysDesc(keysType, keys, clusterKeysColumnNames),
                 partitionDesc, distributionDesc, Maps.newHashMap(properties), 
extProperties,
                 comment, addRollups, null);
     }
diff --git 
a/regression-test/data/unique_with_mow_c_p0/test_schema_change_ck.out 
b/regression-test/data/unique_with_mow_c_p0/test_schema_change_ck.out
new file mode 100644
index 00000000000..50028960ab1
--- /dev/null
+++ b/regression-test/data/unique_with_mow_c_p0/test_schema_change_ck.out
@@ -0,0 +1,365 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_original --
+11     28      38
+10     29      39
+
+-- !select_add_c4 --
+11     28      38      \N
+10     29      39      \N
+13     27      36      40
+12     26      37      40
+
+-- !select_add_c5 --
+11     \N      28      38      \N
+10     \N      29      39      \N
+13     \N      27      36      40
+12     \N      26      37      40
+15     50      20      34      40
+14     50      20      35      40
+
+-- !select_add_c6 --
+11     \N      28      \N      38      \N
+10     \N      29      \N      39      \N
+13     \N      27      \N      36      40
+12     \N      26      \N      37      40
+15     50      20      \N      34      40
+14     50      20      \N      35      40
+17     50      20      60      32      40
+16     50      20      60      33      40
+
+-- !select_add_k2 --
+11     \N      \N      28      \N      38      \N
+10     \N      \N      29      \N      39      \N
+13     \N      \N      27      \N      36      40
+12     \N      \N      26      \N      37      40
+15     \N      50      20      \N      34      40
+14     \N      50      20      \N      35      40
+17     \N      50      20      60      32      40
+16     \N      50      20      60      33      40
+19     200     \N      20      \N      30      \N
+18     200     \N      20      \N      31      \N
+
+-- !select_drop_c4 --
+11     \N      \N      28      \N      38
+10     \N      \N      29      \N      39
+13     \N      \N      27      \N      36
+12     \N      \N      26      \N      37
+15     \N      50      20      \N      34
+14     \N      50      20      \N      35
+17     \N      50      20      60      32
+16     \N      50      20      60      33
+19     200     \N      20      \N      30
+18     200     \N      20      \N      31
+119    200     \N      20      \N      30
+118    200     \N      20      \N      31
+
+-- !select_drop_c5 --
+11     \N      28      \N      38
+10     \N      29      \N      39
+13     \N      27      \N      36
+12     \N      26      \N      37
+15     \N      20      \N      34
+14     \N      20      \N      35
+17     \N      20      60      32
+16     \N      20      60      33
+19     200     20      \N      30
+18     200     20      \N      31
+119    200     20      \N      30
+118    200     20      \N      31
+117    200     20      \N      32
+116    200     20      \N      33
+
+-- !select_drop_c6 --
+11     \N      28      38
+10     \N      29      39
+13     \N      27      36
+12     \N      26      37
+15     \N      20      34
+14     \N      20      35
+17     \N      20      32
+16     \N      20      33
+19     200     20      30
+18     200     20      31
+119    200     20      30
+118    200     20      31
+117    200     20      32
+116    200     20      33
+115    200     25      34
+114    200     24      35
+
+-- !select_reorder --
+11     \N      38      28
+10     \N      39      29
+13     \N      36      27
+12     \N      37      26
+15     \N      34      20
+14     \N      35      20
+17     \N      32      20
+16     \N      33      20
+19     200     30      20
+18     200     31      20
+119    200     30      20
+118    200     31      20
+117    200     32      20
+116    200     33      20
+115    200     34      25
+114    200     35      24
+113    200     36      23
+112    200     37      22
+
+-- !select_modify_k2 --
+11     \N      38      28
+10     \N      39      29
+13     \N      36      27
+12     \N      37      26
+15     \N      34      20
+14     \N      35      20
+17     \N      32      20
+16     \N      33      20
+19     200     30      20
+18     200     31      20
+119    200     30      20
+118    200     31      20
+117    200     32      20
+116    200     33      20
+115    200     34      25
+114    200     35      24
+113    200     36      23
+112    200     37      22
+111    200     38      21
+110    200     39      20
+
+-- !select_create_mv_base --
+11     \N      38      28
+10     \N      39      29
+13     \N      36      27
+12     \N      37      26
+15     \N      34      20
+14     \N      35      20
+17     \N      32      20
+16     \N      33      20
+19     200     30      20
+18     200     31      20
+119    200     30      20
+118    200     31      20
+117    200     32      20
+116    200     33      20
+115    200     34      25
+114    200     35      24
+113    200     36      23
+112    200     37      22
+111    200     38      21
+110    200     39      20
+211    200     38      21
+210    200     39      20
+
+-- !select_create_mv_mv --
+10     39
+11     38
+12     37
+13     36
+14     35
+15     34
+16     33
+17     32
+18     31
+19     30
+118    31
+119    30
+116    33
+117    32
+114    35
+115    34
+112    37
+113    36
+110    39
+111    38
+210    39
+211    38
+
+-- !select_create_rollup_base --
+11     \N      38      28
+10     \N      39      29
+13     \N      36      27
+12     \N      37      26
+15     \N      34      20
+14     \N      35      20
+17     \N      32      20
+16     \N      33      20
+19     200     30      20
+18     200     31      20
+119    200     30      20
+118    200     31      20
+117    200     32      20
+116    200     33      20
+115    200     34      25
+114    200     35      24
+113    200     36      23
+112    200     37      22
+111    200     38      21
+110    200     39      20
+211    200     38      21
+210    200     39      20
+311    200     38      21
+310    200     39      20
+
+-- !select_create_rollup_roll --
+\N     10      29
+\N     11      28
+\N     12      26
+\N     13      27
+\N     14      20
+\N     15      20
+\N     16      20
+\N     17      20
+200    18      20
+200    19      20
+200    118     20
+200    119     20
+200    116     20
+200    117     20
+200    114     24
+200    115     25
+200    112     22
+200    113     23
+200    110     20
+200    111     21
+200    210     20
+200    211     21
+200    310     20
+200    311     21
+
+-- !select_add_partition --
+10011  200     38      21
+10010  200     39      20
+11     \N      38      28
+10     \N      39      29
+13     \N      36      27
+12     \N      37      26
+15     \N      34      20
+14     \N      35      20
+17     \N      32      20
+16     \N      33      20
+19     200     30      20
+18     200     31      20
+119    200     30      20
+118    200     31      20
+117    200     32      20
+116    200     33      20
+115    200     34      25
+114    200     35      24
+113    200     36      23
+112    200     37      22
+111    200     38      21
+110    200     39      20
+211    200     38      21
+210    200     39      20
+311    200     38      21
+310    200     39      20
+
+-- !select_truncate --
+13     \N      36      27
+12     \N      37      26
+11     \N      38      28
+10     \N      39      29
+
+-- !select_rollup_base --
+12     22      31      41      51
+11     21      32      42      52
+
+-- !select_rollup_roll --
+21     11      42      32
+22     12      41      31
+
+-- !select_rollup_base_sc --
+12     22      31      41      51
+11     21      32      42      52
+
+-- !select_rollup_roll_sc --
+21     11      42      32
+22     12      41      31
+
+-- !select_rollup_base_sc1 --
+12     22      31      41      51
+11     21      32      42      52
+14     24      33      43      53
+13     23      34      44      54
+
+-- !select_rollup_roll_sc1 --
+21     11      42      32
+22     12      41      31
+23     13      44      34
+24     14      43      33
+
+-- !select_restore_base2 --
+12     22      31      41      51
+11     21      32      42      52
+14     24      33      43      53
+13     23      34      44      54
+16     26      33      43      53
+15     25      34      44      54
+
+-- !select_restore_roll2 --
+21     11      42      32
+22     12      41      31
+23     13      44      34
+24     14      43      33
+25     15      44      34
+26     16      43      33
+
+-- !select_restore_base --
+12     22      31      41      51
+11     21      32      42      52
+14     24      33      43      53
+13     23      34      44      54
+
+-- !select_restore_roll --
+21     11      42      32
+22     12      41      31
+23     13      44      34
+24     14      43      33
+
+-- !select_restore_base1 --
+12     22      31      41      51
+11     21      32      42      52
+14     24      33      43      53
+13     23      34      44      54
+18     28      33      43      53
+17     27      34      44      54
+
+-- !select_restore_roll1 --
+21     11      42      32
+22     12      41      31
+23     13      44      34
+24     14      43      33
+27     17      44      34
+28     18      43      33
+
+-- !select_restore_base2 --
+12     22      31      41      51
+11     21      32      42      52
+14     24      33      43      53
+13     23      34      44      54
+
+-- !select_restore_roll2 --
+21     11      42      32
+22     12      41      31
+23     13      44      34
+24     14      43      33
+
+-- !select_restore_base3 --
+12     22      31      41      51
+11     21      32      42      52
+14     24      33      43      53
+13     23      34      44      54
+18     28      33      43      53
+17     27      34      44      54
+
+-- !select_restore_roll4 --
+21     11      42      32
+22     12      41      31
+23     13      44      34
+24     14      43      33
+27     17      44      34
+28     18      43      33
+
diff --git 
a/regression-test/suites/unique_with_mow_c_p0/test_create_table.groovy 
b/regression-test/suites/unique_with_mow_c_p0/test_create_table.groovy
index 8cd7cb6d198..0abb7d8f1a9 100644
--- a/regression-test/suites/unique_with_mow_c_p0/test_create_table.groovy
+++ b/regression-test/suites/unique_with_mow_c_p0/test_create_table.groovy
@@ -81,7 +81,7 @@ suite("test_create_table") {
                     "enable_unique_key_merge_on_write" = "true"
              );
         """
-        exception "Key cluster column[c_addresses] doesn't exist"
+        exception "Cluster key column[c_addresses] doesn't exist"
     }
 
     // mow unique table with duplicate cluster keys
diff --git 
a/regression-test/suites/unique_with_mow_c_p0/test_schema_change.groovy 
b/regression-test/suites/unique_with_mow_c_p0/test_schema_change.groovy
index 9abee82f7c0..37c96e79a6b 100644
--- a/regression-test/suites/unique_with_mow_c_p0/test_schema_change.groovy
+++ b/regression-test/suites/unique_with_mow_c_p0/test_schema_change.groovy
@@ -48,7 +48,7 @@ suite("test_schema_change") {
             `min_dwell_time` INT DEFAULT "99999" COMMENT "用户最小停留时间")
         UNIQUE KEY(`user_id`, `date`, `city`, `age`, `sex`)
         CLUSTER BY(`cost`, `comment`)
-        DISTRIBUTED BY HASH(`user_id`)
+        DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
         PROPERTIES ( "replication_num" = "1",
                      "enable_unique_key_merge_on_write" = "true"
         );
@@ -237,12 +237,12 @@ suite("test_schema_change") {
     }
 
     // 5. modify column order should success (Temporarily throw exception)
-    test {
+    /*test {
         sql """
             alter table ${tableName} ORDER BY (`user_id`, `date`, `city`, 
`age`, `sex`, `max_dwell_time`, `comment`, `min_dwell_time`, 
`last_visit_date_not_null`, `cost`, `score`, `last_update_date`);
         """
         exception "Can not modify column order in Unique data model table"
-    }
+    }*/
     /*assertTrue(getAlterTableState(), "alter column order should success");
     {
         sql """ INSERT INTO ${tableName}
diff --git 
a/regression-test/suites/unique_with_mow_c_p0/test_schema_change_ck.groovy 
b/regression-test/suites/unique_with_mow_c_p0/test_schema_change_ck.groovy
new file mode 100644
index 00000000000..840badb6310
--- /dev/null
+++ b/regression-test/suites/unique_with_mow_c_p0/test_schema_change_ck.groovy
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_schema_change_ck") {
+    def db = "regression_test_unique_with_mow_c_p0"
+    def tableName = "test_schema_change_ck"
+
+    def getAlterTableState = {
+        waitForSchemaChangeDone {
+            sql """ SHOW ALTER TABLE COLUMN WHERE tablename='${tableName}' 
ORDER BY createtime DESC LIMIT 1 """
+            time 600
+        }
+        return true
+    }
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    test {
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `c1` int(11) NULL, 
+                `c2` int(11) NULL, 
+                `c3` int(11) NULL
+            ) unique KEY(`c1`) 
+            cluster by(`c3`, `c2`) 
+            DISTRIBUTED BY HASH(`c1`) BUCKETS 1
+            PROPERTIES (
+                "replication_num" = "1",
+                "disable_auto_compaction" = "true",
+                "light_schema_change" = "false"
+            );
+        """
+        exception "Unique merge-on-write table with cluster keys must enable 
light schema change"
+    }
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `c1` int(11) NULL, 
+            `c2` int(11) NULL, 
+            `c3` int(11) NULL
+        ) unique KEY(`c1`)
+        cluster by(`c3`, `c2`)
+        PARTITION BY RANGE(`c1`)
+        ( 
+            PARTITION `p_10000` VALUES [("0"), ("10000")) 
+        )
+        DISTRIBUTED BY HASH(`c1`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "disable_auto_compaction" = "true"
+        );
+    """
+
+    sql """ INSERT INTO ${tableName} VALUES (11, 28, 38), (10, 29, 39) """
+    qt_select_original """select * from ${tableName}"""
+
+    /****** add value column ******/
+    // after cluster key
+    sql """ alter table ${tableName} ADD column c4 int(11) after c3; """
+    assertTrue(getAlterTableState(), "add column should success")
+    sql """ INSERT INTO ${tableName}(c1, c2, c3, c4) VALUES (13, 27, 36, 40), 
(12, 26, 37, 40) """
+    qt_select_add_c4 """select * from ${tableName}"""
+
+    // before cluster key
+    sql """ alter table ${tableName} ADD column c5 int(11) after c1; """
+    assertTrue(getAlterTableState(), "add column should success")
+    sql """ INSERT INTO ${tableName}(c1, c2, c3, c4, c5) VALUES (15, 20, 34, 
40, 50), (14, 20, 35, 40, 50) """
+    qt_select_add_c5 """select * from ${tableName}"""
+
+    // in the middle of cluster key
+    sql """ alter table ${tableName} ADD column c6 int(11) after c2; """
+    assertTrue(getAlterTableState(), "add column should success")
+    sql """ INSERT INTO ${tableName}(c1, c2, c3, c4, c5, c6) VALUES (17, 20, 
32, 40, 50, 60), (16, 20, 33, 40, 50, 60) """
+    qt_select_add_c6 """select * from ${tableName}"""
+
+    /****** add key column ******/
+    sql """ alter table ${tableName} ADD column k2 int(11) key after c1; """
+    assertTrue(getAlterTableState(), "add column should success")
+    sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (19, 20, 30, 200), 
(18, 20, 31, 200) """
+    qt_select_add_k2 """select * from ${tableName}"""
+
+    /****** TODO add cluster key column is not supported ******/
+
+    /****** drop value column ******/
+    sql """ alter table ${tableName} drop column c4; """
+    assertTrue(getAlterTableState(), "drop column should success")
+    sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (119, 20, 30, 
200), (118, 20, 31, 200) """
+    qt_select_drop_c4 """select * from ${tableName}"""
+
+    sql """ alter table ${tableName} drop column c5; """
+    assertTrue(getAlterTableState(), "drop column should success")
+    sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (117, 20, 32, 
200), (116, 20, 33, 200) """
+    qt_select_drop_c5 """select * from ${tableName}"""
+
+    sql """ alter table ${tableName} drop column c6; """
+    assertTrue(getAlterTableState(), "drop column should success")
+    sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (115, 25, 34, 
200), (114, 24, 35, 200) """
+    qt_select_drop_c6 """select * from ${tableName}"""
+
+    /****** drop key column ******/
+    test {
+        sql """ alter table ${tableName} drop column k2; """
+        exception "Can not drop key column in Unique data model table"
+    }
+
+    /****** TODO does not support drop cluster key ******/
+    test {
+        sql """ alter table ${tableName} drop column c3; """
+        exception "Can not drop cluster key column in Unique data model table"
+    }
+
+    /****** reorder ******/
+    sql """ alter table ${tableName} order by(c1, k2, c3, c2); """
+    assertTrue(getAlterTableState(), "reorder should success")
+    sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (113, 23, 36, 
200), (112, 22, 37, 200) """
+    qt_select_reorder """select * from ${tableName}"""
+
+    /****** modify key column data type ******/
+    sql """ alter table ${tableName} modify column k2 BIGINT key; """
+    assertTrue(getAlterTableState(), "modify should success")
+    sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (111, 21, 38, 
200), (110, 20, 39, 200) """
+    qt_select_modify_k2 """select * from ${tableName}"""
+
+    /****** TODO does not support modify cluster key column data type ******/
+    test {
+        sql """ alter table ${tableName} modify column c2 BIGINT; """
+        exception "Can not modify cluster key column"
+    }
+
+    /****** create mv ******/
+    def mv_name = "k2_c3"
+    sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name}"""
+    createMV """ create materialized view ${mv_name} as select c1, c3 from 
${tableName}; """
+    sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (211, 21, 38, 
200), (210, 20, 39, 200) """
+    qt_select_create_mv_base """select * from ${tableName}"""
+    qt_select_create_mv_mv """select c1, c3 from ${tableName}"""
+
+    /****** create rollup ******/
+    sql """ alter table ${tableName} ADD ROLLUP r1(k2, c1, c2); """
+    waitForSchemaChangeDone {
+        sql """show alter table rollup where tablename='${tableName}' order by 
createtime desc limit 1"""
+        time 600
+    }
+    sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (311, 21, 38, 
200), (310, 20, 39, 200) """
+    qt_select_create_rollup_base """select * from ${tableName}"""
+    qt_select_create_rollup_roll """select k2, c1, c2 from ${tableName}"""
+
+    /****** add partition ******/
+    sql "ALTER TABLE ${tableName} ADD PARTITION p_20000 VALUES [('10000'), 
('20000'));"
+    for (int i = 0; i < 10; i++) {
+        List<List<Object>> partitions = sql "show partitions from 
${tableName};"
+        logger.info("partitions: ${partitions}")
+        if (partitions.size() < 2 && i < 10) {
+            sleep(50)
+            continue
+        }
+        assertEquals(partitions.size(), 2)
+    }
+    sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (10011, 21, 38, 
200), (10010, 20, 39, 200) """
+    qt_select_add_partition """select * from ${tableName}"""
+
+    /****** one sql contain multi column changes ******/
+
+    /****** truncate table ******/
+    sql """ TRUNCATE TABLE ${tableName} """
+    sql """ INSERT INTO ${tableName}(c1, c2, c3) VALUES (11, 28, 38), (10, 29, 
39), (12, 26, 37), (13, 27, 36) """
+    qt_select_truncate """select * from ${tableName}"""
+
+    /****** create table with rollup ******/
+    tableName = tableName + "_rollup"
+    sql """ DROP TABLE IF EXISTS ${tableName}; """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` int(11) NULL, 
+            `k2` int(11) NULL,
+            `c3` int(11) NULL, 
+            `c4` int(11) NULL,
+            `c5` int(11) NULL
+        ) unique KEY(`k1`, `k2`)
+        cluster by(`c4`, `c5`)
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+        ROLLUP (
+            r1 (k2, k1, c4, c3)
+        )
+        PROPERTIES (
+            "replication_num" = "1",
+            "disable_auto_compaction" = "true"
+        );
+    """
+    sql """ INSERT INTO ${tableName} VALUES (11, 21, 32, 42, 52), (12, 22, 31, 
41, 51); """
+    qt_select_rollup_base """select * from ${tableName};"""
+    qt_select_rollup_roll """select k2, k1, c4, c3 from ${tableName};"""
+
+    /****** specify index, not base index ******/
+    sql """ ALTER TABLE ${tableName} ORDER BY(k2, k1, c3, c4) from r1; """
+    assertTrue(getAlterTableState(), "reorder rollup should success")
+    qt_select_rollup_base_sc """select * from ${tableName};"""
+    qt_select_rollup_roll_sc """select k2, k1, c4, c3 from ${tableName};"""
+    sql """ INSERT INTO ${tableName} VALUES (13, 23, 34, 44, 54), (14, 24, 33, 
43, 53); """
+    qt_select_rollup_base_sc1 """select * from ${tableName};"""
+    qt_select_rollup_roll_sc1 """select k2, k1, c4, c3 from ${tableName};"""
+
+    /****** backup restore ******/
+    if (!isCloudMode()) {
+        def repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
+        def backup = tableName + "_bak"
+        def syncer = getSyncer()
+        syncer.createS3Repository(repoName)
+        def result = sql """ show tablets from ${tableName}; """
+        logger.info("tablets 0: ${result}")
+
+        // backup
+        sql """ BACKUP SNAPSHOT ${context.dbName}.${backup} TO ${repoName} ON 
(${tableName}) properties("type"="full"); """
+        syncer.waitSnapshotFinish()
+        def snapshot = syncer.getSnapshotTimestamp(repoName, backup)
+        assertTrue(snapshot != null)
+        sql """ INSERT INTO ${tableName} VALUES (15, 25, 34, 44, 54), (16, 26, 
33, 43, 53); """
+        qt_select_restore_base2 """select * from ${tableName};"""
+        qt_select_restore_roll2 """select k2, k1, c4, c3 from ${tableName};"""
+
+        // restore
+        logger.info(""" RESTORE SNAPSHOT ${context.dbName}.${backup} FROM 
`${repoName}` ON (`${tableName}`) PROPERTIES ("backup_timestamp" = 
"${snapshot}","replication_num" = "1" ) """)
+        sql """ RESTORE SNAPSHOT ${context.dbName}.${backup} FROM 
`${repoName}` ON (`${tableName}`) PROPERTIES ("backup_timestamp" = 
"${snapshot}","replication_num" = "1" ) """
+        syncer.waitAllRestoreFinish(context.dbName)
+        result = sql """ show tablets from ${tableName}; """
+        logger.info("tablets 1: ${result}")
+        qt_select_restore_base """select * from ${tableName};"""
+        qt_select_restore_roll """select k2, k1, c4, c3 from ${tableName};"""
+        sql """ INSERT INTO ${tableName} VALUES (17, 27, 34, 44, 54), (18, 28, 
33, 43, 53); """
+        qt_select_restore_base1 """select * from ${tableName};"""
+        qt_select_restore_roll1 """select k2, k1, c4, c3 from ${tableName};"""
+
+        // restore
+        sql """ drop table ${tableName}; """
+        sql """ RESTORE SNAPSHOT ${context.dbName}.${backup} FROM 
`${repoName}` ON (`${tableName}`) PROPERTIES ("backup_timestamp" = 
"${snapshot}","replication_num" = "1" ) """
+        syncer.waitAllRestoreFinish(context.dbName)
+        result = sql """ show tablets from ${tableName}; """
+        logger.info("tablets 2: ${result}")
+        qt_select_restore_base2 """select * from ${tableName};"""
+        qt_select_restore_roll2 """select k2, k1, c4, c3 from ${tableName};"""
+        sql """ INSERT INTO ${tableName} VALUES (17, 27, 34, 44, 54), (18, 28, 
33, 43, 53); """
+        qt_select_restore_base3 """select * from ${tableName};"""
+        qt_select_restore_roll4 """select k2, k1, c4, c3 from ${tableName};"""
+
+        sql "DROP REPOSITORY `${repoName}`"
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to