This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 5dda61b410c [fix](backup) Load backup meta and job info bytes from disk #43276 (#43519) 5dda61b410c is described below commit 5dda61b410cdde1dc7495dee9a95eeafe6f6fb25 Author: walter <w41te...@gmail.com> AuthorDate: Mon Nov 11 14:08:35 2024 +0800 [fix](backup) Load backup meta and job info bytes from disk #43276 (#43519) cherry pick from #43276 --- .../java/org/apache/doris/backup/AbstractJob.java | 2 + .../org/apache/doris/backup/BackupHandler.java | 50 +++++++++++++++++++--- .../java/org/apache/doris/backup/BackupJob.java | 45 ++++++++++++------- .../java/org/apache/doris/backup/RestoreJob.java | 5 +++ .../apache/doris/service/FrontendServiceImpl.java | 7 ++- 5 files changed, 85 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java index 4e2c3fd1990..f22598dd86b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java @@ -155,6 +155,8 @@ public abstract class AbstractJob implements Writable { public abstract boolean isCancelled(); + public abstract boolean isFinished(); + public abstract Status updateRepo(Repository repo); public static AbstractJob read(DataInput in) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index d9b7659cfc1..49190acce1a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -109,10 +109,10 @@ public class BackupHandler extends MasterDaemon implements Writable { private Env env; - // map to store backup info, key is label name, value is Pair<meta, info>, meta && info is bytes - // this map not present in persist && only in fe master memory + // map to store backup info, key is label name, value is the BackupJob + // this map not present in persist && only in fe memory // one table only keep one snapshot info, only keep last - private final Map<String, Snapshot> localSnapshots = new HashMap<>(); + private final Map<String, BackupJob> localSnapshots = new HashMap<>(); private ReadWriteLock localSnapshotsLock = new ReentrantReadWriteLock(); public BackupHandler() { @@ -167,6 +167,7 @@ public class BackupHandler extends MasterDaemon implements Writable { return false; } } + isInit = true; return true; } @@ -544,11 +545,15 @@ public class BackupHandler extends MasterDaemon implements Writable { return; } + List<String> removedLabels = Lists.newArrayList(); jobLock.lock(); try { Deque<AbstractJob> jobs = dbIdToBackupOrRestoreJobs.computeIfAbsent(dbId, k -> Lists.newLinkedList()); while (jobs.size() >= Config.max_backup_restore_job_num_per_db) { - jobs.removeFirst(); + AbstractJob removedJob = jobs.removeFirst(); + if (removedJob instanceof BackupJob && ((BackupJob) removedJob).isLocalSnapshot()) { + removedLabels.add(removedJob.getLabel()); + } } AbstractJob lastJob = jobs.peekLast(); @@ -561,6 +566,17 @@ public class BackupHandler extends MasterDaemon implements Writable { } finally { jobLock.unlock(); } + + if (job.isFinished() && job instanceof BackupJob) { + // Save snapshot to local repo, when reload backupHandler from image. + BackupJob backupJob = (BackupJob) job; + if (backupJob.isLocalSnapshot()) { + addSnapshot(backupJob.getLabel(), backupJob); + } + } + for (String label : removedLabels) { + removeSnapshot(label); + } } private List<AbstractJob> getAllCurrentJobs() { @@ -799,22 +815,42 @@ public class BackupHandler extends MasterDaemon implements Writable { return false; } - public void addSnapshot(String labelName, Snapshot snapshot) { + public void addSnapshot(String labelName, BackupJob backupJob) { + assert backupJob.isFinished(); + + LOG.info("add snapshot {} to local repo", labelName); localSnapshotsLock.writeLock().lock(); try { - localSnapshots.put(labelName, snapshot); + localSnapshots.put(labelName, backupJob); + } finally { + localSnapshotsLock.writeLock().unlock(); + } + } + + public void removeSnapshot(String labelName) { + LOG.info("remove snapshot {} from local repo", labelName); + localSnapshotsLock.writeLock().lock(); + try { + localSnapshots.remove(labelName); } finally { localSnapshotsLock.writeLock().unlock(); } } public Snapshot getSnapshot(String labelName) { + BackupJob backupJob; localSnapshotsLock.readLock().lock(); try { - return localSnapshots.get(labelName); + backupJob = localSnapshots.get(labelName); } finally { localSnapshotsLock.readLock().unlock(); } + + if (backupJob == null) { + return null; + } + + return backupJob.getSnapshot(); } public static BackupHandler read(DataInput in) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index b24fb9fe7fd..9e932d6f8fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -123,9 +123,6 @@ public class BackupJob extends AbstractJob { // backup properties && table commit seq with table id private Map<String, String> properties = Maps.newHashMap(); - private byte[] metaInfoBytes = null; - private byte[] jobInfoBytes = null; - public BackupJob() { super(JobType.BACKUP); } @@ -337,11 +334,7 @@ public class BackupJob extends AbstractJob { @Override public synchronized void replayRun() { - LOG.info("replay run backup job: {}", this); - if (state == BackupJobState.FINISHED && repoId == Repository.KEEP_ON_LOCAL_REPO_ID) { - Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes); - env.getBackupHandler().addSnapshot(label, snapshot); - } + // nothing to do } @Override @@ -359,6 +352,11 @@ public class BackupJob extends AbstractJob { return state == BackupJobState.CANCELLED; } + @Override + public boolean isFinished() { + return state == BackupJobState.FINISHED; + } + @Override public synchronized Status updateRepo(Repository repo) { this.repo = repo; @@ -844,8 +842,6 @@ public class BackupJob extends AbstractJob { } backupMeta.writeToFile(metaInfoFile); localMetaInfoFilePath = metaInfoFile.getAbsolutePath(); - // read meta info to metaInfoBytes - metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath()); // 3. save job info file Map<Long, Long> tableCommitSeqMap = Maps.newHashMap(); @@ -872,8 +868,6 @@ public class BackupJob extends AbstractJob { } jobInfo.writeToFile(jobInfoFile); localJobInfoFilePath = jobInfoFile.getAbsolutePath(); - // read job info to jobInfoBytes - jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath()); } catch (Exception e) { status = new Status(ErrCode.COMMON_ERROR, "failed to save meta info and job info file: " + e.getMessage()); return; @@ -927,7 +921,6 @@ public class BackupJob extends AbstractJob { } } - finishedTime = System.currentTimeMillis(); state = BackupJobState.FINISHED; @@ -936,8 +929,7 @@ public class BackupJob extends AbstractJob { LOG.info("job is finished. {}", this); if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) { - Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes); - env.getBackupHandler().addSnapshot(label, snapshot); + env.getBackupHandler().addSnapshot(label, this); return; } } @@ -1030,6 +1022,29 @@ public class BackupJob extends AbstractJob { LOG.info("finished to cancel backup job. current state: {}. {}", curState.name(), this); } + public boolean isLocalSnapshot() { + return repoId == Repository.KEEP_ON_LOCAL_REPO_ID; + } + + // read meta and job info bytes from disk, and return the snapshot + public synchronized Snapshot getSnapshot() { + if (state != BackupJobState.FINISHED || repoId != Repository.KEEP_ON_LOCAL_REPO_ID) { + return null; + } + + try { + File metaInfoFile = new File(localMetaInfoFilePath); + File jobInfoFile = new File(localJobInfoFilePath); + byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath()); + byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath()); + return new Snapshot(label, metaInfoBytes, jobInfoBytes); + } catch (IOException e) { + LOG.warn("failed to load meta info and job info file, meta info file {}, job info file {}: ", + localMetaInfoFilePath, localJobInfoFilePath, e); + return null; + } + } + public synchronized List<String> getInfo() { List<String> info = Lists.newArrayList(); info.add(String.valueOf(jobId)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 5a3b569a7cb..1f13a2970d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -379,6 +379,11 @@ public class RestoreJob extends AbstractJob { return state == RestoreJobState.CANCELLED; } + @Override + public boolean isFinished() { + return state == RestoreJobState.FINISHED; + } + @Override public synchronized Status updateRepo(Repository repo) { this.repo = repo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index d37e54deba8..d1aefdec5a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3018,15 +3018,18 @@ public class FrontendServiceImpl implements FrontendService.Iface { } // Step 3: get snapshot + String label = request.getLabelName(); TGetSnapshotResult result = new TGetSnapshotResult(); result.setStatus(new TStatus(TStatusCode.OK)); - Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(request.getLabelName()); + Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(label); if (snapshot == null) { result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST); - result.getStatus().addToErrorMsgs("snapshot not exist"); + result.getStatus().addToErrorMsgs(String.format("snapshot %s not exist", label)); } else { result.setMeta(snapshot.getMeta()); result.setJobInfo(snapshot.getJobInfo()); + LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}", + label, snapshot.getMeta().length, snapshot.getJobInfo().length); } return result; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org