This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 5658c34e5f5 [feat](restore) Support compressed snapshot meta and job info #43516 (#43570) 5658c34e5f5 is described below commit 5658c34e5f5f687c4dc362bef9c8f6ef0b1237d2 Author: walter <w41te...@gmail.com> AuthorDate: Mon Nov 11 20:34:24 2024 +0800 [feat](restore) Support compressed snapshot meta and job info #43516 (#43570) cherry pick from #43516 --- .../main/java/org/apache/doris/common/Config.java | 9 ++++ .../java/org/apache/doris/common/GZIPUtils.java | 48 ++++++++++++++++++++++ .../apache/doris/service/FrontendServiceImpl.java | 45 +++++++++++++++++--- gensrc/thrift/FrontendService.thrift | 3 ++ 4 files changed, 99 insertions(+), 6 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 59f4b33aff4..c3733d8b3be 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1587,6 +1587,15 @@ public class Config extends ConfigBase { @ConfField(mutable = false) public static boolean backup_job_compressed_serialization = false; + /** + * A internal config, to indicate whether to enable the restore snapshot rpc compression. + * + * The ccr syncer will depends this config to decide whether to compress the meta and job + * info of the restore snapshot request. + */ + @ConfField(mutable = false) + public static boolean enable_restore_snapshot_rpc_compression = true; + /** * Control the max num of tablets per backup job involved. */ diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java new file mode 100644 index 00000000000..7408e2888cc --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import org.apache.commons.io.IOUtils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class GZIPUtils { + public static boolean isGZIPCompressed(byte[] data) { + // From RFC 1952: 3.2. Members with a deflate compressed data stream (ID1 = 8, ID2 = 8) + return data.length >= 2 && data[0] == (byte) 0x1F && data[1] == (byte) 0x8B; + } + + public static byte[] compress(byte[] data) throws IOException { + ByteArrayOutputStream bytesStream = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipStream = new GZIPOutputStream(bytesStream)) { + gzipStream.write(data); + } + return bytesStream.toByteArray(); + } + + public static byte[] decompress(byte[] data) throws IOException { + ByteArrayInputStream bytesStream = new ByteArrayInputStream(data); + try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) { + return IOUtils.toByteArray(gzipStream); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 31aef1ebb7c..ba49d6da8e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -54,6 +54,7 @@ import org.apache.doris.common.CaseSensibility; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.DuplicatedRequestException; +import org.apache.doris.common.GZIPUtils; import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; @@ -271,6 +272,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -2875,7 +2877,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { // getSnapshotImpl private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String clientIp) - throws UserException { + throws UserException, IOException { // Step 1: Check all required arg: user, passwd, db, label_name, snapshot_name, // snapshot_type if (!request.isSetUser()) { @@ -2916,10 +2918,22 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST); result.getStatus().addToErrorMsgs(String.format("snapshot %s not exist", label)); } else { - result.setMeta(snapshot.getMeta()); - result.setJobInfo(snapshot.getJobInfo()); + byte[] meta = snapshot.getMeta(); + byte[] jobInfo = snapshot.getJobInfo(); + LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}", - label, snapshot.getMeta().length, snapshot.getJobInfo().length); + label, meta.length, jobInfo.length); + if (request.isEnableCompress()) { + meta = GZIPUtils.compress(meta); + jobInfo = GZIPUtils.compress(jobInfo); + result.setCompressed(true); + if (LOG.isDebugEnabled()) { + LOG.debug("get snapshot info with compress, snapshot: {}, compressed meta " + + "size {}, compressed job info size {}", label, meta.length, jobInfo.length); + } + } + result.setMeta(meta); + result.setJobInfo(jobInfo); } return result; @@ -3026,8 +3040,27 @@ public class FrontendServiceImpl implements FrontendService.Iface { restoreTableRefClause = new AbstractBackupTableRefClause(isExclude, tableRefs); } } - RestoreStmt restoreStmt = new RestoreStmt(label, repoName, restoreTableRefClause, properties, request.getMeta(), - request.getJobInfo()); + + byte[] meta = request.getMeta(); + byte[] jobInfo = request.getJobInfo(); + if (Config.enable_restore_snapshot_rpc_compression && request.isCompressed()) { + if (LOG.isDebugEnabled()) { + LOG.debug("decompress meta and job info, compressed meta size {}, compressed job info size {}", + meta.length, jobInfo.length); + } + try { + meta = GZIPUtils.decompress(meta); + jobInfo = GZIPUtils.decompress(jobInfo); + } catch (Exception e) { + LOG.warn("decompress meta and job info failed", e); + throw new UserException("decompress meta and job info failed", e); + } + } else if (GZIPUtils.isGZIPCompressed(jobInfo) || GZIPUtils.isGZIPCompressed(meta)) { + throw new UserException("The request is compressed, but the config " + + "`enable_restore_snapshot_rpc_compressed` is not enabled."); + } + + RestoreStmt restoreStmt = new RestoreStmt(label, repoName, restoreTableRefClause, properties, meta, jobInfo); restoreStmt.setIsBeingSynced(); LOG.debug("restore snapshot info, restoreStmt: {}", restoreStmt); try { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 1039baccda3..77e49e6c1fa 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1235,6 +1235,7 @@ struct TGetSnapshotRequest { 7: optional string label_name 8: optional string snapshot_name 9: optional TSnapshotType snapshot_type + 10: optional bool enable_compress; } struct TGetSnapshotResult { @@ -1242,6 +1243,7 @@ struct TGetSnapshotResult { 2: optional binary meta 3: optional binary job_info 4: optional Types.TNetworkAddress master_address + 5: optional bool compressed; } struct TTableRef { @@ -1265,6 +1267,7 @@ struct TRestoreSnapshotRequest { 13: optional bool clean_tables 14: optional bool clean_partitions 15: optional bool atomic_restore + 16: optional bool compressed; } struct TRestoreSnapshotResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org