This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 08fe22cb0c [improvement](backup) Add BackupJobInfo with tableCommitSeqMap (#21255) 08fe22cb0c is described below commit 08fe22cb0c1c16aaaf23b69d167fdab397c3742e Author: Jack Drogon <jack.xsuper...@gmail.com> AuthorDate: Wed Jun 28 11:10:12 2023 +0800 [improvement](backup) Add BackupJobInfo with tableCommitSeqMap (#21255) Signed-off-by: Jack Drogon <jack.xsuper...@gmail.com> --- .../java/org/apache/doris/backup/BackupJob.java | 23 +++++++++++++-- .../org/apache/doris/backup/BackupJobInfo.java | 6 +++- .../org/apache/doris/journal/JournalEntity.java | 6 ++++ .../java/org/apache/doris/persist/BarrierLog.java | 34 ++++++++++++++++++++++ .../java/org/apache/doris/persist/EditLog.java | 17 ++++++++--- .../org/apache/doris/persist/OperationType.java | 2 ++ .../org/apache/doris/backup/BackupHandlerTest.java | 2 +- 7 files changed, 82 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index 61eda11a79..565efda2d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -75,6 +75,7 @@ import java.util.stream.Collectors; public class BackupJob extends AbstractJob { private static final Logger LOG = LogManager.getLogger(BackupJob.class); + private static final String TABLE_COMMIT_SEQ_PREFIX = "table_commit_seq:"; public enum BackupJobState { PENDING, // Job is newly created. Send snapshot tasks and save copied meta info, then transfer to SNAPSHOTING @@ -110,7 +111,7 @@ public class BackupJob extends AbstractJob { // save the local file path of meta info and job info file private String localMetaInfoFilePath = null; private String localJobInfoFilePath = null; - // backup properties + // backup properties && table commit seq with table id private Map<String, String> properties = Maps.newHashMap(); private byte[] metaInfoBytes = null; @@ -431,6 +432,12 @@ public class BackupJob extends AbstractJob { private void prepareSnapshotTaskForOlapTableWithoutLock(OlapTable olapTable, TableRef backupTableRef, AgentBatchTask batchTask) { + // Add barrier editolog for barrier commit seq + long commitSeq = env.getEditLog().logBarrier(); + // format as "table:{tableId}" + String tableKey = String.format("%s%d", TABLE_COMMIT_SEQ_PREFIX, olapTable.getId()); + properties.put(tableKey, String.valueOf(commitSeq)); + // check backup table again if (backupTableRef.getPartitionNames() != null) { for (String partName : backupTableRef.getPartitionNames().getPartitionNames()) { @@ -680,8 +687,20 @@ public class BackupJob extends AbstractJob { metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath()); // 3. save job info file + Map<Long, Long> tableCommitSeqMap = Maps.newHashMap(); + // iterate properties, convert key, value from string to long + // key is "${TABLE_COMMIT_SEQ_PREFIX}{tableId}", only need tableId to long + for (Map.Entry<String, String> entry : properties.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key.startsWith(TABLE_COMMIT_SEQ_PREFIX)) { + long tableId = Long.parseLong(key.substring(TABLE_COMMIT_SEQ_PREFIX.length())); + long commitSeq = Long.parseLong(value); + tableCommitSeqMap.put(tableId, commitSeq); + } + } jobInfo = BackupJobInfo.fromCatalog(createTime, label, dbName, dbId, - getContent(), backupMeta, snapshotInfos); + getContent(), backupMeta, snapshotInfos, tableCommitSeqMap); LOG.debug("job info: {}. {}", jobInfo, this); File jobInfoFile = new File(jobDir, Repository.PREFIX_JOB_INFO + createTimeStr); if (!jobInfoFile.createNewFile()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java index 4457740440..fff2b755c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java @@ -99,6 +99,9 @@ public class BackupJobInfo implements Writable { @SerializedName("tablet_snapshot_path_map") public Map<Long, String> tabletSnapshotPathMap = Maps.newHashMap(); + @SerializedName("table_commit_seq_map") + public Map<Long, Long> tableCommitSeqMap; + public static class ExtraInfo { public static class NetworkAddrss { @SerializedName("ip") @@ -575,7 +578,7 @@ public class BackupJobInfo implements Writable { public static BackupJobInfo fromCatalog(long backupTime, String label, String dbName, long dbId, BackupContent content, BackupMeta backupMeta, - Map<Long, SnapshotInfo> snapshotInfos) { + Map<Long, SnapshotInfo> snapshotInfos, Map<Long, Long> tableCommitSeqMap) { BackupJobInfo jobInfo = new BackupJobInfo(); jobInfo.backupTime = backupTime; @@ -584,6 +587,7 @@ public class BackupJobInfo implements Writable { jobInfo.dbId = dbId; jobInfo.metaVersion = FeConstants.meta_version; jobInfo.content = content; + jobInfo.tableCommitSeqMap = tableCommitSeqMap; Collection<Table> tbls = backupMeta.getTables().values(); // tbls diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 39b804eb40..2a29b18c85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -69,6 +69,7 @@ import org.apache.doris.persist.AlterViewInfo; import org.apache.doris.persist.AnalyzeDeletionLog; import org.apache.doris.persist.BackendReplicasInfo; import org.apache.doris.persist.BackendTabletsInfo; +import org.apache.doris.persist.BarrierLog; import org.apache.doris.persist.BatchDropInfo; import org.apache.doris.persist.BatchModifyPartitionsInfo; import org.apache.doris.persist.BatchRemoveTransactionsOperation; @@ -829,6 +830,11 @@ public class JournalEntity implements Writable { isRead = true; break; } + case OperationType.OP_BARRIER: { + data = new BarrierLog(); + isRead = true; + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java new file mode 100644 index 0000000000..e44cbb2f31 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; + +import java.io.DataOutput; +import java.io.IOException; + +public class BarrierLog implements Writable { + public BarrierLog() { + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, ""); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 4244664d43..7be7253f1d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -1037,6 +1037,11 @@ public class EditLog { env.replayGcBinlog(binlogGcInfo); break; } + case OperationType.OP_BARRIER: { + // the log only for barrier commit seq, not need to replay + LOG.info("replay barrier"); + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); @@ -1800,11 +1805,15 @@ public class EditLog { logEdit(OperationType.OP_DELETE_ANALYSIS_TASK, log); } - public void logAlterDatabaseProperty(AlterDatabasePropertyInfo log) { - logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log); + public long logAlterDatabaseProperty(AlterDatabasePropertyInfo log) { + return logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log); + } + + public long logGcBinlog(BinlogGcInfo log) { + return logEdit(OperationType.OP_GC_BINLOG, log); } - public void logGcBinlog(BinlogGcInfo log) { - logEdit(OperationType.OP_GC_BINLOG, log); + public long logBarrier() { + return logEdit(OperationType.OP_BARRIER, new BarrierLog()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 674374f917..451a3eb7aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -308,6 +308,8 @@ public class OperationType { public static final short OP_GC_BINLOG = 435; + public static final short OP_BARRIER = 436; + /** * Get opcode name by op code. diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java index 50d39e0ab6..97e689b697 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java @@ -228,7 +228,7 @@ public class BackupHandlerTest { BackupJobInfo info = BackupJobInfo.fromCatalog(System.currentTimeMillis(), "ss2", CatalogMocker.TEST_DB_NAME, CatalogMocker.TEST_DB_ID, BackupStmt.BackupContent.ALL, - backupMeta, snapshotInfos); + backupMeta, snapshotInfos, null); infos.add(info); return Status.OK; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org