This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e2e6a0dd83 [Feature](load) Support mutable property for partition (#16036) e2e6a0dd83 is described below commit e2e6a0dd8387801f9a9c951a710bc29277db318f Author: zhengshengjun <zhengsheng...@apache.org> AuthorDate: Sat Feb 18 23:09:34 2023 +0800 [Feature](load) Support mutable property for partition (#16036) The background is described in this issue: #15723, where users used Apache Druid to satisfy such lambada requirements before. We will not make Doris dropping data not belonged to current time window automatically like Druid, which is not flexible. We demand a ability to support mutable/immutable partition, the PR works this way: 1. Support mutable property for a partition. 2. The mutable property of a partition is passed from FE to BE in a load procedure 3. If a record's partition is immutable, we mark this row as "un selected" which will not be included in computation of 'max_filter_ratio', so that data write to immutable partition will be neglected and not cause load failure. Use Example: 1. Add immutable partition or modify an partition to be immutable: - alter table test_tbl add [temporary] partition xxx values less than ('xxx') ('mutable' = 'true'); - alter table test_tbl modify partition xx set ('mutable' = 'false'); 2. Write 5 records into table, two of then belongs to immutable partition --- be/src/exec/tablet_info.cpp | 1 + be/src/exec/tablet_info.h | 1 + be/src/vec/sink/vtablet_sink.cpp | 6 +++ be/src/vec/sink/vtablet_sink.h | 1 + fe/fe-core/src/main/cup/sql_parser.cup | 12 ++--- .../main/java/org/apache/doris/alter/Alter.java | 12 ++--- .../doris/analysis/ModifyPartitionClause.java | 23 ++++++++-- .../apache/doris/analysis/SinglePartitionDesc.java | 8 ++++ .../java/org/apache/doris/backup/RestoreJob.java | 6 ++- .../apache/doris/catalog/CatalogRecycleBin.java | 18 ++++++-- .../org/apache/doris/catalog/DataProperty.java | 18 +++++++- .../java/org/apache/doris/catalog/OlapTable.java | 14 +++--- .../org/apache/doris/catalog/PartitionInfo.java | 20 +++++++-- .../doris/common/proc/PartitionsProcDir.java | 3 ++ .../org/apache/doris/common/util/NetUtils.java | 6 ++- .../apache/doris/common/util/PropertyAnalyzer.java | 7 ++- .../apache/doris/datasource/InternalCatalog.java | 29 ++++++++++-- .../doris/load/loadv2/LoadingTaskPlanner.java | 6 +++ .../apache/doris/persist/ModifyPartitionInfo.java | 1 + .../apache/doris/persist/PartitionPersistInfo.java | 12 ++++- .../org/apache/doris/planner/OlapTableSink.java | 2 + .../org/apache/doris/clone/DiskRebalanceTest.java | 2 +- .../java/org/apache/doris/clone/RebalanceTest.java | 2 +- .../org/apache/doris/common/util/UnitTestUtil.java | 1 + .../apache/doris/load/loadv2/SparkLoadJobTest.java | 4 +- .../apache/doris/planner/OlapTableSinkTest.java | 4 ++ gensrc/thrift/Descriptors.thrift | 2 + .../stream_load/test_immutable_partition.csv | 11 +++++ .../data/load_p0/stream_load/test_stream_load.out | 8 ++++ .../load_p0/stream_load/test_stream_load.groovy | 52 ++++++++++++++++++++++ 30 files changed, 253 insertions(+), 39 deletions(-) diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index b833831b23..ca65f5c58b 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -231,6 +231,7 @@ Status VOlapTablePartitionParam::init() { const TOlapTablePartition& t_part = _t_param.partitions[i]; auto part = _obj_pool.add(new VOlapTablePartition(&_partition_block)); part->id = t_part.id; + part->is_mutable = t_part.is_mutable; if (!_is_in_partition) { if (t_part.__isset.start_keys) { diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index c1138245e1..38244d30ff 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -102,6 +102,7 @@ struct VOlapTablePartition { std::vector<BlockRow> in_keys; int64_t num_buckets = 0; std::vector<OlapTableIndexTablets> indexes; + bool is_mutable; VOlapTablePartition(vectorized::Block* partition_block) : start_key {partition_block, -1}, end_key {partition_block, -1} {} diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 5bedc35d8a..c535392a31 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -1020,6 +1020,11 @@ Status VOlapTableSink::find_tablet(RuntimeState* state, vectorized::Block* block is_continue = true; return status; } + if (!(*partition)->is_mutable) { + _number_immutable_partition_filtered_rows++; + is_continue = true; + return status; + } _partition_ids.emplace((*partition)->id); if (findTabletMode != FindTabletMode::FIND_TABLET_EVERY_ROW) { if (_partition_to_tablet_map.find((*partition)->id) == _partition_to_tablet_map.end()) { @@ -1227,6 +1232,7 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { state->num_rows_load_unselected(); state->set_num_rows_load_total(num_rows_load_total); state->update_num_rows_load_filtered(_number_filtered_rows); + state->update_num_rows_load_unselected(_number_immutable_partition_filtered_rows); // print log of add batch time of all node, for tracing load performance easily std::stringstream ss; diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index 04809a6eca..cb48057ec5 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -495,6 +495,7 @@ private: int64_t _number_input_rows = 0; int64_t _number_output_rows = 0; int64_t _number_filtered_rows = 0; + int64_t _number_immutable_partition_filtered_rows = 0; RuntimeProfile::Counter* _input_rows_counter = nullptr; RuntimeProfile::Counter* _output_rows_counter = nullptr; diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 97784ab613..ac621dafea 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -1497,19 +1497,19 @@ alter_table_clause ::= {: RESULT = new DropPartitionClause(ifExists, partitionName, isTempPartition, force ? force : isTempPartition); :} - | KW_MODIFY KW_PARTITION ident:partitionName KW_SET LPAREN key_value_map:properties RPAREN + | KW_MODIFY opt_tmp:isTempPartition KW_PARTITION ident:partitionName KW_SET LPAREN key_value_map:properties RPAREN {: ArrayList<String> partitions = new ArrayList<String>(); partitions.add(partitionName); - RESULT = new ModifyPartitionClause(partitions, properties); + RESULT = new ModifyPartitionClause(partitions, properties, isTempPartition); :} - | KW_MODIFY KW_PARTITION LPAREN ident_list:partitions RPAREN KW_SET LPAREN key_value_map:properties RPAREN + | KW_MODIFY opt_tmp:isTempPartition KW_PARTITION LPAREN ident_list:partitions RPAREN KW_SET LPAREN key_value_map:properties RPAREN {: - RESULT = new ModifyPartitionClause(partitions, properties); + RESULT = new ModifyPartitionClause(partitions, properties, isTempPartition); :} - | KW_MODIFY KW_PARTITION LPAREN STAR RPAREN KW_SET LPAREN key_value_map:properties RPAREN + | KW_MODIFY opt_tmp:isTempPartition KW_PARTITION LPAREN STAR RPAREN KW_SET LPAREN key_value_map:properties RPAREN {: - RESULT = ModifyPartitionClause.createStarClause(properties); + RESULT = ModifyPartitionClause.createStarClause(properties, isTempPartition); :} | KW_REPLACE opt_partition_names:partitions KW_WITH opt_partition_names:tempPartitions opt_properties:properties {: diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 3ea07a3579..16298fff41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -249,7 +249,8 @@ public class Alter { } else { List<String> partitionNames = clause.getPartitionNames(); if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY)) { - modifyPartitionsProperty(db, olapTable, partitionNames, properties); + modifyPartitionsProperty(db, olapTable, partitionNames, properties, + clause.isTempPartition()); } else { needProcessOutsideTableLock = true; } @@ -490,7 +491,7 @@ public class Alter { OlapTable olapTable = (OlapTable) table; olapTable.writeLockOrDdlException(); try { - modifyPartitionsProperty(db, olapTable, partitionNames, properties); + modifyPartitionsProperty(db, olapTable, partitionNames, properties, clause.isTempPartition()); } finally { olapTable.writeUnlock(); } @@ -722,7 +723,8 @@ public class Alter { public void modifyPartitionsProperty(Database db, OlapTable olapTable, List<String> partitionNames, - Map<String, String> properties) + Map<String, String> properties, + boolean isTempPartition) throws DdlException, AnalysisException { Preconditions.checkArgument(olapTable.isWriteLockHeldByCurrentThread()); List<ModifyPartitionInfo> modifyPartitionInfos = Lists.newArrayList(); @@ -731,7 +733,7 @@ public class Alter { } for (String partitionName : partitionNames) { - Partition partition = olapTable.getPartition(partitionName); + Partition partition = olapTable.getPartition(partitionName, isTempPartition); if (partition == null) { throw new DdlException( "Partition[" + partitionName + "] does not exist in table[" + olapTable.getName() + "]"); @@ -757,7 +759,7 @@ public class Alter { // modify meta here PartitionInfo partitionInfo = olapTable.getPartitionInfo(); for (String partitionName : partitionNames) { - Partition partition = olapTable.getPartition(partitionName); + Partition partition = olapTable.getPartition(partitionName, isTempPartition); // 4. data property // 4.1 get old data property from partition DataProperty dataProperty = partitionInfo.getDataProperty(partition.getId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyPartitionClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyPartitionClause.java index 6656730419..1fcc570d73 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyPartitionClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyPartitionClause.java @@ -38,6 +38,7 @@ public class ModifyPartitionClause extends AlterTableClause { private List<String> partitionNames; private Map<String, String> properties; + private boolean isTempPartition; private boolean needExpand = false; public List<String> getPartitionNames() { @@ -45,7 +46,8 @@ public class ModifyPartitionClause extends AlterTableClause { } // c'tor for non-star clause - public ModifyPartitionClause(List<String> partitionNames, Map<String, String> properties) { + public ModifyPartitionClause(List<String> partitionNames, Map<String, String> properties, + boolean isTempPartition) { super(AlterOpType.MODIFY_PARTITION); this.partitionNames = partitionNames; this.properties = properties; @@ -57,19 +59,22 @@ public class ModifyPartitionClause extends AlterTableClause { // And these 3 operations does not require table to be stable. // If other kinds of operations be added later, "needTableStable" may be changed. this.needTableStable = false; + this.isTempPartition = isTempPartition; } // c'tor for 'Modify Partition(*)' clause - private ModifyPartitionClause(Map<String, String> properties) { + private ModifyPartitionClause(Map<String, String> properties, boolean isTempPartition) { super(AlterOpType.MODIFY_PARTITION); this.partitionNames = Lists.newArrayList(); this.properties = properties; this.needExpand = true; this.needTableStable = false; + this.isTempPartition = isTempPartition; } - public static ModifyPartitionClause createStarClause(Map<String, String> properties) { - return new ModifyPartitionClause(properties); + public static ModifyPartitionClause createStarClause(Map<String, String> properties, + boolean isTempPartition) { + return new ModifyPartitionClause(properties, isTempPartition); } @Override @@ -104,6 +109,9 @@ public class ModifyPartitionClause extends AlterTableClause { // 3. tablet type PropertyAnalyzer.analyzeTabletType(properties); + + // 4. mutable + PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_MUTABLE, true); } @Override @@ -111,6 +119,10 @@ public class ModifyPartitionClause extends AlterTableClause { return this.properties; } + public boolean isTempPartition() { + return isTempPartition; + } + public boolean isNeedExpand() { return this.needExpand; } @@ -119,6 +131,9 @@ public class ModifyPartitionClause extends AlterTableClause { public String toSql() { StringBuilder sb = new StringBuilder(); sb.append("MODIFY PARTITION "); + if (isTempPartition) { + sb.append("TEMPORARY "); + } sb.append("("); if (needExpand) { sb.append("*"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SinglePartitionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SinglePartitionDesc.java index 7ffec7c1d2..f8dbdc47b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SinglePartitionDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SinglePartitionDesc.java @@ -47,6 +47,7 @@ public class SinglePartitionDesc implements AllPartitionDesc { private TTabletType tabletType = TTabletType.TABLET_TYPE_DISK; private Long versionInfo; private String storagePolicy; + private boolean isMutable; public SinglePartitionDesc(boolean ifNotExists, String partName, PartitionKeyDesc partitionKeyDesc, Map<String, String> properties) { @@ -87,6 +88,10 @@ public class SinglePartitionDesc implements AllPartitionDesc { return isInMemory; } + public boolean isMutable() { + return isMutable; + } + public TTabletType getTabletType() { return tabletType; } @@ -142,6 +147,9 @@ public class SinglePartitionDesc implements AllPartitionDesc { // analyze in memory isInMemory = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_INMEMORY, false); + // analyze is mutable + isMutable = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_MUTABLE, true); + tabletType = PropertyAnalyzer.analyzeTabletType(properties); if (otherProperties == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index d374487216..204f5f5dfb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -824,7 +824,8 @@ public class RestoreJob extends AbstractJob { } localPartitionInfo.addPartition(restoredPart.getId(), false, remoteItem, remoteDataProperty, restoreReplicaAlloc, - remotePartitionInfo.getIsInMemory(remotePartId)); + remotePartitionInfo.getIsInMemory(remotePartId), + remotePartitionInfo.getIsMutable(remotePartId)); } localTbl.addPartition(restoredPart); } finally { @@ -1169,7 +1170,8 @@ public class RestoreJob extends AbstractJob { } localPartitionInfo.addPartition(restorePart.getId(), false, remotePartitionInfo.getItem(remotePartId), remoteDataProperty, restoreReplicaAlloc, - remotePartitionInfo.getIsInMemory(remotePartId)); + remotePartitionInfo.getIsInMemory(remotePartId), + remotePartitionInfo.getIsMutable(remotePartId)); localTbl.addPartition(restorePart); // modify tablet inverted index diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java index dadb0f722e..8fcab41cff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java @@ -169,7 +169,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { public synchronized boolean recyclePartition(long dbId, long tableId, Partition partition, Range<PartitionKey> range, PartitionItem listPartitionItem, DataProperty dataProperty, ReplicaAllocation replicaAlloc, - boolean isInMemory) { + boolean isInMemory, boolean isMutable) { if (idToPartition.containsKey(partition.getId())) { LOG.error("partition[{}] already in recycle bin.", partition.getId()); return false; @@ -177,7 +177,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { // recycle partition RecyclePartitionInfo partitionInfo = new RecyclePartitionInfo(dbId, tableId, partition, - range, listPartitionItem, dataProperty, replicaAlloc, isInMemory); + range, listPartitionItem, dataProperty, replicaAlloc, isInMemory, isMutable); idToRecycleTime.put(partition.getId(), System.currentTimeMillis()); idToPartition.put(partition.getId(), partitionInfo); LOG.info("recycle partition[{}-{}]", partition.getId(), partition.getName()); @@ -765,6 +765,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { partitionInfo.setDataProperty(partitionId, recoverPartitionInfo.getDataProperty()); partitionInfo.setReplicaAllocation(partitionId, recoverPartitionInfo.getReplicaAlloc()); partitionInfo.setIsInMemory(partitionId, recoverPartitionInfo.isInMemory()); + partitionInfo.setIsMutable(partitionId, recoverPartitionInfo.isMutable()); // remove from recycle bin idToPartition.remove(partitionId); @@ -808,6 +809,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { partitionInfo.setDataProperty(partitionId, recyclePartitionInfo.getDataProperty()); partitionInfo.setReplicaAllocation(partitionId, recyclePartitionInfo.getReplicaAlloc()); partitionInfo.setIsInMemory(partitionId, recyclePartitionInfo.isInMemory()); + partitionInfo.setIsMutable(partitionId, recyclePartitionInfo.isMutable()); iterator.remove(); idToRecycleTime.remove(partitionId); @@ -1192,6 +1194,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { private DataProperty dataProperty; private ReplicaAllocation replicaAlloc; private boolean isInMemory; + private boolean isMutable = true; public RecyclePartitionInfo() { // for persist @@ -1200,7 +1203,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { public RecyclePartitionInfo(long dbId, long tableId, Partition partition, Range<PartitionKey> range, PartitionItem listPartitionItem, DataProperty dataProperty, ReplicaAllocation replicaAlloc, - boolean isInMemory) { + boolean isInMemory, boolean isMutable) { this.dbId = dbId; this.tableId = tableId; this.partition = partition; @@ -1209,6 +1212,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { this.dataProperty = dataProperty; this.replicaAlloc = replicaAlloc; this.isInMemory = isInMemory; + this.isMutable = isMutable; } public long getDbId() { @@ -1243,6 +1247,10 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { return isInMemory; } + public boolean isMutable() { + return isMutable; + } + @Override public void write(DataOutput out) throws IOException { out.writeLong(dbId); @@ -1253,6 +1261,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { dataProperty.write(out); replicaAlloc.write(out); out.writeBoolean(isInMemory); + out.writeBoolean(isMutable); } public void readFields(DataInput in) throws IOException { @@ -1269,6 +1278,9 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { replicaAlloc = ReplicaAllocation.read(in); } isInMemory = in.readBoolean(); + if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_115) { + isMutable = in.readBoolean(); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java index 13b1fd6123..7a65b83bcd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java @@ -45,6 +45,8 @@ public class DataProperty implements Writable, GsonPostProcessable { private long cooldownTimeMs; @SerializedName(value = "storagePolicy") private String storagePolicy; + @SerializedName(value = "isMutable") + private boolean isMutable = true; private DataProperty() { // for persist @@ -69,9 +71,14 @@ public class DataProperty implements Writable, GsonPostProcessable { * @param storagePolicy remote storage policy for remote storage */ public DataProperty(TStorageMedium medium, long cooldown, String storagePolicy) { + this(medium, cooldown, storagePolicy, true); + } + + public DataProperty(TStorageMedium medium, long cooldown, String storagePolicy, boolean isMutable) { this.storageMedium = medium; this.cooldownTimeMs = cooldown; this.storagePolicy = storagePolicy; + this.isMutable = isMutable; } public TStorageMedium getStorageMedium() { @@ -86,6 +93,14 @@ public class DataProperty implements Writable, GsonPostProcessable { return storagePolicy; } + public boolean isMutable() { + return isMutable; + } + + public void setMutable(boolean mutable) { + isMutable = mutable; + } + public static DataProperty read(DataInput in) throws IOException { if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_108) { String json = Text.readString(in); @@ -127,7 +142,8 @@ public class DataProperty implements Writable, GsonPostProcessable { return this.storageMedium == other.storageMedium && this.cooldownTimeMs == other.cooldownTimeMs - && Strings.nullToEmpty(this.storagePolicy).equals(Strings.nullToEmpty(other.storagePolicy)); + && Strings.nullToEmpty(this.storagePolicy).equals(Strings.nullToEmpty(other.storagePolicy)) + && this.isMutable == other.isMutable; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index e570274625..face5c1185 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -749,7 +749,8 @@ public class OlapTable extends Table { new ListPartitionItem(Lists.newArrayList(new PartitionKey())), partitionInfo.getDataProperty(partition.getId()), partitionInfo.getReplicaAllocation(partition.getId()), - partitionInfo.getIsInMemory(partition.getId())); + partitionInfo.getIsInMemory(partition.getId()), + partitionInfo.getIsMutable(partition.getId())); } else if (partitionInfo.getType() == PartitionType.LIST) { // construct a dummy range @@ -768,7 +769,8 @@ public class OlapTable extends Table { partitionInfo.getItem(partition.getId()), partitionInfo.getDataProperty(partition.getId()), partitionInfo.getReplicaAllocation(partition.getId()), - partitionInfo.getIsInMemory(partition.getId())); + partitionInfo.getIsInMemory(partition.getId()), + partitionInfo.getIsMutable(partition.getId())); } } else if (!reserveTablets) { Env.getCurrentEnv().onErasePartition(partition); @@ -1298,7 +1300,8 @@ public class OlapTable extends Table { for (long partitionId : tempRangeInfo.getIdToItem(false).keySet()) { this.partitionInfo.addPartition(partitionId, true, tempRangeInfo.getItem(partitionId), tempRangeInfo.getDataProperty(partitionId), - tempRangeInfo.getReplicaAllocation(partitionId), tempRangeInfo.getIsInMemory(partitionId)); + tempRangeInfo.getReplicaAllocation(partitionId), tempRangeInfo.getIsInMemory(partitionId), + tempRangeInfo.getIsMutable(partitionId)); } } tempPartitions.unsetPartitionInfo(); @@ -1382,16 +1385,17 @@ public class OlapTable extends Table { DataProperty dataProperty = partitionInfo.getDataProperty(oldPartition.getId()); ReplicaAllocation replicaAlloc = partitionInfo.getReplicaAllocation(oldPartition.getId()); boolean isInMemory = partitionInfo.getIsInMemory(oldPartition.getId()); + boolean isMutable = partitionInfo.getIsMutable(oldPartition.getId()); if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) { PartitionItem item = partitionInfo.getItem(oldPartition.getId()); partitionInfo.dropPartition(oldPartition.getId()); partitionInfo.addPartition(newPartition.getId(), false, item, dataProperty, - replicaAlloc, isInMemory); + replicaAlloc, isInMemory, isMutable); } else { partitionInfo.dropPartition(oldPartition.getId()); - partitionInfo.addPartition(newPartition.getId(), dataProperty, replicaAlloc, isInMemory); + partitionInfo.addPartition(newPartition.getId(), dataProperty, replicaAlloc, isInMemory, isMutable); } return oldPartition; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java index aca3f74326..8cadf85ccf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java @@ -156,12 +156,14 @@ public class PartitionInfo implements Writable { public void unprotectHandleNewSinglePartitionDesc(long partitionId, boolean isTemp, PartitionItem partitionItem, DataProperty dataProperty, ReplicaAllocation replicaAlloc, - boolean isInMemory) { + boolean isInMemory, boolean isMutable) { setItemInternal(partitionId, isTemp, partitionItem); idToDataProperty.put(partitionId, dataProperty); idToReplicaAllocation.put(partitionId, replicaAlloc); idToInMemory.put(partitionId, isInMemory); idToStoragePolicy.put(partitionId, ""); + //TODO + //idToMutable.put(partitionId, isMutable); } public List<Map.Entry<Long, PartitionItem>> getPartitionItemEntryList(boolean isTemp, boolean isSorted) { @@ -247,6 +249,14 @@ public class PartitionInfo implements Writable { return idToInMemory.get(partitionId); } + public boolean getIsMutable(long partitionId) { + return idToDataProperty.get(partitionId).isMutable(); + } + + public void setIsMutable(long partitionId, boolean isMutable) { + idToDataProperty.get(partitionId).setMutable(isMutable); + } + public void setIsInMemory(long partitionId, boolean isInMemory) { idToInMemory.put(partitionId, isInMemory); } @@ -271,14 +281,15 @@ public class PartitionInfo implements Writable { } public void addPartition(long partitionId, boolean isTemp, PartitionItem item, DataProperty dataProperty, - ReplicaAllocation replicaAlloc, boolean isInMemory) { - addPartition(partitionId, dataProperty, replicaAlloc, isInMemory); + ReplicaAllocation replicaAlloc, boolean isInMemory, boolean isMutable) { + addPartition(partitionId, dataProperty, replicaAlloc, isInMemory, isMutable); setItemInternal(partitionId, isTemp, item); } public void addPartition(long partitionId, DataProperty dataProperty, ReplicaAllocation replicaAlloc, - boolean isInMemory) { + boolean isInMemory, boolean isMutable) { + dataProperty.setMutable(isMutable); idToDataProperty.put(partitionId, dataProperty); idToReplicaAllocation.put(partitionId, replicaAlloc); idToInMemory.put(partitionId, isInMemory); @@ -395,6 +406,7 @@ public class PartitionInfo implements Writable { buff.append("data_property: ").append(entry.getValue().toString()).append("; "); buff.append("replica number: ").append(idToReplicaAllocation.get(entry.getKey())).append("; "); buff.append("in memory: ").append(idToInMemory.get(entry.getKey())); + buff.append("is mutable: ").append(idToDataProperty.get(entry.getKey()).isMutable()); } return buff.toString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java index cb78140a26..422effe926 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java @@ -69,6 +69,7 @@ public class PartitionsProcDir implements ProcDirInterface { .add("State").add("PartitionKey").add("Range").add("DistributionKey") .add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime").add("RemoteStoragePolicy") .add("LastConsistencyCheckTime").add("DataSize").add("IsInMemory").add("ReplicaAllocation") + .add("IsMutable") .build(); private Database db; @@ -301,6 +302,8 @@ public class PartitionsProcDir implements ProcDirInterface { // replica allocation partitionInfo.add(tblPartitionInfo.getReplicaAllocation(partitionId).toCreateStmt()); + partitionInfo.add(tblPartitionInfo.getIsMutable(partitionId)); + partitionInfos.add(partitionInfo); } } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java index fe814cccc0..61f288b20a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java @@ -17,6 +17,7 @@ package org.apache.doris.common.util; +import org.apache.doris.common.AnalysisException; import org.apache.doris.system.SystemInfoService; import org.apache.commons.validator.routines.InetAddressValidator; @@ -128,12 +129,15 @@ public class NetUtils { return addr + ":" + port; } - public static SystemInfoService.HostInfo resolveHostInfoFromHostPort(String hostPort) { + public static SystemInfoService.HostInfo resolveHostInfoFromHostPort(String hostPort) throws AnalysisException { if (hostPort.charAt(0) == '[') { String[] pair = hostPort.substring(1).split("]:"); return new SystemInfoService.HostInfo(null, pair[0], Integer.valueOf(pair[1])); } else { String[] pair = hostPort.split(":"); + if (pair.length != 2) { + throw new AnalysisException("invalid host port: " + hostPort); + } return new SystemInfoService.HostInfo(null, pair[0], Integer.valueOf(pair[1])); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 7d9e26f502..45d983c709 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -122,6 +122,8 @@ public class PropertyAnalyzer { public static final String PROPERTIES_STORE_ROW_COLUMN = "store_row_column"; + public static final String PROPERTIES_MUTABLE = "mutable"; + private static final Logger LOG = LogManager.getLogger(PropertyAnalyzer.class); private static final String COMMA_SEPARATOR = ","; private static final double MAX_FPP = 0.05; @@ -226,7 +228,10 @@ public class PropertyAnalyzer { } } - return new DataProperty(storageMedium, cooldownTimestamp, newStoragePolicy); + boolean mutable = PropertyAnalyzer.analyzeBooleanProp(properties, PROPERTIES_MUTABLE, true); + properties.remove(PROPERTIES_MUTABLE); + + return new DataProperty(storageMedium, cooldownTimestamp, newStoragePolicy, mutable); } public static short analyzeShortKeyColumnCount(Map<String, String> properties) throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 240ce9fbf8..abaf029dec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1361,6 +1361,26 @@ public class InternalCatalog implements CatalogIf<Database> { } Map<String, String> properties = singlePartitionDesc.getProperties(); + + /* + * sql: alter table test_tbl add partition xxx values less than ('xxx') properties('mutable' = 'false'); + * + * sql_parser.cup definition: + * AddPartitionClause: + * KW_ADD ... single_partition_desc:desc opt_distribution:distribution opt_properties:properties) + * SinglePartitionDesc: + * single_partition_desc ::= KW_PARTITION ... ident:partName KW_VALUES KW_LESS KW_THAN + * partition_key_desc:desc opt_key_value_map:properties) + * + * If there is no opt_distribution definition, the properties in SQL is ambiguous to JCUP. It can bind + * properties to AddPartitionClause(`opt_properties`) or SinglePartitionDesc(`opt_key_value_map`). + * And JCUP choose to bind to AddPartitionClause, so we should add properties of AddPartitionClause to + * SinglePartitionDesc's properties here. + */ + if (null != addPartitionClause.getProperties()) { + properties.putAll(addPartitionClause.getProperties()); + } + // partition properties should inherit table properties ReplicaAllocation replicaAlloc = olapTable.getDefaultReplicaAllocation(); if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM) && !properties.containsKey( @@ -1528,12 +1548,12 @@ public class InternalCatalog implements CatalogIf<Database> { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, partitionInfo.getItem(partitionId).getItems(), ListPartitionItem.DUMMY_ITEM, dataProperty, partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), - isTempPartition); + isTempPartition, partitionInfo.getIsMutable(partitionId)); } else if (partitionInfo.getType() == PartitionType.LIST) { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, RangePartitionItem.DUMMY_ITEM, partitionInfo.getItem(partitionId), dataProperty, partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), - isTempPartition); + isTempPartition, partitionInfo.getIsMutable(partitionId)); } Env.getCurrentEnv().getEditLog().logAddPartition(info); @@ -1570,7 +1590,7 @@ public class InternalCatalog implements CatalogIf<Database> { } partitionInfo.unprotectHandleNewSinglePartitionDesc(partition.getId(), info.isTempPartition(), - partitionItem, info.getDataProperty(), info.getReplicaAlloc(), info.isInMemory()); + partitionItem, info.getDataProperty(), info.getReplicaAlloc(), info.isInMemory(), info.isMutable()); if (!Env.isCheckpointThread()) { // add to inverted index @@ -1988,6 +2008,8 @@ public class InternalCatalog implements CatalogIf<Database> { PropertyAnalyzer.PROPERTIES_DYNAMIC_SCHEMA, false); olapTable.setIsDynamicSchema(isDynamicSchema); + boolean isMutable = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_MUTABLE, true); + // set storage policy String storagePolicy = PropertyAnalyzer.analyzeStoragePolicy(properties); Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(storagePolicy); @@ -2019,6 +2041,7 @@ public class InternalCatalog implements CatalogIf<Database> { partitionInfo.setReplicaAllocation(partitionId, replicaAlloc); partitionInfo.setIsInMemory(partitionId, isInMemory); partitionInfo.setTabletType(partitionId, tabletType); + partitionInfo.setIsMutable(partitionId, isMutable); } // check colocation properties diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index d6af2a7699..3f42179023 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -215,6 +215,12 @@ public class LoadingTaskPlanner { Set<Long> specifiedPartitionIds = Sets.newHashSet(); for (BrokerFileGroup brokerFileGroup : fileGroups) { if (brokerFileGroup.getPartitionIds() != null) { + for (long partitionId : brokerFileGroup.getPartitionIds()) { + if (!table.getPartitionInfo().getIsMutable(partitionId)) { + throw new LoadException("Can't load data to immutable partition, table: " + + table.getName() + ", partition: " + table.getPartition(partitionId)); + } + } specifiedPartitionIds.addAll(brokerFileGroup.getPartitionIds()); } // all file group in fileGroups should have same partitions, so only need to get partition ids diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyPartitionInfo.java index 7c68e8323d..b7363ce6b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyPartitionInfo.java @@ -47,6 +47,7 @@ public class ModifyPartitionInfo implements Writable { private short replicationNum; @SerializedName(value = "isInMemory") private boolean isInMemory; + @SerializedName(value = "replicaAlloc") private ReplicaAllocation replicaAlloc; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/PartitionPersistInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/PartitionPersistInfo.java index 8146540861..4c49e4515d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/PartitionPersistInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/PartitionPersistInfo.java @@ -45,13 +45,14 @@ public class PartitionPersistInfo implements Writable { private ReplicaAllocation replicaAlloc; private boolean isInMemory = false; private boolean isTempPartition = false; + private boolean isMutable = true; public PartitionPersistInfo() { } public PartitionPersistInfo(long dbId, long tableId, Partition partition, Range<PartitionKey> range, PartitionItem listPartitionItem, DataProperty dataProperty, ReplicaAllocation replicaAlloc, - boolean isInMemory, boolean isTempPartition) { + boolean isInMemory, boolean isTempPartition, boolean isMutable) { this.dbId = dbId; this.tableId = tableId; this.partition = partition; @@ -63,6 +64,7 @@ public class PartitionPersistInfo implements Writable { this.replicaAlloc = replicaAlloc; this.isInMemory = isInMemory; this.isTempPartition = isTempPartition; + this.isMutable = isMutable; } public Long getDbId() { @@ -97,6 +99,10 @@ public class PartitionPersistInfo implements Writable { return isInMemory; } + public boolean isMutable() { + return isMutable; + } + public boolean isTempPartition() { return isTempPartition; } @@ -112,6 +118,7 @@ public class PartitionPersistInfo implements Writable { replicaAlloc.write(out); out.writeBoolean(isInMemory); out.writeBoolean(isTempPartition); + out.writeBoolean(isMutable); } public void readFields(DataInput in) throws IOException { @@ -130,6 +137,9 @@ public class PartitionPersistInfo implements Writable { isInMemory = in.readBoolean(); isTempPartition = in.readBoolean(); + if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_115) { + isMutable = in.readBoolean(); + } } public boolean equals(Object obj) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index a3e354384e..8551891376 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -282,6 +282,7 @@ public class OlapTableSink extends DataSink { index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList())))); tPartition.setNumBuckets(index.getTablets().size()); } + tPartition.setIsMutable(table.getPartitionInfo().getIsMutable(partitionId)); partitionParam.addToPartitions(tPartition); DistributionInfo distInfo = partition.getDistributionInfo(); @@ -306,6 +307,7 @@ public class OlapTableSink extends DataSink { TOlapTablePartition tPartition = new TOlapTablePartition(); tPartition.setId(partition.getId()); + tPartition.setIsMutable(table.getPartitionInfo().getIsMutable(partition.getId())); // No lowerBound and upperBound for this range for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) { tPartition.addToIndexes(new TOlapTableIndexTablets(index.getId(), Lists.newArrayList( diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java index bc0dd1ef8d..3f85acd216 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java @@ -153,7 +153,7 @@ public class DiskRebalanceTest { Partition partition = new Partition(id, "p" + idx, index, new HashDistributionInfo()); olapTable.addPartition(partition); olapTable.getPartitionInfo().addPartition(id, new DataProperty(TStorageMedium.HDD), - ReplicaAllocation.DEFAULT_ALLOCATION, false); + ReplicaAllocation.DEFAULT_ALLOCATION, false, true); }); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java index eefc2716b4..07a1cd012e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java @@ -190,7 +190,7 @@ public class RebalanceTest { Partition partition = new Partition(id, "p" + idx, index, new HashDistributionInfo()); olapTable.addPartition(partition); olapTable.getPartitionInfo().addPartition(id, new DataProperty(TStorageMedium.HDD), - ReplicaAllocation.DEFAULT_ALLOCATION, false); + ReplicaAllocation.DEFAULT_ALLOCATION, false, true); }); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java index 4460fcb31f..b3ae0afe8b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java @@ -112,6 +112,7 @@ public class UnitTestUtil { partitionInfo.setDataProperty(partitionId, new DataProperty(DataProperty.DEFAULT_STORAGE_MEDIUM)); partitionInfo.setReplicaAllocation(partitionId, new ReplicaAllocation((short) 3)); partitionInfo.setIsInMemory(partitionId, false); + partitionInfo.setIsMutable(partitionId, true); partitionInfo.setTabletType(partitionId, TTabletType.TABLET_TYPE_DISK); OlapTable table = new OlapTable(tableId, TABLE_NAME, columns, KeysType.AGG_KEYS, partitionInfo, distributionInfo); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index de0dfbedfa..e3916cfb18 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -24,6 +24,7 @@ import org.apache.doris.analysis.LoadStmt; import org.apache.doris.analysis.ResourceDesc; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DataProperty; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MaterializedIndex; @@ -337,7 +338,8 @@ public class SparkLoadJobTest { long fileSize = 6L; filePathToSize.put(filePath, fileSize); PartitionInfo partitionInfo = new RangePartitionInfo(); - partitionInfo.addPartition(partitionId, null, new ReplicaAllocation((short) 1), false); + partitionInfo.addPartition(partitionId, new DataProperty(DataProperty.DEFAULT_STORAGE_MEDIUM), + new ReplicaAllocation((short) 1), false, true); new Expectations() { { diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java index 146a3b4d93..fd37ca436c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DataProperty; import org.apache.doris.catalog.HashDistributionInfo; import org.apache.doris.catalog.ListPartitionInfo; import org.apache.doris.catalog.MaterializedIndex; @@ -102,6 +103,9 @@ public class OlapTableSinkTest { } }; + dstTable.getPartitionInfo().setDataProperty(partition.getId(), + new DataProperty(DataProperty.DEFAULT_STORAGE_MEDIUM)); + dstTable.getPartitionInfo().setIsMutable(partition.getId(), true); OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(2L), false); sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false); sink.complete(); diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 6d3c2e93c0..9cc204cea2 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -162,6 +162,8 @@ struct TOlapTablePartition { 6: optional list<Exprs.TExprNode> start_keys 7: optional list<Exprs.TExprNode> end_keys 8: optional list<list<Exprs.TExprNode>> in_keys + + 9: optional bool is_mutable = true } struct TOlapTablePartitionParam { diff --git a/regression-test/data/load_p0/stream_load/test_immutable_partition.csv b/regression-test/data/load_p0/stream_load/test_immutable_partition.csv new file mode 100644 index 0000000000..d6f18559b6 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_immutable_partition.csv @@ -0,0 +1,11 @@ +-2 -50 1 \N 44 +2 -51 1 2 \N +11 3 5 6 19 +13 3 5 8 21 +25 8 9 2 4 +33 9 8 2 1 +38 1 9 2 5 +33 10 9 2 7 +89 199 2 3 -1 +1000 3 9 8 1 +11 10 9 8 1 diff --git a/regression-test/data/load_p0/stream_load/test_stream_load.out b/regression-test/data/load_p0/stream_load/test_stream_load.out index 85a8835f51..72bc763d81 100644 --- a/regression-test/data/load_p0/stream_load/test_stream_load.out +++ b/regression-test/data/load_p0/stream_load/test_stream_load.out @@ -69,3 +69,11 @@ 7 [1, 2, 3, 4, 5] \N \N \N \N \N \N \N \N \N 8 [1, 2, 3, 4, 5] \N \N \N \N \N [NULL] \N [NULL] \N +-- !sql1 -- +-2 -50 1 \N 44 +2 -51 1 2 \N +25 8 9 2 4 +33 10 9 2 7 +33 9 8 2 1 +38 1 9 2 5 + diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy index e93daf8e22..a04d25622e 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy @@ -672,5 +672,57 @@ suite("test_stream_load", "p0") { } } sql "sync" + + // test immutable partition success + def tableName9 = "test_immutable_partition" + sql """ DROP TABLE IF EXISTS ${tableName9} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName9} ( + `k1` bigint(20) NULL, + `k2` bigint(20) NULL, + `v1` tinyint(4) SUM NULL, + `v2` tinyint(4) REPLACE NULL, + `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL + ) ENGINE=OLAP + AGGREGATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + PARTITION BY RANGE(`k1`) + (PARTITION partition_a VALUES [("-9223372036854775808"), ("10")), + PARTITION partition_b VALUES [("10"), ("20")), + PARTITION partition_c VALUES [("20"), ("30")), + PARTITION partition_d VALUES [("30"), ("40"))) + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + sql """ALTER TABLE ${tableName9} ADD PARTITION partition_e VALUES less than ('3000') properties ('mutable' = 'false')""" + sql """ALTER TABLE ${tableName9} MODIFY PARTITION partition_b set ('mutable' = 'false')""" + + streamLoad { + table "${tableName9}" + + set 'column_separator', '\t' + set 'columns', 'k1, k2, v1, v2, v3' + set 'partitions', 'partition_a, partition_b, partition_c, partition_d, partition_e' + set 'strict_mode', 'true' + + file 'test_immutable_partition.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(11, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(5, json.NumberUnselectedRows) + } + } + + sql "sync" + order_qt_sql1 "select * from ${tableName9} order by k1, k2" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org