This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 74fab5c5be4 [feat](restore) Support compressed snapshot meta and job info (#43516) 74fab5c5be4 is described below commit 74fab5c5be487e0fdda83a725e96b307986dd8fb Author: walter <w41te...@gmail.com> AuthorDate: Fri Nov 8 21:22:15 2024 +0800 [feat](restore) Support compressed snapshot meta and job info (#43516) Related PR: https://github.com/selectdb/ccr-syncer/pull/223 The backup meta and snapshot info are larger and might exceed the thrift max message size limitation. This PR compresses the backup meta and job info in both the `GetSnapshot` and `RestoreSnapshot` methods to avoid this. To keep compatibility, the field `compressed` is added to `TGetSnapshotResult` to indicate whether the meta and job info are compressed; config `enable_restore_snapshot_rpc_compression` indicates whether the FE support reads the compressed meta and job info, the ccr-syncer should read this config before issuing a restore snapshot request with compressed meta and job info. ### Release note Support compressing large backup meta and snapshot job info, to avoid exceeding the thrift server limitation. --- .../main/java/org/apache/doris/common/Config.java | 9 +++++ .../java/org/apache/doris/common/GZIPUtils.java | 46 ++++++++++++++++++++++ .../apache/doris/service/FrontendServiceImpl.java | 45 ++++++++++++++++++--- gensrc/thrift/FrontendService.thrift | 3 ++ 4 files changed, 97 insertions(+), 6 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index e48e4304eaf..93b2dfce26c 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1596,6 +1596,15 @@ public class Config extends ConfigBase { @ConfField(mutable = false) public static boolean backup_job_compressed_serialization = false; + /** + * A internal config, to indicate whether to enable the restore snapshot rpc compression. + * + * The ccr syncer will depends this config to decide whether to compress the meta and job + * info of the restore snapshot request. + */ + @ConfField(mutable = false) + public static boolean enable_restore_snapshot_rpc_compression = true; + /** * Control the max num of tablets per backup job involved. */ diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java new file mode 100644 index 00000000000..8c9011bcbad --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class GZIPUtils { + public static boolean isGZIPCompressed(byte[] data) { + // From RFC 1952: 3.2. Members with a deflate compressed data stream (ID1 = 8, ID2 = 8) + return data.length >= 2 && data[0] == (byte) 0x1F && data[1] == (byte) 0x8B; + } + + public static byte[] compress(byte[] data) throws IOException { + ByteArrayOutputStream bytesStream = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipStream = new GZIPOutputStream(bytesStream)) { + gzipStream.write(data); + } + return bytesStream.toByteArray(); + } + + public static byte[] decompress(byte[] data) throws IOException { + ByteArrayInputStream bytesStream = new ByteArrayInputStream(data); + try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) { + return gzipStream.readAllBytes(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 346f1a66008..ba91825711c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -55,6 +55,7 @@ import org.apache.doris.common.CaseSensibility; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.DuplicatedRequestException; +import org.apache.doris.common.GZIPUtils; import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; @@ -272,6 +273,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -2890,7 +2892,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { // getSnapshotImpl private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String clientIp) - throws UserException { + throws UserException, IOException { // Step 1: Check all required arg: user, passwd, db, label_name, snapshot_name, // snapshot_type if (!request.isSetUser()) { @@ -2931,10 +2933,22 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST); result.getStatus().addToErrorMsgs(String.format("snapshot %s not exist", label)); } else { - result.setMeta(snapshot.getMeta()); - result.setJobInfo(snapshot.getJobInfo()); + byte[] meta = snapshot.getMeta(); + byte[] jobInfo = snapshot.getJobInfo(); + LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}", - label, snapshot.getMeta().length, snapshot.getJobInfo().length); + label, meta.length, jobInfo.length); + if (request.isEnableCompress()) { + meta = GZIPUtils.compress(meta); + jobInfo = GZIPUtils.compress(jobInfo); + result.setCompressed(true); + if (LOG.isDebugEnabled()) { + LOG.debug("get snapshot info with compress, snapshot: {}, compressed meta " + + "size {}, compressed job info size {}", label, meta.length, jobInfo.length); + } + } + result.setMeta(meta); + result.setJobInfo(jobInfo); } return result; @@ -3041,8 +3055,27 @@ public class FrontendServiceImpl implements FrontendService.Iface { restoreTableRefClause = new AbstractBackupTableRefClause(isExclude, tableRefs); } } - RestoreStmt restoreStmt = new RestoreStmt(label, repoName, restoreTableRefClause, properties, request.getMeta(), - request.getJobInfo()); + + byte[] meta = request.getMeta(); + byte[] jobInfo = request.getJobInfo(); + if (Config.enable_restore_snapshot_rpc_compression && request.isCompressed()) { + if (LOG.isDebugEnabled()) { + LOG.debug("decompress meta and job info, compressed meta size {}, compressed job info size {}", + meta.length, jobInfo.length); + } + try { + meta = GZIPUtils.decompress(meta); + jobInfo = GZIPUtils.decompress(jobInfo); + } catch (Exception e) { + LOG.warn("decompress meta and job info failed", e); + throw new UserException("decompress meta and job info failed", e); + } + } else if (GZIPUtils.isGZIPCompressed(jobInfo) || GZIPUtils.isGZIPCompressed(meta)) { + throw new UserException("The request is compressed, but the config " + + "`enable_restore_snapshot_rpc_compressed` is not enabled."); + } + + RestoreStmt restoreStmt = new RestoreStmt(label, repoName, restoreTableRefClause, properties, meta, jobInfo); restoreStmt.setIsBeingSynced(); LOG.debug("restore snapshot info, restoreStmt: {}", restoreStmt); try { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 5efe26590cd..b3c75be8182 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1237,6 +1237,7 @@ struct TGetSnapshotRequest { 7: optional string label_name 8: optional string snapshot_name 9: optional TSnapshotType snapshot_type + 10: optional bool enable_compress; } struct TGetSnapshotResult { @@ -1244,6 +1245,7 @@ struct TGetSnapshotResult { 2: optional binary meta 3: optional binary job_info 4: optional Types.TNetworkAddress master_address + 5: optional bool compressed; } struct TTableRef { @@ -1267,6 +1269,7 @@ struct TRestoreSnapshotRequest { 13: optional bool clean_tables 14: optional bool clean_partitions 15: optional bool atomic_restore + 16: optional bool compressed; } struct TRestoreSnapshotResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org