This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch backup_support_large_meta_file_to_3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d382f96e937519460a094f47285e29bf686a1a9a
Author: w41ter <[email protected]>
AuthorDate: Tue Sep 2 12:25:31 2025 +0000

    [fix](backup) Support backup meta/job info exceeds 2GB
---
 .../java/org/apache/doris/common/GZIPUtils.java    | 21 ++++++++
 .../org/apache/doris/analysis/RestoreStmt.java     | 12 +++--
 .../org/apache/doris/backup/BackupHandler.java     | 12 +----
 .../java/org/apache/doris/backup/BackupJob.java    | 16 ++----
 .../org/apache/doris/backup/BackupJobInfo.java     |  8 +++
 .../java/org/apache/doris/backup/BackupMeta.java   |  2 +-
 .../java/org/apache/doris/backup/Snapshot.java     | 62 ++++++++++++++++------
 .../apache/doris/service/FrontendServiceImpl.java  | 52 +++++++++++++-----
 8 files changed, 129 insertions(+), 56 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
index 7408e2888cc..4500c76f638 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
@@ -21,7 +21,10 @@ import org.apache.commons.io.IOUtils;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -39,10 +42,28 @@ public class GZIPUtils {
         return bytesStream.toByteArray();
     }
 
+    public static byte[] compress(File file) throws IOException {
+        ByteArrayOutputStream bytesStream = new ByteArrayOutputStream();
+        try (FileInputStream fileInputStream = new FileInputStream(file);
+                GZIPOutputStream gzipStream = new 
GZIPOutputStream(bytesStream)) {
+
+            byte[] buffer = new byte[8192]; // 8KB buffer
+            int bytesRead;
+            while ((bytesRead = fileInputStream.read(buffer)) != -1) {
+                gzipStream.write(buffer, 0, bytesRead);
+            }
+        }
+        return bytesStream.toByteArray();
+    }
+
     public static byte[] decompress(byte[] data) throws IOException {
         ByteArrayInputStream bytesStream = new ByteArrayInputStream(data);
         try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) {
             return IOUtils.toByteArray(gzipStream);
         }
     }
+
+    public static InputStream lazyDecompress(byte[] data) throws IOException {
+        return new GZIPInputStream(new ByteArrayInputStream(data));
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
index 5f141837565..1deb2b48db9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.analysis;
 
+import org.apache.doris.backup.BackupJobInfo;
+import org.apache.doris.backup.BackupMeta;
 import org.apache.doris.backup.Repository;
 import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.common.AnalysisException;
@@ -58,8 +60,8 @@ public class RestoreStmt extends AbstractBackupStmt 
implements NotFallbackInPars
     private boolean isCleanPartitions = false;
     private boolean isAtomicRestore = false;
     private boolean isForceReplace = false;
-    private byte[] meta = null;
-    private byte[] jobInfo = null;
+    private BackupMeta meta = null;
+    private BackupJobInfo jobInfo = null;
 
     public RestoreStmt(LabelName labelName, String repoName, 
AbstractBackupTableRefClause restoreTableRefClause,
             Map<String, String> properties) {
@@ -67,7 +69,7 @@ public class RestoreStmt extends AbstractBackupStmt 
implements NotFallbackInPars
     }
 
     public RestoreStmt(LabelName labelName, String repoName, 
AbstractBackupTableRefClause restoreTableRefClause,
-            Map<String, String> properties, byte[] meta, byte[] jobInfo) {
+            Map<String, String> properties, BackupMeta meta, BackupJobInfo 
jobInfo) {
         super(labelName, repoName, restoreTableRefClause, properties);
         this.meta = meta;
         this.jobInfo = jobInfo;
@@ -101,11 +103,11 @@ public class RestoreStmt extends AbstractBackupStmt 
implements NotFallbackInPars
         return isLocal;
     }
 
-    public byte[] getMeta() {
+    public BackupMeta getMeta() {
         return meta;
     }
 
-    public byte[] getJobInfo() {
+    public BackupJobInfo getJobInfo() {
         return jobInfo;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index 188e07af5a2..040a18ed796 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -513,9 +513,7 @@ public class BackupHandler extends MasterDaemon implements 
Writable {
     private void restore(Repository repository, Database db, RestoreStmt stmt) 
throws DdlException {
         BackupJobInfo jobInfo;
         if (stmt.isLocal()) {
-            String jobInfoString = new String(stmt.getJobInfo());
-            jobInfo = BackupJobInfo.genFromJson(jobInfoString);
-
+            jobInfo = stmt.getJobInfo();
             if (jobInfo.extraInfo == null) {
                 ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, 
"Invalid job extra info empty");
             }
@@ -551,13 +549,7 @@ public class BackupHandler extends MasterDaemon implements 
Writable {
                 metaVersion = jobInfo.metaVersion;
             }
 
-            BackupMeta backupMeta;
-            try {
-                backupMeta = BackupMeta.fromBytes(stmt.getMeta(), metaVersion);
-            } catch (IOException e) {
-                LOG.warn("read backup meta failed, current meta version {}", 
Env.getCurrentEnvJournalVersion(), e);
-                throw new DdlException("read backup meta failed", e);
-            }
+            BackupMeta backupMeta = stmt.getMeta();
             String backupTimestamp = TimeUtils.longToTimeString(
                     jobInfo.getBackupTime(), 
TimeUtils.getDatetimeFormatWithHyphenWithTimeZone());
             restoreJob = new RestoreJob(stmt.getLabel(), backupTimestamp,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 3b9c177297b..00835ed6203 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -1057,20 +1057,12 @@ public class BackupJob extends AbstractJob implements 
GsonPostProcessable {
         // Avoid loading expired meta.
         long expiredAt = createTime + timeoutMs;
         if (System.currentTimeMillis() >= expiredAt) {
-            return new Snapshot(label, new byte[0], new byte[0], expiredAt, 
commitSeq);
+            return new Snapshot(label, null, null, expiredAt, commitSeq);
         }
 
-        try {
-            File metaInfoFile = new File(localMetaInfoFilePath);
-            File jobInfoFile = new File(localJobInfoFilePath);
-            byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
-            byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
-            return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt, 
commitSeq);
-        } catch (IOException e) {
-            LOG.warn("failed to load meta info and job info file, meta info 
file {}, job info file {}: ",
-                    localMetaInfoFilePath, localJobInfoFilePath, e);
-            return null;
-        }
+        File metaInfoFile = new File(localMetaInfoFilePath);
+        File jobInfoFile = new File(localJobInfoFilePath);
+        return new Snapshot(label, metaInfoFile, jobInfoFile, expiredAt, 
commitSeq);
     }
 
     public synchronized List<String> getInfo() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
index 554a21c4408..7e77426a82a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
@@ -57,6 +57,8 @@ import java.io.DataOutput;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -763,6 +765,12 @@ public class BackupJobInfo implements Writable, 
GsonPostProcessable {
         return jobInfo;
     }
 
+    public static BackupJobInfo fromInputStream(InputStream inputStream) 
throws IOException {
+        try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+            return GsonUtils.GSON.fromJson(reader, BackupJobInfo.class);
+        }
+    }
+
     public void writeToFile(File jobInfoFile) throws FileNotFoundException {
         PrintWriter printWriter = new PrintWriter(jobInfoFile);
         try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
index 0f1a043bdad..ae8dc178e00 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
@@ -94,7 +94,7 @@ public class BackupMeta implements Writable, 
GsonPostProcessable {
         return fromInputStream(new ByteArrayInputStream(bytes), metaVersion);
     }
 
-    protected static BackupMeta fromInputStream(InputStream stream, int 
metaVersion) throws IOException {
+    public static BackupMeta fromInputStream(InputStream stream, int 
metaVersion) throws IOException {
         MetaContext metaContext = new MetaContext();
         metaContext.setMetaVersion(metaVersion);
         metaContext.setThreadLocalInfo();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
index a9f734dbc99..2fc3ca6d146 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
@@ -17,19 +17,22 @@
 
 package org.apache.doris.backup;
 
-import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.common.GZIPUtils;
+import org.apache.doris.common.Pair;
 
 import com.google.gson.annotations.SerializedName;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
 public class Snapshot {
     @SerializedName(value = "label")
     private String label = null;
 
-    @SerializedName(value = "meta")
-    private byte[] meta = null;
+    private File meta = null;
 
-    @SerializedName(value = "jobInfo")
-    private byte[] jobInfo = null;
+    private File jobInfo = null;
 
     @SerializedName(value = "expired_at")
     private long expiredAt = 0;
@@ -40,7 +43,7 @@ public class Snapshot {
     public Snapshot() {
     }
 
-    public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt, 
long commitSeq) {
+    public Snapshot(String label, File meta, File jobInfo, long expiredAt, 
long commitSeq) {
         this.label = label;
         this.meta = meta;
         this.jobInfo = jobInfo;
@@ -48,12 +51,45 @@ public class Snapshot {
         this.commitSeq = commitSeq;
     }
 
-    public byte[] getMeta() {
-        return meta;
+    public static Pair<BackupMeta, BackupJobInfo> readFromBytes(byte[] meta, 
byte[] jobInfo) throws IOException {
+        BackupJobInfo backupJobInfo = BackupJobInfo.genFromJson(new 
String(jobInfo));
+        BackupMeta backupMeta = BackupMeta.fromBytes(meta, 
backupJobInfo.metaVersion);
+        return Pair.of(backupMeta, backupJobInfo);
+    }
+
+    public static Pair<BackupMeta, BackupJobInfo> 
readFromCompressedBytes(byte[] meta, byte[] jobInfo)
+            throws IOException {
+        BackupJobInfo backupJobInfo = 
BackupJobInfo.fromInputStream(GZIPUtils.lazyDecompress(jobInfo));
+        BackupMeta backupMeta = 
BackupMeta.fromInputStream(GZIPUtils.lazyDecompress(meta), 
backupJobInfo.metaVersion);
+        return Pair.of(backupMeta, backupJobInfo);
+    }
+
+    public static boolean isCompressed(byte[] meta, byte[] jobInfo) {
+        return GZIPUtils.isGZIPCompressed(jobInfo) || 
GZIPUtils.isGZIPCompressed(meta);
+    }
+
+    public long getMetaSize() {
+        return meta != null ? meta.length() : 0;
+    }
+
+    public long getJobInfoSize() {
+        return jobInfo != null ? jobInfo.length() : 0;
     }
 
-    public byte[] getJobInfo() {
-        return jobInfo;
+    public byte[] getCompressedMeta() throws IOException {
+        return GZIPUtils.compress(meta);
+    }
+
+    public byte[] getCompressedJobInfo() throws IOException {
+        return GZIPUtils.compress(jobInfo);
+    }
+
+    public byte[] getMeta() throws IOException {
+        return Files.readAllBytes(meta.toPath());
+    }
+
+    public byte[] getJobInfo() throws IOException {
+        return Files.readAllBytes(jobInfo.toPath());
     }
 
     public long getExpiredAt() {
@@ -68,16 +104,10 @@ public class Snapshot {
         return commitSeq;
     }
 
-    public String toJson() {
-        return GsonUtils.GSON.toJson(this);
-    }
-
     @Override
     public String toString() {
         return "Snapshot{"
                 + "label='" + label + '\''
-                + ", meta=" + meta
-                + ", jobInfo=" + jobInfo
                 + ", expiredAt=" + expiredAt
                 + ", commitSeq=" + commitSeq
                 + '}';
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index f3140d490c1..957df9f106a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -28,6 +28,8 @@ import org.apache.doris.analysis.SetType;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.analysis.TableRef;
 import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.backup.BackupJobInfo;
+import org.apache.doris.backup.BackupMeta;
 import org.apache.doris.backup.Snapshot;
 import org.apache.doris.binlog.BinlogLagInfo;
 import org.apache.doris.catalog.AutoIncrementGenerator;
@@ -61,7 +63,6 @@ import org.apache.doris.common.CaseSensibility;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.DuplicatedRequestException;
-import org.apache.doris.common.GZIPUtils;
 import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.LoadException;
@@ -3056,24 +3057,38 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_EXPIRED);
             result.getStatus().addToErrorMsgs(String.format("snapshot %s is 
expired", label));
         } else {
-            byte[] meta = snapshot.getMeta();
-            byte[] jobInfo = snapshot.getJobInfo();
+            long metaSize = snapshot.getMetaSize();
+            long jobInfoSize = snapshot.getJobInfoSize();
+            long snapshotSize = snapshot.getMetaSize() + 
snapshot.getJobInfoSize();
+            if (metaSize + jobInfoSize >= Integer.MAX_VALUE) {
+                String msg = String.format(
+                        "Snapshot %s is too large (%d bytes > 2GB). Please 
enable compression to continue.",
+                        label, snapshotSize);
+                LOG.warn("get snapshot failed: {}", msg);
+                result.getStatus().setStatusCode(TStatusCode.INTERNAL_ERROR);
+                result.getStatus().addToErrorMsgs(msg);
+                return result;
+            }
+
             long expiredAt = snapshot.getExpiredAt();
             long commitSeq = snapshot.getCommitSeq();
 
             LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info 
size: {}, "
-                    + "expired at: {}, commit seq: {}", label, meta.length, 
jobInfo.length, expiredAt, commitSeq);
+                    + "expired at: {}, commit seq: {}", label, metaSize, 
jobInfoSize, expiredAt, commitSeq);
             if (request.isEnableCompress()) {
-                meta = GZIPUtils.compress(meta);
-                jobInfo = GZIPUtils.compress(jobInfo);
+                byte[] meta = snapshot.getCompressedMeta();
+                byte[] jobInfo = snapshot.getCompressedJobInfo();
+                result.setMeta(meta);
+                result.setJobInfo(jobInfo);
                 result.setCompressed(true);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("get snapshot info with compress, snapshot: {}, 
compressed meta "
                             + "size {}, compressed job info size {}", label, 
meta.length, jobInfo.length);
                 }
+            } else {
+                result.setMeta(snapshot.getMeta());
+                result.setJobInfo(snapshot.getJobInfo());
             }
-            result.setMeta(meta);
-            result.setJobInfo(jobInfo);
             result.setExpiredAt(expiredAt);
             result.setCommitSeq(commitSeq);
         }
@@ -3186,6 +3201,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             }
         }
 
+        BackupMeta backupMeta;
+        BackupJobInfo backupJobInfo;
         byte[] meta = request.getMeta();
         byte[] jobInfo = request.getJobInfo();
         if (Config.enable_restore_snapshot_rpc_compression && 
request.isCompressed()) {
@@ -3194,18 +3211,29 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                         meta.length, jobInfo.length);
             }
             try {
-                meta = GZIPUtils.decompress(meta);
-                jobInfo = GZIPUtils.decompress(jobInfo);
+                Pair<BackupMeta, BackupJobInfo> pair = 
Snapshot.readFromCompressedBytes(meta, jobInfo);
+                backupMeta = pair.first;
+                backupJobInfo = pair.second;
             } catch (Exception e) {
                 LOG.warn("decompress meta and job info failed", e);
                 throw new UserException("decompress meta and job info failed", 
e);
             }
-        } else if (GZIPUtils.isGZIPCompressed(jobInfo) || 
GZIPUtils.isGZIPCompressed(meta)) {
+        } else if (Snapshot.isCompressed(meta, jobInfo)) {
             throw new UserException("The request is compressed, but the config 
"
                     + "`enable_restore_snapshot_rpc_compressed` is not 
enabled.");
+        } else {
+            try {
+                Pair<BackupMeta, BackupJobInfo> pair = 
Snapshot.readFromBytes(meta, jobInfo);
+                backupMeta = pair.first;
+                backupJobInfo = pair.second;
+            } catch (Exception e) {
+                LOG.warn("deserialize meta and job info failed", e);
+                throw new UserException("deserialize meta and job info 
failed", e);
+            }
         }
 
-        RestoreStmt restoreStmt = new RestoreStmt(label, repoName, 
restoreTableRefClause, properties, meta, jobInfo);
+        RestoreStmt restoreStmt = new RestoreStmt(
+                label, repoName, restoreTableRefClause, properties, 
backupMeta, backupJobInfo);
         restoreStmt.setIsBeingSynced();
         LOG.debug("restore snapshot info, restoreStmt: {}", restoreStmt);
         try {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to