This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new e49d73614fb [improve](backup) Reduce backup/restore job log size by compress (#42954) e49d73614fb is described below commit e49d73614fb4e1a19ad6f7c6bb7a189a170f07a6 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Wed Nov 6 17:19:57 2024 +0800 [improve](backup) Reduce backup/restore job log size by compress (#42954) Cherry-picked from #42459 Co-authored-by: walter <w41te...@gmail.com> --- .../main/java/org/apache/doris/common/Config.java | 17 +++++++++++ .../java/org/apache/doris/backup/AbstractJob.java | 27 ++++++++++++----- .../java/org/apache/doris/backup/BackupJob.java | 32 +++++++++++++++++++- .../java/org/apache/doris/backup/RestoreJob.java | 34 +++++++++++++++++++++- .../org/apache/doris/persist/gson/GsonUtils.java | 30 +++++++++++++++++++ 5 files changed, 131 insertions(+), 9 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index c91a487466b..4211b42e5c0 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1556,12 +1556,29 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static String multi_partition_name_prefix = "p_"; + /** * Control the max num of backup/restore job per db */ @ConfField(mutable = true, masterOnly = true) public static int max_backup_restore_job_num_per_db = 10; + /** + * A internal config, to reduce the restore job size during serialization by compress. + * + * WARNING: Once this option is enabled and a restore is performed, the FE version cannot be rolled back. + */ + @ConfField(mutable = false) + public static boolean restore_job_compressed_serialization = false; + + /** + * A internal config, to reduce the backup job size during serialization by compress. + * + * WARNING: Once this option is enabled and a backup is performed, the FE version cannot be rolled back. + */ + @ConfField(mutable = false) + public static boolean backup_job_compressed_serialization = false; + /** * Control the max num of tablets per backup job involved. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java index 77ec0ae26f4..3a00c974f9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java @@ -18,6 +18,7 @@ package org.apache.doris.backup; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; @@ -41,9 +42,10 @@ import java.util.Map; * 2. isDone() method is used to check whether we can submit the next job. */ public abstract class AbstractJob implements Writable { + public static final String COMPRESSED_JOB_ID = "COMPRESSED"; public enum JobType { - BACKUP, RESTORE + BACKUP, RESTORE, BACKUP_COMPRESSED, RESTORE_COMPRESSED } @SerializedName("t") @@ -174,10 +176,10 @@ public abstract class AbstractJob implements Writable { if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) { AbstractJob job = null; JobType type = JobType.valueOf(Text.readString(in)); - if (type == JobType.BACKUP) { - job = new BackupJob(); - } else if (type == JobType.RESTORE) { - job = new RestoreJob(); + if (type == JobType.BACKUP || type == JobType.BACKUP_COMPRESSED) { + job = new BackupJob(type); + } else if (type == JobType.RESTORE || type == JobType.RESTORE_COMPRESSED) { + job = new RestoreJob(type); } else { throw new IOException("Unknown job type: " + type.name()); } @@ -186,7 +188,12 @@ public abstract class AbstractJob implements Writable { job.readFields(in); return job; } else { - return GsonUtils.GSON.fromJson(Text.readString(in), AbstractJob.class); + String json = Text.readString(in); + if (COMPRESSED_JOB_ID.equals(json)) { + return GsonUtils.fromJsonCompressed(in, AbstractJob.class); + } else { + return GsonUtils.GSON.fromJson(json, AbstractJob.class); + } } } @@ -203,7 +210,13 @@ public abstract class AbstractJob implements Writable { count++; } - Text.writeString(out, GsonUtils.GSON.toJson(this)); + if ((type == JobType.BACKUP && Config.backup_job_compressed_serialization) + || (type == JobType.RESTORE && Config.restore_job_compressed_serialization)) { + Text.writeString(out, COMPRESSED_JOB_ID); + GsonUtils.toJsonCompressed(out, this); + } else { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } } @Deprecated diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index b14fae1ed3f..dbc7bb08fd4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -63,7 +63,9 @@ import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.ByteArrayInputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.nio.file.FileVisitOption; @@ -75,6 +77,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; public class BackupJob extends AbstractJob { @@ -134,6 +137,11 @@ public class BackupJob extends AbstractJob { super(JobType.BACKUP); } + public BackupJob(JobType jobType) { + super(jobType); + assert jobType == JobType.BACKUP || jobType == JobType.BACKUP_COMPRESSED; + } + public BackupJob(String label, long dbId, String dbName, List<TableRef> tableRefs, long timeoutMs, BackupContent content, Env env, long repoId) { super(JobType.BACKUP, label, dbId, dbName, timeoutMs, env, repoId); @@ -1050,13 +1058,35 @@ public class BackupJob extends AbstractJob { job.readFields(in); return job; } else { - return GsonUtils.GSON.fromJson(Text.readString(in), BackupJob.class); + String json = Text.readString(in); + if (AbstractJob.COMPRESSED_JOB_ID.equals(json)) { + return GsonUtils.fromJsonCompressed(in, BackupJob.class); + } else { + return GsonUtils.GSON.fromJson(json, BackupJob.class); + } } } public void readFields(DataInput in) throws IOException { super.readFields(in); + if (type == JobType.BACKUP_COMPRESSED) { + type = JobType.BACKUP; + + Text text = new Text(); + text.readFields(in); + + ByteArrayInputStream byteStream = new ByteArrayInputStream(text.getBytes()); + try (GZIPInputStream gzipStream = new GZIPInputStream(byteStream)) { + try (DataInputStream stream = new DataInputStream(gzipStream)) { + readOthers(in); + } + } + } else { + readOthers(in); + } + } + public void readOthers(DataInput in) throws IOException { // table refs int size = in.readInt(); tableRefs = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index cf43afc6597..1b992cf353a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -104,7 +104,9 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.ByteArrayInputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.IOException; import java.util.HashMap; import java.util.List; @@ -112,6 +114,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; public class RestoreJob extends AbstractJob implements GsonPostProcessable { private static final String PROP_RESERVE_REPLICA = RestoreStmt.PROP_RESERVE_REPLICA; @@ -216,6 +219,10 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { super(JobType.RESTORE); } + public RestoreJob(JobType jobType) { + super(jobType); + } + public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables, @@ -2522,7 +2529,12 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { job.readFields(in); return job; } else { - return GsonUtils.GSON.fromJson(Text.readString(in), RestoreJob.class); + String json = Text.readString(in); + if (AbstractJob.COMPRESSED_JOB_ID.equals(json)) { + return GsonUtils.fromJsonCompressed(in, RestoreJob.class); + } else { + return GsonUtils.GSON.fromJson(json, RestoreJob.class); + } } } @@ -2530,7 +2542,27 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { @Override public void readFields(DataInput in) throws IOException { super.readFields(in); + if (type == JobType.RESTORE_COMPRESSED) { + type = JobType.RESTORE; + + Text text = new Text(); + text.readFields(in); + if (LOG.isDebugEnabled() || text.getLength() > (100 << 20)) { + LOG.info("read restore job compressed size {}", text.getLength()); + } + + ByteArrayInputStream bytesStream = new ByteArrayInputStream(text.getBytes()); + try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) { + try (DataInputStream stream = new DataInputStream(gzipStream)) { + readOthers(stream); + } + } + } else { + readOthers(in); + } + } + private void readOthers(DataInput in) throws IOException { backupTimestamp = Text.readString(in); jobInfo = BackupJobInfo.read(in); allowLoad = in.readBoolean(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index ae2fff46835..ee6f2f74eac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -126,6 +126,7 @@ import org.apache.doris.cloud.load.CloudBrokerLoadJob; import org.apache.doris.cloud.load.CopyJob; import org.apache.doris.common.Config; import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.common.io.Text; import org.apache.doris.common.util.RangeUtils; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.ExternalDatabase; @@ -246,8 +247,13 @@ import com.google.gson.stream.JsonWriter; import org.apache.commons.lang3.reflect.TypeUtils; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; import java.io.DataInputStream; +import java.io.DataOutput; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; @@ -257,6 +263,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; /* * Some utilities about Gson. @@ -978,4 +986,26 @@ public class GsonUtils { } } + public static void toJsonCompressed(DataOutput out, Object src) throws IOException { + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipStream = new GZIPOutputStream(byteStream)) { + try (OutputStreamWriter writer = new OutputStreamWriter(gzipStream)) { + GsonUtils.GSON.toJson(src, writer); + } + } + Text text = new Text(byteStream.toByteArray()); + text.write(out); + } + + public static <T> T fromJsonCompressed(DataInput in, Class<T> clazz) throws IOException { + Text text = new Text(); + text.readFields(in); + + ByteArrayInputStream byteStream = new ByteArrayInputStream(text.getBytes()); + try (GZIPInputStream gzipStream = new GZIPInputStream(byteStream)) { + try (InputStreamReader reader = new InputStreamReader(gzipStream)) { + return GsonUtils.GSON.fromJson(reader, clazz); + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org