This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new f12253ce96f branch-2.1: [enhancement](backup) handle dropped tables 
and partitions during backup #52935 (#55005)
f12253ce96f is described below

commit f12253ce96fe8f806acd2ffddd8e3efa74a3b90b
Author: yulihua <[email protected]>
AuthorDate: Tue Sep 9 10:51:11 2025 +0800

    branch-2.1: [enhancement](backup) handle dropped tables and partitions 
during backup #52935 (#55005)
---
 .../java/org/apache/doris/backup/BackupJob.java    | 173 ++++++++++++-
 .../java/org/apache/doris/backup/BackupMeta.java   |   9 +
 .../org/apache/doris/backup/BackupJobTest.java     | 270 ++++++++++++++++++++-
 .../org/apache/doris/common/util/UnitTestUtil.java |  18 +-
 4 files changed, 449 insertions(+), 21 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index ca77bb3de23..51ceb3eca1a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -78,6 +78,8 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
@@ -125,6 +127,12 @@ public class BackupJob extends AbstractJob {
     // backup properties && table commit seq with table id
     private Map<String, String> properties = Maps.newHashMap();
 
+    // Record table IDs that were dropped during backup
+    private Set<Long> droppedTables = ConcurrentHashMap.newKeySet();
+
+    // Record partition IDs that were dropped during backup (tableId -> set of 
partitionIds)
+    private Map<Long, Set<Long>> droppedPartitionsByTable = 
Maps.newConcurrentMap();
+
     private long commitSeq = 0;
 
     public BackupJob() {
@@ -228,6 +236,39 @@ public class BackupJob extends AbstractJob {
         return true;
     }
 
+    private boolean handleTabletMissing(SnapshotTask task) {
+        LOG.info("handleTabletMissing task: {}", task);
+        Table table = 
env.getInternalCatalog().getTableByTableId(task.getTableId());
+        if (table == null) {
+            // Table was dropped (including cases where database was dropped)
+            droppedTables.add(task.getTableId());
+            LOG.info("table {} marked as dropped during backup. {}", 
task.getTableId(), this);
+            return true;
+        }
+
+        if (!(table instanceof OlapTable)) {
+            return false;
+        }
+
+        OlapTable olapTable = (OlapTable) table;
+        olapTable.readLock();
+        try {
+            Partition partition = 
olapTable.getPartition(task.getPartitionId());
+            if (partition == null) {
+                // Partition was dropped or truncated (partition ID changed)
+                droppedPartitionsByTable.computeIfAbsent(task.getTableId(), k 
-> ConcurrentHashMap.newKeySet())
+                                       .add(task.getPartitionId());
+                LOG.info("partition {} from table {} marked as dropped during 
backup (dropped or truncated). {}",
+                        task.getPartitionId(), task.getTableId(), this);
+                return true;
+            }
+
+            // If partition still exists, tablet missing is caused by other 
reasons
+            return false;
+        } finally {
+            olapTable.readUnlock();
+        }
+    }
 
     public synchronized boolean finishTabletSnapshotTask(SnapshotTask task, 
TFinishTaskRequest request) {
         Preconditions.checkState(task.getJobId() == jobId);
@@ -242,11 +283,21 @@ public class BackupJob extends AbstractJob {
                 cancelInternal();
             }
 
-            if (request.getTaskStatus().getStatusCode() == 
TStatusCode.TABLET_MISSING
-                    && !tryNewTabletSnapshotTask(task)) {
-                status = new Status(ErrCode.NOT_FOUND,
-                        "make snapshot failed, failed to ge tablet, table will 
be dropped or truncated");
-                cancelInternal();
+            if (request.getTaskStatus().getStatusCode() == 
TStatusCode.TABLET_MISSING) {
+                if (handleTabletMissing(task)) {
+                    // Successfully handled drop case, remove from task queue
+                    taskProgress.remove(task.getSignature());
+                    taskErrMsg.remove(task.getSignature());
+                    Long oldValue = 
unfinishedTaskIds.remove(task.getSignature());
+                    return oldValue != null;
+                } else {
+                    // Not caused by drop, follow original logic
+                    if (!tryNewTabletSnapshotTask(task)) {
+                        status = new Status(ErrCode.NOT_FOUND,
+                                "make snapshot failed, failed to get tablet, 
table will be dropped or truncated");
+                        cancelInternal();
+                    }
+                }
             }
 
             if (request.getTaskStatus().getStatusCode() == 
TStatusCode.NOT_IMPLEMENTED_ERROR) {
@@ -491,13 +542,18 @@ public class BackupJob extends AbstractJob {
         List<Table> copiedTables = Lists.newArrayList();
         List<Resource> copiedResources = Lists.newArrayList();
         AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
+        // Track if we have any valid tables for backup
+        boolean hasValidTables = false;
         for (TableRef tableRef : tableRefs) {
             String tblName = tableRef.getName().getTbl();
             Table tbl = db.getTableNullable(tblName);
             if (tbl == null) {
-                status = new Status(ErrCode.NOT_FOUND, "table " + tblName + " 
does not exist");
-                return;
+                // Table was dropped, skip it and continue with other tables
+                LOG.info("table {} does not exist, it was dropped during 
backup preparation, skip it. {}",
+                        tblName, this);
+                continue;
             }
+            hasValidTables = true;
             tbl.readLock();
             try {
                 switch (tbl.getType()) {
@@ -538,7 +594,11 @@ public class BackupJob extends AbstractJob {
                 tbl.readUnlock();
             }
         }
-
+        // If no valid tables found, cancel the job
+        if (!hasValidTables) {
+            status = new Status(ErrCode.NOT_FOUND, "no valid tables found for 
backup");
+            return;
+        }
         // Limit the max num of tablets involved in a backup job, to avoid OOM.
         if (unfinishedTaskIds.size() > Config.max_backup_tablets_per_job) {
             String msg = String.format("the num involved tablets %d exceeds 
the limit %d, "
@@ -829,6 +889,43 @@ public class BackupJob extends AbstractJob {
         }
     }
 
+    private void cleanupDroppedTablesAndPartitions() {
+        if (backupMeta == null) {
+            return;
+        }
+
+        // Remove dropped partitions first (before removing tables)
+        for (Map.Entry<Long, Set<Long>> entry : 
droppedPartitionsByTable.entrySet()) {
+            Long tableId = entry.getKey();
+            Set<Long> droppedPartitionIds = entry.getValue();
+
+            Table table = backupMeta.getTable(tableId);
+            if (table instanceof OlapTable) {
+                OlapTable olapTable = (OlapTable) table;
+
+                // Directly get partitions by ID instead of iterating all 
partitions
+                for (Long droppedPartitionId : droppedPartitionIds) {
+                    Partition partition = 
olapTable.getPartition(droppedPartitionId);
+                    if (partition != null) {
+                        LOG.info("remove dropped partition {} from table {} 
(id: {}) in backup meta. {}",
+                                partition.getName(), table.getName(), tableId, 
this);
+                        
olapTable.dropPartitionAndReserveTablet(partition.getName());
+                    }
+                }
+            }
+        }
+
+        // Remove dropped tables after processing partitions
+        for (Long tableId : droppedTables) {
+            Table removedTable = backupMeta.getTable(tableId);
+            if (removedTable != null) {
+                LOG.info("remove dropped table {} (id: {}) from backup meta. 
{}",
+                        removedTable.getName(), tableId, this);
+                backupMeta.removeTable(tableId);
+            }
+        }
+    }
+
     private void saveMetaInfo(boolean replay) {
         String createTimeStr = TimeUtils.longToTimeString(createTime,
                 TimeUtils.getDatetimeFormatWithHyphenWithTimeZone());
@@ -850,7 +947,10 @@ public class BackupJob extends AbstractJob {
                 return;
             }
 
-            // 2. save meta info file
+            // 2. Clean up dropped tables and partitions from backup metadata
+            cleanupDroppedTablesAndPartitions();
+
+            // 3. save meta info file
             File metaInfoFile = new File(jobDir, Repository.FILE_META_INFO);
             if (!metaInfoFile.createNewFile()) {
                 status = new Status(ErrCode.COMMON_ERROR,
@@ -860,7 +960,7 @@ public class BackupJob extends AbstractJob {
             backupMeta.writeToFile(metaInfoFile);
             localMetaInfoFilePath = metaInfoFile.getAbsolutePath();
 
-            // 3. save job info file
+            // 4. save job info file
             Map<Long, Long> tableCommitSeqMap = Maps.newHashMap();
             // iterate properties, convert key, value from string to long
             // key is "${TABLE_COMMIT_SEQ_PREFIX}{tableId}", only need tableId 
to long
@@ -873,8 +973,21 @@ public class BackupJob extends AbstractJob {
                     tableCommitSeqMap.put(tableId, commitSeq);
                 }
             }
+            // Filter out snapshot infos for dropped tables and partitions
+            Map<Long, SnapshotInfo> filteredSnapshotInfos = Maps.newHashMap();
+            for (Map.Entry<Long, SnapshotInfo> entry : 
snapshotInfos.entrySet()) {
+                SnapshotInfo info = entry.getValue();
+                boolean isDroppedTable = 
droppedTables.contains(info.getTblId());
+                boolean isDroppedPartition = 
droppedPartitionsByTable.getOrDefault(info.getTblId(),
+                        
Collections.emptySet()).contains(info.getPartitionId());
+
+                if (!isDroppedTable && !isDroppedPartition) {
+                    filteredSnapshotInfos.put(entry.getKey(), info);
+                }
+            }
+
             jobInfo = BackupJobInfo.fromCatalog(createTime, label, dbName, 
dbId,
-                    getContent(), backupMeta, snapshotInfos, 
tableCommitSeqMap);
+                    getContent(), backupMeta, filteredSnapshotInfos, 
tableCommitSeqMap);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("job info: {}. {}", jobInfo, this);
             }
@@ -907,6 +1020,10 @@ public class BackupJob extends AbstractJob {
 
         snapshotInfos.clear();
 
+        // Clean up temporary records to reduce editlog size
+        droppedPartitionsByTable.clear();
+        droppedTables.clear();
+
         // log
         env.getEditLog().logBackupJob(this);
         LOG.info("finished to save meta the backup job info file to 
local.[{}], [{}] {}",
@@ -1193,6 +1310,22 @@ public class BackupJob extends AbstractJob {
             Text.writeString(out, entry.getKey());
             Text.writeString(out, entry.getValue());
         }
+
+        // write dropped tables
+        out.writeInt(droppedTables.size());
+        for (Long tableId : droppedTables) {
+            out.writeLong(tableId);
+        }
+
+        // write dropped partitions
+        out.writeInt(droppedPartitionsByTable.size());
+        for (Map.Entry<Long, Set<Long>> entry : 
droppedPartitionsByTable.entrySet()) {
+            out.writeLong(entry.getKey());
+            out.writeInt(entry.getValue().size());
+            for (Long partitionId : entry.getValue()) {
+                out.writeLong(partitionId);
+            }
+        }
     }
 
     public void readFields(DataInput in) throws IOException {
@@ -1267,6 +1400,24 @@ public class BackupJob extends AbstractJob {
         if (properties.containsKey(SNAPSHOT_COMMIT_SEQ)) {
             commitSeq = Long.parseLong(properties.get(SNAPSHOT_COMMIT_SEQ));
         }
+
+        // read dropped tables
+        size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            droppedTables.add(in.readLong());
+        }
+
+        // read dropped partitions
+        size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            long tableId = in.readLong();
+            int partitionSize = in.readInt();
+            Set<Long> partitionIds = ConcurrentHashMap.newKeySet();
+            for (int j = 0; j < partitionSize; j++) {
+                partitionIds.add(in.readLong());
+            }
+            droppedPartitionsByTable.put(tableId, partitionIds);
+        }
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
index bef51db8d48..c2df6027328 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
@@ -84,6 +84,15 @@ public class BackupMeta implements Writable {
         return tblIdMap.get(tblId);
     }
 
+    public boolean removeTable(Long tableId) {
+        Table removedTable = tblIdMap.remove(tableId);
+        if (removedTable != null) {
+            tblNameMap.remove(removedTable.getName());
+            return true;
+        }
+        return false;
+    }
+
     public static BackupMeta fromFile(String filePath, int metaVersion) throws 
IOException {
         return fromInputStream(new FileInputStream(filePath), metaVersion);
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
index e2058221e44..62109320069 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
@@ -26,6 +26,8 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.jmockit.Deencapsulation;
@@ -58,6 +60,8 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.FileVisitOption;
@@ -72,6 +76,7 @@ public class BackupJobTest {
 
     private BackupJob job;
     private Database db;
+    private OlapTable table2;
 
     private long dbId = 1;
     private long tblId = 2;
@@ -81,6 +86,12 @@ public class BackupJobTest {
     private long backendId = 10000;
     private long version = 6;
 
+    private long tblId2 = 3;
+    private long partId2 = 4;
+    private long idxId2 = 5;
+    private long tabletId2 = 6;
+    private String table2Name = "testTable2";
+
     private long repoId = 20000;
     private AtomicLong id = new AtomicLong(50000);
 
@@ -150,6 +161,10 @@ public class BackupJobTest {
         Deencapsulation.setField(env, "backupHandler", backupHandler);
 
         db = UnitTestUtil.createDb(dbId, tblId, partId, idxId, tabletId, 
backendId, version);
+
+        // Create second table in setUp to avoid Env initialization issues
+        table2 = UnitTestUtil.createTable(db, tblId2, table2Name, partId2, 
idxId2, tabletId2, backendId, version);
+
         catalog = Deencapsulation.newInstance(InternalCatalog.class);
         new Expectations(env) {
             {
@@ -161,13 +176,26 @@ public class BackupJobTest {
                 minTimes = 0;
                 result = db;
 
+                catalog.getTableByTableId(anyLong);
+                minTimes = 0;
+                result = new Delegate<Table>() {
+                    public Table getTableByTableId(Long tableId) {
+                        // Check if table exists in the database
+                        return db.getTableNullable(tableId);
+                    }
+                };
+
                 Env.getCurrentEnvJournalVersion();
                 minTimes = 0;
                 result = FeConstants.meta_version;
 
                 env.getNextId();
                 minTimes = 0;
-                result = id.getAndIncrement();
+                result = new Delegate<Long>() {
+                    public Long getNextId() {
+                        return id.getAndIncrement();
+                    }
+                };
 
                 env.getEditLog();
                 minTimes = 0;
@@ -207,6 +235,7 @@ public class BackupJobTest {
             }
         };
 
+        // Only include first table to ensure other tests are not affected
         List<TableRef> tableRefs = Lists.newArrayList();
         tableRefs.add(new TableRef(
                 new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, 
UnitTestUtil.DB_NAME, UnitTestUtil.TABLE_NAME),
@@ -215,9 +244,20 @@ public class BackupJobTest {
                 env, repo.getId(), 0);
     }
 
+    /**
+     * Test normal backup job execution flow
+     *
+     * Scenario: Backup a single table with all content
+     * Expected Results:
+     * 1. Job should progress through all states: PENDING -> SNAPSHOTING -> 
UPLOAD_SNAPSHOT -> UPLOADING -> SAVE_META -> UPLOAD_INFO -> FINISHED
+     * 2. Backup meta should contain the correct table information
+     * 3. Snapshot and upload tasks should be created and executed successfully
+     * 4. Meta files should be saved and uploaded correctly
+     * 5. Job should complete successfully with OK status
+     */
     @Test
     public void testRunNormal() {
-        // 1.pending
+        // 1. pending
         Assert.assertEquals(BackupJobState.PENDING, job.getState());
         job.run();
         Assert.assertEquals(Status.OK, job.getStatus());
@@ -338,9 +378,18 @@ public class BackupJobTest {
         Assert.assertEquals(BackupJobState.FINISHED, job.getState());
     }
 
+    /**
+     * Test backup job execution with non-existent table
+     *
+     * Scenario: Attempt to backup a table that does not exist
+     * Expected Results:
+     * 1. Job should fail with NOT_FOUND error code
+     * 2. Job state should be CANCELLED
+     * 3. No backup tasks should be created
+     */
     @Test
     public void testRunAbnormal() {
-        // 1.pending
+        // 1. pending
         AgentTaskQueue.clearAllTasks();
 
         List<TableRef> tableRefs = Lists.newArrayList();
@@ -353,4 +402,219 @@ public class BackupJobTest {
         Assert.assertEquals(Status.ErrCode.NOT_FOUND, 
job.getStatus().getErrCode());
         Assert.assertEquals(BackupJobState.CANCELLED, job.getState());
     }
+
+    /**
+     * Test backup job execution with mixed existing and non-existent tables
+     *
+     * Scenario: Backup two tables - one existing table and one non-existent 
table
+     * Expected Results:
+     * 1. Job should succeed and proceed to SNAPSHOTING state
+     * 2. Backup meta should only contain the existing table
+     * 3. Only snapshot tasks for the existing table should be created
+     * 4. Non-existent table should be skipped without causing job failure
+     */
+    @Test
+    public void testRunAbnormalWithMixedTables() {
+        // Test backup two tables: one normal table and one non-existent table
+        // Verify backup succeeds, backs up the normal table, and skips the 
non-existent table
+        AgentTaskQueue.clearAllTasks();
+
+        List<TableRef> tableRefs = Lists.newArrayList();
+        // Add normal table
+        tableRefs.add(new TableRef(
+                new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, 
UnitTestUtil.DB_NAME, UnitTestUtil.TABLE_NAME),
+                null));
+        // Add non-existent table
+        tableRefs.add(
+                new TableRef(new 
TableName(InternalCatalog.INTERNAL_CATALOG_NAME, UnitTestUtil.DB_NAME, 
"unknown_tbl"),
+                        null));
+
+        job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs, 
13600 * 1000, BackupStmt.BackupContent.ALL,
+                env, repo.getId(), 0);
+
+        // 1. pending
+        Assert.assertEquals(BackupJobState.PENDING, job.getState());
+        job.run();
+        Assert.assertEquals(Status.OK, job.getStatus());
+        Assert.assertEquals(BackupJobState.SNAPSHOTING, job.getState());
+
+        // Verify backup meta only contains the normal table
+        BackupMeta backupMeta = job.getBackupMeta();
+        Assert.assertEquals(1, backupMeta.getTables().size());
+        OlapTable backupTbl = (OlapTable) 
backupMeta.getTable(UnitTestUtil.TABLE_NAME);
+        Assert.assertNotNull(backupTbl);
+        Assert.assertNull(backupMeta.getTable("unknown_tbl"));
+
+        // Verify only snapshot tasks for the normal table are created
+        Assert.assertEquals(1, AgentTaskQueue.getTaskNum());
+        AgentTask task = AgentTaskQueue.getTask(backendId, 
TTaskType.MAKE_SNAPSHOT, id.get() - 1);
+        Assert.assertTrue(task instanceof SnapshotTask);
+        SnapshotTask snapshotTask = (SnapshotTask) task;
+        Assert.assertEquals(tblId, snapshotTask.getTableId());
+        Assert.assertEquals(dbId, snapshotTask.getDbId());
+        Assert.assertEquals(partId, snapshotTask.getPartitionId());
+        Assert.assertEquals(idxId, snapshotTask.getIndexId());
+        Assert.assertEquals(tabletId, snapshotTask.getTabletId());
+    }
+
+    /**
+     * Test backup job execution when a table is dropped during SNAPSHOTING 
phase
+     *
+     * Scenario: Start backup with two normal tables, then drop one table 
during SNAPSHOTING phase
+     * Expected Results:
+     * 1. Job should start with two tables and create snapshot tasks for both
+     * 2. When one table is dropped during SNAPSHOTING, the dropped table 
should be marked as dropped
+     * 3. Backup should continue successfully with only the remaining table
+     * 4. Final backup meta should only contain the non-dropped table
+     * 5. Job should complete successfully with FINISHED state
+     */
+    @Test
+    public void testRunWithTableDroppedDuringSnapshoting() {
+        try {
+            AgentTaskQueue.clearAllTasks();
+
+            List<TableRef> tableRefs = Lists.newArrayList();
+            tableRefs.add(new TableRef(
+                    new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, 
UnitTestUtil.DB_NAME, UnitTestUtil.TABLE_NAME),
+                    null));
+            tableRefs.add(new TableRef(
+                    new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, 
UnitTestUtil.DB_NAME, table2Name),
+                    null));
+
+            job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, 
tableRefs, 13600 * 1000, BackupStmt.BackupContent.ALL,
+                    env, repo.getId(), 0);
+
+            // 1. pending - should create snapshot tasks for both tables
+            Assert.assertEquals(BackupJobState.PENDING, job.getState());
+            job.run();
+            Assert.assertEquals(Status.OK, job.getStatus());
+            Assert.assertEquals(BackupJobState.SNAPSHOTING, job.getState());
+
+            // Verify backup meta contains both tables initially
+            BackupMeta backupMeta = job.getBackupMeta();
+            Assert.assertEquals(2, backupMeta.getTables().size());
+            Assert.assertNotNull(backupMeta.getTable(UnitTestUtil.TABLE_NAME));
+            Assert.assertNotNull(backupMeta.getTable(table2Name));
+
+            // Verify snapshot tasks are created for both tables
+            Assert.assertEquals(2, AgentTaskQueue.getTaskNum());
+
+            // 2. Simulate dropping the second table during SNAPSHOTING phase
+            db.unregisterTable(table2Name);
+
+            // 3. Finish snapshot tasks
+            SnapshotTask taskForDroppedTable = null;
+            SnapshotTask taskForExistingTable = null;
+            long taskTabletId1 = AgentTaskQueue.getTask(backendId, 
TTaskType.MAKE_SNAPSHOT, id.get() - 2).getTabletId();
+            if (taskTabletId1 == tabletId) {
+                taskForExistingTable = (SnapshotTask) 
AgentTaskQueue.getTask(backendId, TTaskType.MAKE_SNAPSHOT, id.get() - 2);
+                taskForDroppedTable = (SnapshotTask) 
AgentTaskQueue.getTask(backendId, TTaskType.MAKE_SNAPSHOT, id.get() - 1);
+            } else {
+                taskForDroppedTable = (SnapshotTask) 
AgentTaskQueue.getTask(backendId, TTaskType.MAKE_SNAPSHOT, id.get() - 2);
+                taskForExistingTable = (SnapshotTask) 
AgentTaskQueue.getTask(backendId, TTaskType.MAKE_SNAPSHOT, id.get() - 1);
+            }
+
+            TBackend tBackend = new TBackend("", 0, 1);
+
+            // Finish task for dropped table
+            TStatus taskStatusMissing = new 
TStatus(TStatusCode.TABLET_MISSING);
+            taskStatusMissing.setErrorMsgs(Lists.newArrayList("Tablet 
missing"));
+            TFinishTaskRequest requestMissing = new 
TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
+                    taskForDroppedTable.getSignature(), taskStatusMissing);
+            
Assert.assertTrue(job.finishTabletSnapshotTask(taskForDroppedTable, 
requestMissing));
+
+            // Finish task for existing table
+            String snapshotPath = "/path/to/snapshot";
+            List<String> snapshotFiles = Lists.newArrayList("1.dat", "1.idx", 
"1.hdr");
+            TStatus taskStatusOK = new TStatus(TStatusCode.OK);
+            TFinishTaskRequest requestOK = new TFinishTaskRequest(tBackend, 
TTaskType.MAKE_SNAPSHOT,
+                    taskForExistingTable.getSignature(), taskStatusOK);
+            requestOK.setSnapshotFiles(snapshotFiles);
+            requestOK.setSnapshotPath(snapshotPath);
+            
Assert.assertTrue(job.finishTabletSnapshotTask(taskForExistingTable, 
requestOK));
+
+            // 4. Continue the backup process
+            job.run();
+            Assert.assertEquals(Status.OK, job.getStatus());
+            Assert.assertEquals(BackupJobState.UPLOAD_SNAPSHOT, 
job.getState());
+
+            AgentTaskQueue.clearAllTasks();
+            job.run(); // UPLOAD_SNAPSHOT -> UPLOADING
+            Assert.assertEquals(1, AgentTaskQueue.getTaskNum());
+            UploadTask upTask = (UploadTask) AgentTaskQueue.getTask(backendId, 
TTaskType.UPLOAD, id.get() - 1);
+
+            // Finish upload task
+            Map<Long, List<String>> tabletFileMap = Maps.newHashMap();
+            List<String> tabletFiles = Lists.newArrayList();
+            tabletFiles.add("1.dat.4f158689243a3d6030352fec3cfd3798");
+            tabletFiles.add("1.idx.4f158689243a3d6030352fec3cfd3798");
+            tabletFiles.add("1.hdr.4f158689243a3d6030352fec3cfd3798");
+            tabletFileMap.put(taskForExistingTable.getTabletId(), tabletFiles);
+            TFinishTaskRequest requestUpload = new 
TFinishTaskRequest(tBackend, TTaskType.UPLOAD,
+                    upTask.getSignature(), taskStatusOK);
+            requestUpload.setTabletFiles(tabletFileMap);
+            Assert.assertTrue(job.finishSnapshotUploadTask(upTask, 
requestUpload));
+
+            job.run(); // UPLOADING -> SAVE_META
+            Assert.assertEquals(BackupJobState.SAVE_META, job.getState());
+
+            job.run(); // SAVE_META -> UPLOAD_INFO
+            Assert.assertEquals(BackupJobState.UPLOAD_INFO, job.getState());
+
+            job.run(); // UPLOAD_INFO -> FINISHED
+            Assert.assertEquals(BackupJobState.FINISHED, job.getState());
+
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        } finally {
+            // Clean up: re-register the second table if it was removed
+            if (db.getTableNullable(table2Name) == null && table2 != null) {
+                db.registerTable(table2);
+            }
+        }
+    }
+
+    /**
+     * Test backup job serialization and deserialization
+     *
+     * Scenario: Write backup job to file and read it back
+     * Expected Results:
+     * 1. Backup job should be successfully written to file
+     * 2. Backup job should be successfully read from file
+     * 3. All job properties should be preserved during 
serialization/deserialization
+     * 4. Temporary files should be cleaned up
+     */
+    @Test
+    public void testSerialization() throws IOException, AnalysisException {
+        // 1. Write objects to file
+        final Path path = Files.createTempFile("backupJob", "tmp");
+        DataOutputStream out = new 
DataOutputStream(Files.newOutputStream(path));
+
+        List<TableRef> tableRefs = Lists.newArrayList();
+        tableRefs.add(
+                new TableRef(new 
TableName(InternalCatalog.INTERNAL_CATALOG_NAME, UnitTestUtil.DB_NAME, 
UnitTestUtil.TABLE_NAME),
+                        null));
+        job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs, 
13600 * 1000, BackupStmt.BackupContent.ALL,
+            env, repo.getId(), 123);
+
+        job.write(out);
+        out.flush();
+        out.close();
+
+        // 2. Read objects from file
+        DataInputStream in = new DataInputStream(Files.newInputStream(path));
+
+        BackupJob job2 = BackupJob.read(in);
+
+        Assert.assertEquals(job.getJobId(), job2.getJobId());
+        Assert.assertEquals(job.getDbId(), job2.getDbId());
+        Assert.assertEquals(job.getCreateTime(), job2.getCreateTime());
+        Assert.assertEquals(job.getType(), job2.getType());
+        Assert.assertEquals(job.getCommitSeq(), job2.getCommitSeq());
+
+        // 3. delete files
+        in.close();
+        Files.delete(path);
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
index 9963479f6b8..0c85a2e7655 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
@@ -66,8 +66,14 @@ public class UnitTestUtil {
 
     public static Database createDb(long dbId, long tableId, long partitionId, 
long indexId,
                                     long tabletId, long backendId, long 
version) {
-        // Catalog.getCurrentInvertedIndex().clear();
+        Database db = new Database(dbId, DB_NAME);
+        createTable(db, tableId, TABLE_NAME, partitionId, indexId, tabletId, 
backendId, version);
+
+        return db;
+    }
 
+    public static OlapTable createTable(Database db, long tableId, String 
tableName, long partitionId, long indexId,
+                                        long tabletId, long backendId, long 
version) {
         // replica
         long replicaId = 0;
         Replica replica1 = new Replica(replicaId, backendId, 
ReplicaState.NORMAL, version, 0);
@@ -79,7 +85,7 @@ public class UnitTestUtil {
 
         // index
         MaterializedIndex index = new MaterializedIndex(indexId, 
IndexState.NORMAL);
-        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, 
indexId, 0, TStorageMedium.HDD);
+        TabletMeta tabletMeta = new TabletMeta(db.getId(), tableId, 
partitionId, indexId, 0, TStorageMedium.HDD);
         index.addTablet(tablet, tabletMeta);
 
         tablet.addReplica(replica1);
@@ -115,17 +121,15 @@ public class UnitTestUtil {
         partitionInfo.setIsInMemory(partitionId, false);
         partitionInfo.setIsMutable(partitionId, true);
         partitionInfo.setTabletType(partitionId, TTabletType.TABLET_TYPE_DISK);
-        OlapTable table = new OlapTable(tableId, TABLE_NAME, columns,
+        OlapTable table = new OlapTable(tableId, tableName, columns,
                                         KeysType.AGG_KEYS, partitionInfo, 
distributionInfo);
         Deencapsulation.setField(table, "baseIndexId", indexId);
         table.addPartition(partition);
-        table.setIndexMeta(indexId, TABLE_NAME, columns, 0, SCHEMA_HASH, 
(short) 1, TStorageType.COLUMN,
+        table.setIndexMeta(indexId, tableName, columns, 0, SCHEMA_HASH, 
(short) 1, TStorageType.COLUMN,
                 KeysType.AGG_KEYS);
 
-        // db
-        Database db = new Database(dbId, DB_NAME);
         db.registerTable(table);
-        return db;
+        return table;
     }
 
     public static Backend createBackend(long id, String host, int heartPort, 
int bePort, int httpPort) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to