This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 44133312747 [chore](task) log the thrift message size if the broken 
pipe is occurred (#49492)
44133312747 is described below

commit 441333127473aea54513337db1bbe40ac85ec863
Author: walter <maoch...@selectdb.com>
AuthorDate: Wed Mar 26 14:20:42 2025 +0800

    [chore](task) log the thrift message size if the broken pipe is occurred 
(#49492)
---
 .../java/org/apache/doris/task/AgentBatchTask.java    | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index 0bea093d81b..328e4ba233d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -20,6 +20,7 @@ package org.apache.doris.task;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.ThriftUtils;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.system.Backend;
@@ -63,6 +64,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /*
  * This class group tasks by backend
@@ -169,6 +171,7 @@ public class AgentBatchTask implements Runnable {
             TNetworkAddress address = null;
             boolean ok = false;
             String errMsg = "";
+            List<TAgentTaskRequest> agentTaskRequests = new 
LinkedList<TAgentTaskRequest>();
             try {
                 Backend backend = 
Env.getCurrentSystemInfo().getBackend(backendId);
                 if (backend == null || !backend.isAlive()) {
@@ -180,7 +183,6 @@ public class AgentBatchTask implements Runnable {
                 String host = FeConstants.runningUnitTest ? "127.0.0.1" : 
backend.getHost();
                 address = new TNetworkAddress(host, backend.getBePort());
                 client = ClientPool.backendPool.borrowObject(address);
-                List<TAgentTaskRequest> agentTaskRequests = new 
LinkedList<TAgentTaskRequest>();
                 for (AgentTask task : tasks) {
                     agentTaskRequests.add(toAgentTaskRequest(task));
                     if (agentTaskRequests.size() >= batchSize) {
@@ -193,6 +195,21 @@ public class AgentBatchTask implements Runnable {
             } catch (Exception e) {
                 LOG.warn("task exec error. backend[{}]", backendId, e);
                 errMsg = String.format("task exec error: %s. backend[%d]", 
e.getMessage(), backendId);
+                if (!agentTaskRequests.isEmpty() && errMsg.contains("Broken 
pipe")) {
+                    // Log the task binary message size and the max task type, 
to help debug the
+                    // large thrift message size issue.
+                    List<Pair<TTaskType, Long>> taskTypeAndSize = 
agentTaskRequests.stream()
+                            .map(req -> Pair.of(req.getTaskType(), 
ThriftUtils.getBinaryMessageSize(req)))
+                            .collect(Collectors.toList());
+                    Pair<TTaskType, Long> maxTaskTypeAndSize = 
taskTypeAndSize.stream()
+                            .max((p1, p2) -> Long.compare(p1.value(), 
p2.value()))
+                            .orElse(null);  // taskTypeAndSize is not empty
+                    TTaskType maxType = maxTaskTypeAndSize.first;
+                    long maxSize = maxTaskTypeAndSize.second;
+                    long totalSize = 
taskTypeAndSize.stream().map(Pair::value).reduce(0L, Long::sum);
+                    LOG.warn("submit {} tasks to backend[{}], total size: {}, 
max task type: {}, size: {}. msg: {}",
+                            agentTaskRequests.size(), backendId, totalSize, 
maxType, maxSize, e.getMessage());
+                }
             } finally {
                 if (ok) {
                     ClientPool.backendPool.returnObject(address, client);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to