This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 5dda61b410c [fix](backup) Load backup meta and job info bytes from 
disk #43276 (#43519)
5dda61b410c is described below

commit 5dda61b410cdde1dc7495dee9a95eeafe6f6fb25
Author: walter <w41te...@gmail.com>
AuthorDate: Mon Nov 11 14:08:35 2024 +0800

    [fix](backup) Load backup meta and job info bytes from disk #43276 (#43519)
    
    cherry pick from #43276
---
 .../java/org/apache/doris/backup/AbstractJob.java  |  2 +
 .../org/apache/doris/backup/BackupHandler.java     | 50 +++++++++++++++++++---
 .../java/org/apache/doris/backup/BackupJob.java    | 45 ++++++++++++-------
 .../java/org/apache/doris/backup/RestoreJob.java   |  5 +++
 .../apache/doris/service/FrontendServiceImpl.java  |  7 ++-
 5 files changed, 85 insertions(+), 24 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
index 4e2c3fd1990..f22598dd86b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
@@ -155,6 +155,8 @@ public abstract class AbstractJob implements Writable {
 
     public abstract boolean isCancelled();
 
+    public abstract boolean isFinished();
+
     public abstract Status updateRepo(Repository repo);
 
     public static AbstractJob read(DataInput in) throws IOException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index d9b7659cfc1..49190acce1a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -109,10 +109,10 @@ public class BackupHandler extends MasterDaemon 
implements Writable {
 
     private Env env;
 
-    // map to store backup info, key is label name, value is Pair<meta, info>, 
meta && info is bytes
-    // this map not present in persist && only in fe master memory
+    // map to store backup info, key is label name, value is the BackupJob
+    // this map not present in persist && only in fe memory
     // one table only keep one snapshot info, only keep last
-    private final Map<String, Snapshot> localSnapshots = new HashMap<>();
+    private final Map<String, BackupJob> localSnapshots = new HashMap<>();
     private ReadWriteLock localSnapshotsLock = new ReentrantReadWriteLock();
 
     public BackupHandler() {
@@ -167,6 +167,7 @@ public class BackupHandler extends MasterDaemon implements 
Writable {
                 return false;
             }
         }
+
         isInit = true;
         return true;
     }
@@ -544,11 +545,15 @@ public class BackupHandler extends MasterDaemon 
implements Writable {
             return;
         }
 
+        List<String> removedLabels = Lists.newArrayList();
         jobLock.lock();
         try {
             Deque<AbstractJob> jobs = 
dbIdToBackupOrRestoreJobs.computeIfAbsent(dbId, k -> Lists.newLinkedList());
             while (jobs.size() >= Config.max_backup_restore_job_num_per_db) {
-                jobs.removeFirst();
+                AbstractJob removedJob = jobs.removeFirst();
+                if (removedJob instanceof BackupJob && ((BackupJob) 
removedJob).isLocalSnapshot()) {
+                    removedLabels.add(removedJob.getLabel());
+                }
             }
             AbstractJob lastJob = jobs.peekLast();
 
@@ -561,6 +566,17 @@ public class BackupHandler extends MasterDaemon implements 
Writable {
         } finally {
             jobLock.unlock();
         }
+
+        if (job.isFinished() && job instanceof BackupJob) {
+            // Save snapshot to local repo, when reload backupHandler from 
image.
+            BackupJob backupJob = (BackupJob) job;
+            if (backupJob.isLocalSnapshot()) {
+                addSnapshot(backupJob.getLabel(), backupJob);
+            }
+        }
+        for (String label : removedLabels) {
+            removeSnapshot(label);
+        }
     }
 
     private List<AbstractJob> getAllCurrentJobs() {
@@ -799,22 +815,42 @@ public class BackupHandler extends MasterDaemon 
implements Writable {
         return false;
     }
 
-    public void addSnapshot(String labelName, Snapshot snapshot) {
+    public void addSnapshot(String labelName, BackupJob backupJob) {
+        assert backupJob.isFinished();
+
+        LOG.info("add snapshot {} to local repo", labelName);
         localSnapshotsLock.writeLock().lock();
         try {
-            localSnapshots.put(labelName, snapshot);
+            localSnapshots.put(labelName, backupJob);
+        } finally {
+            localSnapshotsLock.writeLock().unlock();
+        }
+    }
+
+    public void removeSnapshot(String labelName) {
+        LOG.info("remove snapshot {} from local repo", labelName);
+        localSnapshotsLock.writeLock().lock();
+        try {
+            localSnapshots.remove(labelName);
         } finally {
             localSnapshotsLock.writeLock().unlock();
         }
     }
 
     public Snapshot getSnapshot(String labelName) {
+        BackupJob backupJob;
         localSnapshotsLock.readLock().lock();
         try {
-            return localSnapshots.get(labelName);
+            backupJob = localSnapshots.get(labelName);
         } finally {
             localSnapshotsLock.readLock().unlock();
         }
+
+        if (backupJob == null) {
+            return null;
+        }
+
+        return backupJob.getSnapshot();
     }
 
     public static BackupHandler read(DataInput in) throws IOException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index b24fb9fe7fd..9e932d6f8fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -123,9 +123,6 @@ public class BackupJob extends AbstractJob {
     // backup properties && table commit seq with table id
     private Map<String, String> properties = Maps.newHashMap();
 
-    private byte[] metaInfoBytes = null;
-    private byte[] jobInfoBytes = null;
-
     public BackupJob() {
         super(JobType.BACKUP);
     }
@@ -337,11 +334,7 @@ public class BackupJob extends AbstractJob {
 
     @Override
     public synchronized void replayRun() {
-        LOG.info("replay run backup job: {}", this);
-        if (state == BackupJobState.FINISHED && repoId == 
Repository.KEEP_ON_LOCAL_REPO_ID) {
-            Snapshot snapshot = new Snapshot(label, metaInfoBytes, 
jobInfoBytes);
-            env.getBackupHandler().addSnapshot(label, snapshot);
-        }
+        // nothing to do
     }
 
     @Override
@@ -359,6 +352,11 @@ public class BackupJob extends AbstractJob {
         return state == BackupJobState.CANCELLED;
     }
 
+    @Override
+    public boolean isFinished() {
+        return state == BackupJobState.FINISHED;
+    }
+
     @Override
     public synchronized Status updateRepo(Repository repo) {
         this.repo = repo;
@@ -844,8 +842,6 @@ public class BackupJob extends AbstractJob {
             }
             backupMeta.writeToFile(metaInfoFile);
             localMetaInfoFilePath = metaInfoFile.getAbsolutePath();
-            // read meta info to metaInfoBytes
-            metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
 
             // 3. save job info file
             Map<Long, Long> tableCommitSeqMap = Maps.newHashMap();
@@ -872,8 +868,6 @@ public class BackupJob extends AbstractJob {
             }
             jobInfo.writeToFile(jobInfoFile);
             localJobInfoFilePath = jobInfoFile.getAbsolutePath();
-            // read job info to jobInfoBytes
-            jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
         } catch (Exception e) {
             status = new Status(ErrCode.COMMON_ERROR, "failed to save meta 
info and job info file: " + e.getMessage());
             return;
@@ -927,7 +921,6 @@ public class BackupJob extends AbstractJob {
             }
         }
 
-
         finishedTime = System.currentTimeMillis();
         state = BackupJobState.FINISHED;
 
@@ -936,8 +929,7 @@ public class BackupJob extends AbstractJob {
         LOG.info("job is finished. {}", this);
 
         if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
-            Snapshot snapshot = new Snapshot(label, metaInfoBytes, 
jobInfoBytes);
-            env.getBackupHandler().addSnapshot(label, snapshot);
+            env.getBackupHandler().addSnapshot(label, this);
             return;
         }
     }
@@ -1030,6 +1022,29 @@ public class BackupJob extends AbstractJob {
         LOG.info("finished to cancel backup job. current state: {}. {}", 
curState.name(), this);
     }
 
+    public boolean isLocalSnapshot() {
+        return repoId == Repository.KEEP_ON_LOCAL_REPO_ID;
+    }
+
+    // read meta and job info bytes from disk, and return the snapshot
+    public synchronized Snapshot getSnapshot() {
+        if (state != BackupJobState.FINISHED || repoId != 
Repository.KEEP_ON_LOCAL_REPO_ID) {
+            return null;
+        }
+
+        try {
+            File metaInfoFile = new File(localMetaInfoFilePath);
+            File jobInfoFile = new File(localJobInfoFilePath);
+            byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
+            byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
+            return new Snapshot(label, metaInfoBytes, jobInfoBytes);
+        } catch (IOException e) {
+            LOG.warn("failed to load meta info and job info file, meta info 
file {}, job info file {}: ",
+                    localMetaInfoFilePath, localJobInfoFilePath, e);
+            return null;
+        }
+    }
+
     public synchronized List<String> getInfo() {
         List<String> info = Lists.newArrayList();
         info.add(String.valueOf(jobId));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 5a3b569a7cb..1f13a2970d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -379,6 +379,11 @@ public class RestoreJob extends AbstractJob {
         return state == RestoreJobState.CANCELLED;
     }
 
+    @Override
+    public boolean isFinished() {
+        return state == RestoreJobState.FINISHED;
+    }
+
     @Override
     public synchronized Status updateRepo(Repository repo) {
         this.repo = repo;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index d37e54deba8..d1aefdec5a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -3018,15 +3018,18 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         }
 
         // Step 3: get snapshot
+        String label = request.getLabelName();
         TGetSnapshotResult result = new TGetSnapshotResult();
         result.setStatus(new TStatus(TStatusCode.OK));
-        Snapshot snapshot = 
Env.getCurrentEnv().getBackupHandler().getSnapshot(request.getLabelName());
+        Snapshot snapshot = 
Env.getCurrentEnv().getBackupHandler().getSnapshot(label);
         if (snapshot == null) {
             result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST);
-            result.getStatus().addToErrorMsgs("snapshot not exist");
+            result.getStatus().addToErrorMsgs(String.format("snapshot %s not 
exist", label));
         } else {
             result.setMeta(snapshot.getMeta());
             result.setJobInfo(snapshot.getJobInfo());
+            LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info 
size: {}",
+                    label, snapshot.getMeta().length, 
snapshot.getJobInfo().length);
         }
 
         return result;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to