This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 2fb8e00907e branch-2.1: [chore](task) log the thrift message size if 
the broken pipe is occurred #49492 (#49509)
2fb8e00907e is described below

commit 2fb8e00907e193995d4c05f8ab700358ffd67688
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Mar 28 09:57:53 2025 +0800

    branch-2.1: [chore](task) log the thrift message size if the broken pipe is 
occurred #49492 (#49509)
    
    Cherry-picked from #49492
    
    Co-authored-by: walter <maoch...@selectdb.com>
---
 .../java/org/apache/doris/task/AgentBatchTask.java    | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index ebfdb28a16b..0045e05ccba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -20,6 +20,7 @@ package org.apache.doris.task;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.ThriftUtils;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.BackendService;
@@ -60,6 +61,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /*
  * This class group tasks by backend
@@ -166,6 +168,7 @@ public class AgentBatchTask implements Runnable {
             TNetworkAddress address = null;
             boolean ok = false;
             String errMsg = "";
+            List<TAgentTaskRequest> agentTaskRequests = new 
LinkedList<TAgentTaskRequest>();
             try {
                 Backend backend = 
Env.getCurrentSystemInfo().getBackend(backendId);
                 if (backend == null || !backend.isAlive()) {
@@ -177,7 +180,6 @@ public class AgentBatchTask implements Runnable {
                 String host = FeConstants.runningUnitTest ? "127.0.0.1" : 
backend.getHost();
                 address = new TNetworkAddress(host, backend.getBePort());
                 client = ClientPool.backendPool.borrowObject(address);
-                List<TAgentTaskRequest> agentTaskRequests = new 
LinkedList<TAgentTaskRequest>();
                 for (AgentTask task : tasks) {
                     agentTaskRequests.add(toAgentTaskRequest(task));
                     if (agentTaskRequests.size() >= batchSize) {
@@ -190,6 +192,21 @@ public class AgentBatchTask implements Runnable {
             } catch (Exception e) {
                 LOG.warn("task exec error. backend[{}]", backendId, e);
                 errMsg = String.format("task exec error: %s. backend[%d]", 
e.getMessage(), backendId);
+                if (!agentTaskRequests.isEmpty() && errMsg.contains("Broken 
pipe")) {
+                    // Log the task binary message size and the max task type, 
to help debug the
+                    // large thrift message size issue.
+                    List<Pair<TTaskType, Long>> taskTypeAndSize = 
agentTaskRequests.stream()
+                            .map(req -> Pair.of(req.getTaskType(), 
ThriftUtils.getBinaryMessageSize(req)))
+                            .collect(Collectors.toList());
+                    Pair<TTaskType, Long> maxTaskTypeAndSize = 
taskTypeAndSize.stream()
+                            .max((p1, p2) -> Long.compare(p1.value(), 
p2.value()))
+                            .orElse(null);  // taskTypeAndSize is not empty
+                    TTaskType maxType = maxTaskTypeAndSize.first;
+                    long maxSize = maxTaskTypeAndSize.second;
+                    long totalSize = 
taskTypeAndSize.stream().map(Pair::value).reduce(0L, Long::sum);
+                    LOG.warn("submit {} tasks to backend[{}], total size: {}, 
max task type: {}, size: {}. msg: {}",
+                            agentTaskRequests.size(), backendId, totalSize, 
maxType, maxSize, e.getMessage());
+                }
             } finally {
                 if (ok) {
                     ClientPool.backendPool.returnObject(address, client);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to