This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e1e8db173 [fix](exec) fix thread token shutdown (#14418)
3e1e8db173 is described below

commit 3e1e8db173bc1eb36a4ffa84d2d011ff8e1f8721
Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com>
AuthorDate: Sun Nov 20 00:04:48 2022 +0800

    [fix](exec) fix thread token shutdown (#14418)
    
    Fix Thread pool token was shut down error.
    This is because when there are more than 1 fragment of a query on one BE, 
the thread token maybe
    reset incorrectly, causing thread token shutdown earlier.
    cherry-pick from master
    Introduced from #13021
---
 be/src/runtime/fragment_mgr.cpp                               | 11 ++++++-----
 be/src/runtime/plan_fragment_executor.cpp                     |  4 +++-
 fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java |  5 +++--
 3 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 57d2a05f1e..29cdd17320 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -255,8 +255,9 @@ Status FragmentExecState::execute() {
         CgroupsMgr::apply_system_cgroup();
         opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start 
executing Fragment");
         Status status = _executor.open();
-        WARN_IF_ERROR(status, strings::Substitute("Got error while opening 
fragment $0",
-                                                  
print_id(_fragment_instance_id)));
+        WARN_IF_ERROR(status,
+                      strings::Substitute("Got error while opening fragment 
$0, query id: $1",
+                                          print_id(_fragment_instance_id), 
print_id(_query_id)));
 
         _executor.close();
         if (!status.ok()) {
@@ -403,7 +404,8 @@ void FragmentExecState::coordinator_callback(const Status& 
status, RuntimeProfil
                << apache::thrift::ThriftDebugString(params).c_str();
     if (!exec_status.ok()) {
         LOG(WARNING) << "report error status: " << exec_status.to_string()
-                     << " to coordinator: " << _coord_addr;
+                     << " to coordinator: " << _coord_addr << ", query id: " 
<< print_id(_query_id)
+                     << ", instance id: " << print_id(_fragment_instance_id);
     }
     try {
         try {
@@ -630,7 +632,6 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
                     BackendOptions::get_localhost());
         }
         fragments_ctx = search->second;
-        _set_scan_concurrency(params, fragments_ctx.get());
     } else {
         // This may be a first fragment request of the query.
         // Create the query fragments context.
@@ -689,7 +690,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
                           << print_id(fragments_ctx->query_id)
                           << " limit: " << PrettyPrinter::print(bytes_limit, 
TUnit::BYTES);
             } else {
-                // Already has a query fragmentscontext, use it
+                // Already has a query fragments context, use it
                 fragments_ctx = search->second;
             }
         }
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 8209c82e83..3a9206f1fa 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -626,7 +626,9 @@ void PlanFragmentExecutor::update_status(const Status& 
new_status) {
 void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, 
const std::string& msg) {
     LOG_INFO("PlanFragmentExecutor::cancel")
             .tag("query_id", _query_id)
-            .tag("instance_id", _runtime_state->fragment_instance_id());
+            .tag("instance_id", _runtime_state->fragment_instance_id())
+            .tag("reason", reason)
+            .tag("error message", msg);
     DCHECK(_prepared);
     _cancel_reason = reason;
     _cancel_msg = msg;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 035ed03cfe..a89aa3ef42 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1695,8 +1695,9 @@ public class Coordinator {
         // and returned_all_results_ is true.
         // (UpdateStatus() initiates cancellation, if it hasn't already been 
initiated)
         if (!(returnedAllResults && status.isCancelled()) && !status.ok()) {
-            LOG.warn("one instance report fail, query_id={} instance_id={}",
-                    DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()));
+            LOG.warn("one instance report fail, query_id={} instance_id={}, 
error message: {}",
+                    DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()),
+                    status.getErrorMsg());
             updateStatus(status, params.getFragmentInstanceId());
         }
         if (execState.done) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to