This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new af9529a [Dynamic Partition] Support for automatically adding partitions af9529a is described below commit af9529a2071415830447ff8d1a417c8f542b8aed Author: WingC <1018957...@qq.com> AuthorDate: Fri Jan 3 09:45:04 2020 -0600 [Dynamic Partition] Support for automatically adding partitions In some scenarios, when a user creates an olap table that is range partition by time, the user needs to periodically add and remove partitions to ensure that the data is valid. As a result, adding and removing partitions dynamically can be very useful for users. --- fe/src/main/cup/sql_parser.cup | 9 +- fe/src/main/java/org/apache/doris/alter/Alter.java | 4 + .../apache/doris/alter/SchemaChangeHandler.java | 4 + .../analysis/ModifyTablePropertiesClause.java | 7 +- .../doris/analysis/ShowDynamicPartitionStmt.java | 87 ++++ .../java/org/apache/doris/catalog/Catalog.java | 72 ++- .../doris/catalog/DynamicPartitionProperty.java | 82 ++++ .../java/org/apache/doris/catalog/OlapTable.java | 35 ++ .../org/apache/doris/catalog/TableProperty.java | 89 ++++ .../doris/clone/DynamicPartitionScheduler.java | 292 +++++++++++++ .../main/java/org/apache/doris/common/Config.java | 12 + .../java/org/apache/doris/common/ErrorCode.java | 20 +- .../org/apache/doris/common/FeMetaVersion.java | 6 +- .../doris/common/util/DynamicPartitionUtil.java | 267 ++++++++++++ .../org/apache/doris/journal/JournalEntity.java | 6 + .../java/org/apache/doris/persist/EditLog.java | 11 +- .../doris/persist/ModifyDynamicPartitionInfo.java | 66 +++ .../org/apache/doris/persist/OperationType.java | 3 + .../java/org/apache/doris/qe/ShowExecutor.java | 49 +++ fe/src/main/jflex/sql_scanner.flex | 1 + .../apache/doris/alter/SchemaChangeJobV2Test.java | 95 +++- .../doris/catalog/DynamicPartitionTableTest.java | 482 +++++++++++++++++++++ .../java/org/apache/doris/catalog/FakeEditLog.java | 6 + .../apache/doris/catalog/TablePropertyTest.java | 74 ++++ .../persist/ModifyDynamicPartitionInfoTest.java | 68 +++ 25 files changed, 1836 insertions(+), 11 deletions(-) diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup index 9cc533a..9593dc9 100644 --- a/fe/src/main/cup/sql_parser.cup +++ b/fe/src/main/cup/sql_parser.cup @@ -196,7 +196,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMMIT, KW_COMMITTED, KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_COUNT, KW_CREATE, KW_CROSS, KW_CURRENT, KW_CURRENT_USER, KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_TIME, KW_DECIMAL, KW_DECOMMISSION, KW_DEFAULT, KW_DESC, KW_DESCRIBE, - KW_DELETE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DISTRIBUTED, KW_DISTRIBUTION, KW_BUCKETS, KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, KW_DUPLICATE, + KW_DELETE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DISTRIBUTED, KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS, KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, KW_DUPLICATE, KW_ELSE, KW_END, KW_ENGINE, KW_ENGINES, KW_ENTER, KW_ERRORS, KW_EVENTS, KW_EXISTS, KW_EXPORT, KW_EXTERNAL, KW_EXTRACT, KW_FALSE, KW_FOLLOWER, KW_FOLLOWING, KW_FREE, KW_FROM, KW_FILE, KW_FIRST, KW_FLOAT, KW_FOR, KW_FORMAT, KW_FRONTEND, KW_FRONTENDS, KW_FULL, KW_FUNCTION, KW_GLOBAL, KW_GRANT, KW_GRANTS, KW_GROUP, @@ -1939,6 +1939,11 @@ show_param ::= {: RESULT = new ShowDbStmt(parser.wild, parser.where); :} + /* Dynamic Partition */ + | KW_DYNAMIC KW_PARTITION KW_TABLES opt_db:db + {: + RESULT = new ShowDynamicPartitionStmt(db); + :} /* Columns */ | opt_full KW_COLUMNS from_or_in table_name:table opt_db:db opt_wild_where {: @@ -4341,6 +4346,8 @@ keyword ::= {: RESULT = id; :} | KW_STOP:id {: RESULT = id; :} + | KW_DYNAMIC:id + {: RESULT = id; :} ; // Identifier that contain keyword diff --git a/fe/src/main/java/org/apache/doris/alter/Alter.java b/fe/src/main/java/org/apache/doris/alter/Alter.java index 062ad68..2767106 100644 --- a/fe/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/src/main/java/org/apache/doris/alter/Alter.java @@ -56,6 +56,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.persist.AlterViewInfo; import org.apache.doris.qe.ConnectContext; @@ -298,8 +299,10 @@ public class Alter { Preconditions.checkState(alterClauses.size() == 1); AlterClause alterClause = alterClauses.get(0); if (alterClause instanceof DropPartitionClause) { + DynamicPartitionUtil.checkAlterAllowed(olapTable); Catalog.getInstance().dropPartition(db, olapTable, ((DropPartitionClause) alterClause)); } else if (alterClause instanceof ModifyPartitionClause) { + DynamicPartitionUtil.checkAlterAllowed(olapTable); Catalog.getInstance().modifyPartition(db, olapTable, ((ModifyPartitionClause) alterClause)); } else { hasAddPartition = true; @@ -316,6 +319,7 @@ public class Alter { Preconditions.checkState(alterClauses.size() == 1); AlterClause alterClause = alterClauses.get(0); if (alterClause instanceof AddPartitionClause) { + DynamicPartitionUtil.checkAlterAllowed((OlapTable) db.getTable(tableName)); Catalog.getInstance().addPartition(db, tableName, (AddPartitionClause) alterClause); } else { Preconditions.checkState(false); diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 152b71a..92edff6 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -60,6 +60,7 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; @@ -1341,6 +1342,9 @@ public class SchemaChangeHandler extends AlterHandler { */ sendClearAlterTask(db, olapTable); return; + } else if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) { + Catalog.getCurrentCatalog().modifyTableDynamicPartition(db, olapTable, properties); + return; } } diff --git a/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java b/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java index c7678dd..e9b3018 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java +++ b/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java @@ -17,8 +17,10 @@ package org.apache.doris.analysis; +import org.apache.doris.catalog.TableProperty; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.PropertyAnalyzer; @@ -39,7 +41,8 @@ public class ModifyTablePropertiesClause extends AlterTableClause { throw new AnalysisException("Properties is not set"); } - if (properties.size() != 1) { + if (properties.size() != 1 + && !TableProperty.isSamePrefixProperties(properties, TableProperty.DYNAMIC_PARTITION_PROPERTY_PREFIX)) { throw new AnalysisException("Can only set one table property at a time"); } @@ -71,6 +74,8 @@ public class ModifyTablePropertiesClause extends AlterTableClause { throw new AnalysisException( "Property " + PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT + " should be v2"); } + } else if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) { + // do nothing, dynamic properties will be analyzed in SchemaChangeHandler.process } else { throw new AnalysisException("Unknown table property: " + properties.keySet()); } diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowDynamicPartitionStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowDynamicPartitionStmt.java new file mode 100644 index 0000000..30bfaa4 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/analysis/ShowDynamicPartitionStmt.java @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import com.google.common.base.Strings; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.qe.ShowResultSetMetaData; + +public class ShowDynamicPartitionStmt extends ShowStmt { + private String db; + private static final ShowResultSetMetaData SHOW_DYNAMIC_PARTITION_META_DATA = + ShowResultSetMetaData.builder() + .addColumn(new Column("TableName", ScalarType.createVarchar(20))) + .addColumn(new Column("Enable", ScalarType.createVarchar(20))) + .addColumn(new Column("TimeUnit", ScalarType.createVarchar(20))) + .addColumn(new Column("End", ScalarType.createVarchar(20))) + .addColumn(new Column("Prefix", ScalarType.createVarchar(20))) + .addColumn(new Column("Buckets", ScalarType.createVarchar(20))) + .addColumn(new Column("LastUpdateTime", ScalarType.createVarchar(20))) + .addColumn(new Column("LastSchedulerTime", ScalarType.createVarchar(20))) + .addColumn(new Column("State", ScalarType.createVarchar(20))) + .addColumn(new Column("Msg", ScalarType.createVarchar(20))) + .build(); + + ShowDynamicPartitionStmt(String db) { + this.db = db; + } + + public String getDb() { + return db; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + if (Strings.isNullOrEmpty(db)) { + db = analyzer.getDefaultDb(); + if (Strings.isNullOrEmpty(db)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); + } + } else { + db = ClusterNamespace.getFullName(analyzer.getClusterName(), db); + } + + // we do not check db privs here. because user may not have any db privs, + // but if it has privs of tbls inside this db,it should be allowed to see this db. + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("SHOW DYNAMIC PARTITION TABLES"); + if (!Strings.isNullOrEmpty(db)) { + sb.append(" FROM ").append(db); + } + return sb.toString(); + } + + @Override + public String toString() { + return toSql(); + } + + @Override + public ShowResultSetMetaData getMetaData() { + return SHOW_DYNAMIC_PARTITION_META_DATA; + } +} \ No newline at end of file diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 08a06ed..704193d 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -85,6 +85,7 @@ import org.apache.doris.catalog.OlapTable.OlapTableState; import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.catalog.Table.TableType; import org.apache.doris.clone.ColocateTableBalancer; +import org.apache.doris.clone.DynamicPartitionScheduler; import org.apache.doris.clone.TabletChecker; import org.apache.doris.clone.TabletScheduler; import org.apache.doris.clone.TabletSchedulerStat; @@ -104,12 +105,14 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.Daemon; +import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.KuduUtil; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.QueryableReentrantLock; import org.apache.doris.common.util.SmallFileMgr; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; import org.apache.doris.consistency.ConsistencyChecker; import org.apache.doris.deploy.DeployManager; @@ -154,6 +157,7 @@ import org.apache.doris.persist.DatabaseInfo; import org.apache.doris.persist.DropInfo; import org.apache.doris.persist.DropLinkDbAndUpdateDbInfo; import org.apache.doris.persist.DropPartitionInfo; +import org.apache.doris.persist.ModifyDynamicPartitionInfo; import org.apache.doris.persist.EditLog; import org.apache.doris.persist.ModifyPartitionInfo; import org.apache.doris.persist.PartitionPersistInfo; @@ -370,6 +374,8 @@ public class Catalog { private SmallFileMgr smallFileMgr; + private DynamicPartitionScheduler dynamicPartitionScheduler; + public List<Frontend> getFrontends(FrontendNodeType nodeType) { if (nodeType == null) { // get all @@ -418,6 +424,10 @@ public class Catalog { return metaReplayState; } + public DynamicPartitionScheduler getDynamicPartitionScheduler() { + return this.dynamicPartitionScheduler; + } + private static class SingletonHolder { private static final Catalog INSTANCE = new Catalog(); } @@ -491,6 +501,9 @@ public class Catalog { this.routineLoadTaskScheduler = new RoutineLoadTaskScheduler(routineLoadManager); this.smallFileMgr = new SmallFileMgr(); + + this.dynamicPartitionScheduler = new DynamicPartitionScheduler("DynamicPartitionScheduler", + Config.dynamic_partition_check_interval_seconds * 1000L); } public static void destroyCheckpoint() { @@ -1178,6 +1191,8 @@ public class Catalog { // start routine load scheduler routineLoadScheduler.start(); routineLoadTaskScheduler.start(); + // start dynamic partition task + dynamicPartitionScheduler.start(); } // start threads that should running on all FE @@ -3090,6 +3105,7 @@ public class Catalog { } public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause clause) throws DdlException { + DynamicPartitionUtil.checkAlterAllowed(olapTable); Preconditions.checkArgument(db.isWriteLockHeldByCurrentThread()); String partitionName = clause.getPartitionName(); @@ -3387,6 +3403,9 @@ public class Catalog { } partitionInfo = partitionDesc.toPartitionInfo(baseSchema, partitionNameToId); } else { + if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(stmt.getProperties())) { + throw new DdlException("Only support dynamic partition properties on range partition table"); + } long partitionId = getNextId(); // use table name as single partition name partitionNameToId.put(tableName, partitionId); @@ -3556,6 +3575,8 @@ public class Catalog { PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(), DataProperty.DEFAULT_HDD_DATA_PROPERTY); PropertyAnalyzer.analyzeReplicationNum(properties, FeConstants.default_replication_num); + DynamicPartitionUtil.checkAndSetDynamicPartitionProperty(olapTable, properties); + if (properties != null && !properties.isEmpty()) { // here, all properties should be checked throw new DdlException("Unknown properties: " + properties); @@ -3588,7 +3609,7 @@ public class Catalog { if (!db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists())) { ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exists"); } - + // we have added these index to memory, only need to persist here if (getColocateTableIndex().isColocateTable(tableId)) { GroupId groupId = getColocateTableIndex().getGroup(tableId); @@ -3596,8 +3617,11 @@ public class Catalog { ColocatePersistInfo info = ColocatePersistInfo.createForAddTable(groupId, tableId, backendsPerBucketSeq); editLog.logColocateAddTable(info); } - LOG.info("successfully create table[{};{}]", tableName, tableId); + // register or remove table from DynamicPartition after table created + DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable); + dynamicPartitionScheduler.createOrUpdateRuntimeInfo( + tableName, DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime()); } catch (DdlException e) { for (Long tabletId : tabletIdSet) { Catalog.getCurrentInvertedIndex().deleteTablet(tabletId); @@ -3879,6 +3903,11 @@ public class Catalog { sb.append(colocateTable).append("\""); } + // 6. dynamic partition + if (olapTable.dynamicPartitionExists()) { + sb.append(olapTable.getTableProperty().getDynamicPartitionProperty().toString()); + } + sb.append("\n)"); } else if (table.getType() == TableType.MYSQL) { MysqlTable mysqlTable = (MysqlTable) table; @@ -4033,6 +4062,7 @@ public class Catalog { } } } // end for partitions + DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(dbId, olapTable); } } } @@ -5057,6 +5087,42 @@ public class Catalog { throw new DdlException("not implmented"); } + public void modifyTableDynamicPartition(Database db, OlapTable table, Map<String, String> properties) throws DdlException { + TableProperty tableProperty = table.getTableProperty(); + if (tableProperty == null) { + DynamicPartitionUtil.checkAndSetDynamicPartitionProperty(table, properties); + } else { + Map<String, String> analyzedDynamicPartition = DynamicPartitionUtil.analyzeDynamicPartition(properties); + tableProperty.modifyTableProperties(analyzedDynamicPartition); + } + + DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), table); + dynamicPartitionScheduler.createOrUpdateRuntimeInfo( + table.getName(), DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime()); + ModifyDynamicPartitionInfo info = new ModifyDynamicPartitionInfo(db.getId(), table.getId(), table.getTableProperty().getProperties()); + editLog.logDynamicPartition(info); + } + + public void replayModifyTableDynamicPartition(ModifyDynamicPartitionInfo info) { + long dbId = info.getDbId(); + long tableId = info.getTableId(); + Map<String, String> properties = info.getProperties(); + + Database db = getDb(dbId); + db.writeLock(); + try { + OlapTable olapTable = (OlapTable) db.getTable(tableId); + TableProperty tableProperty = olapTable.getTableProperty(); + if (tableProperty == null) { + olapTable.setTableProperty(new TableProperty(properties).buildDynamicProperty()); + } else { + tableProperty.modifyTableProperties(properties); + } + } finally { + db.writeUnlock(); + } + } + /* * used for handling AlterClusterStmt * (for client is the ALTER CLUSTER command). @@ -6193,7 +6259,7 @@ public class Catalog { throw new DdlException("Table " + tbl.getName() + " is not random distributed"); } TableInfo tableInfo = TableInfo.createForModifyDistribution(db.getId(), tbl.getId()); - editLog.logModifyDitrubutionType(tableInfo); + editLog.logModifyDistributionType(tableInfo); LOG.info("finished to modify distribution type of table: " + tbl.getName()); } finally { db.writeUnlock(); diff --git a/fe/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java b/fe/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java new file mode 100644 index 0000000..7906cae --- /dev/null +++ b/fe/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import java.util.Map; + +public class DynamicPartitionProperty{ + public static final String TIME_UNIT = "dynamic_partition.time_unit"; + public static final String END = "dynamic_partition.end"; + public static final String PREFIX = "dynamic_partition.prefix"; + public static final String BUCKETS = "dynamic_partition.buckets"; + public static final String ENABLE = "dynamic_partition.enable"; + + private boolean exist; + + private boolean enable; + private String timeUnit; + private int end; + private String prefix; + private int buckets; + + DynamicPartitionProperty(Map<String ,String> properties) { + if (properties != null && !properties.isEmpty()) { + this.exist = true; + this.enable = Boolean.parseBoolean(properties.get(ENABLE)); + this.timeUnit = properties.get(TIME_UNIT); + this.end = Integer.parseInt(properties.get(END)); + this.prefix = properties.get(PREFIX); + this.buckets = Integer.parseInt(properties.get(BUCKETS)); + } else { + this.exist = false; + } + } + + public boolean isExist() { + return exist; + } + + public String getTimeUnit() { + return timeUnit; + } + + public int getEnd() { + return end; + } + + public String getPrefix() { + return prefix; + } + + public int getBuckets() { + return buckets; + } + + public boolean getEnable() { + return enable; + } + + @Override + public String toString() { + return ",\n\"" + ENABLE + "\" = \"" + enable + "\"" + + ",\n\"" + TIME_UNIT + "\" = \"" + timeUnit + "\"" + + ",\n\"" + END + "\" = \"" + end + "\"" + + ",\n\"" + PREFIX + "\" = \"" + prefix + "\"" + + ",\n\"" + BUCKETS + "\" = \"" + buckets + "\""; + } +} diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java index ea2b3d9..459d023 100644 --- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -123,6 +123,8 @@ public class OlapTable extends Table { // The init value is -1, which means there is not partition and index at all. private long baseIndexId = -1; + private TableProperty tableProperty; + public OlapTable() { // for persist super(TableType.OLAP); @@ -144,6 +146,8 @@ public class OlapTable extends Table { this.colocateGroup = null; this.indexes = null; + + this.tableProperty = null; } public OlapTable(long id, String tableName, List<Column> baseSchema, KeysType keysType, @@ -177,11 +181,28 @@ public class OlapTable extends Table { this.bfFpp = 0; this.colocateGroup = null; + if (indexes == null) { this.indexes = null; } else { this.indexes = indexes; } + + this.tableProperty = null; + } + + public void setTableProperty(TableProperty tableProperty) { + this.tableProperty = tableProperty; + } + + public TableProperty getTableProperty() { + return this.tableProperty; + } + + public boolean dynamicPartitionExists() { + return tableProperty != null + && tableProperty.getDynamicPartitionProperty() != null + && tableProperty.getDynamicPartitionProperty().isExist(); } public void setBaseIndexId(long baseIndexId) { @@ -876,6 +897,14 @@ public class OlapTable extends Table { } else { out.writeBoolean(false); } + + //dynamicProperties + if (tableProperty == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + tableProperty.write(out); + } } public void readFields(DataInput in) throws IOException { @@ -976,6 +1005,12 @@ public class OlapTable extends Table { this.indexes = TableIndexes.read(in); } } + // dynamic partition + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_71) { + if (in.readBoolean()) { + tableProperty = TableProperty.read(in); + } + } } public boolean equals(Table table) { diff --git a/fe/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/src/main/java/org/apache/doris/catalog/TableProperty.java new file mode 100644 index 0000000..ed9d966 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import com.google.gson.annotations.SerializedName; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** TableProperty contains additional information about OlapTable + * TableProperty includes properties to persistent the additional information + * Different properties is recognized by prefix such as dynamic_partition + * If there is different type properties is added.Write a method such as buildDynamicProperty to build it. + */ +public class TableProperty implements Writable { + public static final String DYNAMIC_PARTITION_PROPERTY_PREFIX = "dynamic_partition"; + + @SerializedName(value = "properties") + private Map<String, String> properties; + + private DynamicPartitionProperty dynamicPartitionProperty; + + public TableProperty(Map<String, String> properties) { + this.properties = properties; + } + + public static boolean isSamePrefixProperties(Map<String, String> properties, String prefix) { + for (String value : properties.keySet()) { + if (!value.startsWith(prefix)) { + return false; + } + } + return true; + } + + public TableProperty buildDynamicProperty() { + HashMap<String, String> dynamicPartitionProperties = new HashMap<>(); + for (Map.Entry<String, String> entry : properties.entrySet()) { + if (entry.getKey().startsWith(DYNAMIC_PARTITION_PROPERTY_PREFIX)) { + dynamicPartitionProperties.put(entry.getKey(), entry.getValue()); + } + } + dynamicPartitionProperty = new DynamicPartitionProperty(dynamicPartitionProperties); + return this; + } + + void modifyTableProperties(Map<String, String> modifyProperties) { + properties.putAll(modifyProperties); + buildDynamicProperty(); + } + + public Map<String, String> getProperties() { + return properties; + } + + public DynamicPartitionProperty getDynamicPartitionProperty() { + return dynamicPartitionProperty; + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static TableProperty read(DataInput in) throws IOException { + return GsonUtils.GSON.fromJson(Text.readString(in), TableProperty.class).buildDynamicProperty(); + } +} diff --git a/fe/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java new file mode 100644 index 0000000..19b87d3 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; +import org.apache.doris.analysis.AddPartitionClause; +import org.apache.doris.analysis.DistributionDesc; +import org.apache.doris.analysis.HashDistributionDesc; +import org.apache.doris.analysis.PartitionKeyDesc; +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.analysis.SingleRangePartitionDesc; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DynamicPartitionProperty; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableProperty; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.Pair; +import org.apache.doris.common.util.DynamicPartitionUtil; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.common.util.TimeUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * This class is used to periodically add or drop partition on an olapTable which specify dynamic partition properties + * Config.dynamic_partition_enable determine whether this feature is enable, Config.dynamic_partition_check_interval_seconds + * determine how often the task is performed + */ +public class DynamicPartitionScheduler extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(DynamicPartitionScheduler.class); + public static final String LAST_SCHEDULER_TIME = "lastSchedulerTime"; + public static final String LAST_UPDATE_TIME = "lastUpdateTime"; + public static final String DYNAMIC_PARTITION_STATE = "dynamicPartitionState"; + public static final String MSG = "msg"; + + private final String DEFAULT_RUNTIME_VALUE = "N/A"; + + private Map<String, Map<String, String>> runtimeInfos = Maps.newConcurrentMap(); + private Set<Pair<Long, Long>> dynamicPartitionTableInfo = Sets.newConcurrentHashSet(); + private boolean initialize; + + public enum State { + NORMAL, + ERROR + } + + + public DynamicPartitionScheduler(String name, long intervalMs) { + super(name, intervalMs); + this.initialize = false; + } + + public void registerDynamicPartitionTable(Long dbId, Long tableId) { + dynamicPartitionTableInfo.add(new Pair<>(dbId, tableId)); + } + + public void removeDynamicPartitionTable(Long dbId, Long tableId) { + dynamicPartitionTableInfo.remove(new Pair<>(dbId, tableId)); + } + + public String getRuntimeInfo(String tableName, String key) { + Map<String, String> tableRuntimeInfo = runtimeInfos.getOrDefault(tableName, createDefaultRuntimeInfo()); + return tableRuntimeInfo.getOrDefault(key, DEFAULT_RUNTIME_VALUE); + } + + public void removeRuntimeInfo(String tableName) { + runtimeInfos.remove(tableName); + } + + public void createOrUpdateRuntimeInfo(String tableName, String key, String value) { + Map<String, String> runtimeInfo = runtimeInfos.get(tableName); + if (runtimeInfo == null) { + runtimeInfo = createDefaultRuntimeInfo(); + runtimeInfo.put(key, value); + runtimeInfos.put(tableName, runtimeInfo); + } else { + runtimeInfo.put(key, value); + } + } + + private Map<String, String> createDefaultRuntimeInfo() { + Map<String, String> defaultRuntimeInfo = Maps.newConcurrentMap(); + defaultRuntimeInfo.put(LAST_UPDATE_TIME, DEFAULT_RUNTIME_VALUE); + defaultRuntimeInfo.put(LAST_SCHEDULER_TIME, DEFAULT_RUNTIME_VALUE); + defaultRuntimeInfo.put(DYNAMIC_PARTITION_STATE, State.NORMAL.toString()); + defaultRuntimeInfo.put(MSG, DEFAULT_RUNTIME_VALUE); + return defaultRuntimeInfo; + } + + private void dynamicAddPartition() { + Iterator<Pair<Long, Long>> iterator = dynamicPartitionTableInfo.iterator(); + while (iterator.hasNext()) { + Pair<Long, Long> tableInfo = iterator.next(); + Long dbId = tableInfo.first; + Long tableId = tableInfo.second; + Database db = Catalog.getInstance().getDb(dbId); + if (db == null) { + iterator.remove(); + continue; + } + String tableName; + ArrayList<AddPartitionClause> addPartitionClauses = new ArrayList<>(); + db.readLock(); + try { + // Only OlapTable has DynamicPartitionProperty + OlapTable olapTable = (OlapTable) db.getTable(tableId); + if (olapTable == null + || !olapTable.dynamicPartitionExists() + || !olapTable.getTableProperty().getDynamicPartitionProperty().getEnable()) { + iterator.remove(); + continue; + } + + if (olapTable.getState() != OlapTable.OlapTableState.NORMAL) { + String errorMsg = "Table[" + olapTable.getName() + "]'s state is not NORMAL." + + "Do not allow doing dynamic add partition. table state=" + olapTable.getState(); + recordFailedMsg(olapTable.getName(), errorMsg); + LOG.info(errorMsg); + continue; + } + + // Determine the partition column type + // if column type is Date, format partition name as yyyyMMdd + // if column type is DateTime, format partition name as yyyyMMddHHssmm + // scheduler time should be record even no partition added + createOrUpdateRuntimeInfo(olapTable.getName(), LAST_SCHEDULER_TIME, TimeUtils.getCurrentFormatTime()); + RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo(); + Column partitionColumn = rangePartitionInfo.getPartitionColumns().get(0); + String partitionFormat; + try { + partitionFormat = DynamicPartitionUtil.getPartitionFormat(partitionColumn); + } catch (DdlException e) { + recordFailedMsg(olapTable.getName(), e.getMessage()); + continue; + } + + Calendar calendar = Calendar.getInstance(); + TableProperty tableProperty = olapTable.getTableProperty(); + DynamicPartitionProperty dynamicPartitionProperty = tableProperty.getDynamicPartitionProperty(); + + for (int i = 0; i <= dynamicPartitionProperty.getEnd(); i++) { + String dynamicPartitionPrefix = dynamicPartitionProperty.getPrefix(); + String prevBorder = DynamicPartitionUtil.getPartitionRange(dynamicPartitionProperty.getTimeUnit(), + i, (Calendar) calendar.clone(), partitionFormat); + String partitionName = dynamicPartitionPrefix + DynamicPartitionUtil.getFormattedPartitionName(prevBorder); + + // continue if partition already exists + String nextBorder = DynamicPartitionUtil.getPartitionRange(dynamicPartitionProperty.getTimeUnit(), + i + 1, (Calendar) calendar.clone(), partitionFormat); + PartitionValue lowerValue = new PartitionValue(prevBorder); + PartitionValue upperValue = new PartitionValue(nextBorder); + PartitionInfo partitionInfo = olapTable.getPartitionInfo(); + RangePartitionInfo info = (RangePartitionInfo) (partitionInfo); + boolean isPartitionExists = false; + Range<PartitionKey> addPartitionKeyRange = null; + try { + PartitionKey lowerBound = PartitionKey.createPartitionKey(Collections.singletonList(lowerValue), Collections.singletonList(partitionColumn)); + PartitionKey upperBound = PartitionKey.createPartitionKey(Collections.singletonList(upperValue), Collections.singletonList(partitionColumn)); + addPartitionKeyRange = Range.closedOpen(lowerBound, upperBound); + } catch (AnalysisException e) { + // keys.size is always equal to column.size, cannot reach this exception + LOG.error("Keys size is not equl to column size."); + continue; + } + for (Range<PartitionKey> partitionKeyRange : info.getIdToRange().values()) { + // only support single column partition now + try { + RangePartitionInfo.checkRangeIntersect(partitionKeyRange, addPartitionKeyRange); + } catch (DdlException e) { + isPartitionExists = true; + if (addPartitionKeyRange.equals(partitionKeyRange)) { + clearFailedMsg(olapTable.getName()); + } else { + recordFailedMsg(olapTable.getName(), e.getMessage()); + } + break; + } + } + if (isPartitionExists) { + continue; + } + + // construct partition desc + PartitionKeyDesc partitionKeyDesc = new PartitionKeyDesc(Collections.singletonList(lowerValue), Collections.singletonList(upperValue)); + HashMap<String, String> partitionProperties = new HashMap<>(1); + partitionProperties.put("replication_num", String.valueOf(DynamicPartitionUtil.estimateReplicateNum(olapTable))); + SingleRangePartitionDesc rangePartitionDesc = new SingleRangePartitionDesc(true, partitionName, + partitionKeyDesc, partitionProperties); + + // construct distribution desc + HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) olapTable.getDefaultDistributionInfo(); + List<String> distColumnNames = new ArrayList<>(); + for (Column distributionColumn : hashDistributionInfo.getDistributionColumns()) { + distColumnNames.add(distributionColumn.getName()); + } + DistributionDesc distributionDesc = new HashDistributionDesc(dynamicPartitionProperty.getBuckets(), distColumnNames); + + // add partition according to partition desc and distribution desc + addPartitionClauses.add(new AddPartitionClause(rangePartitionDesc, distributionDesc, null)); + } + tableName = olapTable.getName(); + } finally { + db.readUnlock(); + } + for (AddPartitionClause addPartitionClause : addPartitionClauses) { + try { + Catalog.getCurrentCatalog().addPartition(db, tableName, addPartitionClause); + clearFailedMsg(tableName); + } catch (DdlException e) { + recordFailedMsg(tableName, e.getMessage()); + } + } + } + } + + private void recordFailedMsg(String tableName, String msg) { + LOG.warn("dynamic add partition failed: " + msg); + createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.ERROR.toString()); + createOrUpdateRuntimeInfo(tableName, MSG, msg); + } + + private void clearFailedMsg(String tableName) { + createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.NORMAL.toString()); + createOrUpdateRuntimeInfo(tableName, MSG, DEFAULT_RUNTIME_VALUE); + } + + private void initDynamicPartitionTable() { + for (Long dbId : Catalog.getInstance().getDbIds()) { + Database db = Catalog.getInstance().getDb(dbId); + if (db == null) { + continue; + } + db.readLock(); + try { + for (Table table : Catalog.getInstance().getDb(dbId).getTables()) { + if (DynamicPartitionUtil.isDynamicPartitionTable(table)) { + registerDynamicPartitionTable(db.getId(), table.getId()); + } + } + } finally { + db.readUnlock(); + } + } + initialize = true; + } + + @Override + protected void runAfterCatalogReady() { + if (!initialize) { + // check Dynamic Partition tables only when FE start + initDynamicPartitionTable(); + } + if (Config.dynamic_partition_enable) { + dynamicAddPartition(); + } + } +} \ No newline at end of file diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 753c5de..3aa4b95 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -945,5 +945,17 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true) public static boolean disable_cluster_feature = true; + + /* + * Decide how often to check dynamic partition + */ + @ConfField(mutable = true, masterOnly = true) + public static int dynamic_partition_check_interval_seconds = 600; + + /* + * If set to true, dynamic partition feature will open + */ + @ConfField(mutable = true, masterOnly = true) + public static boolean dynamic_partition_enable = false; } diff --git a/fe/src/main/java/org/apache/doris/common/ErrorCode.java b/fe/src/main/java/org/apache/doris/common/ErrorCode.java index 9b2ab9f..21a7204 100644 --- a/fe/src/main/java/org/apache/doris/common/ErrorCode.java +++ b/fe/src/main/java/org/apache/doris/common/ErrorCode.java @@ -214,7 +214,25 @@ public enum ErrorCode { "Colocate tables distribution columns must have the same data type: %s should be %s"), ERR_COLOCATE_NOT_COLOCATE_TABLE(5064, new byte[] { '4', '2', '0', '0', '0' }, "Table %s is not a colocated table"), - ERR_INVALID_OPERATION(5065, new byte[] { '4', '2', '0', '0', '0' }, "Operation %s is invalid"); + ERR_INVALID_OPERATION(5065, new byte[] { '4', '2', '0', '0', '0' }, "Operation %s is invalid"), + ERROR_DYNAMIC_PARTITION_TIME_UNIT(5065, new byte[] {'4', '2', '0', '0', '0'}, + "Unsupported time unit %s. Expect DAY WEEK MONTH."), + ERROR_DYNAMIC_PARTITION_END_ZERO(5066, new byte[] {'4', '2', '0', '0', '0'}, + "Dynamic partition end must greater than 0"), + ERROR_DYNAMIC_PARTITION_END_FORMAT(5066, new byte[] {'4', '2', '0', '0', '0'}, + "Invalid dynamic partition end %s"), + ERROR_DYNAMIC_PARTITION_END_EMPTY(5066, new byte[] {'4', '2', '0', '0', '0'}, + "Dynamic partition end is empty"), + ERROR_DYNAMIC_PARTITION_BUCKETS_ZERO(5067, new byte[] {'4', '2', '0', '0', '0'}, + "Dynamic partition buckets must greater than 0"), + ERROR_DYNAMIC_PARTITION_BUCKETS_FORMAT(5067, new byte[] {'4', '2', '0', '0', '0'}, + "Invalid dynamic partition buckets %s"), + ERROR_DYNAMIC_PARTITION_BUCKETS_EMPTY(5066, new byte[] {'4', '2', '0', '0', '0'}, + "Dynamic partition buckets is empty"), + ERROR_DYNAMIC_PARTITION_ENABLE(5068, new byte[] {'4', '2', '0', '0', '0'}, + "Invalid dynamic partition enable: %s. Expected true or false"), + ERROR_DYNAMIC_PARTITION_PREFIX(5069, new byte[] {'4', '2', '0', '0', '0'}, + "Invalid dynamic partition prefix: %s."); ErrorCode(int code, byte[] sqlState, String errorMsg) { this.code = code; diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index 834062e..2a9539c 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -147,10 +147,12 @@ public final class FeMetaVersion { public static final int VERSION_67 = 67; // for es table context public static final int VERSION_68 = 68; - // modofy password checking logic + // modify password checking logic public static final int VERSION_69 = 69; // for indexes public static final int VERSION_70 = 70; + // dynamic partition + public static final int VERSION_71 = 71; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_70; + public static final int VERSION_CURRENT = VERSION_71; } diff --git a/fe/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java b/fe/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java new file mode 100644 index 0000000..83f5097 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java @@ -0,0 +1,267 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.common.util; + +import com.google.common.base.Strings; +import org.apache.doris.analysis.TimestampArithmeticExpr.TimeUnit; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DynamicPartitionProperty; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableProperty; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.FeNameFormat; + +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.HashMap; +import java.util.Map; + +public class DynamicPartitionUtil { + private static final String TIMESTAMP_FORMAT = "yyyyMMdd"; + private static final String DATE_FORMAT = "yyyy-MM-dd"; + private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + public static void checkTimeUnit(String timeUnit) throws DdlException { + if (Strings.isNullOrEmpty(timeUnit) + || !(timeUnit.equalsIgnoreCase(TimeUnit.DAY.toString()) + || timeUnit.equalsIgnoreCase(TimeUnit.WEEK.toString()) + || timeUnit.equalsIgnoreCase(TimeUnit.MONTH.toString()))) { + ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_TIME_UNIT, timeUnit); + } + } + + private static void checkPrefix(String prefix) throws DdlException { + try { + FeNameFormat.checkPartitionName(prefix); + } catch (AnalysisException e) { + ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_PREFIX, prefix); + } + } + + private static void checkEnd(String end) throws DdlException { + if (Strings.isNullOrEmpty(end)) { + ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_END_EMPTY); + } + try { + if (Integer.parseInt(end) <= 0) { + ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_END_ZERO, end); + } + } catch (NumberFormatException e) { + ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_END_FORMAT, end); + } + } + + private static void checkBuckets(String buckets) throws DdlException { + if (Strings.isNullOrEmpty(buckets)) { + ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_BUCKETS_EMPTY); + } + try { + if (Integer.parseInt(buckets) <= 0) { + ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_BUCKETS_ZERO, buckets); + } + } catch (NumberFormatException e) { + ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_BUCKETS_FORMAT, buckets); + } + } + + private static void checkEnable(String enable) throws DdlException { + if (Strings.isNullOrEmpty(enable) + || (!Boolean.TRUE.toString().equalsIgnoreCase(enable) && !Boolean.FALSE.toString().equalsIgnoreCase(enable))) { + ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_ENABLE, enable); + } + } + + public static boolean checkDynamicPartitionPropertiesExist(Map<String, String> properties) { + if (properties == null) { + return false; + } + return properties.containsKey(DynamicPartitionProperty.TIME_UNIT) || + properties.containsKey(DynamicPartitionProperty.END) || + properties.containsKey(DynamicPartitionProperty.PREFIX) || + properties.containsKey(DynamicPartitionProperty.BUCKETS) || + properties.containsKey(DynamicPartitionProperty.ENABLE); + } + + public static boolean checkInputDynamicPartitionProperties(Map<String, String> properties, PartitionInfo partitionInfo) throws DdlException{ + if (properties == null || properties.isEmpty()) { + return false; + } + if (partitionInfo.getType() != PartitionType.RANGE || partitionInfo.isMultiColumnPartition()) { + throw new DdlException("Dynamic partition only support single-column range partition"); + } + String timeUnit = properties.get(DynamicPartitionProperty.TIME_UNIT); + String prefix = properties.get(DynamicPartitionProperty.PREFIX); + String end = properties.get(DynamicPartitionProperty.END); + String buckets = properties.get(DynamicPartitionProperty.BUCKETS); + String enable = properties.get(DynamicPartitionProperty.ENABLE); + if (!((Strings.isNullOrEmpty(enable) && + Strings.isNullOrEmpty(timeUnit) && + Strings.isNullOrEmpty(prefix) && + Strings.isNullOrEmpty(end) && + Strings.isNullOrEmpty(buckets)))) { + if (Strings.isNullOrEmpty(enable)) { + throw new DdlException("Must assign dynamic_partition.enable properties"); + } + if (Strings.isNullOrEmpty(timeUnit)) { + throw new DdlException("Must assign dynamic_partition.time_unit properties"); + } + if (Strings.isNullOrEmpty(prefix)) { + throw new DdlException("Must assign dynamic_partition.prefix properties"); + } + if (Strings.isNullOrEmpty(end)) { + throw new DdlException("Must assign dynamic_partition.end properties"); + } + if (Strings.isNullOrEmpty(buckets)) { + throw new DdlException("Must assign dynamic_partition.buckets properties"); + } + } + return true; + } + + public static void registerOrRemoveDynamicPartitionTable(long dbId, OlapTable olapTable) { + if (olapTable.getTableProperty() != null + && olapTable.getTableProperty().getDynamicPartitionProperty() != null) { + if (olapTable.getTableProperty().getDynamicPartitionProperty().getEnable()) { + Catalog.getCurrentCatalog().getDynamicPartitionScheduler().registerDynamicPartitionTable(dbId, olapTable.getId()); + } else { + Catalog.getCurrentCatalog().getDynamicPartitionScheduler().removeDynamicPartitionTable(dbId, olapTable.getId()); + } + } + } + + public static Map<String, String> analyzeDynamicPartition(Map<String, String> properties) throws DdlException { + // properties should not be empty, check properties before call this function + Map<String, String> analyzedProperties = new HashMap<>(); + if (properties.containsKey(DynamicPartitionProperty.TIME_UNIT)) { + String timeUnitValue = properties.get(DynamicPartitionProperty.TIME_UNIT); + checkTimeUnit(timeUnitValue); + properties.remove(DynamicPartitionProperty.TIME_UNIT); + analyzedProperties.put(DynamicPartitionProperty.TIME_UNIT, timeUnitValue); + } + if (properties.containsKey(DynamicPartitionProperty.PREFIX)) { + String prefixValue = properties.get(DynamicPartitionProperty.PREFIX); + checkPrefix(prefixValue); + properties.remove(DynamicPartitionProperty.PREFIX); + analyzedProperties.put(DynamicPartitionProperty.PREFIX, prefixValue); + } + if (properties.containsKey(DynamicPartitionProperty.END)) { + String endValue = properties.get(DynamicPartitionProperty.END); + checkEnd(endValue); + properties.remove(DynamicPartitionProperty.END); + analyzedProperties.put(DynamicPartitionProperty.END, endValue); + } + if (properties.containsKey(DynamicPartitionProperty.BUCKETS)) { + String bucketsValue = properties.get(DynamicPartitionProperty.BUCKETS); + checkBuckets(bucketsValue); + properties.remove(DynamicPartitionProperty.BUCKETS); + analyzedProperties.put(DynamicPartitionProperty.BUCKETS, bucketsValue); + } + if (properties.containsKey(DynamicPartitionProperty.ENABLE)) { + String enableValue = properties.get(DynamicPartitionProperty.ENABLE); + checkEnable(enableValue); + properties.remove(DynamicPartitionProperty.ENABLE); + analyzedProperties.put(DynamicPartitionProperty.ENABLE, enableValue); + } + return analyzedProperties; + } + + public static void checkAlterAllowed(OlapTable olapTable) throws DdlException { + TableProperty tableProperty = olapTable.getTableProperty(); + if (tableProperty != null && + tableProperty.getDynamicPartitionProperty().isExist() && + tableProperty.getDynamicPartitionProperty().getEnable()) { + throw new DdlException("Cannot modify partition on a Dynamic Partition Table, set `dynamic_partition.enable` to false firstly."); + } + } + + public static boolean isDynamicPartitionTable(Table table) { + if (!(table instanceof OlapTable) || + !(((OlapTable) table).getPartitionInfo().getType().equals(PartitionType.RANGE))) { + return false; + } + RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) ((OlapTable) table).getPartitionInfo(); + TableProperty tableProperty = ((OlapTable) table).getTableProperty(); + if (tableProperty == null || !tableProperty.getDynamicPartitionProperty().isExist()) { + return false; + } + + return rangePartitionInfo.getPartitionColumns().size() == 1 && tableProperty.getDynamicPartitionProperty().getEnable(); + } + + /** + * properties should be checked before call this method + */ + public static void checkAndSetDynamicPartitionProperty(OlapTable olapTable, Map<String, String> properties) throws DdlException { + if (DynamicPartitionUtil.checkInputDynamicPartitionProperties(properties, olapTable.getPartitionInfo())) { + Map<String, String> dynamicPartitionProperties = DynamicPartitionUtil.analyzeDynamicPartition(properties); + olapTable.setTableProperty(new TableProperty(dynamicPartitionProperties).buildDynamicProperty()); + } + } + + public static String getPartitionFormat(Column column) throws DdlException { + if (column.getDataType().equals(PrimitiveType.DATE)) { + return DATE_FORMAT; + } else if (column.getDataType().equals(PrimitiveType.DATETIME)) { + return DATETIME_FORMAT; + } else if (PrimitiveType.getIntegerTypes().contains(column.getDataType())) { + // TODO: For Integer Type, only support format it as yyyyMMdd now + return TIMESTAMP_FORMAT; + } else { + throw new DdlException("Dynamic Partition Only Support DATE, DATETIME and INTEGER Type Now."); + } + } + + public static String getFormattedPartitionName(String name) { + return name.replace("-", "").replace(":", "").replace(" ", ""); + } + + public static String getPartitionRange(String timeUnit, int offset, Calendar calendar, String format) { + if (timeUnit.equalsIgnoreCase(TimeUnit.DAY.toString())) { + calendar.add(Calendar.DAY_OF_MONTH, offset); + } else if (timeUnit.equalsIgnoreCase(TimeUnit.WEEK.toString())) { + calendar.add(Calendar.WEEK_OF_MONTH, offset); + } else { + calendar.add(Calendar.MONTH, offset); + } + SimpleDateFormat dateFormat = new SimpleDateFormat(format); + return dateFormat.format(calendar.getTime()); + } + + public static int estimateReplicateNum(OlapTable table) { + int replicateNum = 3; + long maxPartitionId = 0; + for (Partition partition: table.getPartitions()) { + if (partition.getId() > maxPartitionId) { + maxPartitionId = partition.getId(); + replicateNum = table.getPartitionInfo().getReplicationNum(partition.getId()); + } + } + return replicateNum; + } +} \ No newline at end of file diff --git a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java index 4282468..ddffb73 100644 --- a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -54,6 +54,7 @@ import org.apache.doris.persist.DatabaseInfo; import org.apache.doris.persist.DropInfo; import org.apache.doris.persist.DropLinkDbAndUpdateDbInfo; import org.apache.doris.persist.DropPartitionInfo; +import org.apache.doris.persist.ModifyDynamicPartitionInfo; import org.apache.doris.persist.HbPackage; import org.apache.doris.persist.ModifyPartitionInfo; import org.apache.doris.persist.OperationType; @@ -497,6 +498,11 @@ public class JournalEntity implements Writable { isRead = true; break; } + case OperationType.OP_DYNAMIC_PARTITION: { + data = ModifyDynamicPartitionInfo.read(in); + isRead = true; + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java b/fe/src/main/java/org/apache/doris/persist/EditLog.java index 3d6c016..2e26de1 100644 --- a/fe/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java @@ -697,6 +697,11 @@ public class EditLog { catalog.replayConvertDistributionType(tableInfo); break; } + case OperationType.OP_DYNAMIC_PARTITION: { + ModifyDynamicPartitionInfo modifyDynamicPartitionInfo = (ModifyDynamicPartitionInfo) journal.getData(); + catalog.replayModifyTableDynamicPartition(modifyDynamicPartitionInfo); + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); @@ -1195,7 +1200,11 @@ public class EditLog { logEdit(OperationType.OP_ALTER_JOB_V2, alterJob); } - public void logModifyDitrubutionType(TableInfo tableInfo) { + public void logModifyDistributionType(TableInfo tableInfo) { logEdit(OperationType.OP_MODIFY_DISTRIBUTION_TYPE, tableInfo); } + + public void logDynamicPartition(ModifyDynamicPartitionInfo info) { + logEdit(OperationType.OP_DYNAMIC_PARTITION, info); + } } diff --git a/fe/src/main/java/org/apache/doris/persist/ModifyDynamicPartitionInfo.java b/fe/src/main/java/org/apache/doris/persist/ModifyDynamicPartitionInfo.java new file mode 100644 index 0000000..90051be --- /dev/null +++ b/fe/src/main/java/org/apache/doris/persist/ModifyDynamicPartitionInfo.java @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import com.google.gson.annotations.SerializedName; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class ModifyDynamicPartitionInfo implements Writable { + + @SerializedName(value = "dbId") + private long dbId; + @SerializedName(value = "tableId") + private long tableId; + @SerializedName(value = "properties") + private Map<String, String> properties = new HashMap<>(); + + public ModifyDynamicPartitionInfo(long dbId, long tableId, Map<String, String> properties) { + this.dbId = dbId; + this.tableId = tableId; + this.properties = properties; + } + + public long getDbId() { + return dbId; + } + + public long getTableId() { + return tableId; + } + + public Map<String, String> getProperties() { + return properties; + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static ModifyDynamicPartitionInfo read(DataInput in) throws IOException { + return GsonUtils.GSON.fromJson(Text.readString(in), ModifyDynamicPartitionInfo.class); + } +} diff --git a/fe/src/main/java/org/apache/doris/persist/OperationType.java b/fe/src/main/java/org/apache/doris/persist/OperationType.java index f6a8352..0e117a0 100644 --- a/fe/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/src/main/java/org/apache/doris/persist/OperationType.java @@ -153,4 +153,7 @@ public class OperationType { // small files 251~260 public static final short OP_CREATE_SMALL_FILE = 251; public static final short OP_DROP_SMALL_FILE = 252; + + // dynamic partition 261~265 + public static final short OP_DYNAMIC_PARTITION = 261; } diff --git a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java index bc2ce5c..3f5f3ec 100644 --- a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -35,6 +35,7 @@ import org.apache.doris.analysis.ShowCreateTableStmt; import org.apache.doris.analysis.ShowDataStmt; import org.apache.doris.analysis.ShowDbStmt; import org.apache.doris.analysis.ShowDeleteStmt; +import org.apache.doris.analysis.ShowDynamicPartitionStmt; import org.apache.doris.analysis.ShowEnginesStmt; import org.apache.doris.analysis.ShowExportStmt; import org.apache.doris.analysis.ShowFrontendsStmt; @@ -69,6 +70,7 @@ import org.apache.doris.catalog.AggregateFunction; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DynamicPartitionProperty; import org.apache.doris.catalog.Function; import org.apache.doris.catalog.Index; import org.apache.doris.catalog.MaterializedIndex; @@ -83,6 +85,7 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.Type; import org.apache.doris.catalog.View; +import org.apache.doris.clone.DynamicPartitionScheduler; import org.apache.doris.cluster.BaseParam; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; @@ -238,6 +241,8 @@ public class ShowExecutor { handleAdminShowConfig(); } else if (stmt instanceof ShowSmallFilesStmt) { handleShowSmallFiles(); + } else if (stmt instanceof ShowDynamicPartitionStmt) { + handleShowDynamicPartition(); } else if (stmt instanceof ShowIndexStmt) { handleShowIndex(); } else { @@ -1438,6 +1443,50 @@ public class ShowExecutor { resultSet = new ShowResultSet(showStmt.getMetaData(), results); } + private void handleShowDynamicPartition() { + ShowDynamicPartitionStmt showDynamicPartitionStmt = (ShowDynamicPartitionStmt) stmt; + List<List<String>> rows = Lists.newArrayList(); + Database db = ctx.getCatalog().getDb(showDynamicPartitionStmt.getDb()); + if (db != null) { + db.readLock(); + try { + for (Table tbl : db.getTables()) { + if (!(tbl instanceof OlapTable)) { + continue; + } + + DynamicPartitionScheduler dynamicPartitionScheduler = Catalog.getCurrentCatalog().getDynamicPartitionScheduler(); + OlapTable olapTable = (OlapTable) tbl; + if (!olapTable.dynamicPartitionExists()) { + dynamicPartitionScheduler.removeRuntimeInfo(olapTable.getName()); + continue; + } + // check tbl privs + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), + db.getFullName(), olapTable.getName(), + PrivPredicate.SHOW)) { + continue; + } + DynamicPartitionProperty dynamicPartitionProperty = olapTable.getTableProperty().getDynamicPartitionProperty(); + String tableName = olapTable.getName(); + rows.add(Lists.newArrayList( + tableName, + String.valueOf(dynamicPartitionProperty.getEnable()), + dynamicPartitionProperty.getTimeUnit().toUpperCase(), + String.valueOf(dynamicPartitionProperty.getEnd()), + dynamicPartitionProperty.getPrefix(), + String.valueOf(dynamicPartitionProperty.getBuckets()), + dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.LAST_UPDATE_TIME), + dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.LAST_SCHEDULER_TIME), + dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.DYNAMIC_PARTITION_STATE), + dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.MSG))); + } + } finally { + db.readUnlock(); + } + resultSet = new ShowResultSet(showDynamicPartitionStmt.getMetaData(), rows); + } + } } diff --git a/fe/src/main/jflex/sql_scanner.flex b/fe/src/main/jflex/sql_scanner.flex index 56a9368..53322e7 100644 --- a/fe/src/main/jflex/sql_scanner.flex +++ b/fe/src/main/jflex/sql_scanner.flex @@ -160,6 +160,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("distinctpcsa", new Integer(SqlParserSymbols.KW_DISTINCTPCSA)); keywordMap.put("distributed", new Integer(SqlParserSymbols.KW_DISTRIBUTED)); keywordMap.put("distribution", new Integer(SqlParserSymbols.KW_DISTRIBUTION)); + keywordMap.put("dynamic", new Integer(SqlParserSymbols.KW_DYNAMIC)); keywordMap.put("buckets", new Integer(SqlParserSymbols.KW_BUCKETS)); keywordMap.put("div", new Integer(SqlParserSymbols.KW_DIV)); keywordMap.put("double", new Integer(SqlParserSymbols.KW_DOUBLE)); diff --git a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java index 15ce167..6d6705e 100644 --- a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java +++ b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java @@ -27,11 +27,14 @@ import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.ColumnDef; import org.apache.doris.analysis.ColumnDef.DefaultValue; import org.apache.doris.analysis.ColumnPosition; +import org.apache.doris.analysis.ModifyTablePropertiesClause; import org.apache.doris.analysis.TypeDef; +import org.apache.doris.backup.CatalogMocker; import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.CatalogTestUtil; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DynamicPartitionProperty; import org.apache.doris.catalog.FakeCatalog; import org.apache.doris.catalog.FakeEditLog; import org.apache.doris.catalog.MaterializedIndex; @@ -46,6 +49,7 @@ import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Tablet; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.UserException; @@ -58,10 +62,13 @@ import org.apache.doris.transaction.GlobalTransactionMgr; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -80,6 +87,9 @@ public class SchemaChangeJobV2Test { false, AggregateType.MAX, false, new DefaultValue(true, "1"), ""); private static AddColumnClause addColumnClause = new AddColumnClause(newCol, new ColumnPosition("v"), null, null); + @Rule + public ExpectedException expectedEx = ExpectedException.none(); + @Before public void setUp() throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException, AnalysisException { @@ -161,7 +171,7 @@ public class SchemaChangeJobV2Test { Assert.assertEquals(2, testPartition.getMaterializedIndices(IndexExtState.ALL).size()); Assert.assertEquals(1, testPartition.getMaterializedIndices(IndexExtState.VISIBLE).size()); Assert.assertEquals(1, testPartition.getMaterializedIndices(IndexExtState.SHADOW).size()); - + // runWaitingTxnJob schemaChangeHandler.runAfterCatalogReady(); Assert.assertEquals(JobState.RUNNING, schemaChangeJob.getJobState()); @@ -187,9 +197,90 @@ public class SchemaChangeJobV2Test { shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), testPartition.getVisibleVersionHash(), shadowReplica.getDataSize(), shadowReplica.getRowCount()); } } - + schemaChangeHandler.runAfterCatalogReady(); Assert.assertEquals(JobState.FINISHED, schemaChangeJob.getJobState()); } + @Test + public void testModifyDynamicPartitionNormal() throws UserException { + FakeCatalog.setCatalog(masterCatalog); + SchemaChangeHandler schemaChangeHandler = Catalog.getInstance().getSchemaChangeHandler(); + ArrayList<AlterClause> alterClauses = new ArrayList<>(); + Map<String, String> properties = new HashMap<>(); + properties.put(DynamicPartitionProperty.ENABLE, "true"); + properties.put(DynamicPartitionProperty.TIME_UNIT, "day"); + properties.put(DynamicPartitionProperty.END, "3"); + properties.put(DynamicPartitionProperty.PREFIX, "p"); + properties.put(DynamicPartitionProperty.BUCKETS, "30"); + alterClauses.add(new ModifyTablePropertiesClause(properties)); + Database db = CatalogMocker.mockDb(); + OlapTable olapTable = (OlapTable) db.getTable(CatalogMocker.TEST_TBL2_ID); + schemaChangeHandler.process(alterClauses, "default_cluster", db, olapTable); + Assert.assertTrue(olapTable.getTableProperty().getDynamicPartitionProperty().isExist()); + Assert.assertTrue(olapTable.getTableProperty().getDynamicPartitionProperty().getEnable()); + Assert.assertEquals("day", olapTable.getTableProperty().getDynamicPartitionProperty().getTimeUnit()); + Assert.assertEquals(3, olapTable.getTableProperty().getDynamicPartitionProperty().getEnd()); + Assert.assertEquals("p", olapTable.getTableProperty().getDynamicPartitionProperty().getPrefix()); + Assert.assertEquals(30, olapTable.getTableProperty().getDynamicPartitionProperty().getBuckets()); + + + // set dynamic_partition.enable = false + ArrayList<AlterClause> tmpAlterClauses = new ArrayList<>(); + properties.put(DynamicPartitionProperty.ENABLE, "false"); + tmpAlterClauses.add(new ModifyTablePropertiesClause(properties)); + schemaChangeHandler.process(tmpAlterClauses, "default_cluster", db, olapTable); + Assert.assertFalse(olapTable.getTableProperty().getDynamicPartitionProperty().getEnable()); + // set dynamic_partition.time_unit = week + tmpAlterClauses = new ArrayList<>(); + properties.put(DynamicPartitionProperty.TIME_UNIT, "week"); + tmpAlterClauses.add(new ModifyTablePropertiesClause(properties)); + schemaChangeHandler.process(tmpAlterClauses, "default_cluster", db, olapTable); + Assert.assertEquals("week", olapTable.getTableProperty().getDynamicPartitionProperty().getTimeUnit()); + // set dynamic_partition.end = 10 + tmpAlterClauses = new ArrayList<>(); + properties.put(DynamicPartitionProperty.END, "10"); + tmpAlterClauses.add(new ModifyTablePropertiesClause(properties)); + schemaChangeHandler.process(tmpAlterClauses, "default_cluster", db, olapTable); + Assert.assertEquals(10, olapTable.getTableProperty().getDynamicPartitionProperty().getEnd()); + // set dynamic_partition.prefix = p1 + tmpAlterClauses = new ArrayList<>(); + properties.put(DynamicPartitionProperty.PREFIX, "p1"); + tmpAlterClauses.add(new ModifyTablePropertiesClause(properties)); + schemaChangeHandler.process(tmpAlterClauses, "default_cluster", db, olapTable); + Assert.assertEquals("p1", olapTable.getTableProperty().getDynamicPartitionProperty().getPrefix()); + // set dynamic_partition.buckets = 3 + tmpAlterClauses = new ArrayList<>(); + properties.put(DynamicPartitionProperty.BUCKETS, "3"); + tmpAlterClauses.add(new ModifyTablePropertiesClause(properties)); + schemaChangeHandler.process(tmpAlterClauses, "default_cluster", db, olapTable); + Assert.assertEquals(3, olapTable.getTableProperty().getDynamicPartitionProperty().getBuckets()); + } + + public void modifyDynamicPartitionWithoutTableProperty(String propertyKey, String propertyValue, String missPropertyKey) + throws UserException { + FakeCatalog.setCatalog(masterCatalog); + SchemaChangeHandler schemaChangeHandler = Catalog.getInstance().getSchemaChangeHandler(); + ArrayList<AlterClause> alterClauses = new ArrayList<>(); + Map<String, String> properties = new HashMap<>(); + properties.put(propertyKey, propertyValue); + alterClauses.add(new ModifyTablePropertiesClause(properties)); + + Database db = CatalogMocker.mockDb(); + OlapTable olapTable = (OlapTable) db.getTable(CatalogMocker.TEST_TBL2_ID); + + expectedEx.expect(DdlException.class); + expectedEx.expectMessage(String.format("Must assign %s properties", missPropertyKey)); + + schemaChangeHandler.process(alterClauses, "default_cluster", db, olapTable); + } + + @Test + public void testModifyDynamicPartitionWithoutTableProperty() throws UserException { + modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.ENABLE, "false", DynamicPartitionProperty.TIME_UNIT); + modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.TIME_UNIT, "day", DynamicPartitionProperty.ENABLE); + modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.END, "3", DynamicPartitionProperty.ENABLE); + modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.PREFIX, "p", DynamicPartitionProperty.ENABLE); + modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.BUCKETS, "30", DynamicPartitionProperty.ENABLE); + } } diff --git a/fe/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java b/fe/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java new file mode 100644 index 0000000..9f98a41 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java @@ -0,0 +1,482 @@ +package org.apache.doris.catalog; + +import com.google.common.collect.Lists; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mock; +import mockit.MockUp; +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.ColumnDef; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.HashDistributionDesc; +import org.apache.doris.analysis.KeysDesc; +import org.apache.doris.analysis.PartitionKeyDesc; +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.analysis.RangePartitionDesc; +import org.apache.doris.analysis.SingleRangePartitionDesc; +import org.apache.doris.analysis.TableName; +import org.apache.doris.analysis.TypeDef; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.persist.EditLog; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.task.AgentBatchTask; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class DynamicPartitionTableTest { + private TableName dbTableName; + private String dbName = "testDb"; + private String tableName = "testTable"; + private String clusterName = "default"; + private List<Long> beIds = Lists.newArrayList(); + private List<String> columnNames = Lists.newArrayList(); + private List<ColumnDef> columnDefs = Lists.newArrayList(); + + private Catalog catalog = Catalog.getInstance(); + private Database db = new Database(); + private Analyzer analyzer; + + private Map<String, String> properties; + private List<SingleRangePartitionDesc> singleRangePartitionDescs; + + @Injectable + ConnectContext connectContext; + + @Rule + public ExpectedException expectedEx = ExpectedException.none(); + + @Before + public void setUp() throws Exception { + dbTableName = new TableName(dbName, tableName); + + beIds.add(1L); + beIds.add(2L); + beIds.add(3L); + + columnNames.add("key1"); + columnNames.add("key2"); + columnNames.add("key3"); + + columnDefs.add(new ColumnDef("key1", new TypeDef(ScalarType.createType(PrimitiveType.INT)))); + columnDefs.add(new ColumnDef("key2", new TypeDef(ScalarType.createType(PrimitiveType.INT)))); + columnDefs.add(new ColumnDef("key3", new TypeDef(ScalarType.createVarchar(10)))); + + analyzer = new Analyzer(catalog, connectContext); + + properties = new HashMap<>(); + properties.put(DynamicPartitionProperty.ENABLE, "true"); + properties.put(DynamicPartitionProperty.PREFIX, "p"); + properties.put(DynamicPartitionProperty.TIME_UNIT, "day"); + properties.put(DynamicPartitionProperty.END, "3"); + properties.put(DynamicPartitionProperty.BUCKETS, "30"); + + singleRangePartitionDescs = new LinkedList<>(); + singleRangePartitionDescs.add(new SingleRangePartitionDesc(false, "p1", + new PartitionKeyDesc(Lists.newArrayList(new PartitionValue("-128"))), null)); + + new MockUp<AgentBatchTask>() { + @Mock + void run() { + return; + } + }; + + new MockUp<CountDownLatch>() { + @Mock + boolean await(long timeout, TimeUnit unit) { + return true; + } + }; + + new Expectations(analyzer, catalog) {{ + analyzer.getClusterName(); + minTimes = 0; + result = clusterName; + }}; + + dbTableName.analyze(analyzer); + } + + @Test + public void testNormal(@Injectable SystemInfoService systemInfoService, + @Injectable PaloAuth paloAuth, + @Injectable EditLog editLog) throws UserException { + new Expectations(catalog) { + { + catalog.getDb(dbTableName.getDb()); + minTimes = 0; + result = db; + + Catalog.getCurrentSystemInfo(); + minTimes = 0; + result = systemInfoService; + + systemInfoService.checkClusterCapacity(anyString); + minTimes = 0; + systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString); + minTimes = 0; + result = beIds; + + catalog.getAuth(); + minTimes = 0; + result = paloAuth; + paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE); + minTimes = 0; + result = true; + + catalog.getEditLog(); + minTimes = 0; + result = editLog; + } + }; + + CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), + new RangePartitionDesc(Lists.newArrayList("key1"), singleRangePartitionDescs), + new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, ""); + stmt.analyze(analyzer); + + catalog.createTable(stmt); + } + + @Test + public void testMissEnable(@Injectable SystemInfoService systemInfoService, + @Injectable PaloAuth paloAuth, + @Injectable EditLog editLog) throws UserException { + new Expectations(catalog) { + { + catalog.getDb(dbTableName.getDb()); + minTimes = 0; + result = db; + + Catalog.getCurrentSystemInfo(); + minTimes = 0; + result = systemInfoService; + + systemInfoService.checkClusterCapacity(anyString); + minTimes = 0; + systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString); + minTimes = 0; + result = beIds; + + catalog.getAuth(); + minTimes = 0; + result = paloAuth; + paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE); + minTimes = 0; + result = true; + + catalog.getEditLog(); + minTimes = 0; + result = editLog; + } + }; + + properties.remove(DynamicPartitionProperty.ENABLE); + + CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), + new RangePartitionDesc(Lists.newArrayList("key1"), singleRangePartitionDescs), + new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, ""); + stmt.analyze(analyzer); + + expectedEx.expect(DdlException.class); + expectedEx.expectMessage("Must assign dynamic_partition.enable properties"); + + catalog.createTable(stmt); + } + + @Test + public void testMissPrefix(@Injectable SystemInfoService systemInfoService, + @Injectable PaloAuth paloAuth, + @Injectable EditLog editLog) throws UserException { + new Expectations(catalog) { + { + catalog.getDb(dbTableName.getDb()); + minTimes = 0; + result = db; + + Catalog.getCurrentSystemInfo(); + minTimes = 0; + result = systemInfoService; + + systemInfoService.checkClusterCapacity(anyString); + minTimes = 0; + systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString); + minTimes = 0; + result = beIds; + + catalog.getAuth(); + minTimes = 0; + result = paloAuth; + paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE); + minTimes = 0; + result = true; + + catalog.getEditLog(); + minTimes = 0; + result = editLog; + } + }; + + properties.remove(DynamicPartitionProperty.PREFIX); + + CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), + new RangePartitionDesc(Lists.newArrayList("key1"), singleRangePartitionDescs), + new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, ""); + stmt.analyze(analyzer); + + expectedEx.expect(DdlException.class); + expectedEx.expectMessage("Must assign dynamic_partition.prefix properties"); + + catalog.createTable(stmt); + } + + @Test + public void testMissTimeUnit(@Injectable SystemInfoService systemInfoService, + @Injectable PaloAuth paloAuth, + @Injectable EditLog editLog) throws UserException { + new Expectations(catalog) { + { + catalog.getDb(dbTableName.getDb()); + minTimes = 0; + result = db; + + Catalog.getCurrentSystemInfo(); + minTimes = 0; + result = systemInfoService; + + systemInfoService.checkClusterCapacity(anyString); + minTimes = 0; + systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString); + minTimes = 0; + result = beIds; + + catalog.getAuth(); + minTimes = 0; + result = paloAuth; + paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE); + minTimes = 0; + result = true; + + catalog.getEditLog(); + minTimes = 0; + result = editLog; + } + }; + + properties.remove(DynamicPartitionProperty.TIME_UNIT); + + CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), + new RangePartitionDesc(Lists.newArrayList("key1"), singleRangePartitionDescs), + new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, ""); + stmt.analyze(analyzer); + + expectedEx.expect(DdlException.class); + expectedEx.expectMessage("Must assign dynamic_partition.time_unit properties"); + + catalog.createTable(stmt); + } + + @Test + public void testMissEnd(@Injectable SystemInfoService systemInfoService, + @Injectable PaloAuth paloAuth, + @Injectable EditLog editLog) throws UserException { + new Expectations(catalog) { + { + catalog.getDb(dbTableName.getDb()); + minTimes = 0; + result = db; + + Catalog.getCurrentSystemInfo(); + minTimes = 0; + result = systemInfoService; + + systemInfoService.checkClusterCapacity(anyString); + minTimes = 0; + systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString); + minTimes = 0; + result = beIds; + + catalog.getAuth(); + minTimes = 0; + result = paloAuth; + paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE); + minTimes = 0; + result = true; + + catalog.getEditLog(); + minTimes = 0; + result = editLog; + } + }; + + properties.remove(DynamicPartitionProperty.END); + + CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), + new RangePartitionDesc(Lists.newArrayList("key1"), singleRangePartitionDescs), + new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, ""); + stmt.analyze(analyzer); + + expectedEx.expect(DdlException.class); + expectedEx.expectMessage("Must assign dynamic_partition.end properties"); + + catalog.createTable(stmt); + } + + @Test + public void testMissBuckets(@Injectable SystemInfoService systemInfoService, + @Injectable PaloAuth paloAuth, + @Injectable EditLog editLog) throws UserException { + new Expectations(catalog) { + { + catalog.getDb(dbTableName.getDb()); + minTimes = 0; + result = db; + + Catalog.getCurrentSystemInfo(); + minTimes = 0; + result = systemInfoService; + + systemInfoService.checkClusterCapacity(anyString); + minTimes = 0; + systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString); + minTimes = 0; + result = beIds; + + catalog.getAuth(); + minTimes = 0; + result = paloAuth; + paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE); + minTimes = 0; + result = true; + + catalog.getEditLog(); + minTimes = 0; + result = editLog; + } + }; + + properties.remove(DynamicPartitionProperty.BUCKETS); + + CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), + new RangePartitionDesc(Lists.newArrayList("key1"), singleRangePartitionDescs), + new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, ""); + stmt.analyze(analyzer); + + expectedEx.expect(DdlException.class); + expectedEx.expectMessage("Must assign dynamic_partition.buckets properties"); + + catalog.createTable(stmt); + } + + @Test + public void testNotAllowed(@Injectable SystemInfoService systemInfoService, + @Injectable PaloAuth paloAuth, + @Injectable EditLog editLog) throws UserException { + new Expectations(catalog) { + { + catalog.getDb(dbTableName.getDb()); + minTimes = 0; + result = db; + + Catalog.getCurrentSystemInfo(); + minTimes = 0; + result = systemInfoService; + + systemInfoService.checkClusterCapacity(anyString); + minTimes = 0; + systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString); + minTimes = 0; + result = beIds; + + catalog.getAuth(); + minTimes = 0; + result = paloAuth; + paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE); + minTimes = 0; + result = true; + + catalog.getEditLog(); + minTimes = 0; + result = editLog; + } + }; + + CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), null, + new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, ""); + stmt.analyze(analyzer); + + expectedEx.expect(DdlException.class); + expectedEx.expectMessage("Only support dynamic partition properties on range partition table"); + + catalog.createTable(stmt); + } + + @Test + public void testNotAllowedInMultiPartitions(@Injectable SystemInfoService systemInfoService, + @Injectable PaloAuth paloAuth, + @Injectable EditLog editLog) throws UserException { + new Expectations(catalog) { + { + catalog.getDb(dbTableName.getDb()); + minTimes = 0; + result = db; + + Catalog.getCurrentSystemInfo(); + minTimes = 0; + result = systemInfoService; + + systemInfoService.checkClusterCapacity(anyString); + minTimes = 0; + systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString); + minTimes = 0; + result = beIds; + + catalog.getAuth(); + minTimes = 0; + result = paloAuth; + paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE); + minTimes = 0; + result = true; + + catalog.getEditLog(); + minTimes = 0; + result = editLog; + } + }; + + List<SingleRangePartitionDesc> rangePartitionDescs = new LinkedList<>(); + rangePartitionDescs.add(new SingleRangePartitionDesc(false, "p1", + new PartitionKeyDesc(Lists.newArrayList(new PartitionValue("-128"), new PartitionValue("100"))), null)); + + CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), + new RangePartitionDesc(Lists.newArrayList("key1", "key2"), singleRangePartitionDescs), + new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, ""); + stmt.analyze(analyzer); + + expectedEx.expect(DdlException.class); + expectedEx.expectMessage("Dynamic partition only support single-column range partition"); + + catalog.createTable(stmt); + } +} \ No newline at end of file diff --git a/fe/src/test/java/org/apache/doris/catalog/FakeEditLog.java b/fe/src/test/java/org/apache/doris/catalog/FakeEditLog.java index 4d45afc..3c179f1 100644 --- a/fe/src/test/java/org/apache/doris/catalog/FakeEditLog.java +++ b/fe/src/test/java/org/apache/doris/catalog/FakeEditLog.java @@ -22,6 +22,7 @@ import org.apache.doris.alter.RollupJob; import org.apache.doris.alter.SchemaChangeJob; import org.apache.doris.cluster.Cluster; import org.apache.doris.persist.EditLog; +import org.apache.doris.persist.ModifyDynamicPartitionInfo; import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.transaction.TransactionState; @@ -97,6 +98,11 @@ public class FakeEditLog extends MockUp<EditLog> { } + @Mock + public void logDynamicPartition(ModifyDynamicPartitionInfo info) { + + } + public TransactionState getTransaction(long transactionId) { return allTransactionState.get(transactionId); } diff --git a/fe/src/test/java/org/apache/doris/catalog/TablePropertyTest.java b/fe/src/test/java/org/apache/doris/catalog/TablePropertyTest.java new file mode 100644 index 0000000..ca69bd5 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/catalog/TablePropertyTest.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.HashMap; + +public class TablePropertyTest { + private static String fileName = "./TablePropertyTest"; + + @After + public void tearDown() { + File file = new File(fileName); + file.delete(); + } + + @Test + public void testNormal() throws IOException { + // 1. Write objects to file + File file = new File(fileName); + file.createNewFile(); + DataOutputStream out = new DataOutputStream(new FileOutputStream(file)); + + HashMap<String, String> properties = new HashMap<>(); + properties.put(DynamicPartitionProperty.ENABLE, "true"); + properties.put(DynamicPartitionProperty.TIME_UNIT, "day"); + properties.put(DynamicPartitionProperty.END, "3"); + properties.put(DynamicPartitionProperty.PREFIX, "p"); + properties.put(DynamicPartitionProperty.BUCKETS, "30"); + properties.put("otherProperty", "unknownProperty"); + TableProperty tableProperty = new TableProperty(properties); + tableProperty.write(out); + out.flush(); + out.close(); + + // 2. Read objects from file + DataInputStream in = new DataInputStream(new FileInputStream(file)); + TableProperty readTableProperty = TableProperty.read(in); + DynamicPartitionProperty readDynamicPartitionProperty = readTableProperty.getDynamicPartitionProperty(); + DynamicPartitionProperty dynamicPartitionProperty = new DynamicPartitionProperty(properties); + Assert.assertEquals(readTableProperty.getProperties(), properties); + Assert.assertEquals(readDynamicPartitionProperty.getEnable(), dynamicPartitionProperty.getEnable()); + Assert.assertEquals(readDynamicPartitionProperty.getBuckets(), dynamicPartitionProperty.getBuckets()); + Assert.assertEquals(readDynamicPartitionProperty.getPrefix(), dynamicPartitionProperty.getPrefix()); + Assert.assertEquals(readDynamicPartitionProperty.getEnd(), dynamicPartitionProperty.getEnd()); + Assert.assertEquals(readDynamicPartitionProperty.getTimeUnit(), dynamicPartitionProperty.getTimeUnit()); + in.close(); + } +} diff --git a/fe/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java b/fe/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java new file mode 100644 index 0000000..27406e2 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import org.apache.doris.catalog.DynamicPartitionProperty; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.HashMap; + +public class ModifyDynamicPartitionInfoTest { + private String fileName = "./ModifyDynamicPartitionInfoTest"; + + @After + public void tearDown() { + File file = new File(fileName); + file.delete(); + } + + @Test + public void testNormal() throws IOException { + // 1. Write objects to file + File file = new File(fileName); + file.createNewFile(); + DataOutputStream out = new DataOutputStream(new FileOutputStream(file)); + + HashMap<String, String> properties = new HashMap<>(); + properties.put(DynamicPartitionProperty.ENABLE, "true"); + properties.put(DynamicPartitionProperty.TIME_UNIT, "day"); + properties.put(DynamicPartitionProperty.END, "3"); + properties.put(DynamicPartitionProperty.PREFIX, "p"); + properties.put(DynamicPartitionProperty.BUCKETS, "30"); + ModifyDynamicPartitionInfo modifyDynamicPartitionInfo = new ModifyDynamicPartitionInfo(100L, 200L, properties); + modifyDynamicPartitionInfo.write(out); + out.flush(); + out.close(); + + // 2. Read objects from file + DataInputStream in = new DataInputStream(new FileInputStream(file)); + ModifyDynamicPartitionInfo readModifyDynamicPartitionInfo = ModifyDynamicPartitionInfo.read(in); + Assert.assertEquals(readModifyDynamicPartitionInfo.getDbId(), 100L); + Assert.assertEquals(readModifyDynamicPartitionInfo.getTableId(), 200L); + Assert.assertEquals(readModifyDynamicPartitionInfo.getProperties(), properties); + in.close(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org