This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 461e6494860 [fix](ccr) handle large binlog (#30435)
461e6494860 is described below

commit 461e649486018755d145f156941bd802f0af7fa5
Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com>
AuthorDate: Sun Jan 28 17:52:39 2024 +0800

    [fix](ccr) handle large binlog (#30435)
---
 .../src/main/java/org/apache/doris/common/Config.java      |  5 +++++
 .../main/java/org/apache/doris/binlog/BinlogManager.java   | 14 +++++++++++++-
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index bc849e24bd5..1231312cf5d 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2145,6 +2145,11 @@ public class Config extends ConfigBase {
             "Whether to enable binlog feature"})
     public static boolean enable_feature_binlog = false;
 
+    @ConfField(mutable = false, masterOnly = false, varType = 
VariableAnnotation.EXPERIMENTAL, description = {
+        "设置 binlog 消息最字节长度",
+        "Set the maximum byte length of binlog message"})
+    public static int max_binlog_messsage_size = 1024 * 1024 * 1024;
+
     @ConfField(mutable = true, masterOnly = true, description = {
             "是否禁止使用 WITH REOSOURCE 语句创建 Catalog。",
             "Whether to disable creating catalog with WITH RESOURCE 
statement."})
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index b819d8444eb..6ac3ba3b3a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -457,10 +457,22 @@ public class BinlogManager {
         int length = dis.readInt();
         byte[] data = new byte[length];
         dis.readFully(data);
-        TMemoryInputTransport transport = new TMemoryInputTransport(data);
+        Boolean isLargeBinlog = length > 8 * 1024 * 1024;
+        if (isLargeBinlog) {
+            LOG.info("a large binlog length {}", length);
+        }
+
+        TMemoryInputTransport transport = new TMemoryInputTransport();
+        
transport.getConfiguration().setMaxMessageSize(Config.max_binlog_messsage_size);
+        transport.reset(data);
+
         TBinaryProtocol protocol = new TBinaryProtocol(transport);
         TBinlog binlog = new TBinlog();
         binlog.read(protocol);
+
+        if (isLargeBinlog) {
+            LOG.info("a large binlog length {} type {}", length, binlog.type);
+        }
         return binlog;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to