This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 44133312747 [chore](task) log the thrift message size if the broken pipe is occurred (#49492) 44133312747 is described below commit 441333127473aea54513337db1bbe40ac85ec863 Author: walter <maoch...@selectdb.com> AuthorDate: Wed Mar 26 14:20:42 2025 +0800 [chore](task) log the thrift message size if the broken pipe is occurred (#49492) --- .../java/org/apache/doris/task/AgentBatchTask.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java index 0bea093d81b..328e4ba233d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java @@ -20,6 +20,7 @@ package org.apache.doris.task; import org.apache.doris.catalog.Env; import org.apache.doris.common.ClientPool; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.common.ThriftUtils; import org.apache.doris.metric.MetricRepo; import org.apache.doris.system.Backend; @@ -63,6 +64,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /* * This class group tasks by backend @@ -169,6 +171,7 @@ public class AgentBatchTask implements Runnable { TNetworkAddress address = null; boolean ok = false; String errMsg = ""; + List<TAgentTaskRequest> agentTaskRequests = new LinkedList<TAgentTaskRequest>(); try { Backend backend = Env.getCurrentSystemInfo().getBackend(backendId); if (backend == null || !backend.isAlive()) { @@ -180,7 +183,6 @@ public class AgentBatchTask implements Runnable { String host = FeConstants.runningUnitTest ? "127.0.0.1" : backend.getHost(); address = new TNetworkAddress(host, backend.getBePort()); client = ClientPool.backendPool.borrowObject(address); - List<TAgentTaskRequest> agentTaskRequests = new LinkedList<TAgentTaskRequest>(); for (AgentTask task : tasks) { agentTaskRequests.add(toAgentTaskRequest(task)); if (agentTaskRequests.size() >= batchSize) { @@ -193,6 +195,21 @@ public class AgentBatchTask implements Runnable { } catch (Exception e) { LOG.warn("task exec error. backend[{}]", backendId, e); errMsg = String.format("task exec error: %s. backend[%d]", e.getMessage(), backendId); + if (!agentTaskRequests.isEmpty() && errMsg.contains("Broken pipe")) { + // Log the task binary message size and the max task type, to help debug the + // large thrift message size issue. + List<Pair<TTaskType, Long>> taskTypeAndSize = agentTaskRequests.stream() + .map(req -> Pair.of(req.getTaskType(), ThriftUtils.getBinaryMessageSize(req))) + .collect(Collectors.toList()); + Pair<TTaskType, Long> maxTaskTypeAndSize = taskTypeAndSize.stream() + .max((p1, p2) -> Long.compare(p1.value(), p2.value())) + .orElse(null); // taskTypeAndSize is not empty + TTaskType maxType = maxTaskTypeAndSize.first; + long maxSize = maxTaskTypeAndSize.second; + long totalSize = taskTypeAndSize.stream().map(Pair::value).reduce(0L, Long::sum); + LOG.warn("submit {} tasks to backend[{}], total size: {}, max task type: {}, size: {}. msg: {}", + agentTaskRequests.size(), backendId, totalSize, maxType, maxSize, e.getMessage()); + } } finally { if (ok) { ClientPool.backendPool.returnObject(address, client); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org