This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5658c34e5f5 [feat](restore) Support compressed snapshot meta and job 
info #43516 (#43570)
5658c34e5f5 is described below

commit 5658c34e5f5f687c4dc362bef9c8f6ef0b1237d2
Author: walter <w41te...@gmail.com>
AuthorDate: Mon Nov 11 20:34:24 2024 +0800

    [feat](restore) Support compressed snapshot meta and job info #43516 
(#43570)
    
    cherry pick from #43516
---
 .../main/java/org/apache/doris/common/Config.java  |  9 ++++
 .../java/org/apache/doris/common/GZIPUtils.java    | 48 ++++++++++++++++++++++
 .../apache/doris/service/FrontendServiceImpl.java  | 45 +++++++++++++++++---
 gensrc/thrift/FrontendService.thrift               |  3 ++
 4 files changed, 99 insertions(+), 6 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 59f4b33aff4..c3733d8b3be 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1587,6 +1587,15 @@ public class Config extends ConfigBase {
     @ConfField(mutable = false)
     public static boolean backup_job_compressed_serialization = false;
 
+    /**
+     * A internal config, to indicate whether to enable the restore snapshot 
rpc compression.
+     *
+     * The ccr syncer will depends this config to decide whether to compress 
the meta and job
+     * info of the restore snapshot request.
+     */
+    @ConfField(mutable = false)
+    public static boolean enable_restore_snapshot_rpc_compression = true;
+
     /**
      * Control the max num of tablets per backup job involved.
      */
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
new file mode 100644
index 00000000000..7408e2888cc
--- /dev/null
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class GZIPUtils {
+    public static boolean isGZIPCompressed(byte[] data) {
+        // From RFC 1952: 3.2. Members with a deflate compressed data stream 
(ID1 = 8, ID2 = 8)
+        return data.length >= 2 && data[0] == (byte) 0x1F && data[1] == (byte) 
0x8B;
+    }
+
+    public static byte[] compress(byte[] data) throws IOException {
+        ByteArrayOutputStream bytesStream = new ByteArrayOutputStream();
+        try (GZIPOutputStream gzipStream = new GZIPOutputStream(bytesStream)) {
+            gzipStream.write(data);
+        }
+        return bytesStream.toByteArray();
+    }
+
+    public static byte[] decompress(byte[] data) throws IOException {
+        ByteArrayInputStream bytesStream = new ByteArrayInputStream(data);
+        try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) {
+            return IOUtils.toByteArray(gzipStream);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 31aef1ebb7c..ba49d6da8e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -54,6 +54,7 @@ import org.apache.doris.common.CaseSensibility;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.DuplicatedRequestException;
+import org.apache.doris.common.GZIPUtils;
 import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.LoadException;
@@ -271,6 +272,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -2875,7 +2877,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
     // getSnapshotImpl
     private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, 
String clientIp)
-            throws UserException {
+            throws UserException, IOException {
         // Step 1: Check all required arg: user, passwd, db, label_name, 
snapshot_name,
         // snapshot_type
         if (!request.isSetUser()) {
@@ -2916,10 +2918,22 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST);
             result.getStatus().addToErrorMsgs(String.format("snapshot %s not 
exist", label));
         } else {
-            result.setMeta(snapshot.getMeta());
-            result.setJobInfo(snapshot.getJobInfo());
+            byte[] meta = snapshot.getMeta();
+            byte[] jobInfo = snapshot.getJobInfo();
+
             LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info 
size: {}",
-                    label, snapshot.getMeta().length, 
snapshot.getJobInfo().length);
+                    label, meta.length, jobInfo.length);
+            if (request.isEnableCompress()) {
+                meta = GZIPUtils.compress(meta);
+                jobInfo = GZIPUtils.compress(jobInfo);
+                result.setCompressed(true);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("get snapshot info with compress, snapshot: {}, 
compressed meta "
+                            + "size {}, compressed job info size {}", label, 
meta.length, jobInfo.length);
+                }
+            }
+            result.setMeta(meta);
+            result.setJobInfo(jobInfo);
         }
 
         return result;
@@ -3026,8 +3040,27 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                 restoreTableRefClause = new 
AbstractBackupTableRefClause(isExclude, tableRefs);
             }
         }
-        RestoreStmt restoreStmt = new RestoreStmt(label, repoName, 
restoreTableRefClause, properties, request.getMeta(),
-                request.getJobInfo());
+
+        byte[] meta = request.getMeta();
+        byte[] jobInfo = request.getJobInfo();
+        if (Config.enable_restore_snapshot_rpc_compression && 
request.isCompressed()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("decompress meta and job info, compressed meta size 
{}, compressed job info size {}",
+                        meta.length, jobInfo.length);
+            }
+            try {
+                meta = GZIPUtils.decompress(meta);
+                jobInfo = GZIPUtils.decompress(jobInfo);
+            } catch (Exception e) {
+                LOG.warn("decompress meta and job info failed", e);
+                throw new UserException("decompress meta and job info failed", 
e);
+            }
+        } else if (GZIPUtils.isGZIPCompressed(jobInfo) || 
GZIPUtils.isGZIPCompressed(meta)) {
+            throw new UserException("The request is compressed, but the config 
"
+                    + "`enable_restore_snapshot_rpc_compressed` is not 
enabled.");
+        }
+
+        RestoreStmt restoreStmt = new RestoreStmt(label, repoName, 
restoreTableRefClause, properties, meta, jobInfo);
         restoreStmt.setIsBeingSynced();
         LOG.debug("restore snapshot info, restoreStmt: {}", restoreStmt);
         try {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 1039baccda3..77e49e6c1fa 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1235,6 +1235,7 @@ struct TGetSnapshotRequest {
     7: optional string label_name
     8: optional string snapshot_name
     9: optional TSnapshotType snapshot_type
+    10: optional bool enable_compress;
 }
 
 struct TGetSnapshotResult {
@@ -1242,6 +1243,7 @@ struct TGetSnapshotResult {
     2: optional binary meta
     3: optional binary job_info
     4: optional Types.TNetworkAddress master_address
+    5: optional bool compressed;
 }
 
 struct TTableRef {
@@ -1265,6 +1267,7 @@ struct TRestoreSnapshotRequest {
     13: optional bool clean_tables
     14: optional bool clean_partitions
     15: optional bool atomic_restore
+    16: optional bool compressed;
 }
 
 struct TRestoreSnapshotResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to