This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ea708d9bafb [fix](cluster key) fix mow cluster key with schema change (#40372) ea708d9bafb is described below commit ea708d9bafb1206c0c8e3387c6794e348e30afa3 Author: meiyi <myime...@gmail.com> AuthorDate: Sat Sep 14 20:02:55 2024 +0800 [fix](cluster key) fix mow cluster key with schema change (#40372) --- be/src/olap/memtable.cpp | 9 +- be/src/olap/merger.cpp | 56 ++-- be/src/olap/merger.h | 3 +- be/src/olap/rowset/segcompaction.cpp | 4 +- be/src/olap/rowset/segment_v2/segment_writer.cpp | 24 +- .../rowset/segment_v2/vertical_segment_writer.cpp | 8 +- .../java/org/apache/doris/alter/RollupJobV2.java | 2 +- .../apache/doris/alter/SchemaChangeHandler.java | 5 +- .../org/apache/doris/alter/SchemaChangeJobV2.java | 24 ++ .../java/org/apache/doris/analysis/ColumnDef.java | 4 + .../org/apache/doris/analysis/CreateTableStmt.java | 5 +- .../java/org/apache/doris/analysis/KeysDesc.java | 67 ++-- .../java/org/apache/doris/backup/RestoreJob.java | 11 + .../java/org/apache/doris/catalog/OlapTable.java | 11 + .../cloud/datasource/CloudInternalCatalog.java | 3 +- .../apache/doris/datasource/InternalCatalog.java | 49 ++- .../org/apache/doris/master/ReportHandler.java | 12 +- .../trees/plans/commands/info/CreateTableInfo.java | 94 +++--- .../unique_with_mow_c_p0/test_schema_change_ck.out | 365 +++++++++++++++++++++ .../unique_with_mow_c_p0/test_create_table.groovy | 2 +- .../unique_with_mow_c_p0/test_schema_change.groovy | 6 +- .../test_schema_change_ck.groovy | 262 +++++++++++++++ 22 files changed, 849 insertions(+), 177 deletions(-) diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 4f66a361650..671e07d7556 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -323,9 +323,14 @@ Status MemTable::_sort_by_cluster_keys() { } Tie tie = Tie(0, mutable_block.rows()); - for (auto i : _tablet_schema->cluster_key_idxes()) { + for (auto cid : _tablet_schema->cluster_key_idxes()) { + auto index = _tablet_schema->field_index(cid); + if (index == -1) { + return Status::InternalError("could not find cluster key column with unique_id=" + + std::to_string(cid) + " in tablet schema"); + } auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int { - return mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, i, -1); + return mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, index, -1); }; _sort_one_column(row_in_blocks, tie, cmp); } diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index cba828785d9..ab034123ac8 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -57,30 +57,6 @@ #include "vec/olap/vertical_merge_iterator.h" namespace doris { -namespace { - -// for mow with cluster key table, the key group also contains cluster key columns. -// the `key_group_cluster_key_idxes` marks the positions of cluster key columns in key group. -void _generate_key_group_cluster_key_idxes(const TabletSchema& tablet_schema, - std::vector<std::vector<uint32_t>>& column_groups, - std::vector<uint32_t>& key_group_cluster_key_idxes) { - if (column_groups.empty() || tablet_schema.cluster_key_idxes().empty()) { - return; - } - - auto& key_column_group = column_groups[0]; - for (const auto& index_in_tablet_schema : tablet_schema.cluster_key_idxes()) { - for (auto j = 0; j < key_column_group.size(); ++j) { - auto cid = key_column_group[j]; - if (cid == index_in_tablet_schema) { - key_group_cluster_key_idxes.emplace_back(j); - break; - } - } - } -} - -} // namespace Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, const TabletSchema& cur_tablet_schema, @@ -183,7 +159,8 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, // split columns into several groups, make sure all keys in one group // unique_key should consider sequence&delete column void Merger::vertical_split_columns(const TabletSchema& tablet_schema, - std::vector<std::vector<uint32_t>>* column_groups) { + std::vector<std::vector<uint32_t>>* column_groups, + std::vector<uint32_t>* key_group_cluster_key_idxes) { uint32_t num_key_cols = tablet_schema.num_key_columns(); uint32_t total_cols = tablet_schema.num_columns(); std::vector<uint32_t> key_columns; @@ -206,8 +183,24 @@ void Merger::vertical_split_columns(const TabletSchema& tablet_schema, } if (!tablet_schema.cluster_key_idxes().empty()) { for (const auto& cid : tablet_schema.cluster_key_idxes()) { - if (cid >= num_key_cols) { - key_columns.emplace_back(cid); + auto idx = tablet_schema.field_index(cid); + DCHECK(idx >= 0) << "could not find cluster key column with unique_id=" << cid + << " in tablet schema, table_id=" << tablet_schema.table_id(); + if (idx >= num_key_cols) { + key_columns.emplace_back(idx); + } + } + // tablet schema unique ids: [1, 2, 5, 3, 6, 4], [1 2] is key columns + // cluster key unique ids: [3, 1, 4] + // the key_columns should be [0, 1, 3, 5] + // the key_group_cluster_key_idxes should be [2, 1, 3] + for (const auto& cid : tablet_schema.cluster_key_idxes()) { + auto idx = tablet_schema.field_index(cid); + for (auto i = 0; i < key_columns.size(); ++i) { + if (idx == key_columns[i]) { + key_group_cluster_key_idxes->emplace_back(i); + break; + } } } } @@ -218,14 +211,12 @@ void Merger::vertical_split_columns(const TabletSchema& tablet_schema, if (!key_columns.empty()) { column_groups->emplace_back(std::move(key_columns)); } - auto&& cluster_key_idxes = tablet_schema.cluster_key_idxes(); std::vector<uint32_t> value_columns; for (uint32_t i = num_key_cols; i < total_cols; ++i) { if (i == sequence_col_idx || i == delete_sign_idx || - cluster_key_idxes.end() != - std::find(cluster_key_idxes.begin(), cluster_key_idxes.end(), i)) { + key_columns.end() != std::find(key_columns.begin(), key_columns.end(), i)) { continue; } @@ -460,11 +451,8 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t int64_t merge_way_num, Statistics* stats_output) { LOG(INFO) << "Start to do vertical compaction, tablet_id: " << tablet->tablet_id(); std::vector<std::vector<uint32_t>> column_groups; - vertical_split_columns(tablet_schema, &column_groups); - std::vector<uint32_t> key_group_cluster_key_idxes; - _generate_key_group_cluster_key_idxes(tablet_schema, column_groups, - key_group_cluster_key_idxes); + vertical_split_columns(tablet_schema, &column_groups, &key_group_cluster_key_idxes); vectorized::RowSourcesBuffer row_sources_buf( tablet->tablet_id(), dst_rowset_writer->context().tablet_path, reader_type); diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h index cb05162b3bc..7d430cde7f3 100644 --- a/be/src/olap/merger.h +++ b/be/src/olap/merger.h @@ -66,7 +66,8 @@ public: // for vertical compaction static void vertical_split_columns(const TabletSchema& tablet_schema, - std::vector<std::vector<uint32_t>>* column_groups); + std::vector<std::vector<uint32_t>>* column_groups, + std::vector<uint32_t>* key_group_cluster_key_idxes); static Status vertical_compact_one_group( BaseTabletSPtr tablet, ReaderType reader_type, const TabletSchema& tablet_schema, bool is_key, const std::vector<uint32_t>& column_group, diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index 374056f7b9d..fc8baf952c1 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -248,7 +248,9 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt } std::vector<std::vector<uint32_t>> column_groups; - Merger::vertical_split_columns(*ctx.tablet_schema, &column_groups); + std::vector<uint32_t> key_group_cluster_key_idxes; + Merger::vertical_split_columns(*ctx.tablet_schema, &column_groups, + &key_group_cluster_key_idxes); vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), tablet->tablet_path(), ReaderType::READER_SEGMENT_COMPACTION); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 0e9b55d99b8..225677f5d1f 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -126,7 +126,7 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id, _key_index_size.clear(); _num_sort_key_columns = _tablet_schema->cluster_key_idxes().size(); for (auto cid : _tablet_schema->cluster_key_idxes()) { - const auto& column = _tablet_schema->column(cid); + const auto& column = _tablet_schema->column_by_uid(cid); _key_coders.push_back(get_key_coder(column.type())); _key_index_size.push_back(column.index_length()); } @@ -755,17 +755,31 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po // 2. generate short key index (use cluster key) key_columns.clear(); for (const auto& cid : _tablet_schema->cluster_key_idxes()) { - for (size_t id = 0; id < _column_writers.size(); ++id) { - // olap data convertor always start from id = 0 - if (cid == _column_ids[id]) { - auto converted_result = _olap_data_convertor->convert_column_data(id); + // find cluster key index in tablet schema + auto cluster_key_index = _tablet_schema->field_index(cid); + if (cluster_key_index == -1) { + return Status::InternalError( + "could not find cluster key column with unique_id=" + + std::to_string(cid) + " in tablet schema"); + } + bool found = false; + for (auto i = 0; i < _column_ids.size(); ++i) { + if (_column_ids[i] == cluster_key_index) { + auto converted_result = _olap_data_convertor->convert_column_data(i); if (!converted_result.first.ok()) { return converted_result.first; } key_columns.push_back(converted_result.second); + found = true; break; } } + if (!found) { + return Status::InternalError( + "could not found cluster key column with unique_id=" + + std::to_string(cid) + + ", tablet schema index=" + std::to_string(cluster_key_index)); + } } RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos)); } else if (_is_mow()) { diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 5663c3779df..4863f2c0401 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -130,7 +130,7 @@ VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32 _key_index_size.clear(); _num_sort_key_columns = _tablet_schema->cluster_key_idxes().size(); for (auto cid : _tablet_schema->cluster_key_idxes()) { - const auto& column = _tablet_schema->column(cid); + const auto& column = _tablet_schema->column_by_uid(cid); _key_coders.push_back(get_key_coder(column.type())); _key_index_size.push_back(column.index_length()); } @@ -714,6 +714,7 @@ Status VerticalSegmentWriter::write_batch() { std::vector<vectorized::IOlapColumnDataAccessor*> key_columns; vectorized::IOlapColumnDataAccessor* seq_column = nullptr; + // the key is cluster key column unique id std::map<uint32_t, vectorized::IOlapColumnDataAccessor*> cid_to_column; for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) { RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema)); @@ -732,11 +733,12 @@ Status VerticalSegmentWriter::write_batch() { if (_tablet_schema->has_sequence_col() && cid == _tablet_schema->sequence_col_idx()) { seq_column = column; } + auto column_unique_id = _tablet_schema->column(cid).unique_id(); if (_is_mow_with_cluster_key() && std::find(_tablet_schema->cluster_key_idxes().begin(), _tablet_schema->cluster_key_idxes().end(), - cid) != _tablet_schema->cluster_key_idxes().end()) { - cid_to_column[cid] = column; + column_unique_id) != _tablet_schema->cluster_key_idxes().end()) { + cid_to_column[column_unique_id] = column; } RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), column->get_data(), data.num_rows)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 3a29c0c542e..62eff357875 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -233,7 +233,6 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { TStorageMedium storageMedium = tbl.getPartitionInfo().getDataProperty(partitionId).getStorageMedium(); TTabletType tabletType = tbl.getPartitionInfo().getTabletType(partitionId); MaterializedIndex rollupIndex = entry.getValue(); - Map<Long, Long> tabletIdMap = this.partitionIdToBaseRollupTabletIdMap.get(partitionId); for (Tablet rollupTablet : rollupIndex.getTablets()) { long rollupTabletId = rollupTablet.getId(); @@ -276,6 +275,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { if (this.storageFormat != null) { createReplicaTask.setStorageFormat(this.storageFormat); } + // rollup replica does not need to set mow cluster keys batchTask.addTask(createReplicaTask); } // end for rollupReplicas } // end for rollupTablets diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index c47344f14c5..43857b2e898 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -310,7 +310,7 @@ public class SchemaChangeHandler extends AlterHandler { boolean lightSchemaChange = olapTable.getEnableLightSchemaChange(); /* * UNIQUE: - * Can not drop any key column. + * Can not drop any key column, cluster key column * AGGREGATION: * Can not drp any key column is has value with REPLACE method */ @@ -844,9 +844,6 @@ public class SchemaChangeHandler extends AlterHandler { if (!column.isVisible()) { newSchema.add(column); } - if (column.isClusterKey()) { - throw new DdlException("Can not modify column order in Unique data model table"); - } } } if (newSchema.size() != targetIndexSchema.size()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index f44d9416e72..c514bf6306e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -71,6 +71,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Table; import com.google.common.collect.Table.Cell; import com.google.gson.annotations.SerializedName; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -216,6 +217,20 @@ public class SchemaChangeJobV2 extends AlterJobV2 { partitionOriginIndexIdMap.clear(); } + private boolean isShadowIndexOfBase(long shadowIdxId, OlapTable tbl) { + if (indexIdToName.get(shadowIdxId).startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) { + String shadowIndexName = indexIdToName.get(shadowIdxId); + String indexName = shadowIndexName + .substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()); + long indexId = tbl.getIndexIdByName(indexName); + LOG.info("shadow index id: {}, shadow index name: {}, pointer to index id: {}, index name: {}, " + + "base index id: {}, table_id: {}", shadowIdxId, shadowIndexName, indexId, indexName, + tbl.getBaseIndexId(), tbl.getId()); + return indexId == tbl.getBaseIndexId(); + } + return false; + } + protected void createShadowIndexReplica() throws AlterCancelException { Database db = Env.getCurrentInternalCatalog() .getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist")); @@ -261,6 +276,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 { short shadowShortKeyColumnCount = indexShortKeyMap.get(shadowIdxId); List<Column> shadowSchema = indexSchemaMap.get(shadowIdxId); + List<Integer> clusterKeyIndexes = null; + if (shadowIdxId == tbl.getBaseIndexId() || isShadowIndexOfBase(shadowIdxId, tbl)) { + clusterKeyIndexes = OlapTable.getClusterKeyIndexes(shadowSchema); + } int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash; long originIndexId = indexIdMap.get(shadowIdxId); int originSchemaHash = tbl.getSchemaHashByIndexId(originIndexId); @@ -309,6 +328,11 @@ public class SchemaChangeJobV2 extends AlterJobV2 { } createReplicaTask.setInvertedIndexFileStorageFormat(tbl .getInvertedIndexFileStorageFormat()); + if (!CollectionUtils.isEmpty(clusterKeyIndexes)) { + createReplicaTask.setClusterKeyIndexes(clusterKeyIndexes); + LOG.info("table: {}, partition: {}, index: {}, tablet: {}, cluster key indexes: {}", + tableId, partitionId, shadowIdxId, shadowTabletId, clusterKeyIndexes); + } batchTask.addTask(createReplicaTask); } // end for rollupReplicas } // end for rollupTablets diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java index 33474f8263c..625a3b3b131 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java @@ -354,6 +354,10 @@ public class ColumnDef { return visible; } + public int getClusterKeyId() { + return this.clusterKeyId; + } + public void setClusterKeyId(int clusterKeyId) { this.clusterKeyId = clusterKeyId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index 865489a113e..d3f37b632ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -421,9 +421,6 @@ public class CreateTableStmt extends DdlStmt implements NotFallbackInParser { keysDesc.analyze(columnDefs); if (!CollectionUtils.isEmpty(keysDesc.getClusterKeysColumnNames())) { - if (Config.isCloudMode()) { - throw new AnalysisException("Cluster key is not supported in cloud mode"); - } if (!enableUniqueKeyMergeOnWrite) { throw new AnalysisException("Cluster keys only support unique keys table which enabled " + PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE); @@ -503,7 +500,7 @@ public class CreateTableStmt extends DdlStmt implements NotFallbackInParser { columnDef.getType().getPrimitiveType() + " column can't support aggregation " + columnDef.getAggregateType()); } - if (columnDef.isKey()) { + if (columnDef.isKey() || columnDef.getClusterKeyId() != -1) { throw new AnalysisException(columnDef.getType().getPrimitiveType() + " can only be used in the non-key column of the duplicate table at present."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java index e7359657ef2..0076ce74de3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java @@ -34,7 +34,6 @@ public class KeysDesc implements Writable { private KeysType type; private List<String> keysColumnNames; private List<String> clusterKeysColumnNames; - private List<Integer> clusterKeysColumnIds = null; public KeysDesc() { this.type = KeysType.AGG_KEYS; @@ -51,12 +50,6 @@ public class KeysDesc implements Writable { this.clusterKeysColumnNames = clusterKeyColumnNames; } - public KeysDesc(KeysType type, List<String> keysColumnNames, List<String> clusterKeyColumnNames, - List<Integer> clusterKeysColumnIds) { - this(type, keysColumnNames, clusterKeyColumnNames); - this.clusterKeysColumnIds = clusterKeysColumnIds; - } - public KeysType getKeysType() { return type; } @@ -69,10 +62,6 @@ public class KeysDesc implements Writable { return clusterKeysColumnNames; } - public List<Integer> getClusterKeysColumnIds() { - return clusterKeysColumnIds; - } - public boolean containsCol(String colName) { return keysColumnNames.contains(colName); } @@ -90,17 +79,6 @@ public class KeysDesc implements Writable { throw new AnalysisException("The number of key columns should be less than the number of columns."); } - if (clusterKeysColumnNames != null) { - if (Config.isCloudMode()) { - throw new AnalysisException("Cluster key is not supported in cloud mode"); - } - if (type != KeysType.UNIQUE_KEYS) { - throw new AnalysisException("Cluster keys only support unique keys table."); - } - clusterKeysColumnIds = Lists.newArrayList(); - analyzeClusterKeys(cols); - } - for (int i = 0; i < keysColumnNames.size(); ++i) { String name = cols.get(i).getName(); if (!keysColumnNames.get(i).equalsIgnoreCase(name)) { @@ -135,39 +113,48 @@ public class KeysDesc implements Writable { } if (clusterKeysColumnNames != null) { - int minKeySize = keysColumnNames.size() < clusterKeysColumnNames.size() ? keysColumnNames.size() - : clusterKeysColumnNames.size(); - boolean sameKey = true; - for (int i = 0; i < minKeySize; ++i) { - if (!keysColumnNames.get(i).equalsIgnoreCase(clusterKeysColumnNames.get(i))) { - sameKey = false; - break; - } - } - if (sameKey) { - throw new AnalysisException("Unique keys and cluster keys should be different."); - } + analyzeClusterKeys(cols); } } private void analyzeClusterKeys(List<ColumnDef> cols) throws AnalysisException { - for (int i = 0; i < clusterKeysColumnNames.size(); ++i) { + if (Config.isCloudMode()) { + throw new AnalysisException("Cluster key is not supported in cloud mode"); + } + if (type != KeysType.UNIQUE_KEYS) { + throw new AnalysisException("Cluster keys only support unique keys table"); + } + // check that cluster keys is not duplicated + for (int i = 0; i < clusterKeysColumnNames.size(); i++) { String name = clusterKeysColumnNames.get(i); - // check if key is duplicate for (int j = 0; j < i; j++) { if (clusterKeysColumnNames.get(j).equalsIgnoreCase(name)) { throw new AnalysisException("Duplicate cluster key column[" + name + "]."); } } - // check if key exists and generate key column ids + } + // check that cluster keys is not equal to primary keys + int minKeySize = Math.min(keysColumnNames.size(), clusterKeysColumnNames.size()); + boolean sameKey = true; + for (int i = 0; i < minKeySize; i++) { + if (!keysColumnNames.get(i).equalsIgnoreCase(clusterKeysColumnNames.get(i))) { + sameKey = false; + break; + } + } + if (sameKey) { + throw new AnalysisException("Unique keys and cluster keys should be different."); + } + // check that cluster key column exists + for (int i = 0; i < clusterKeysColumnNames.size(); i++) { + String name = clusterKeysColumnNames.get(i); for (int j = 0; j < cols.size(); j++) { if (cols.get(j).getName().equalsIgnoreCase(name)) { - cols.get(j).setClusterKeyId(clusterKeysColumnIds.size()); - clusterKeysColumnIds.add(j); + cols.get(j).setClusterKeyId(i); break; } if (j == cols.size() - 1) { - throw new AnalysisException("Key cluster column[" + name + "] doesn't exist."); + throw new AnalysisException("Cluster key column[" + name + "] doesn't exist."); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 13a6d3a8051..27ad19e1762 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -100,6 +100,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Table.Cell; import com.google.gson.annotations.SerializedName; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -1239,6 +1240,10 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { MaterializedIndexMeta indexMeta = localTbl.getIndexMetaByIndexId(restoredIdx.getId()); List<Index> indexes = restoredIdx.getId() == localTbl.getBaseIndexId() ? localTbl.getCopiedIndexes() : null; + List<Integer> clusterKeyIndexes = null; + if (indexMeta.getIndexId() == localTbl.getBaseIndexId() || localTbl.isShadowIndex(indexMeta.getIndexId())) { + clusterKeyIndexes = OlapTable.getClusterKeyIndexes(indexMeta.getSchema()); + } for (Tablet restoreTablet : restoredIdx.getTablets()) { TabletMeta tabletMeta = new TabletMeta(db.getId(), localTbl.getId(), restorePart.getId(), restoredIdx.getId(), indexMeta.getSchemaHash(), TStorageMedium.HDD); @@ -1282,6 +1287,12 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { LOG.info("set base tablet {} for replica {} in restore job {}, tablet id={}", baseTablet.first, restoreReplica.getId(), jobId, restoreTablet.getId()); } + if (!CollectionUtils.isEmpty(clusterKeyIndexes)) { + task.setClusterKeyIndexes(clusterKeyIndexes); + LOG.info("table: {}, partition: {}, index: {}, tablet: {}, cluster key indexes: {}", + localTbl.getId(), restorePart.getId(), restoredIdx.getId(), restoreTablet.getId(), + clusterKeyIndexes); + } batchTask.addTask(task); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 20737d9a035..9728a9e4154 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -110,6 +110,7 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -3131,6 +3132,16 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc } } + public static List<Integer> getClusterKeyIndexes(List<Column> columns) { + Map<Integer, Integer> clusterKeyIndexes = new TreeMap<>(); + for (Column column : columns) { + if (column.isClusterKey()) { + clusterKeyIndexes.put(column.getClusterKeyId(), column.getUniqueId()); + } + } + return clusterKeyIndexes.isEmpty() ? null : new ArrayList<>(clusterKeyIndexes.values()); + } + public long getVisibleVersionTime() { return tableAttributes.getVisibleVersionTime(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index c1c58f7b898..78044f2190d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -101,8 +101,7 @@ public class CloudInternalCatalog extends InternalCatalog { String storagePolicy, IdGeneratorBuffer idGeneratorBuffer, BinlogConfig binlogConfig, - boolean isStorageMediumSpecified, - List<Integer> clusterKeyIndexes) + boolean isStorageMediumSpecified) throws DdlException { // create base index first. Preconditions.checkArgument(tbl.getBaseIndexId() != -1); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 77fe701f204..03c33a21e94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1770,8 +1770,7 @@ public class InternalCatalog implements CatalogIf<Database> { singlePartitionDesc.isInMemory(), singlePartitionDesc.getTabletType(), storagePolicy, idGeneratorBuffer, - binlogConfig, dataProperty.isStorageMediumSpecified(), null); - // TODO cluster key ids + binlogConfig, dataProperty.isStorageMediumSpecified()); // check again olapTable = db.getOlapTableOrDdlException(tableName); @@ -2086,8 +2085,7 @@ public class InternalCatalog implements CatalogIf<Database> { String storagePolicy, IdGeneratorBuffer idGeneratorBuffer, BinlogConfig binlogConfig, - boolean isStorageMediumSpecified, - List<Integer> clusterKeyIndexes) + boolean isStorageMediumSpecified) throws DdlException { // create base index first. Preconditions.checkArgument(tbl.getBaseIndexId() != -1); @@ -2145,6 +2143,11 @@ public class InternalCatalog implements CatalogIf<Database> { short shortKeyColumnCount = indexMeta.getShortKeyColumnCount(); TStorageType storageType = indexMeta.getStorageType(); List<Column> schema = indexMeta.getSchema(); + List<Integer> clusterKeyIndexes = null; + if (indexId == tbl.getBaseIndexId()) { + // only base and shadow index need cluster key indexes + clusterKeyIndexes = OlapTable.getClusterKeyIndexes(schema); + } KeysType keysType = indexMeta.getKeysType(); List<Index> indexes = indexId == tbl.getBaseIndexId() ? tbl.getCopiedIndexes() : null; int totalTaskNum = index.getTablets().size() * totalReplicaNum; @@ -2176,7 +2179,11 @@ public class InternalCatalog implements CatalogIf<Database> { task.setStorageFormat(tbl.getStorageFormat()); task.setInvertedIndexFileStorageFormat(tbl.getInvertedIndexFileStorageFormat()); - task.setClusterKeyIndexes(clusterKeyIndexes); + if (!CollectionUtils.isEmpty(clusterKeyIndexes)) { + task.setClusterKeyIndexes(clusterKeyIndexes); + LOG.info("table: {}, partition: {}, index: {}, tablet: {}, cluster key indexes: {}", + tbl.getId(), partitionId, indexId, tabletId, clusterKeyIndexes); + } batchTask.addTask(task); // add to AgentTaskQueue for handling finish report. // not for resending task @@ -2649,8 +2656,8 @@ public class InternalCatalog implements CatalogIf<Database> { olapTable.setRowStorePageSize(rowStorePageSize); // check data sort properties - int keyColumnSize = CollectionUtils.isEmpty(keysDesc.getClusterKeysColumnIds()) ? keysDesc.keysColumnSize() : - keysDesc.getClusterKeysColumnIds().size(); + int keyColumnSize = CollectionUtils.isEmpty(keysDesc.getClusterKeysColumnNames()) ? keysDesc.keysColumnSize() : + keysDesc.getClusterKeysColumnNames().size(); DataSortInfo dataSortInfo = PropertyAnalyzer.analyzeDataSortInfo(properties, keysType, keyColumnSize, storageFormat); olapTable.setDataSortInfo(dataSortInfo); @@ -2662,6 +2669,10 @@ public class InternalCatalog implements CatalogIf<Database> { } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } + if (enableUniqueKeyMergeOnWrite && !enableLightSchemaChange && !CollectionUtils.isEmpty( + keysDesc.getClusterKeysColumnNames())) { + throw new DdlException("Unique merge-on-write table with cluster keys must enable light schema change"); + } } olapTable.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite); @@ -2990,18 +3001,15 @@ public class InternalCatalog implements CatalogIf<Database> { throw new DdlException(e.getMessage()); } - // analyse group commit interval ms - int groupCommitIntervalMs; try { - groupCommitIntervalMs = PropertyAnalyzer.analyzeGroupCommitIntervalMs(properties); + int groupCommitIntervalMs = PropertyAnalyzer.analyzeGroupCommitIntervalMs(properties); olapTable.setGroupCommitIntervalMs(groupCommitIntervalMs); } catch (Exception e) { throw new DdlException(e.getMessage()); } - int groupCommitDataBytes; try { - groupCommitDataBytes = PropertyAnalyzer.analyzeGroupCommitDataBytes(properties); + int groupCommitDataBytes = PropertyAnalyzer.analyzeGroupCommitDataBytes(properties); olapTable.setGroupCommitDataBytes(groupCommitDataBytes); } catch (Exception e) { throw new DdlException(e.getMessage()); @@ -3057,8 +3065,7 @@ public class InternalCatalog implements CatalogIf<Database> { storagePolicy, idGeneratorBuffer, binlogConfigForTask, - partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified(), - keysDesc.getClusterKeysColumnIds()); + partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified()); afterCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList(), true); olapTable.addPartition(partition); @@ -3142,8 +3149,7 @@ public class InternalCatalog implements CatalogIf<Database> { partitionInfo.getTabletType(entry.getValue()), partionStoragePolicy, idGeneratorBuffer, binlogConfigForTask, - dataProperty.isStorageMediumSpecified(), - keysDesc.getClusterKeysColumnIds()); + dataProperty.isStorageMediumSpecified()); olapTable.addPartition(partition); olapTable.getPartitionInfo().getDataProperty(partition.getId()) .setStoragePolicy(partionStoragePolicy); @@ -3566,14 +3572,6 @@ public class InternalCatalog implements CatalogIf<Database> { Env.getCurrentInvertedIndex().deleteTablet(tabletId); } }; - Map<Integer, Integer> clusterKeyMap = new TreeMap<>(); - for (int i = 0; i < olapTable.getBaseSchema().size(); i++) { - Column column = olapTable.getBaseSchema().get(i); - if (column.getClusterKeyId() != -1) { - clusterKeyMap.put(column.getClusterKeyId(), i); - } - } - List<Integer> clusterKeyIdxes = clusterKeyMap.values().stream().collect(Collectors.toList()); try { long bufferSize = IdGeneratorUtil.getBufferSizeForTruncateTable(copiedTbl, origPartitions.values()); IdGeneratorBuffer idGeneratorBuffer = @@ -3609,8 +3607,7 @@ public class InternalCatalog implements CatalogIf<Database> { copiedTbl.getPartitionInfo().getTabletType(oldPartitionId), olapTable.getPartitionInfo().getDataProperty(oldPartitionId).getStoragePolicy(), idGeneratorBuffer, binlogConfig, - copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified(), - clusterKeyIdxes); + copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified()); newPartitions.add(newPartition); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index f7702a49554..a4a5273e8ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -96,6 +96,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -964,10 +965,19 @@ public class ReportHandler extends Daemon { objectPool, olapTable.rowStorePageSize(), olapTable.variantEnableFlattenNested()); - createReplicaTask.setIsRecoverTask(true); createReplicaTask.setInvertedIndexFileStorageFormat(olapTable .getInvertedIndexFileStorageFormat()); + if (indexId == olapTable.getBaseIndexId() || olapTable.isShadowIndex(indexId)) { + List<Integer> clusterKeyIndexes = OlapTable.getClusterKeyIndexes( + indexMeta.getSchema()); + if (!CollectionUtils.isEmpty(clusterKeyIndexes)) { + createReplicaTask.setClusterKeyIndexes(clusterKeyIndexes); + LOG.info("table: {}, partition: {}, index: {}, tablet: {}, " + + "cluster key indexes: {}", tableId, partitionId, indexId, + tabletId, clusterKeyIndexes); + } + } createReplicaBatchTask.addTask(createReplicaTask); } else { // just set this replica as bad diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java index 730c6f115a3..04ce3786bb6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java @@ -131,7 +131,6 @@ public class CreateTableInfo { private boolean isExternal = false; private String clusterName = null; private List<String> clusterKeysColumnNames = null; - private List<Integer> clusterKeysColumnIds = null; private PartitionTableInfo partitionTableInfo; // get when validate /** @@ -424,9 +423,6 @@ public class CreateTableInfo { validateKeyColumns(); if (!clusterKeysColumnNames.isEmpty()) { - if (Config.isCloudMode()) { - throw new AnalysisException("Cluster key is not supported in cloud mode"); - } if (!isEnableMergeOnWrite) { throw new AnalysisException( "Cluster keys only support unique keys table which enabled " @@ -736,50 +732,6 @@ public class CreateTableInfo { "The number of key columns should be less than the number of columns."); } - if (!clusterKeysColumnNames.isEmpty()) { - if (Config.isCloudMode()) { - throw new AnalysisException("Cluster key is not supported in cloud mode"); - } - if (keysType != KeysType.UNIQUE_KEYS) { - throw new AnalysisException("Cluster keys only support unique keys table."); - } - clusterKeysColumnIds = Lists.newArrayList(); - for (int i = 0; i < clusterKeysColumnNames.size(); ++i) { - String name = clusterKeysColumnNames.get(i); - // check if key is duplicate - for (int j = 0; j < i; j++) { - if (clusterKeysColumnNames.get(j).equalsIgnoreCase(name)) { - throw new AnalysisException("Duplicate cluster key column[" + name + "]."); - } - } - // check if key exists and generate key column ids - for (int j = 0; j < columns.size(); j++) { - if (columns.get(j).getName().equalsIgnoreCase(name)) { - columns.get(j).setClusterKeyId(clusterKeysColumnIds.size()); - clusterKeysColumnIds.add(j); - break; - } - if (j == columns.size() - 1) { - throw new AnalysisException( - "Key cluster column[" + name + "] doesn't exist."); - } - } - } - - int minKeySize = keys.size() < clusterKeysColumnNames.size() ? keys.size() - : clusterKeysColumnNames.size(); - boolean sameKey = true; - for (int i = 0; i < minKeySize; ++i) { - if (!keys.get(i).equalsIgnoreCase(clusterKeysColumnNames.get(i))) { - sameKey = false; - break; - } - } - if (sameKey) { - throw new AnalysisException("Unique keys and cluster keys should be different."); - } - } - for (int i = 0; i < keys.size(); ++i) { String name = columns.get(i).getName(); if (!keys.get(i).equalsIgnoreCase(name)) { @@ -815,6 +767,50 @@ public class CreateTableInfo { } } } + + if (!clusterKeysColumnNames.isEmpty()) { + // the same code as KeysDesc#analyzeClusterKeys + if (Config.isCloudMode()) { + throw new AnalysisException("Cluster key is not supported in cloud mode"); + } + if (keysType != KeysType.UNIQUE_KEYS) { + throw new AnalysisException("Cluster keys only support unique keys table"); + } + // check that cluster keys is not duplicated + for (int i = 0; i < clusterKeysColumnNames.size(); i++) { + String name = clusterKeysColumnNames.get(i); + for (int j = 0; j < i; j++) { + if (clusterKeysColumnNames.get(j).equalsIgnoreCase(name)) { + throw new AnalysisException("Duplicate cluster key column[" + name + "]."); + } + } + } + // check that cluster keys is not equal to primary keys + int minKeySize = Math.min(keys.size(), clusterKeysColumnNames.size()); + boolean sameKey = true; + for (int i = 0; i < minKeySize; ++i) { + if (!keys.get(i).equalsIgnoreCase(clusterKeysColumnNames.get(i))) { + sameKey = false; + break; + } + } + if (sameKey) { + throw new AnalysisException("Unique keys and cluster keys should be different."); + } + // check that cluster key column exists + for (int i = 0; i < clusterKeysColumnNames.size(); ++i) { + String name = clusterKeysColumnNames.get(i); + for (int j = 0; j < columns.size(); j++) { + if (columns.get(j).getName().equalsIgnoreCase(name)) { + columns.get(j).setClusterKeyId(i); + break; + } + if (j == columns.size() - 1) { + throw new AnalysisException("Cluster key column[" + name + "] doesn't exist."); + } + } + } + } } /** @@ -858,7 +854,7 @@ public class CreateTableInfo { return new CreateTableStmt(ifNotExists, isExternal, new TableName(ctlName, dbName, tableName), catalogColumns, catalogIndexes, engineName, - new KeysDesc(keysType, keys, clusterKeysColumnNames, clusterKeysColumnIds), + new KeysDesc(keysType, keys, clusterKeysColumnNames), partitionDesc, distributionDesc, Maps.newHashMap(properties), extProperties, comment, addRollups, null); } diff --git a/regression-test/data/unique_with_mow_c_p0/test_schema_change_ck.out b/regression-test/data/unique_with_mow_c_p0/test_schema_change_ck.out new file mode 100644 index 00000000000..50028960ab1 --- /dev/null +++ b/regression-test/data/unique_with_mow_c_p0/test_schema_change_ck.out @@ -0,0 +1,365 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_original -- +11 28 38 +10 29 39 + +-- !select_add_c4 -- +11 28 38 \N +10 29 39 \N +13 27 36 40 +12 26 37 40 + +-- !select_add_c5 -- +11 \N 28 38 \N +10 \N 29 39 \N +13 \N 27 36 40 +12 \N 26 37 40 +15 50 20 34 40 +14 50 20 35 40 + +-- !select_add_c6 -- +11 \N 28 \N 38 \N +10 \N 29 \N 39 \N +13 \N 27 \N 36 40 +12 \N 26 \N 37 40 +15 50 20 \N 34 40 +14 50 20 \N 35 40 +17 50 20 60 32 40 +16 50 20 60 33 40 + +-- !select_add_k2 -- +11 \N \N 28 \N 38 \N +10 \N \N 29 \N 39 \N +13 \N \N 27 \N 36 40 +12 \N \N 26 \N 37 40 +15 \N 50 20 \N 34 40 +14 \N 50 20 \N 35 40 +17 \N 50 20 60 32 40 +16 \N 50 20 60 33 40 +19 200 \N 20 \N 30 \N +18 200 \N 20 \N 31 \N + +-- !select_drop_c4 -- +11 \N \N 28 \N 38 +10 \N \N 29 \N 39 +13 \N \N 27 \N 36 +12 \N \N 26 \N 37 +15 \N 50 20 \N 34 +14 \N 50 20 \N 35 +17 \N 50 20 60 32 +16 \N 50 20 60 33 +19 200 \N 20 \N 30 +18 200 \N 20 \N 31 +119 200 \N 20 \N 30 +118 200 \N 20 \N 31 + +-- !select_drop_c5 -- +11 \N 28 \N 38 +10 \N 29 \N 39 +13 \N 27 \N 36 +12 \N 26 \N 37 +15 \N 20 \N 34 +14 \N 20 \N 35 +17 \N 20 60 32 +16 \N 20 60 33 +19 200 20 \N 30 +18 200 20 \N 31 +119 200 20 \N 30 +118 200 20 \N 31 +117 200 20 \N 32 +116 200 20 \N 33 + +-- !select_drop_c6 -- +11 \N 28 38 +10 \N 29 39 +13 \N 27 36 +12 \N 26 37 +15 \N 20 34 +14 \N 20 35 +17 \N 20 32 +16 \N 20 33 +19 200 20 30 +18 200 20 31 +119 200 20 30 +118 200 20 31 +117 200 20 32 +116 200 20 33 +115 200 25 34 +114 200 24 35 + +-- !select_reorder -- +11 \N 38 28 +10 \N 39 29 +13 \N 36 27 +12 \N 37 26 +15 \N 34 20 +14 \N 35 20 +17 \N 32 20 +16 \N 33 20 +19 200 30 20 +18 200 31 20 +119 200 30 20 +118 200 31 20 +117 200 32 20 +116 200 33 20 +115 200 34 25 +114 200 35 24 +113 200 36 23 +112 200 37 22 + +-- !select_modify_k2 -- +11 \N 38 28 +10 \N 39 29 +13 \N 36 27 +12 \N 37 26 +15 \N 34 20 +14 \N 35 20 +17 \N 32 20 +16 \N 33 20 +19 200 30 20 +18 200 31 20 +119 200 30 20 +118 200 31 20 +117 200 32 20 +116 200 33 20 +115 200 34 25 +114 200 35 24 +113 200 36 23 +112 200 37 22 +111 200 38 21 +110 200 39 20 + +-- !select_create_mv_base -- +11 \N 38 28 +10 \N 39 29 +13 \N 36 27 +12 \N 37 26 +15 \N 34 20 +14 \N 35 20 +17 \N 32 20 +16 \N 33 20 +19 200 30 20 +18 200 31 20 +119 200 30 20 +118 200 31 20 +117 200 32 20 +116 200 33 20 +115 200 34 25 +114 200 35 24 +113 200 36 23 +112 200 37 22 +111 200 38 21 +110 200 39 20 +211 200 38 21 +210 200 39 20 + +-- !select_create_mv_mv -- +10 39 +11 38 +12 37 +13 36 +14 35 +15 34 +16 33 +17 32 +18 31 +19 30 +118 31 +119 30 +116 33 +117 32 +114 35 +115 34 +112 37 +113 36 +110 39 +111 38 +210 39 +211 38 + +-- !select_create_rollup_base -- +11 \N 38 28 +10 \N 39 29 +13 \N 36 27 +12 \N 37 26 +15 \N 34 20 +14 \N 35 20 +17 \N 32 20 +16 \N 33 20 +19 200 30 20 +18 200 31 20 +119 200 30 20 +118 200 31 20 +117 200 32 20 +116 200 33 20 +115 200 34 25 +114 200 35 24 +113 200 36 23 +112 200 37 22 +111 200 38 21 +110 200 39 20 +211 200 38 21 +210 200 39 20 +311 200 38 21 +310 200 39 20 + +-- !select_create_rollup_roll -- +\N 10 29 +\N 11 28 +\N 12 26 +\N 13 27 +\N 14 20 +\N 15 20 +\N 16 20 +\N 17 20 +200 18 20 +200 19 20 +200 118 20 +200 119 20 +200 116 20 +200 117 20 +200 114 24 +200 115 25 +200 112 22 +200 113 23 +200 110 20 +200 111 21 +200 210 20 +200 211 21 +200 310 20 +200 311 21 + +-- !select_add_partition -- +10011 200 38 21 +10010 200 39 20 +11 \N 38 28 +10 \N 39 29 +13 \N 36 27 +12 \N 37 26 +15 \N 34 20 +14 \N 35 20 +17 \N 32 20 +16 \N 33 20 +19 200 30 20 +18 200 31 20 +119 200 30 20 +118 200 31 20 +117 200 32 20 +116 200 33 20 +115 200 34 25 +114 200 35 24 +113 200 36 23 +112 200 37 22 +111 200 38 21 +110 200 39 20 +211 200 38 21 +210 200 39 20 +311 200 38 21 +310 200 39 20 + +-- !select_truncate -- +13 \N 36 27 +12 \N 37 26 +11 \N 38 28 +10 \N 39 29 + +-- !select_rollup_base -- +12 22 31 41 51 +11 21 32 42 52 + +-- !select_rollup_roll -- +21 11 42 32 +22 12 41 31 + +-- !select_rollup_base_sc -- +12 22 31 41 51 +11 21 32 42 52 + +-- !select_rollup_roll_sc -- +21 11 42 32 +22 12 41 31 + +-- !select_rollup_base_sc1 -- +12 22 31 41 51 +11 21 32 42 52 +14 24 33 43 53 +13 23 34 44 54 + +-- !select_rollup_roll_sc1 -- +21 11 42 32 +22 12 41 31 +23 13 44 34 +24 14 43 33 + +-- !select_restore_base2 -- +12 22 31 41 51 +11 21 32 42 52 +14 24 33 43 53 +13 23 34 44 54 +16 26 33 43 53 +15 25 34 44 54 + +-- !select_restore_roll2 -- +21 11 42 32 +22 12 41 31 +23 13 44 34 +24 14 43 33 +25 15 44 34 +26 16 43 33 + +-- !select_restore_base -- +12 22 31 41 51 +11 21 32 42 52 +14 24 33 43 53 +13 23 34 44 54 + +-- !select_restore_roll -- +21 11 42 32 +22 12 41 31 +23 13 44 34 +24 14 43 33 + +-- !select_restore_base1 -- +12 22 31 41 51 +11 21 32 42 52 +14 24 33 43 53 +13 23 34 44 54 +18 28 33 43 53 +17 27 34 44 54 + +-- !select_restore_roll1 -- +21 11 42 32 +22 12 41 31 +23 13 44 34 +24 14 43 33 +27 17 44 34 +28 18 43 33 + +-- !select_restore_base2 -- +12 22 31 41 51 +11 21 32 42 52 +14 24 33 43 53 +13 23 34 44 54 + +-- !select_restore_roll2 -- +21 11 42 32 +22 12 41 31 +23 13 44 34 +24 14 43 33 + +-- !select_restore_base3 -- +12 22 31 41 51 +11 21 32 42 52 +14 24 33 43 53 +13 23 34 44 54 +18 28 33 43 53 +17 27 34 44 54 + +-- !select_restore_roll4 -- +21 11 42 32 +22 12 41 31 +23 13 44 34 +24 14 43 33 +27 17 44 34 +28 18 43 33 + diff --git a/regression-test/suites/unique_with_mow_c_p0/test_create_table.groovy b/regression-test/suites/unique_with_mow_c_p0/test_create_table.groovy index 8cd7cb6d198..0abb7d8f1a9 100644 --- a/regression-test/suites/unique_with_mow_c_p0/test_create_table.groovy +++ b/regression-test/suites/unique_with_mow_c_p0/test_create_table.groovy @@ -81,7 +81,7 @@ suite("test_create_table") { "enable_unique_key_merge_on_write" = "true" ); """ - exception "Key cluster column[c_addresses] doesn't exist" + exception "Cluster key column[c_addresses] doesn't exist" } // mow unique table with duplicate cluster keys diff --git a/regression-test/suites/unique_with_mow_c_p0/test_schema_change.groovy b/regression-test/suites/unique_with_mow_c_p0/test_schema_change.groovy index 9abee82f7c0..37c96e79a6b 100644 --- a/regression-test/suites/unique_with_mow_c_p0/test_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_c_p0/test_schema_change.groovy @@ -48,7 +48,7 @@ suite("test_schema_change") { `min_dwell_time` INT DEFAULT "99999" COMMENT "用户最小停留时间") UNIQUE KEY(`user_id`, `date`, `city`, `age`, `sex`) CLUSTER BY(`cost`, `comment`) - DISTRIBUTED BY HASH(`user_id`) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1", "enable_unique_key_merge_on_write" = "true" ); @@ -237,12 +237,12 @@ suite("test_schema_change") { } // 5. modify column order should success (Temporarily throw exception) - test { + /*test { sql """ alter table ${tableName} ORDER BY (`user_id`, `date`, `city`, `age`, `sex`, `max_dwell_time`, `comment`, `min_dwell_time`, `last_visit_date_not_null`, `cost`, `score`, `last_update_date`); """ exception "Can not modify column order in Unique data model table" - } + }*/ /*assertTrue(getAlterTableState(), "alter column order should success"); { sql """ INSERT INTO ${tableName} diff --git a/regression-test/suites/unique_with_mow_c_p0/test_schema_change_ck.groovy b/regression-test/suites/unique_with_mow_c_p0/test_schema_change_ck.groovy new file mode 100644 index 00000000000..840badb6310 --- /dev/null +++ b/regression-test/suites/unique_with_mow_c_p0/test_schema_change_ck.groovy @@ -0,0 +1,262 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_schema_change_ck") { + def db = "regression_test_unique_with_mow_c_p0" + def tableName = "test_schema_change_ck" + + def getAlterTableState = { + waitForSchemaChangeDone { + sql """ SHOW ALTER TABLE COLUMN WHERE tablename='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + time 600 + } + return true + } + + sql """ DROP TABLE IF EXISTS ${tableName} """ + test { + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `c1` int(11) NULL, + `c2` int(11) NULL, + `c3` int(11) NULL + ) unique KEY(`c1`) + cluster by(`c3`, `c2`) + DISTRIBUTED BY HASH(`c1`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true", + "light_schema_change" = "false" + ); + """ + exception "Unique merge-on-write table with cluster keys must enable light schema change" + } + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `c1` int(11) NULL, + `c2` int(11) NULL, + `c3` int(11) NULL + ) unique KEY(`c1`) + cluster by(`c3`, `c2`) + PARTITION BY RANGE(`c1`) + ( + PARTITION `p_10000` VALUES [("0"), ("10000")) + ) + DISTRIBUTED BY HASH(`c1`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ); + """ + + sql """ INSERT INTO ${tableName} VALUES (11, 28, 38), (10, 29, 39) """ + qt_select_original """select * from ${tableName}""" + + /****** add value column ******/ + // after cluster key + sql """ alter table ${tableName} ADD column c4 int(11) after c3; """ + assertTrue(getAlterTableState(), "add column should success") + sql """ INSERT INTO ${tableName}(c1, c2, c3, c4) VALUES (13, 27, 36, 40), (12, 26, 37, 40) """ + qt_select_add_c4 """select * from ${tableName}""" + + // before cluster key + sql """ alter table ${tableName} ADD column c5 int(11) after c1; """ + assertTrue(getAlterTableState(), "add column should success") + sql """ INSERT INTO ${tableName}(c1, c2, c3, c4, c5) VALUES (15, 20, 34, 40, 50), (14, 20, 35, 40, 50) """ + qt_select_add_c5 """select * from ${tableName}""" + + // in the middle of cluster key + sql """ alter table ${tableName} ADD column c6 int(11) after c2; """ + assertTrue(getAlterTableState(), "add column should success") + sql """ INSERT INTO ${tableName}(c1, c2, c3, c4, c5, c6) VALUES (17, 20, 32, 40, 50, 60), (16, 20, 33, 40, 50, 60) """ + qt_select_add_c6 """select * from ${tableName}""" + + /****** add key column ******/ + sql """ alter table ${tableName} ADD column k2 int(11) key after c1; """ + assertTrue(getAlterTableState(), "add column should success") + sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (19, 20, 30, 200), (18, 20, 31, 200) """ + qt_select_add_k2 """select * from ${tableName}""" + + /****** TODO add cluster key column is not supported ******/ + + /****** drop value column ******/ + sql """ alter table ${tableName} drop column c4; """ + assertTrue(getAlterTableState(), "drop column should success") + sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (119, 20, 30, 200), (118, 20, 31, 200) """ + qt_select_drop_c4 """select * from ${tableName}""" + + sql """ alter table ${tableName} drop column c5; """ + assertTrue(getAlterTableState(), "drop column should success") + sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (117, 20, 32, 200), (116, 20, 33, 200) """ + qt_select_drop_c5 """select * from ${tableName}""" + + sql """ alter table ${tableName} drop column c6; """ + assertTrue(getAlterTableState(), "drop column should success") + sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (115, 25, 34, 200), (114, 24, 35, 200) """ + qt_select_drop_c6 """select * from ${tableName}""" + + /****** drop key column ******/ + test { + sql """ alter table ${tableName} drop column k2; """ + exception "Can not drop key column in Unique data model table" + } + + /****** TODO does not support drop cluster key ******/ + test { + sql """ alter table ${tableName} drop column c3; """ + exception "Can not drop cluster key column in Unique data model table" + } + + /****** reorder ******/ + sql """ alter table ${tableName} order by(c1, k2, c3, c2); """ + assertTrue(getAlterTableState(), "reorder should success") + sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (113, 23, 36, 200), (112, 22, 37, 200) """ + qt_select_reorder """select * from ${tableName}""" + + /****** modify key column data type ******/ + sql """ alter table ${tableName} modify column k2 BIGINT key; """ + assertTrue(getAlterTableState(), "modify should success") + sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (111, 21, 38, 200), (110, 20, 39, 200) """ + qt_select_modify_k2 """select * from ${tableName}""" + + /****** TODO does not support modify cluster key column data type ******/ + test { + sql """ alter table ${tableName} modify column c2 BIGINT; """ + exception "Can not modify cluster key column" + } + + /****** create mv ******/ + def mv_name = "k2_c3" + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name}""" + createMV """ create materialized view ${mv_name} as select c1, c3 from ${tableName}; """ + sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (211, 21, 38, 200), (210, 20, 39, 200) """ + qt_select_create_mv_base """select * from ${tableName}""" + qt_select_create_mv_mv """select c1, c3 from ${tableName}""" + + /****** create rollup ******/ + sql """ alter table ${tableName} ADD ROLLUP r1(k2, c1, c2); """ + waitForSchemaChangeDone { + sql """show alter table rollup where tablename='${tableName}' order by createtime desc limit 1""" + time 600 + } + sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (311, 21, 38, 200), (310, 20, 39, 200) """ + qt_select_create_rollup_base """select * from ${tableName}""" + qt_select_create_rollup_roll """select k2, c1, c2 from ${tableName}""" + + /****** add partition ******/ + sql "ALTER TABLE ${tableName} ADD PARTITION p_20000 VALUES [('10000'), ('20000'));" + for (int i = 0; i < 10; i++) { + List<List<Object>> partitions = sql "show partitions from ${tableName};" + logger.info("partitions: ${partitions}") + if (partitions.size() < 2 && i < 10) { + sleep(50) + continue + } + assertEquals(partitions.size(), 2) + } + sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (10011, 21, 38, 200), (10010, 20, 39, 200) """ + qt_select_add_partition """select * from ${tableName}""" + + /****** one sql contain multi column changes ******/ + + /****** truncate table ******/ + sql """ TRUNCATE TABLE ${tableName} """ + sql """ INSERT INTO ${tableName}(c1, c2, c3) VALUES (11, 28, 38), (10, 29, 39), (12, 26, 37), (13, 27, 36) """ + qt_select_truncate """select * from ${tableName}""" + + /****** create table with rollup ******/ + tableName = tableName + "_rollup" + sql """ DROP TABLE IF EXISTS ${tableName}; """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(11) NULL, + `k2` int(11) NULL, + `c3` int(11) NULL, + `c4` int(11) NULL, + `c5` int(11) NULL + ) unique KEY(`k1`, `k2`) + cluster by(`c4`, `c5`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + ROLLUP ( + r1 (k2, k1, c4, c3) + ) + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ); + """ + sql """ INSERT INTO ${tableName} VALUES (11, 21, 32, 42, 52), (12, 22, 31, 41, 51); """ + qt_select_rollup_base """select * from ${tableName};""" + qt_select_rollup_roll """select k2, k1, c4, c3 from ${tableName};""" + + /****** specify index, not base index ******/ + sql """ ALTER TABLE ${tableName} ORDER BY(k2, k1, c3, c4) from r1; """ + assertTrue(getAlterTableState(), "reorder rollup should success") + qt_select_rollup_base_sc """select * from ${tableName};""" + qt_select_rollup_roll_sc """select k2, k1, c4, c3 from ${tableName};""" + sql """ INSERT INTO ${tableName} VALUES (13, 23, 34, 44, 54), (14, 24, 33, 43, 53); """ + qt_select_rollup_base_sc1 """select * from ${tableName};""" + qt_select_rollup_roll_sc1 """select k2, k1, c4, c3 from ${tableName};""" + + /****** backup restore ******/ + if (!isCloudMode()) { + def repoName = "repo_" + UUID.randomUUID().toString().replace("-", "") + def backup = tableName + "_bak" + def syncer = getSyncer() + syncer.createS3Repository(repoName) + def result = sql """ show tablets from ${tableName}; """ + logger.info("tablets 0: ${result}") + + // backup + sql """ BACKUP SNAPSHOT ${context.dbName}.${backup} TO ${repoName} ON (${tableName}) properties("type"="full"); """ + syncer.waitSnapshotFinish() + def snapshot = syncer.getSnapshotTimestamp(repoName, backup) + assertTrue(snapshot != null) + sql """ INSERT INTO ${tableName} VALUES (15, 25, 34, 44, 54), (16, 26, 33, 43, 53); """ + qt_select_restore_base2 """select * from ${tableName};""" + qt_select_restore_roll2 """select k2, k1, c4, c3 from ${tableName};""" + + // restore + logger.info(""" RESTORE SNAPSHOT ${context.dbName}.${backup} FROM `${repoName}` ON (`${tableName}`) PROPERTIES ("backup_timestamp" = "${snapshot}","replication_num" = "1" ) """) + sql """ RESTORE SNAPSHOT ${context.dbName}.${backup} FROM `${repoName}` ON (`${tableName}`) PROPERTIES ("backup_timestamp" = "${snapshot}","replication_num" = "1" ) """ + syncer.waitAllRestoreFinish(context.dbName) + result = sql """ show tablets from ${tableName}; """ + logger.info("tablets 1: ${result}") + qt_select_restore_base """select * from ${tableName};""" + qt_select_restore_roll """select k2, k1, c4, c3 from ${tableName};""" + sql """ INSERT INTO ${tableName} VALUES (17, 27, 34, 44, 54), (18, 28, 33, 43, 53); """ + qt_select_restore_base1 """select * from ${tableName};""" + qt_select_restore_roll1 """select k2, k1, c4, c3 from ${tableName};""" + + // restore + sql """ drop table ${tableName}; """ + sql """ RESTORE SNAPSHOT ${context.dbName}.${backup} FROM `${repoName}` ON (`${tableName}`) PROPERTIES ("backup_timestamp" = "${snapshot}","replication_num" = "1" ) """ + syncer.waitAllRestoreFinish(context.dbName) + result = sql """ show tablets from ${tableName}; """ + logger.info("tablets 2: ${result}") + qt_select_restore_base2 """select * from ${tableName};""" + qt_select_restore_roll2 """select k2, k1, c4, c3 from ${tableName};""" + sql """ INSERT INTO ${tableName} VALUES (17, 27, 34, 44, 54), (18, 28, 33, 43, 53); """ + qt_select_restore_base3 """select * from ${tableName};""" + qt_select_restore_roll4 """select k2, k1, c4, c3 from ${tableName};""" + + sql "DROP REPOSITORY `${repoName}`" + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org