This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new f12253ce96f branch-2.1: [enhancement](backup) handle dropped tables
and partitions during backup #52935 (#55005)
f12253ce96f is described below
commit f12253ce96fe8f806acd2ffddd8e3efa74a3b90b
Author: yulihua <[email protected]>
AuthorDate: Tue Sep 9 10:51:11 2025 +0800
branch-2.1: [enhancement](backup) handle dropped tables and partitions
during backup #52935 (#55005)
---
.../java/org/apache/doris/backup/BackupJob.java | 173 ++++++++++++-
.../java/org/apache/doris/backup/BackupMeta.java | 9 +
.../org/apache/doris/backup/BackupJobTest.java | 270 ++++++++++++++++++++-
.../org/apache/doris/common/util/UnitTestUtil.java | 18 +-
4 files changed, 449 insertions(+), 21 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index ca77bb3de23..51ceb3eca1a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -78,6 +78,8 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -125,6 +127,12 @@ public class BackupJob extends AbstractJob {
// backup properties && table commit seq with table id
private Map<String, String> properties = Maps.newHashMap();
+ // Record table IDs that were dropped during backup
+ private Set<Long> droppedTables = ConcurrentHashMap.newKeySet();
+
+ // Record partition IDs that were dropped during backup (tableId -> set of
partitionIds)
+ private Map<Long, Set<Long>> droppedPartitionsByTable =
Maps.newConcurrentMap();
+
private long commitSeq = 0;
public BackupJob() {
@@ -228,6 +236,39 @@ public class BackupJob extends AbstractJob {
return true;
}
+ private boolean handleTabletMissing(SnapshotTask task) {
+ LOG.info("handleTabletMissing task: {}", task);
+ Table table =
env.getInternalCatalog().getTableByTableId(task.getTableId());
+ if (table == null) {
+ // Table was dropped (including cases where database was dropped)
+ droppedTables.add(task.getTableId());
+ LOG.info("table {} marked as dropped during backup. {}",
task.getTableId(), this);
+ return true;
+ }
+
+ if (!(table instanceof OlapTable)) {
+ return false;
+ }
+
+ OlapTable olapTable = (OlapTable) table;
+ olapTable.readLock();
+ try {
+ Partition partition =
olapTable.getPartition(task.getPartitionId());
+ if (partition == null) {
+ // Partition was dropped or truncated (partition ID changed)
+ droppedPartitionsByTable.computeIfAbsent(task.getTableId(), k
-> ConcurrentHashMap.newKeySet())
+ .add(task.getPartitionId());
+ LOG.info("partition {} from table {} marked as dropped during
backup (dropped or truncated). {}",
+ task.getPartitionId(), task.getTableId(), this);
+ return true;
+ }
+
+ // If partition still exists, tablet missing is caused by other
reasons
+ return false;
+ } finally {
+ olapTable.readUnlock();
+ }
+ }
public synchronized boolean finishTabletSnapshotTask(SnapshotTask task,
TFinishTaskRequest request) {
Preconditions.checkState(task.getJobId() == jobId);
@@ -242,11 +283,21 @@ public class BackupJob extends AbstractJob {
cancelInternal();
}
- if (request.getTaskStatus().getStatusCode() ==
TStatusCode.TABLET_MISSING
- && !tryNewTabletSnapshotTask(task)) {
- status = new Status(ErrCode.NOT_FOUND,
- "make snapshot failed, failed to ge tablet, table will
be dropped or truncated");
- cancelInternal();
+ if (request.getTaskStatus().getStatusCode() ==
TStatusCode.TABLET_MISSING) {
+ if (handleTabletMissing(task)) {
+ // Successfully handled drop case, remove from task queue
+ taskProgress.remove(task.getSignature());
+ taskErrMsg.remove(task.getSignature());
+ Long oldValue =
unfinishedTaskIds.remove(task.getSignature());
+ return oldValue != null;
+ } else {
+ // Not caused by drop, follow original logic
+ if (!tryNewTabletSnapshotTask(task)) {
+ status = new Status(ErrCode.NOT_FOUND,
+ "make snapshot failed, failed to get tablet,
table will be dropped or truncated");
+ cancelInternal();
+ }
+ }
}
if (request.getTaskStatus().getStatusCode() ==
TStatusCode.NOT_IMPLEMENTED_ERROR) {
@@ -491,13 +542,18 @@ public class BackupJob extends AbstractJob {
List<Table> copiedTables = Lists.newArrayList();
List<Resource> copiedResources = Lists.newArrayList();
AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
+ // Track if we have any valid tables for backup
+ boolean hasValidTables = false;
for (TableRef tableRef : tableRefs) {
String tblName = tableRef.getName().getTbl();
Table tbl = db.getTableNullable(tblName);
if (tbl == null) {
- status = new Status(ErrCode.NOT_FOUND, "table " + tblName + "
does not exist");
- return;
+ // Table was dropped, skip it and continue with other tables
+ LOG.info("table {} does not exist, it was dropped during
backup preparation, skip it. {}",
+ tblName, this);
+ continue;
}
+ hasValidTables = true;
tbl.readLock();
try {
switch (tbl.getType()) {
@@ -538,7 +594,11 @@ public class BackupJob extends AbstractJob {
tbl.readUnlock();
}
}
-
+ // If no valid tables found, cancel the job
+ if (!hasValidTables) {
+ status = new Status(ErrCode.NOT_FOUND, "no valid tables found for
backup");
+ return;
+ }
// Limit the max num of tablets involved in a backup job, to avoid OOM.
if (unfinishedTaskIds.size() > Config.max_backup_tablets_per_job) {
String msg = String.format("the num involved tablets %d exceeds
the limit %d, "
@@ -829,6 +889,43 @@ public class BackupJob extends AbstractJob {
}
}
+ private void cleanupDroppedTablesAndPartitions() {
+ if (backupMeta == null) {
+ return;
+ }
+
+ // Remove dropped partitions first (before removing tables)
+ for (Map.Entry<Long, Set<Long>> entry :
droppedPartitionsByTable.entrySet()) {
+ Long tableId = entry.getKey();
+ Set<Long> droppedPartitionIds = entry.getValue();
+
+ Table table = backupMeta.getTable(tableId);
+ if (table instanceof OlapTable) {
+ OlapTable olapTable = (OlapTable) table;
+
+ // Directly get partitions by ID instead of iterating all
partitions
+ for (Long droppedPartitionId : droppedPartitionIds) {
+ Partition partition =
olapTable.getPartition(droppedPartitionId);
+ if (partition != null) {
+ LOG.info("remove dropped partition {} from table {}
(id: {}) in backup meta. {}",
+ partition.getName(), table.getName(), tableId,
this);
+
olapTable.dropPartitionAndReserveTablet(partition.getName());
+ }
+ }
+ }
+ }
+
+ // Remove dropped tables after processing partitions
+ for (Long tableId : droppedTables) {
+ Table removedTable = backupMeta.getTable(tableId);
+ if (removedTable != null) {
+ LOG.info("remove dropped table {} (id: {}) from backup meta.
{}",
+ removedTable.getName(), tableId, this);
+ backupMeta.removeTable(tableId);
+ }
+ }
+ }
+
private void saveMetaInfo(boolean replay) {
String createTimeStr = TimeUtils.longToTimeString(createTime,
TimeUtils.getDatetimeFormatWithHyphenWithTimeZone());
@@ -850,7 +947,10 @@ public class BackupJob extends AbstractJob {
return;
}
- // 2. save meta info file
+ // 2. Clean up dropped tables and partitions from backup metadata
+ cleanupDroppedTablesAndPartitions();
+
+ // 3. save meta info file
File metaInfoFile = new File(jobDir, Repository.FILE_META_INFO);
if (!metaInfoFile.createNewFile()) {
status = new Status(ErrCode.COMMON_ERROR,
@@ -860,7 +960,7 @@ public class BackupJob extends AbstractJob {
backupMeta.writeToFile(metaInfoFile);
localMetaInfoFilePath = metaInfoFile.getAbsolutePath();
- // 3. save job info file
+ // 4. save job info file
Map<Long, Long> tableCommitSeqMap = Maps.newHashMap();
// iterate properties, convert key, value from string to long
// key is "${TABLE_COMMIT_SEQ_PREFIX}{tableId}", only need tableId
to long
@@ -873,8 +973,21 @@ public class BackupJob extends AbstractJob {
tableCommitSeqMap.put(tableId, commitSeq);
}
}
+ // Filter out snapshot infos for dropped tables and partitions
+ Map<Long, SnapshotInfo> filteredSnapshotInfos = Maps.newHashMap();
+ for (Map.Entry<Long, SnapshotInfo> entry :
snapshotInfos.entrySet()) {
+ SnapshotInfo info = entry.getValue();
+ boolean isDroppedTable =
droppedTables.contains(info.getTblId());
+ boolean isDroppedPartition =
droppedPartitionsByTable.getOrDefault(info.getTblId(),
+
Collections.emptySet()).contains(info.getPartitionId());
+
+ if (!isDroppedTable && !isDroppedPartition) {
+ filteredSnapshotInfos.put(entry.getKey(), info);
+ }
+ }
+
jobInfo = BackupJobInfo.fromCatalog(createTime, label, dbName,
dbId,
- getContent(), backupMeta, snapshotInfos,
tableCommitSeqMap);
+ getContent(), backupMeta, filteredSnapshotInfos,
tableCommitSeqMap);
if (LOG.isDebugEnabled()) {
LOG.debug("job info: {}. {}", jobInfo, this);
}
@@ -907,6 +1020,10 @@ public class BackupJob extends AbstractJob {
snapshotInfos.clear();
+ // Clean up temporary records to reduce editlog size
+ droppedPartitionsByTable.clear();
+ droppedTables.clear();
+
// log
env.getEditLog().logBackupJob(this);
LOG.info("finished to save meta the backup job info file to
local.[{}], [{}] {}",
@@ -1193,6 +1310,22 @@ public class BackupJob extends AbstractJob {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}
+
+ // write dropped tables
+ out.writeInt(droppedTables.size());
+ for (Long tableId : droppedTables) {
+ out.writeLong(tableId);
+ }
+
+ // write dropped partitions
+ out.writeInt(droppedPartitionsByTable.size());
+ for (Map.Entry<Long, Set<Long>> entry :
droppedPartitionsByTable.entrySet()) {
+ out.writeLong(entry.getKey());
+ out.writeInt(entry.getValue().size());
+ for (Long partitionId : entry.getValue()) {
+ out.writeLong(partitionId);
+ }
+ }
}
public void readFields(DataInput in) throws IOException {
@@ -1267,6 +1400,24 @@ public class BackupJob extends AbstractJob {
if (properties.containsKey(SNAPSHOT_COMMIT_SEQ)) {
commitSeq = Long.parseLong(properties.get(SNAPSHOT_COMMIT_SEQ));
}
+
+ // read dropped tables
+ size = in.readInt();
+ for (int i = 0; i < size; i++) {
+ droppedTables.add(in.readLong());
+ }
+
+ // read dropped partitions
+ size = in.readInt();
+ for (int i = 0; i < size; i++) {
+ long tableId = in.readLong();
+ int partitionSize = in.readInt();
+ Set<Long> partitionIds = ConcurrentHashMap.newKeySet();
+ for (int j = 0; j < partitionSize; j++) {
+ partitionIds.add(in.readLong());
+ }
+ droppedPartitionsByTable.put(tableId, partitionIds);
+ }
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
index bef51db8d48..c2df6027328 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
@@ -84,6 +84,15 @@ public class BackupMeta implements Writable {
return tblIdMap.get(tblId);
}
+ public boolean removeTable(Long tableId) {
+ Table removedTable = tblIdMap.remove(tableId);
+ if (removedTable != null) {
+ tblNameMap.remove(removedTable.getName());
+ return true;
+ }
+ return false;
+ }
+
public static BackupMeta fromFile(String filePath, int metaVersion) throws
IOException {
return fromInputStream(new FileInputStream(filePath), metaVersion);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
index e2058221e44..62109320069 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
@@ -26,6 +26,8 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.jmockit.Deencapsulation;
@@ -58,6 +60,8 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitOption;
@@ -72,6 +76,7 @@ public class BackupJobTest {
private BackupJob job;
private Database db;
+ private OlapTable table2;
private long dbId = 1;
private long tblId = 2;
@@ -81,6 +86,12 @@ public class BackupJobTest {
private long backendId = 10000;
private long version = 6;
+ private long tblId2 = 3;
+ private long partId2 = 4;
+ private long idxId2 = 5;
+ private long tabletId2 = 6;
+ private String table2Name = "testTable2";
+
private long repoId = 20000;
private AtomicLong id = new AtomicLong(50000);
@@ -150,6 +161,10 @@ public class BackupJobTest {
Deencapsulation.setField(env, "backupHandler", backupHandler);
db = UnitTestUtil.createDb(dbId, tblId, partId, idxId, tabletId,
backendId, version);
+
+ // Create second table in setUp to avoid Env initialization issues
+ table2 = UnitTestUtil.createTable(db, tblId2, table2Name, partId2,
idxId2, tabletId2, backendId, version);
+
catalog = Deencapsulation.newInstance(InternalCatalog.class);
new Expectations(env) {
{
@@ -161,13 +176,26 @@ public class BackupJobTest {
minTimes = 0;
result = db;
+ catalog.getTableByTableId(anyLong);
+ minTimes = 0;
+ result = new Delegate<Table>() {
+ public Table getTableByTableId(Long tableId) {
+ // Check if table exists in the database
+ return db.getTableNullable(tableId);
+ }
+ };
+
Env.getCurrentEnvJournalVersion();
minTimes = 0;
result = FeConstants.meta_version;
env.getNextId();
minTimes = 0;
- result = id.getAndIncrement();
+ result = new Delegate<Long>() {
+ public Long getNextId() {
+ return id.getAndIncrement();
+ }
+ };
env.getEditLog();
minTimes = 0;
@@ -207,6 +235,7 @@ public class BackupJobTest {
}
};
+ // Only include first table to ensure other tests are not affected
List<TableRef> tableRefs = Lists.newArrayList();
tableRefs.add(new TableRef(
new TableName(InternalCatalog.INTERNAL_CATALOG_NAME,
UnitTestUtil.DB_NAME, UnitTestUtil.TABLE_NAME),
@@ -215,9 +244,20 @@ public class BackupJobTest {
env, repo.getId(), 0);
}
+ /**
+ * Test normal backup job execution flow
+ *
+ * Scenario: Backup a single table with all content
+ * Expected Results:
+ * 1. Job should progress through all states: PENDING -> SNAPSHOTING ->
UPLOAD_SNAPSHOT -> UPLOADING -> SAVE_META -> UPLOAD_INFO -> FINISHED
+ * 2. Backup meta should contain the correct table information
+ * 3. Snapshot and upload tasks should be created and executed successfully
+ * 4. Meta files should be saved and uploaded correctly
+ * 5. Job should complete successfully with OK status
+ */
@Test
public void testRunNormal() {
- // 1.pending
+ // 1. pending
Assert.assertEquals(BackupJobState.PENDING, job.getState());
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
@@ -338,9 +378,18 @@ public class BackupJobTest {
Assert.assertEquals(BackupJobState.FINISHED, job.getState());
}
+ /**
+ * Test backup job execution with non-existent table
+ *
+ * Scenario: Attempt to backup a table that does not exist
+ * Expected Results:
+ * 1. Job should fail with NOT_FOUND error code
+ * 2. Job state should be CANCELLED
+ * 3. No backup tasks should be created
+ */
@Test
public void testRunAbnormal() {
- // 1.pending
+ // 1. pending
AgentTaskQueue.clearAllTasks();
List<TableRef> tableRefs = Lists.newArrayList();
@@ -353,4 +402,219 @@ public class BackupJobTest {
Assert.assertEquals(Status.ErrCode.NOT_FOUND,
job.getStatus().getErrCode());
Assert.assertEquals(BackupJobState.CANCELLED, job.getState());
}
+
+ /**
+ * Test backup job execution with mixed existing and non-existent tables
+ *
+ * Scenario: Backup two tables - one existing table and one non-existent
table
+ * Expected Results:
+ * 1. Job should succeed and proceed to SNAPSHOTING state
+ * 2. Backup meta should only contain the existing table
+ * 3. Only snapshot tasks for the existing table should be created
+ * 4. Non-existent table should be skipped without causing job failure
+ */
+ @Test
+ public void testRunAbnormalWithMixedTables() {
+ // Test backup two tables: one normal table and one non-existent table
+ // Verify backup succeeds, backs up the normal table, and skips the
non-existent table
+ AgentTaskQueue.clearAllTasks();
+
+ List<TableRef> tableRefs = Lists.newArrayList();
+ // Add normal table
+ tableRefs.add(new TableRef(
+ new TableName(InternalCatalog.INTERNAL_CATALOG_NAME,
UnitTestUtil.DB_NAME, UnitTestUtil.TABLE_NAME),
+ null));
+ // Add non-existent table
+ tableRefs.add(
+ new TableRef(new
TableName(InternalCatalog.INTERNAL_CATALOG_NAME, UnitTestUtil.DB_NAME,
"unknown_tbl"),
+ null));
+
+ job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs,
13600 * 1000, BackupStmt.BackupContent.ALL,
+ env, repo.getId(), 0);
+
+ // 1. pending
+ Assert.assertEquals(BackupJobState.PENDING, job.getState());
+ job.run();
+ Assert.assertEquals(Status.OK, job.getStatus());
+ Assert.assertEquals(BackupJobState.SNAPSHOTING, job.getState());
+
+ // Verify backup meta only contains the normal table
+ BackupMeta backupMeta = job.getBackupMeta();
+ Assert.assertEquals(1, backupMeta.getTables().size());
+ OlapTable backupTbl = (OlapTable)
backupMeta.getTable(UnitTestUtil.TABLE_NAME);
+ Assert.assertNotNull(backupTbl);
+ Assert.assertNull(backupMeta.getTable("unknown_tbl"));
+
+ // Verify only snapshot tasks for the normal table are created
+ Assert.assertEquals(1, AgentTaskQueue.getTaskNum());
+ AgentTask task = AgentTaskQueue.getTask(backendId,
TTaskType.MAKE_SNAPSHOT, id.get() - 1);
+ Assert.assertTrue(task instanceof SnapshotTask);
+ SnapshotTask snapshotTask = (SnapshotTask) task;
+ Assert.assertEquals(tblId, snapshotTask.getTableId());
+ Assert.assertEquals(dbId, snapshotTask.getDbId());
+ Assert.assertEquals(partId, snapshotTask.getPartitionId());
+ Assert.assertEquals(idxId, snapshotTask.getIndexId());
+ Assert.assertEquals(tabletId, snapshotTask.getTabletId());
+ }
+
+ /**
+ * Test backup job execution when a table is dropped during SNAPSHOTING
phase
+ *
+ * Scenario: Start backup with two normal tables, then drop one table
during SNAPSHOTING phase
+ * Expected Results:
+ * 1. Job should start with two tables and create snapshot tasks for both
+ * 2. When one table is dropped during SNAPSHOTING, the dropped table
should be marked as dropped
+ * 3. Backup should continue successfully with only the remaining table
+ * 4. Final backup meta should only contain the non-dropped table
+ * 5. Job should complete successfully with FINISHED state
+ */
+ @Test
+ public void testRunWithTableDroppedDuringSnapshoting() {
+ try {
+ AgentTaskQueue.clearAllTasks();
+
+ List<TableRef> tableRefs = Lists.newArrayList();
+ tableRefs.add(new TableRef(
+ new TableName(InternalCatalog.INTERNAL_CATALOG_NAME,
UnitTestUtil.DB_NAME, UnitTestUtil.TABLE_NAME),
+ null));
+ tableRefs.add(new TableRef(
+ new TableName(InternalCatalog.INTERNAL_CATALOG_NAME,
UnitTestUtil.DB_NAME, table2Name),
+ null));
+
+ job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME,
tableRefs, 13600 * 1000, BackupStmt.BackupContent.ALL,
+ env, repo.getId(), 0);
+
+ // 1. pending - should create snapshot tasks for both tables
+ Assert.assertEquals(BackupJobState.PENDING, job.getState());
+ job.run();
+ Assert.assertEquals(Status.OK, job.getStatus());
+ Assert.assertEquals(BackupJobState.SNAPSHOTING, job.getState());
+
+ // Verify backup meta contains both tables initially
+ BackupMeta backupMeta = job.getBackupMeta();
+ Assert.assertEquals(2, backupMeta.getTables().size());
+ Assert.assertNotNull(backupMeta.getTable(UnitTestUtil.TABLE_NAME));
+ Assert.assertNotNull(backupMeta.getTable(table2Name));
+
+ // Verify snapshot tasks are created for both tables
+ Assert.assertEquals(2, AgentTaskQueue.getTaskNum());
+
+ // 2. Simulate dropping the second table during SNAPSHOTING phase
+ db.unregisterTable(table2Name);
+
+ // 3. Finish snapshot tasks
+ SnapshotTask taskForDroppedTable = null;
+ SnapshotTask taskForExistingTable = null;
+ long taskTabletId1 = AgentTaskQueue.getTask(backendId,
TTaskType.MAKE_SNAPSHOT, id.get() - 2).getTabletId();
+ if (taskTabletId1 == tabletId) {
+ taskForExistingTable = (SnapshotTask)
AgentTaskQueue.getTask(backendId, TTaskType.MAKE_SNAPSHOT, id.get() - 2);
+ taskForDroppedTable = (SnapshotTask)
AgentTaskQueue.getTask(backendId, TTaskType.MAKE_SNAPSHOT, id.get() - 1);
+ } else {
+ taskForDroppedTable = (SnapshotTask)
AgentTaskQueue.getTask(backendId, TTaskType.MAKE_SNAPSHOT, id.get() - 2);
+ taskForExistingTable = (SnapshotTask)
AgentTaskQueue.getTask(backendId, TTaskType.MAKE_SNAPSHOT, id.get() - 1);
+ }
+
+ TBackend tBackend = new TBackend("", 0, 1);
+
+ // Finish task for dropped table
+ TStatus taskStatusMissing = new
TStatus(TStatusCode.TABLET_MISSING);
+ taskStatusMissing.setErrorMsgs(Lists.newArrayList("Tablet
missing"));
+ TFinishTaskRequest requestMissing = new
TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
+ taskForDroppedTable.getSignature(), taskStatusMissing);
+
Assert.assertTrue(job.finishTabletSnapshotTask(taskForDroppedTable,
requestMissing));
+
+ // Finish task for existing table
+ String snapshotPath = "/path/to/snapshot";
+ List<String> snapshotFiles = Lists.newArrayList("1.dat", "1.idx",
"1.hdr");
+ TStatus taskStatusOK = new TStatus(TStatusCode.OK);
+ TFinishTaskRequest requestOK = new TFinishTaskRequest(tBackend,
TTaskType.MAKE_SNAPSHOT,
+ taskForExistingTable.getSignature(), taskStatusOK);
+ requestOK.setSnapshotFiles(snapshotFiles);
+ requestOK.setSnapshotPath(snapshotPath);
+
Assert.assertTrue(job.finishTabletSnapshotTask(taskForExistingTable,
requestOK));
+
+ // 4. Continue the backup process
+ job.run();
+ Assert.assertEquals(Status.OK, job.getStatus());
+ Assert.assertEquals(BackupJobState.UPLOAD_SNAPSHOT,
job.getState());
+
+ AgentTaskQueue.clearAllTasks();
+ job.run(); // UPLOAD_SNAPSHOT -> UPLOADING
+ Assert.assertEquals(1, AgentTaskQueue.getTaskNum());
+ UploadTask upTask = (UploadTask) AgentTaskQueue.getTask(backendId,
TTaskType.UPLOAD, id.get() - 1);
+
+ // Finish upload task
+ Map<Long, List<String>> tabletFileMap = Maps.newHashMap();
+ List<String> tabletFiles = Lists.newArrayList();
+ tabletFiles.add("1.dat.4f158689243a3d6030352fec3cfd3798");
+ tabletFiles.add("1.idx.4f158689243a3d6030352fec3cfd3798");
+ tabletFiles.add("1.hdr.4f158689243a3d6030352fec3cfd3798");
+ tabletFileMap.put(taskForExistingTable.getTabletId(), tabletFiles);
+ TFinishTaskRequest requestUpload = new
TFinishTaskRequest(tBackend, TTaskType.UPLOAD,
+ upTask.getSignature(), taskStatusOK);
+ requestUpload.setTabletFiles(tabletFileMap);
+ Assert.assertTrue(job.finishSnapshotUploadTask(upTask,
requestUpload));
+
+ job.run(); // UPLOADING -> SAVE_META
+ Assert.assertEquals(BackupJobState.SAVE_META, job.getState());
+
+ job.run(); // SAVE_META -> UPLOAD_INFO
+ Assert.assertEquals(BackupJobState.UPLOAD_INFO, job.getState());
+
+ job.run(); // UPLOAD_INFO -> FINISHED
+ Assert.assertEquals(BackupJobState.FINISHED, job.getState());
+
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ } finally {
+ // Clean up: re-register the second table if it was removed
+ if (db.getTableNullable(table2Name) == null && table2 != null) {
+ db.registerTable(table2);
+ }
+ }
+ }
+
+ /**
+ * Test backup job serialization and deserialization
+ *
+ * Scenario: Write backup job to file and read it back
+ * Expected Results:
+ * 1. Backup job should be successfully written to file
+ * 2. Backup job should be successfully read from file
+ * 3. All job properties should be preserved during
serialization/deserialization
+ * 4. Temporary files should be cleaned up
+ */
+ @Test
+ public void testSerialization() throws IOException, AnalysisException {
+ // 1. Write objects to file
+ final Path path = Files.createTempFile("backupJob", "tmp");
+ DataOutputStream out = new
DataOutputStream(Files.newOutputStream(path));
+
+ List<TableRef> tableRefs = Lists.newArrayList();
+ tableRefs.add(
+ new TableRef(new
TableName(InternalCatalog.INTERNAL_CATALOG_NAME, UnitTestUtil.DB_NAME,
UnitTestUtil.TABLE_NAME),
+ null));
+ job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs,
13600 * 1000, BackupStmt.BackupContent.ALL,
+ env, repo.getId(), 123);
+
+ job.write(out);
+ out.flush();
+ out.close();
+
+ // 2. Read objects from file
+ DataInputStream in = new DataInputStream(Files.newInputStream(path));
+
+ BackupJob job2 = BackupJob.read(in);
+
+ Assert.assertEquals(job.getJobId(), job2.getJobId());
+ Assert.assertEquals(job.getDbId(), job2.getDbId());
+ Assert.assertEquals(job.getCreateTime(), job2.getCreateTime());
+ Assert.assertEquals(job.getType(), job2.getType());
+ Assert.assertEquals(job.getCommitSeq(), job2.getCommitSeq());
+
+ // 3. delete files
+ in.close();
+ Files.delete(path);
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
index 9963479f6b8..0c85a2e7655 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
@@ -66,8 +66,14 @@ public class UnitTestUtil {
public static Database createDb(long dbId, long tableId, long partitionId,
long indexId,
long tabletId, long backendId, long
version) {
- // Catalog.getCurrentInvertedIndex().clear();
+ Database db = new Database(dbId, DB_NAME);
+ createTable(db, tableId, TABLE_NAME, partitionId, indexId, tabletId,
backendId, version);
+
+ return db;
+ }
+ public static OlapTable createTable(Database db, long tableId, String
tableName, long partitionId, long indexId,
+ long tabletId, long backendId, long
version) {
// replica
long replicaId = 0;
Replica replica1 = new Replica(replicaId, backendId,
ReplicaState.NORMAL, version, 0);
@@ -79,7 +85,7 @@ public class UnitTestUtil {
// index
MaterializedIndex index = new MaterializedIndex(indexId,
IndexState.NORMAL);
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, 0, TStorageMedium.HDD);
+ TabletMeta tabletMeta = new TabletMeta(db.getId(), tableId,
partitionId, indexId, 0, TStorageMedium.HDD);
index.addTablet(tablet, tabletMeta);
tablet.addReplica(replica1);
@@ -115,17 +121,15 @@ public class UnitTestUtil {
partitionInfo.setIsInMemory(partitionId, false);
partitionInfo.setIsMutable(partitionId, true);
partitionInfo.setTabletType(partitionId, TTabletType.TABLET_TYPE_DISK);
- OlapTable table = new OlapTable(tableId, TABLE_NAME, columns,
+ OlapTable table = new OlapTable(tableId, tableName, columns,
KeysType.AGG_KEYS, partitionInfo,
distributionInfo);
Deencapsulation.setField(table, "baseIndexId", indexId);
table.addPartition(partition);
- table.setIndexMeta(indexId, TABLE_NAME, columns, 0, SCHEMA_HASH,
(short) 1, TStorageType.COLUMN,
+ table.setIndexMeta(indexId, tableName, columns, 0, SCHEMA_HASH,
(short) 1, TStorageType.COLUMN,
KeysType.AGG_KEYS);
- // db
- Database db = new Database(dbId, DB_NAME);
db.registerTable(table);
- return db;
+ return table;
}
public static Backend createBackend(long id, String host, int heartPort,
int bePort, int httpPort) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]