This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 2fb8e00907e branch-2.1: [chore](task) log the thrift message size if the broken pipe is occurred #49492 (#49509) 2fb8e00907e is described below commit 2fb8e00907e193995d4c05f8ab700358ffd67688 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Mar 28 09:57:53 2025 +0800 branch-2.1: [chore](task) log the thrift message size if the broken pipe is occurred #49492 (#49509) Cherry-picked from #49492 Co-authored-by: walter <maoch...@selectdb.com> --- .../java/org/apache/doris/task/AgentBatchTask.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java index ebfdb28a16b..0045e05ccba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java @@ -20,6 +20,7 @@ package org.apache.doris.task; import org.apache.doris.catalog.Env; import org.apache.doris.common.ClientPool; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.common.ThriftUtils; import org.apache.doris.system.Backend; import org.apache.doris.thrift.BackendService; @@ -60,6 +61,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /* * This class group tasks by backend @@ -166,6 +168,7 @@ public class AgentBatchTask implements Runnable { TNetworkAddress address = null; boolean ok = false; String errMsg = ""; + List<TAgentTaskRequest> agentTaskRequests = new LinkedList<TAgentTaskRequest>(); try { Backend backend = Env.getCurrentSystemInfo().getBackend(backendId); if (backend == null || !backend.isAlive()) { @@ -177,7 +180,6 @@ public class AgentBatchTask implements Runnable { String host = FeConstants.runningUnitTest ? "127.0.0.1" : backend.getHost(); address = new TNetworkAddress(host, backend.getBePort()); client = ClientPool.backendPool.borrowObject(address); - List<TAgentTaskRequest> agentTaskRequests = new LinkedList<TAgentTaskRequest>(); for (AgentTask task : tasks) { agentTaskRequests.add(toAgentTaskRequest(task)); if (agentTaskRequests.size() >= batchSize) { @@ -190,6 +192,21 @@ public class AgentBatchTask implements Runnable { } catch (Exception e) { LOG.warn("task exec error. backend[{}]", backendId, e); errMsg = String.format("task exec error: %s. backend[%d]", e.getMessage(), backendId); + if (!agentTaskRequests.isEmpty() && errMsg.contains("Broken pipe")) { + // Log the task binary message size and the max task type, to help debug the + // large thrift message size issue. + List<Pair<TTaskType, Long>> taskTypeAndSize = agentTaskRequests.stream() + .map(req -> Pair.of(req.getTaskType(), ThriftUtils.getBinaryMessageSize(req))) + .collect(Collectors.toList()); + Pair<TTaskType, Long> maxTaskTypeAndSize = taskTypeAndSize.stream() + .max((p1, p2) -> Long.compare(p1.value(), p2.value())) + .orElse(null); // taskTypeAndSize is not empty + TTaskType maxType = maxTaskTypeAndSize.first; + long maxSize = maxTaskTypeAndSize.second; + long totalSize = taskTypeAndSize.stream().map(Pair::value).reduce(0L, Long::sum); + LOG.warn("submit {} tasks to backend[{}], total size: {}, max task type: {}, size: {}. msg: {}", + agentTaskRequests.size(), backendId, totalSize, maxType, maxSize, e.getMessage()); + } } finally { if (ok) { ClientPool.backendPool.returnObject(address, client); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org