This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 94b97261382 [fix](ccr) handle large binlog and download failure 
(#30429)
94b97261382 is described below

commit 94b972613821e5498fab4abc9bf430b33b983a2f
Author: Yongqiang YANG <[email protected]>
AuthorDate: Sat Jan 27 16:09:32 2024 +0800

    [fix](ccr) handle large binlog and download failure (#30429)
---
 be/src/agent/task_worker_pool.cpp                          |  4 ++--
 .../src/main/java/org/apache/doris/common/Config.java      |  5 +++++
 .../main/java/org/apache/doris/binlog/BinlogManager.java   | 14 +++++++++++++-
 3 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 9716a65865c..1e16af1b8e9 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -850,8 +850,8 @@ void TaskWorkerPool::_download_worker_thread_callback() {
         auto status = Status::OK();
         if (download_request.__isset.remote_tablet_snapshots) {
             SnapshotLoader loader(_env, download_request.job_id, 
agent_task_req.signature);
-            
loader.remote_http_download(download_request.remote_tablet_snapshots,
-                                        &downloaded_tablet_ids);
+            status = 
loader.remote_http_download(download_request.remote_tablet_snapshots,
+                                                 &downloaded_tablet_ids);
         } else {
             std::unique_ptr<SnapshotLoader> loader = 
std::make_unique<SnapshotLoader>(
                     _env, download_request.job_id, agent_task_req.signature,
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 36dcfc1f8d0..2da574eb844 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2106,6 +2106,11 @@ public class Config extends ConfigBase {
             "Whether to enable binlog feature"})
     public static boolean enable_feature_binlog = false;
 
+    @ConfField(mutable = false, masterOnly = false, expType = 
ExperimentalType.EXPERIMENTAL, description = {
+        "设置 binlog 消息最字节长度",
+        "Set the maximum byte length of binlog message"})
+    public static int max_binlog_messsage_size = 1024 * 1024 * 1024;
+
     @ConfField(mutable = true, masterOnly = true, description = {
             "是否禁止使用 WITH REOSOURCE 语句创建 Catalog。",
             "Whether to disable creating catalog with WITH RESOURCE 
statement."})
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index b819d8444eb..6ac3ba3b3a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -457,10 +457,22 @@ public class BinlogManager {
         int length = dis.readInt();
         byte[] data = new byte[length];
         dis.readFully(data);
-        TMemoryInputTransport transport = new TMemoryInputTransport(data);
+        Boolean isLargeBinlog = length > 8 * 1024 * 1024;
+        if (isLargeBinlog) {
+            LOG.info("a large binlog length {}", length);
+        }
+
+        TMemoryInputTransport transport = new TMemoryInputTransport();
+        
transport.getConfiguration().setMaxMessageSize(Config.max_binlog_messsage_size);
+        transport.reset(data);
+
         TBinaryProtocol protocol = new TBinaryProtocol(transport);
         TBinlog binlog = new TBinlog();
         binlog.read(protocol);
+
+        if (isLargeBinlog) {
+            LOG.info("a large binlog length {} type {}", length, binlog.type);
+        }
         return binlog;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to