This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 2168f0932f4 [feature](restore) Support clean_tables/clean_partitions properties for restore job #39028 (#39366) 2168f0932f4 is described below commit 2168f0932f415fd454037d79bf36e97fe0f7d689 Author: walter <w41te...@gmail.com> AuthorDate: Thu Aug 15 14:16:02 2024 +0800 [feature](restore) Support clean_tables/clean_partitions properties for restore job #39028 (#39366) cherry pick from #39028 --- .../org/apache/doris/analysis/RestoreStmt.java | 104 +++++++------ .../org/apache/doris/backup/BackupHandler.java | 4 +- .../java/org/apache/doris/backup/RestoreJob.java | 97 +++++++++++- .../java/org/apache/doris/catalog/Partition.java | 4 +- .../apache/doris/datasource/InternalCatalog.java | 112 +++++++++----- .../apache/doris/service/FrontendServiceImpl.java | 12 ++ .../org/apache/doris/backup/RestoreJobTest.java | 2 +- gensrc/thrift/FrontendService.thrift | 2 + .../test_backup_restore_clean_restore.groovy | 162 +++++++++++++++++++++ 9 files changed, 393 insertions(+), 106 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java index 394b50d8a02..fe66f0ee4cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java @@ -35,13 +35,15 @@ import java.util.Set; public class RestoreStmt extends AbstractBackupStmt { private static final String PROP_ALLOW_LOAD = "allow_load"; - private static final String PROP_REPLICATION_NUM = "replication_num"; private static final String PROP_BACKUP_TIMESTAMP = "backup_timestamp"; private static final String PROP_META_VERSION = "meta_version"; - private static final String PROP_RESERVE_REPLICA = "reserve_replica"; - private static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE = "reserve_dynamic_partition_enable"; private static final String PROP_IS_BEING_SYNCED = PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED; + public static final String PROP_RESERVE_REPLICA = "reserve_replica"; + public static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE = "reserve_dynamic_partition_enable"; + public static final String PROP_CLEAN_TABLES = "clean_tables"; + public static final String PROP_CLEAN_PARTITIONS = "clean_partitions"; + private boolean allowLoad = false; private ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION; private String backupTimestamp = null; @@ -50,16 +52,18 @@ public class RestoreStmt extends AbstractBackupStmt { private boolean reserveDynamicPartitionEnable = false; private boolean isLocal = false; private boolean isBeingSynced = false; + private boolean isCleanTables = false; + private boolean isCleanPartitions = false; private byte[] meta = null; private byte[] jobInfo = null; public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefClause restoreTableRefClause, - Map<String, String> properties) { + Map<String, String> properties) { super(labelName, repoName, restoreTableRefClause, properties); } public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefClause restoreTableRefClause, - Map<String, String> properties, byte[] meta, byte[] jobInfo) { + Map<String, String> properties, byte[] meta, byte[] jobInfo) { super(labelName, repoName, restoreTableRefClause, properties); this.meta = meta; this.jobInfo = jobInfo; @@ -109,6 +113,14 @@ public class RestoreStmt extends AbstractBackupStmt { return isBeingSynced; } + public boolean isCleanTables() { + return isCleanTables; + } + + public boolean isCleanPartitions() { + return isCleanPartitions; + } + @Override public void analyze(Analyzer analyzer) throws UserException { if (repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) { @@ -138,17 +150,7 @@ public class RestoreStmt extends AbstractBackupStmt { Map<String, String> copiedProperties = Maps.newHashMap(properties); // allow load - if (copiedProperties.containsKey(PROP_ALLOW_LOAD)) { - if (copiedProperties.get(PROP_ALLOW_LOAD).equalsIgnoreCase("true")) { - allowLoad = true; - } else if (copiedProperties.get(PROP_ALLOW_LOAD).equalsIgnoreCase("false")) { - allowLoad = false; - } else { - ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR, - "Invalid allow load value: " + copiedProperties.get(PROP_ALLOW_LOAD)); - } - copiedProperties.remove(PROP_ALLOW_LOAD); - } + allowLoad = eatBooleanProperty(copiedProperties, PROP_ALLOW_LOAD, allowLoad); // replication num this.replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(copiedProperties, ""); @@ -156,34 +158,16 @@ public class RestoreStmt extends AbstractBackupStmt { this.replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION; } // reserve replica - if (copiedProperties.containsKey(PROP_RESERVE_REPLICA)) { - if (copiedProperties.get(PROP_RESERVE_REPLICA).equalsIgnoreCase("true")) { - reserveReplica = true; - } else if (copiedProperties.get(PROP_RESERVE_REPLICA).equalsIgnoreCase("false")) { - reserveReplica = false; - } else { - ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR, - "Invalid reserve_replica value: " + copiedProperties.get(PROP_RESERVE_REPLICA)); - } - // force set reserveReplica to false, do not keep the origin allocation - if (reserveReplica && !Config.force_olap_table_replication_allocation.isEmpty()) { - reserveReplica = false; - } - copiedProperties.remove(PROP_RESERVE_REPLICA); + reserveReplica = eatBooleanProperty(copiedProperties, PROP_RESERVE_REPLICA, reserveReplica); + // force set reserveReplica to false, do not keep the origin allocation + if (reserveReplica && !Config.force_olap_table_replication_allocation.isEmpty()) { + reserveReplica = false; } + // reserve dynamic partition enable - if (copiedProperties.containsKey(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE)) { - if (copiedProperties.get(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE).equalsIgnoreCase("true")) { - reserveDynamicPartitionEnable = true; - } else if (copiedProperties.get(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE).equalsIgnoreCase("false")) { - reserveDynamicPartitionEnable = false; - } else { - ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR, - "Invalid reserve dynamic partition enable value: " - + copiedProperties.get(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE)); - } - copiedProperties.remove(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE); - } + reserveDynamicPartitionEnable = eatBooleanProperty( + copiedProperties, PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, reserveDynamicPartitionEnable); + // backup timestamp if (copiedProperties.containsKey(PROP_BACKUP_TIMESTAMP)) { backupTimestamp = copiedProperties.get(PROP_BACKUP_TIMESTAMP); @@ -207,17 +191,13 @@ public class RestoreStmt extends AbstractBackupStmt { } // is being synced - if (copiedProperties.containsKey(PROP_IS_BEING_SYNCED)) { - if (copiedProperties.get(PROP_IS_BEING_SYNCED).equalsIgnoreCase("true")) { - isBeingSynced = true; - } else if (copiedProperties.get(PROP_IS_BEING_SYNCED).equalsIgnoreCase("false")) { - isBeingSynced = false; - } else { - ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR, - "Invalid is being synced value: " + copiedProperties.get(PROP_IS_BEING_SYNCED)); - } - copiedProperties.remove(PROP_IS_BEING_SYNCED); - } + isBeingSynced = eatBooleanProperty(copiedProperties, PROP_IS_BEING_SYNCED, isBeingSynced); + + // is clean tables + isCleanTables = eatBooleanProperty(copiedProperties, PROP_CLEAN_TABLES, isCleanTables); + + // is clean partitions + isCleanPartitions = eatBooleanProperty(copiedProperties, PROP_CLEAN_PARTITIONS, isCleanPartitions); if (!copiedProperties.isEmpty()) { ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR, @@ -243,4 +223,22 @@ public class RestoreStmt extends AbstractBackupStmt { sb.append("\n)"); return sb.toString(); } + + private boolean eatBooleanProperty(Map<String, String> copiedProperties, String name, boolean defaultValue) + throws AnalysisException { + boolean retval = defaultValue; + if (copiedProperties.containsKey(name)) { + String value = copiedProperties.get(name); + if (value.equalsIgnoreCase("true")) { + retval = true; + } else if (value.equalsIgnoreCase("false")) { + retval = false; + } else { + ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR, + "Invalid boolean property " + name + " value: " + value); + } + copiedProperties.remove(name); + } + return retval; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index 77213a35b42..144b4e49360 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -457,12 +457,12 @@ public class BackupHandler extends MasterDaemon implements Writable { db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(), - env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta); + stmt.isCleanTables(), stmt.isCleanPartitions(), env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta); } else { restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(), db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(), - stmt.isBeingSynced(), env, repository.getId()); + stmt.isBeingSynced(), stmt.isCleanTables(), stmt.isCleanPartitions(), env, repository.getId()); } env.getEditLog().logRestoreJob(restoreJob); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 1b6b4b47072..c76ce530141 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -18,6 +18,7 @@ package org.apache.doris.backup; import org.apache.doris.analysis.BackupStmt.BackupContent; +import org.apache.doris.analysis.RestoreStmt; import org.apache.doris.backup.BackupJobInfo.BackupIndexInfo; import org.apache.doris.backup.BackupJobInfo.BackupOlapTableInfo; import org.apache.doris.backup.BackupJobInfo.BackupPartitionInfo; @@ -63,6 +64,7 @@ import org.apache.doris.common.util.DbUtil; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.property.S3ClientBEProperties; import org.apache.doris.resource.Tag; import org.apache.doris.task.AgentBatchTask; @@ -106,9 +108,12 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class RestoreJob extends AbstractJob { - private static final String PROP_RESERVE_REPLICA = "reserve_replica"; - private static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE = "reserve_dynamic_partition_enable"; + private static final String PROP_RESERVE_REPLICA = RestoreStmt.PROP_RESERVE_REPLICA; + private static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE = + RestoreStmt.PROP_RESERVE_DYNAMIC_PARTITION_ENABLE; private static final String PROP_IS_BEING_SYNCED = PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED; + private static final String PROP_CLEAN_TABLES = RestoreStmt.PROP_CLEAN_TABLES; + private static final String PROP_CLEAN_PARTITIONS = RestoreStmt.PROP_CLEAN_PARTITIONS; private static final Logger LOG = LogManager.getLogger(RestoreJob.class); @@ -173,6 +178,11 @@ public class RestoreJob extends AbstractJob { private boolean isBeingSynced = false; + // Whether to delete existing tables that are not involved in the restore. + private boolean isCleanTables = false; + // Whether to delete existing partitions that are not involved in the restore. + private boolean isCleanPartitions = false; + // restore properties private Map<String, String> properties = Maps.newHashMap(); @@ -182,7 +192,8 @@ public class RestoreJob extends AbstractJob { public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, - boolean reserveDynamicPartitionEnable, boolean isBeingSynced, Env env, long repoId) { + boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables, + boolean isCleanPartitions, Env env, long repoId) { super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId); this.backupTimestamp = backupTs; this.jobInfo = jobInfo; @@ -197,16 +208,21 @@ public class RestoreJob extends AbstractJob { } this.reserveDynamicPartitionEnable = reserveDynamicPartitionEnable; this.isBeingSynced = isBeingSynced; + this.isCleanTables = isCleanTables; + this.isCleanPartitions = isCleanPartitions; properties.put(PROP_RESERVE_REPLICA, String.valueOf(reserveReplica)); properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable)); properties.put(PROP_IS_BEING_SYNCED, String.valueOf(isBeingSynced)); + properties.put(PROP_CLEAN_TABLES, String.valueOf(isCleanTables)); + properties.put(PROP_CLEAN_PARTITIONS, String.valueOf(isCleanPartitions)); } public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, - boolean reserveDynamicPartitionEnable, boolean isBeingSynced, Env env, long repoId, BackupMeta backupMeta) { + boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables, + boolean isCleanPartitions, Env env, long repoId, BackupMeta backupMeta) { this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica, - reserveDynamicPartitionEnable, isBeingSynced, env, repoId); + reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, env, repoId); this.backupMeta = backupMeta; } @@ -842,7 +858,9 @@ public class RestoreJob extends AbstractJob { } if (ok) { - LOG.debug("finished to create all restored replcias. {}", this); + if (LOG.isDebugEnabled()) { + LOG.debug("finished to create all restored replicas. {}", this); + } // add restored partitions. // table should be in State RESTORE, so no other partitions can be // added to or removed from this table during the restore process. @@ -1414,7 +1432,7 @@ public class RestoreJob extends AbstractJob { return; } - Tablet tablet = idx.getTablet(info.getTabletId()); + Tablet tablet = idx.getTablet(info.getTabletId()); if (tablet == null) { status = new Status(ErrCode.NOT_FOUND, "tablet " + info.getTabletId() + " does not exist in restored table " @@ -1557,7 +1575,7 @@ public class RestoreJob extends AbstractJob { return; } - Tablet tablet = idx.getTablet(info.getTabletId()); + Tablet tablet = idx.getTablet(info.getTabletId()); if (tablet == null) { status = new Status(ErrCode.NOT_FOUND, "tablet " + info.getTabletId() + " does not exist in restored table " @@ -1761,6 +1779,14 @@ public class RestoreJob extends AbstractJob { } } + // Drop the exists but non-restored table/partitions. + if (isCleanTables || isCleanPartitions) { + Status st = dropAllNonRestoredTableAndPartitions(db); + if (!st.ok()) { + return st; + } + } + if (!isReplay) { restoredPartitions.clear(); restoredTbls.clear(); @@ -1783,6 +1809,59 @@ public class RestoreJob extends AbstractJob { return Status.OK; } + private Status dropAllNonRestoredTableAndPartitions(Database db) { + try { + for (Table table : db.getTables()) { + long tableId = table.getId(); + String tableName = table.getName(); + TableType tableType = table.getType(); + BackupOlapTableInfo backupTableInfo = jobInfo.backupOlapTableObjects.get(tableName); + if (tableType != TableType.OLAP && tableType != TableType.ODBC && tableType != TableType.VIEW) { + continue; + } + if (tableType == TableType.OLAP && backupTableInfo != null) { + // drop the non restored partitions. + dropNonRestoredPartitions(db, (OlapTable) table, backupTableInfo); + } else if (isCleanTables) { + // otherwise drop the entire table. + LOG.info("drop non restored table {}({}). {}", tableName, tableId, this); + boolean isForceDrop = false; // move this table into recyclebin. + env.getInternalCatalog().dropTableWithoutCheck(db, table, isForceDrop); + } + } + return Status.OK; + } catch (Exception e) { + LOG.warn("drop all non restored table and partitions failed. {}", this, e); + return new Status(ErrCode.COMMON_ERROR, e.getMessage()); + } + } + + private void dropNonRestoredPartitions( + Database db, OlapTable table, BackupOlapTableInfo backupTableInfo) throws DdlException { + if (!isCleanPartitions || !table.writeLockIfExist()) { + return; + } + + try { + long tableId = table.getId(); + String tableName = table.getQualifiedName(); + InternalCatalog catalog = env.getInternalCatalog(); + for (String partitionName : table.getPartitionNames()) { + if (backupTableInfo.containsPart(partitionName)) { + continue; + } + + LOG.info("drop non restored partition {} of table {}({}). {}", + partitionName, tableName, tableId, this); + boolean isTempPartition = false; + boolean isForceDrop = false; // move this partition into recyclebin. + catalog.dropPartitionWithoutCheck(db, table, partitionName, isTempPartition, isForceDrop); + } + } finally { + table.writeUnlock(); + } + } + private void releaseSnapshots() { if (snapshotInfos.isEmpty()) { return; @@ -2184,6 +2263,8 @@ public class RestoreJob extends AbstractJob { reserveReplica = Boolean.parseBoolean(properties.get(PROP_RESERVE_REPLICA)); reserveDynamicPartitionEnable = Boolean.parseBoolean(properties.get(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE)); isBeingSynced = Boolean.parseBoolean(properties.get(PROP_IS_BEING_SYNCED)); + isCleanTables = Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES)); + isCleanPartitions = Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index a970d1798de..d8ed1b1ff06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -148,8 +148,8 @@ public class Partition extends MetaObject implements Writable { public void updateVersionForRestore(long visibleVersion) { this.setVisibleVersion(visibleVersion); this.nextVersion = this.visibleVersion + 1; - LOG.info("update partition {} version for restore: visible: {}, next: {}", - name, visibleVersion, nextVersion); + LOG.info("update partition {}({}) version for restore: visible: {}, next: {}", + name, id, visibleVersion, nextVersion); } public void updateVisibleVersion(long visibleVersion) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 60cda366e77..7a123e50fad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -898,35 +898,62 @@ public class InternalCatalog implements CatalogIf<Database> { + " please use \"DROP table FORCE\"."); } } - table.writeLock(); - long recycleTime = 0; - try { - if (table instanceof OlapTable && !stmt.isForceDrop()) { - OlapTable olapTable = (OlapTable) table; - if ((olapTable.getState() != OlapTableState.NORMAL)) { - throw new DdlException("The table [" + tableName + "]'s state is " + olapTable.getState() - + ", cannot be dropped." + " please cancel the operation on olap table firstly." - + " If you want to forcibly drop(cannot be recovered)," - + " please use \"DROP table FORCE\"."); - } - } - unprotectDropTable(db, table, stmt.isForceDrop(), false, 0); - if (!stmt.isForceDrop()) { - recycleTime = Env.getCurrentRecycleBin().getRecycleTimeById(table.getId()); + + if (table instanceof OlapTable && !stmt.isForceDrop()) { + OlapTable olapTable = (OlapTable) table; + if ((olapTable.getState() != OlapTableState.NORMAL)) { + throw new DdlException("The table [" + tableName + "]'s state is " + olapTable.getState() + + ", cannot be dropped." + " please cancel the operation on olap table firstly." + + " If you want to forcibly drop(cannot be recovered)," + + " please use \"DROP table FORCE\"."); } - } finally { - table.writeUnlock(); } - DropInfo info = new DropInfo(db.getId(), table.getId(), tableName, -1L, stmt.isForceDrop(), recycleTime); - Env.getCurrentEnv().getEditLog().logDropTable(info); - Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(), - db.getId(), table.getId()); + + dropTableInternal(db, table, stmt.isForceDrop()); + } catch (UserException e) { + throw new DdlException(e.getMessage(), e.getMysqlErrorCode()); } finally { db.writeUnlock(); } LOG.info("finished dropping table: {} from db: {}, is force: {}", tableName, dbName, stmt.isForceDrop()); } + // drop table without any check. + public void dropTableWithoutCheck(Database db, Table table, boolean forceDrop) throws DdlException { + if (!db.writeLockIfExist()) { + return; + } + try { + LOG.info("drop table {} without check, force: {}", table.getQualifiedName(), forceDrop); + dropTableInternal(db, table, forceDrop); + } catch (Exception e) { + LOG.warn("drop table without check", e); + throw e; + } finally { + db.writeUnlock(); + } + } + + // Drop a table, the db lock must hold. + private void dropTableInternal(Database db, Table table, boolean forceDrop) throws DdlException { + table.writeLock(); + String tableName = table.getName(); + long recycleTime = 0; + try { + unprotectDropTable(db, table, forceDrop, false, 0); + if (!forceDrop) { + recycleTime = Env.getCurrentRecycleBin().getRecycleTimeById(table.getId()); + } + } finally { + table.writeUnlock(); + } + + DropInfo info = new DropInfo(db.getId(), table.getId(), tableName, -1L, forceDrop, recycleTime); + Env.getCurrentEnv().getEditLog().logDropTable(info); + Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(), + db.getId(), table.getId()); + } + public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay, long recycleTime) { if (table.getType() == TableType.ELASTICSEARCH) { @@ -1755,6 +1782,7 @@ public class InternalCatalog implements CatalogIf<Database> { String partitionName = clause.getPartitionName(); boolean isTempPartition = clause.isTempPartition(); + boolean isForceDrop = clause.isForceDrop(); olapTable.checkNormalStateForAlter(); if (!olapTable.checkPartitionNameExist(partitionName, isTempPartition)) { @@ -1771,27 +1799,31 @@ public class InternalCatalog implements CatalogIf<Database> { throw new DdlException("Alter table [" + olapTable.getName() + "] failed. Not a partitioned table"); } - // drop + if (!isTempPartition && !isForceDrop) { + Partition partition = olapTable.getPartition(partitionName); + if (partition != null && Env.getCurrentGlobalTransactionMgr() + .existCommittedTxns(db.getId(), olapTable.getId(), partition.getId())) { + throw new DdlException( + "There are still some transactions in the COMMITTED state waiting to be completed." + + " The partition [" + partitionName + + "] cannot be dropped. If you want to forcibly drop(cannot be recovered)," + + " please use \"DROP partition FORCE\"."); + } + } + + dropPartitionWithoutCheck(db, olapTable, partitionName, isTempPartition, isForceDrop); + } + + // drop partition without any check, the caller should hold the table write lock. + public void dropPartitionWithoutCheck(Database db, OlapTable olapTable, String partitionName, + boolean isTempPartition, boolean isForceDrop) throws DdlException { Partition partition = null; - long recycleTime = 0; + long recycleTime = -1; if (isTempPartition) { olapTable.dropTempPartition(partitionName, true); } else { - if (!clause.isForceDrop()) { - partition = olapTable.getPartition(partitionName); - if (partition != null) { - if (Env.getCurrentEnv().getGlobalTransactionMgr() - .existCommittedTxns(db.getId(), olapTable.getId(), partition.getId())) { - throw new DdlException( - "There are still some transactions in the COMMITTED state waiting to be completed." - + " The partition [" + partitionName - + "] cannot be dropped. If you want to forcibly drop(cannot be recovered)," - + " please use \"DROP partition FORCE\"."); - } - } - } - olapTable.dropPartition(db.getId(), partitionName, clause.isForceDrop()); - if (!clause.isForceDrop() && partition != null) { + partition = olapTable.dropPartition(db.getId(), partitionName, isForceDrop); + if (!isForceDrop && partition != null) { recycleTime = Env.getCurrentRecycleBin().getRecycleTimeById(partition.getId()); } } @@ -1799,11 +1831,11 @@ public class InternalCatalog implements CatalogIf<Database> { // log long partitionId = partition == null ? -1L : partition.getId(); DropPartitionInfo info = new DropPartitionInfo(db.getId(), olapTable.getId(), partitionId, partitionName, - isTempPartition, clause.isForceDrop(), recycleTime); + isTempPartition, isForceDrop, recycleTime); Env.getCurrentEnv().getEditLog().logDropPartition(info); LOG.info("succeed in dropping partition[{}], table : [{}-{}], is temp : {}, is force : {}", - partitionName, olapTable.getId(), olapTable.getName(), isTempPartition, clause.isForceDrop()); + partitionName, olapTable.getId(), olapTable.getName(), isTempPartition, isForceDrop); } public void replayDropPartition(DropPartitionInfo info) throws MetaNotFoundException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index a38921221fd..cbbd49f69a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2906,6 +2906,18 @@ public class FrontendServiceImpl implements FrontendService.Iface { LabelName label = new LabelName(request.getDb(), request.getLabelName()); String repoName = request.getRepoName(); Map<String, String> properties = request.getProperties(); + + // Restore requires that all properties are known, so the old version of FE will not be able + // to recognize the properties of the new version. Therefore, request parameters are used here + // instead of directly putting them in properties to avoid compatibility issues of cross-version + // synchronization. + if (request.isCleanPartitions()) { + properties.put(RestoreStmt.PROP_CLEAN_PARTITIONS, "true"); + } + if (request.isCleanTables()) { + properties.put(RestoreStmt.PROP_CLEAN_TABLES, "true"); + } + AbstractBackupTableRefClause restoreTableRefClause = null; if (request.isSetTableRefs()) { List<TableRef> tableRefs = new ArrayList<>(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java index 5f426aa3311..839d11e8199 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java @@ -251,7 +251,7 @@ public class RestoreJobTest { db.dropTable(expectedRestoreTbl.getName()); job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), db.getFullName(), jobInfo, false, - new ReplicaAllocation((short) 3), 100000, -1, false, false, false, env, repo.getId()); + new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, env, repo.getId()); List<Table> tbls = Lists.newArrayList(); List<Resource> resources = Lists.newArrayList(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 75a6537ee85..9e43d35f8f1 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1102,6 +1102,8 @@ struct TRestoreSnapshotRequest { 10: optional map<string, string> properties 11: optional binary meta 12: optional binary job_info + 13: optional bool clean_tables + 14: optional bool clean_partitions } struct TRestoreSnapshotResult { diff --git a/regression-test/suites/backup_restore/test_backup_restore_clean_restore.groovy b/regression-test/suites/backup_restore/test_backup_restore_clean_restore.groovy new file mode 100644 index 00000000000..c80bc0d0060 --- /dev/null +++ b/regression-test/suites/backup_restore/test_backup_restore_clean_restore.groovy @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_backup_restore_clean_restore", "backup_restore") { + String suiteName = "test_backup_restore_clean_restore" + String dbName = "${suiteName}_db" + String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "") + String snapshotName = "${suiteName}_snapshot" + String tableNamePrefix = "${suiteName}_tables" + + def syncer = getSyncer() + syncer.createS3Repository(repoName) + sql "CREATE DATABASE IF NOT EXISTS ${dbName}" + + String tableName1 = "${tableNamePrefix}_1" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName1}" + sql """ + CREATE TABLE ${dbName}.${tableName1} ( + `id` LARGEINT NOT NULL, + `count` LARGEINT SUM DEFAULT "0" + ) + AGGREGATE KEY(`id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `p1` VALUES LESS THAN ("0"), + PARTITION `p2` VALUES LESS THAN ("10"), + PARTITION `p3` VALUES LESS THAN ("20") + ) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES + ( + "replication_num" = "1" + ) + """ + + def numRows = 20 + List<String> values = [] + for (int j = 0; j < numRows; ++j) { + values.add("(${j}, ${j})") + } + sql "INSERT INTO ${dbName}.${tableName1} VALUES ${values.join(",")}" + def result = sql "SELECT * FROM ${dbName}.${tableName1}" + assertEquals(result.size(), numRows); + + String tableName2 = "${tableNamePrefix}_2" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName2}" + sql """ + CREATE TABLE ${dbName}.${tableName2} ( + `id` LARGEINT NOT NULL, + `count` LARGEINT SUM DEFAULT "0" + ) + AGGREGATE KEY(`id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `p1` VALUES LESS THAN ("0"), + PARTITION `p2` VALUES LESS THAN ("10"), + PARTITION `p3` VALUES LESS THAN ("20") + ) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES + ( + "replication_num" = "1" + ) + """ + + sql "INSERT INTO ${dbName}.${tableName2} VALUES ${values.join(",")}" + result = sql "SELECT * FROM ${dbName}.${tableName2}" + assertEquals(result.size(), numRows); + + String tableName3 = "${tableNamePrefix}_3" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName3}" + sql """ + CREATE TABLE ${dbName}.${tableName3} ( + `id` LARGEINT NOT NULL, + `count` LARGEINT SUM DEFAULT "0" + ) + AGGREGATE KEY(`id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `p1` VALUES LESS THAN ("0"), + PARTITION `p2` VALUES LESS THAN ("10"), + PARTITION `p3` VALUES LESS THAN ("20") + ) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES + ( + "replication_num" = "1" + ) + """ + + sql "INSERT INTO ${dbName}.${tableName3} VALUES ${values.join(",")}" + result = sql "SELECT * FROM ${dbName}.${tableName3}" + assertEquals(result.size(), numRows); + + + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + """ + + while (!syncer.checkSnapshotFinish(dbName)) { + Thread.sleep(3000) + } + + def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName) + assertTrue(snapshot != null) + + // restore table1, partition 3 of table2 + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( + `${tableName1}`, + `${tableName2}` PARTITION (`p3`) + ) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "clean_tables" = "true", + "clean_partitions" = "true" + ) + """ + + while (!syncer.checkAllRestoreFinish(dbName)) { + Thread.sleep(3000) + } + + // all data of table 1 must exists + result = sql "SELECT * FROM ${dbName}.${tableName1}" + assertEquals(result.size(), numRows); + + // only data in p3 of table 2 exists + result = sql "SELECT * FROM ${dbName}.${tableName2}" + assertEquals(result.size(), numRows-10) + + // table3 are dropped + result = sql """ + SHOW TABLE STATUS FROM ${dbName} LIKE "${tableName3}" + """ + assertEquals(result.size(), 0) + + sql "DROP TABLE ${dbName}.${tableName1} FORCE" + sql "DROP TABLE ${dbName}.${tableName2} FORCE" + sql "DROP DATABASE ${dbName} FORCE" + sql "DROP REPOSITORY `${repoName}`" +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org