This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 150d8bb60fc [branch-2.0](pick 27738) Warning log to trace send fragment #27738 (#27760) 150d8bb60fc is described below commit 150d8bb60fcc06323f2a416764d162ebb72d0fb5 Author: zhiqiang <seuhezhiqi...@163.com> AuthorDate: Thu Nov 30 08:40:09 2023 +0800 [branch-2.0](pick 27738) Warning log to trace send fragment #27738 (#27760) --- be/src/service/internal_service.cpp | 28 ++++++++++++++++++++-- .../main/java/org/apache/doris/qe/Coordinator.java | 27 ++++++++++++++++----- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 4ac98dbf25f..df88be44ba7 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -458,10 +458,22 @@ Status PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_req uint32_t len = ser_request.size(); RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request)); } + const auto& fragment_list = t_request.paramsList; + MonotonicStopWatch timer; + timer.start(); for (const TExecPlanFragmentParams& params : t_request.paramsList) { RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params)); } + + timer.stop(); + double cost_secs = static_cast<double>(timer.elapsed_time()) / 1000000000ULL; + if (cost_secs > 5) { + LOG_WARNING("Prepare {} fragments of query {} costs {} seconds, it costs too much", + fragment_list.size(), print_id(fragment_list.front().params.query_id), + cost_secs); + } + return Status::OK(); } else if (version == PFragmentRequestVersion::VERSION_3) { TPipelineFragmentParamsList t_request; @@ -471,9 +483,21 @@ Status PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_req RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request)); } - for (const TPipelineFragmentParams& params : t_request.params_list) { - RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params)); + const auto& fragment_list = t_request.params_list; + MonotonicStopWatch timer; + timer.start(); + + for (const TPipelineFragmentParams& fragment : fragment_list) { + RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(fragment)); + } + + timer.stop(); + double cost_secs = static_cast<double>(timer.elapsed_time()) / 1000000000ULL; + if (cost_secs > 5) { + LOG_WARNING("Prepare {} fragments of query {} costs {} seconds, it costs too much", + fragment_list.size(), print_id(fragment_list.front().query_id), cost_secs); } + return Status::OK(); } else { return Status::InternalError("invalid version"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index af8eb518e9d..cf3175bc72d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -875,8 +875,13 @@ public class Coordinator implements CoordInterface { long leftTimeMs, String operation) throws RpcException, UserException { if (leftTimeMs <= 0) { - throw new UserException("timeout before waiting for " + operation + " RPC. Elapse(sec): " + ( - (System.currentTimeMillis() - timeoutDeadline) / 1000 + queryOptions.getExecutionTimeout())); + long elapsed = (System.currentTimeMillis() - timeoutDeadline) / 1000 + queryOptions.getExecutionTimeout(); + String msg = String.format( + "timeout before waiting {} rpc, query timeout: {}, already elapsed:{}, left for this:{}", + operation, queryOptions.getExecutionTimeout(), elapsed, leftTimeMs); + + LOG.warn("Query {} {}", DebugUtil.printId(queryId), msg); + throw new UserException(msg); } long timeoutMs = Math.min(leftTimeMs, Config.remote_fragment_exec_timeout_ms); @@ -904,7 +909,10 @@ public class Coordinator implements CoordInterface { code = TStatusCode.INTERNAL_ERROR; } catch (TimeoutException e) { exception = e; - errMsg = "timeout when waiting for " + operation + " RPC. Wait(sec): " + timeoutMs / 1000; + errMsg = String.format( + "timeout when waiting for {} rpc, query timeout {}, left timeout for this operation: {}", + operation, queryOptions.getExecutionTimeout(), timeoutMs / 10000); + LOG.warn("Query {} {}", DebugUtil.printId(queryId), errMsg); code = TStatusCode.TIMEOUT; } @@ -942,8 +950,12 @@ public class Coordinator implements CoordInterface { Future<PExecPlanFragmentResult>>> futures, long leftTimeMs, String operation) throws RpcException, UserException { if (leftTimeMs <= 0) { - throw new UserException("timeout before waiting for " + operation + " RPC. Elapse(sec): " + ( - (System.currentTimeMillis() - timeoutDeadline) / 1000 + queryOptions.query_timeout)); + long elapsed = (System.currentTimeMillis() - timeoutDeadline) / 1000 + queryOptions.getExecutionTimeout(); + String msg = String.format( + "timeout before waiting {} rpc, query timeout: {}, already elapsed:{}, left for this:{}", + operation, queryOptions.getExecutionTimeout(), elapsed, leftTimeMs); + LOG.warn("Query {} {}", DebugUtil.printId(queryId), msg); + throw new UserException(msg); } long timeoutMs = Math.min(leftTimeMs, Config.remote_fragment_exec_timeout_ms); @@ -971,7 +983,10 @@ public class Coordinator implements CoordInterface { code = TStatusCode.INTERNAL_ERROR; } catch (TimeoutException e) { exception = e; - errMsg = "timeout when waiting for " + operation + " RPC. Wait(sec): " + timeoutMs / 1000; + errMsg = String.format( + "timeout when waiting for {} rpc, query timeout {}, left timeout for this operation: {}", + operation, queryOptions.getExecutionTimeout(), timeoutMs / 10000); + LOG.warn("Query {} {}", DebugUtil.printId(queryId), errMsg); code = TStatusCode.TIMEOUT; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org