This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3e1e8db173 [fix](exec) fix thread token shutdown (#14418) 3e1e8db173 is described below commit 3e1e8db173bc1eb36a4ffa84d2d011ff8e1f8721 Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com> AuthorDate: Sun Nov 20 00:04:48 2022 +0800 [fix](exec) fix thread token shutdown (#14418) Fix Thread pool token was shut down error. This is because when there are more than 1 fragment of a query on one BE, the thread token maybe reset incorrectly, causing thread token shutdown earlier. cherry-pick from master Introduced from #13021 --- be/src/runtime/fragment_mgr.cpp | 11 ++++++----- be/src/runtime/plan_fragment_executor.cpp | 4 +++- fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 5 +++-- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 57d2a05f1e..29cdd17320 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -255,8 +255,9 @@ Status FragmentExecState::execute() { CgroupsMgr::apply_system_cgroup(); opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start executing Fragment"); Status status = _executor.open(); - WARN_IF_ERROR(status, strings::Substitute("Got error while opening fragment $0", - print_id(_fragment_instance_id))); + WARN_IF_ERROR(status, + strings::Substitute("Got error while opening fragment $0, query id: $1", + print_id(_fragment_instance_id), print_id(_query_id))); _executor.close(); if (!status.ok()) { @@ -403,7 +404,8 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil << apache::thrift::ThriftDebugString(params).c_str(); if (!exec_status.ok()) { LOG(WARNING) << "report error status: " << exec_status.to_string() - << " to coordinator: " << _coord_addr; + << " to coordinator: " << _coord_addr << ", query id: " << print_id(_query_id) + << ", instance id: " << print_id(_fragment_instance_id); } try { try { @@ -630,7 +632,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi BackendOptions::get_localhost()); } fragments_ctx = search->second; - _set_scan_concurrency(params, fragments_ctx.get()); } else { // This may be a first fragment request of the query. // Create the query fragments context. @@ -689,7 +690,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi << print_id(fragments_ctx->query_id) << " limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES); } else { - // Already has a query fragmentscontext, use it + // Already has a query fragments context, use it fragments_ctx = search->second; } } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 8209c82e83..3a9206f1fa 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -626,7 +626,9 @@ void PlanFragmentExecutor::update_status(const Status& new_status) { void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { LOG_INFO("PlanFragmentExecutor::cancel") .tag("query_id", _query_id) - .tag("instance_id", _runtime_state->fragment_instance_id()); + .tag("instance_id", _runtime_state->fragment_instance_id()) + .tag("reason", reason) + .tag("error message", msg); DCHECK(_prepared); _cancel_reason = reason; _cancel_msg = msg; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 035ed03cfe..a89aa3ef42 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1695,8 +1695,9 @@ public class Coordinator { // and returned_all_results_ is true. // (UpdateStatus() initiates cancellation, if it hasn't already been initiated) if (!(returnedAllResults && status.isCancelled()) && !status.ok()) { - LOG.warn("one instance report fail, query_id={} instance_id={}", - DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId())); + LOG.warn("one instance report fail, query_id={} instance_id={}, error message: {}", + DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()), + status.getErrorMsg()); updateStatus(status, params.getFragmentInstanceId()); } if (execState.done) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org