This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e49d73614fb [improve](backup) Reduce backup/restore job log size by 
compress (#42954)
e49d73614fb is described below

commit e49d73614fb4e1a19ad6f7c6bb7a189a170f07a6
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Nov 6 17:19:57 2024 +0800

    [improve](backup) Reduce backup/restore job log size by compress (#42954)
    
    Cherry-picked from #42459
    
    Co-authored-by: walter <w41te...@gmail.com>
---
 .../main/java/org/apache/doris/common/Config.java  | 17 +++++++++++
 .../java/org/apache/doris/backup/AbstractJob.java  | 27 ++++++++++++-----
 .../java/org/apache/doris/backup/BackupJob.java    | 32 +++++++++++++++++++-
 .../java/org/apache/doris/backup/RestoreJob.java   | 34 +++++++++++++++++++++-
 .../org/apache/doris/persist/gson/GsonUtils.java   | 30 +++++++++++++++++++
 5 files changed, 131 insertions(+), 9 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index c91a487466b..4211b42e5c0 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1556,12 +1556,29 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static String multi_partition_name_prefix = "p_";
+
     /**
      * Control the max num of backup/restore job per db
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int max_backup_restore_job_num_per_db = 10;
 
+    /**
+     * A internal config, to reduce the restore job size during serialization 
by compress.
+     *
+     * WARNING: Once this option is enabled and a restore is performed, the FE 
version cannot be rolled back.
+     */
+    @ConfField(mutable = false)
+    public static boolean restore_job_compressed_serialization = false;
+
+    /**
+     * A internal config, to reduce the backup job size during serialization 
by compress.
+     *
+     * WARNING: Once this option is enabled and a backup is performed, the FE 
version cannot be rolled back.
+     */
+    @ConfField(mutable = false)
+    public static boolean backup_job_compressed_serialization = false;
+
     /**
      * Control the max num of tablets per backup job involved.
      */
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
index 77ec0ae26f4..3a00c974f9a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
@@ -18,6 +18,7 @@
 package org.apache.doris.backup;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Text;
@@ -41,9 +42,10 @@ import java.util.Map;
  * 2. isDone() method is used to check whether we can submit the next job.
  */
 public abstract class AbstractJob implements Writable {
+    public static final String COMPRESSED_JOB_ID = "COMPRESSED";
 
     public enum JobType {
-        BACKUP, RESTORE
+        BACKUP, RESTORE, BACKUP_COMPRESSED, RESTORE_COMPRESSED
     }
 
     @SerializedName("t")
@@ -174,10 +176,10 @@ public abstract class AbstractJob implements Writable {
         if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) {
             AbstractJob job = null;
             JobType type = JobType.valueOf(Text.readString(in));
-            if (type == JobType.BACKUP) {
-                job = new BackupJob();
-            } else if (type == JobType.RESTORE) {
-                job = new RestoreJob();
+            if (type == JobType.BACKUP || type == JobType.BACKUP_COMPRESSED) {
+                job = new BackupJob(type);
+            } else if (type == JobType.RESTORE || type == 
JobType.RESTORE_COMPRESSED) {
+                job = new RestoreJob(type);
             } else {
                 throw new IOException("Unknown job type: " + type.name());
             }
@@ -186,7 +188,12 @@ public abstract class AbstractJob implements Writable {
             job.readFields(in);
             return job;
         } else {
-            return GsonUtils.GSON.fromJson(Text.readString(in), 
AbstractJob.class);
+            String json = Text.readString(in);
+            if (COMPRESSED_JOB_ID.equals(json)) {
+                return GsonUtils.fromJsonCompressed(in, AbstractJob.class);
+            } else {
+                return GsonUtils.GSON.fromJson(json, AbstractJob.class);
+            }
         }
     }
 
@@ -203,7 +210,13 @@ public abstract class AbstractJob implements Writable {
             count++;
         }
 
-        Text.writeString(out, GsonUtils.GSON.toJson(this));
+        if ((type == JobType.BACKUP && 
Config.backup_job_compressed_serialization)
+                || (type == JobType.RESTORE && 
Config.restore_job_compressed_serialization)) {
+            Text.writeString(out, COMPRESSED_JOB_ID);
+            GsonUtils.toJsonCompressed(out, this);
+        } else {
+            Text.writeString(out, GsonUtils.GSON.toJson(this));
+        }
     }
 
     @Deprecated
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index b14fae1ed3f..dbc7bb08fd4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -63,7 +63,9 @@ import com.google.gson.annotations.SerializedName;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.FileVisitOption;
@@ -75,6 +77,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
 
 
 public class BackupJob extends AbstractJob {
@@ -134,6 +137,11 @@ public class BackupJob extends AbstractJob {
         super(JobType.BACKUP);
     }
 
+    public BackupJob(JobType jobType) {
+        super(jobType);
+        assert jobType == JobType.BACKUP || jobType == 
JobType.BACKUP_COMPRESSED;
+    }
+
     public BackupJob(String label, long dbId, String dbName, List<TableRef> 
tableRefs, long timeoutMs,
                      BackupContent content, Env env, long repoId) {
         super(JobType.BACKUP, label, dbId, dbName, timeoutMs, env, repoId);
@@ -1050,13 +1058,35 @@ public class BackupJob extends AbstractJob {
             job.readFields(in);
             return job;
         } else {
-            return GsonUtils.GSON.fromJson(Text.readString(in), 
BackupJob.class);
+            String json = Text.readString(in);
+            if (AbstractJob.COMPRESSED_JOB_ID.equals(json)) {
+                return GsonUtils.fromJsonCompressed(in, BackupJob.class);
+            } else {
+                return GsonUtils.GSON.fromJson(json, BackupJob.class);
+            }
         }
     }
 
     public void readFields(DataInput in) throws IOException {
         super.readFields(in);
+        if (type == JobType.BACKUP_COMPRESSED) {
+            type = JobType.BACKUP;
+
+            Text text = new Text();
+            text.readFields(in);
+
+            ByteArrayInputStream byteStream = new 
ByteArrayInputStream(text.getBytes());
+            try (GZIPInputStream gzipStream = new GZIPInputStream(byteStream)) 
{
+                try (DataInputStream stream = new DataInputStream(gzipStream)) 
{
+                    readOthers(in);
+                }
+            }
+        } else {
+            readOthers(in);
+        }
+    }
 
+    public void readOthers(DataInput in) throws IOException {
         // table refs
         int size = in.readInt();
         tableRefs = Lists.newArrayList();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index cf43afc6597..1b992cf353a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -104,7 +104,9 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
@@ -112,6 +114,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
 
 public class RestoreJob extends AbstractJob implements GsonPostProcessable {
     private static final String PROP_RESERVE_REPLICA = 
RestoreStmt.PROP_RESERVE_REPLICA;
@@ -216,6 +219,10 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         super(JobType.RESTORE);
     }
 
+    public RestoreJob(JobType jobType) {
+        super(jobType);
+    }
+
     public RestoreJob(String label, String backupTs, long dbId, String dbName, 
BackupJobInfo jobInfo, boolean allowLoad,
             ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, 
boolean reserveReplica,
             boolean reserveDynamicPartitionEnable, boolean isBeingSynced, 
boolean isCleanTables,
@@ -2522,7 +2529,12 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
             job.readFields(in);
             return job;
         } else {
-            return GsonUtils.GSON.fromJson(Text.readString(in), 
RestoreJob.class);
+            String json = Text.readString(in);
+            if (AbstractJob.COMPRESSED_JOB_ID.equals(json)) {
+                return GsonUtils.fromJsonCompressed(in, RestoreJob.class);
+            } else {
+                return GsonUtils.GSON.fromJson(json, RestoreJob.class);
+            }
         }
     }
 
@@ -2530,7 +2542,27 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
     @Override
     public void readFields(DataInput in) throws IOException {
         super.readFields(in);
+        if (type == JobType.RESTORE_COMPRESSED) {
+            type = JobType.RESTORE;
+
+            Text text = new Text();
+            text.readFields(in);
+            if (LOG.isDebugEnabled() || text.getLength() > (100 << 20)) {
+                LOG.info("read restore job compressed size {}", 
text.getLength());
+            }
+
+            ByteArrayInputStream bytesStream = new 
ByteArrayInputStream(text.getBytes());
+            try (GZIPInputStream gzipStream = new 
GZIPInputStream(bytesStream)) {
+                try (DataInputStream stream = new DataInputStream(gzipStream)) 
{
+                    readOthers(stream);
+                }
+            }
+        } else {
+            readOthers(in);
+        }
+    }
 
+    private void readOthers(DataInput in) throws IOException {
         backupTimestamp = Text.readString(in);
         jobInfo = BackupJobInfo.read(in);
         allowLoad = in.readBoolean();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index ae2fff46835..ee6f2f74eac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -126,6 +126,7 @@ import org.apache.doris.cloud.load.CloudBrokerLoadJob;
 import org.apache.doris.cloud.load.CopyJob;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeMetaVersion;
+import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.RangeUtils;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.ExternalDatabase;
@@ -246,8 +247,13 @@ import com.google.gson.stream.JsonWriter;
 import org.apache.commons.lang3.reflect.TypeUtils;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
 import java.lang.reflect.Method;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
@@ -257,6 +263,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
 /*
  * Some utilities about Gson.
@@ -978,4 +986,26 @@ public class GsonUtils {
         }
     }
 
+    public static void toJsonCompressed(DataOutput out, Object src) throws 
IOException {
+        ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+        try (GZIPOutputStream gzipStream = new GZIPOutputStream(byteStream)) {
+            try (OutputStreamWriter writer = new 
OutputStreamWriter(gzipStream)) {
+                GsonUtils.GSON.toJson(src, writer);
+            }
+        }
+        Text text = new Text(byteStream.toByteArray());
+        text.write(out);
+    }
+
+    public static <T> T fromJsonCompressed(DataInput in, Class<T> clazz) 
throws IOException {
+        Text text = new Text();
+        text.readFields(in);
+
+        ByteArrayInputStream byteStream = new 
ByteArrayInputStream(text.getBytes());
+        try (GZIPInputStream gzipStream = new GZIPInputStream(byteStream)) {
+            try (InputStreamReader reader = new InputStreamReader(gzipStream)) 
{
+                return GsonUtils.GSON.fromJson(reader, clazz);
+            }
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to