This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 461e6494860 [fix](ccr) handle large binlog (#30435) 461e6494860 is described below commit 461e649486018755d145f156941bd802f0af7fa5 Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> AuthorDate: Sun Jan 28 17:52:39 2024 +0800 [fix](ccr) handle large binlog (#30435) --- .../src/main/java/org/apache/doris/common/Config.java | 5 +++++ .../main/java/org/apache/doris/binlog/BinlogManager.java | 14 +++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index bc849e24bd5..1231312cf5d 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2145,6 +2145,11 @@ public class Config extends ConfigBase { "Whether to enable binlog feature"}) public static boolean enable_feature_binlog = false; + @ConfField(mutable = false, masterOnly = false, varType = VariableAnnotation.EXPERIMENTAL, description = { + "设置 binlog 消息最字节长度", + "Set the maximum byte length of binlog message"}) + public static int max_binlog_messsage_size = 1024 * 1024 * 1024; + @ConfField(mutable = true, masterOnly = true, description = { "是否禁止使用 WITH REOSOURCE 语句创建 Catalog。", "Whether to disable creating catalog with WITH RESOURCE statement."}) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index b819d8444eb..6ac3ba3b3a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -457,10 +457,22 @@ public class BinlogManager { int length = dis.readInt(); byte[] data = new byte[length]; dis.readFully(data); - TMemoryInputTransport transport = new TMemoryInputTransport(data); + Boolean isLargeBinlog = length > 8 * 1024 * 1024; + if (isLargeBinlog) { + LOG.info("a large binlog length {}", length); + } + + TMemoryInputTransport transport = new TMemoryInputTransport(); + transport.getConfiguration().setMaxMessageSize(Config.max_binlog_messsage_size); + transport.reset(data); + TBinaryProtocol protocol = new TBinaryProtocol(transport); TBinlog binlog = new TBinlog(); binlog.read(protocol); + + if (isLargeBinlog) { + LOG.info("a large binlog length {} type {}", length, binlog.type); + } return binlog; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org