This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 53ea2a0a1d1 [fix](coordinator) fix NereidsCoordinator can not 
interrupt query in fe (#44795)
53ea2a0a1d1 is described below

commit 53ea2a0a1d18900a3ad5bca8d1b1d5fe1a98111c
Author: 924060929 <lanhuaj...@selectdb.com>
AuthorDate: Mon Dec 2 16:40:38 2024 +0800

    [fix](coordinator) fix NereidsCoordinator can not interrupt query in fe 
(#44795)
    
    fix NereidsCoordinator can not interrupt query in fe, the QueryProcessor
    should set the status to failed
---
 .../org/apache/doris/qe/AbstractJobProcessor.java  | 118 +++++++++++++++++++++
 .../java/org/apache/doris/qe/JobProcessor.java     |   5 +-
 .../org/apache/doris/qe/NereidsCoordinator.java    |   5 +-
 .../org/apache/doris/qe/runtime/LoadProcessor.java |  74 ++-----------
 .../doris/qe/runtime/PipelineExecutionTask.java    |   2 +-
 .../qe/runtime/PipelineExecutionTaskBuilder.java   |   2 +-
 .../apache/doris/qe/runtime/QueryProcessor.java    |  18 ++--
 7 files changed, 142 insertions(+), 82 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java
new file mode 100644
index 00000000000..2858de25d57
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.Status;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.qe.runtime.BackendFragmentId;
+import org.apache.doris.qe.runtime.MultiFragmentsPipelineTask;
+import org.apache.doris.qe.runtime.PipelineExecutionTask;
+import org.apache.doris.qe.runtime.SingleFragmentPipelineTask;
+import org.apache.doris.thrift.TReportExecStatusParams;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+
+/** AbstractJobProcessor */
+public abstract class AbstractJobProcessor implements JobProcessor {
+    private final Logger logger = LogManager.getLogger(getClass());
+
+    protected final CoordinatorContext coordinatorContext;
+    protected volatile Optional<PipelineExecutionTask> executionTask;
+    protected volatile Optional<Map<BackendFragmentId, 
SingleFragmentPipelineTask>> backendFragmentTasks;
+
+    public AbstractJobProcessor(CoordinatorContext coordinatorContext) {
+        this.coordinatorContext = Objects.requireNonNull(coordinatorContext, 
"coordinatorContext can not be null");
+        this.executionTask = Optional.empty();
+        this.backendFragmentTasks = Optional.empty();
+    }
+
+    protected abstract void doProcessReportExecStatus(
+            TReportExecStatusParams params, SingleFragmentPipelineTask 
fragmentTask);
+
+    @Override
+    public final void setPipelineExecutionTask(PipelineExecutionTask 
pipelineExecutionTask) {
+        Preconditions.checkArgument(pipelineExecutionTask != null, 
"sqlPipelineTask can not be null");
+
+        this.executionTask = Optional.of(pipelineExecutionTask);
+        Map<BackendFragmentId, SingleFragmentPipelineTask> backendFragmentTasks
+                = buildBackendFragmentTasks(pipelineExecutionTask);
+        this.backendFragmentTasks = Optional.of(backendFragmentTasks);
+
+        afterSetPipelineExecutionTask(pipelineExecutionTask);
+    }
+
+    protected void afterSetPipelineExecutionTask(PipelineExecutionTask 
pipelineExecutionTask) {}
+
+    @Override
+    public final void updateFragmentExecStatus(TReportExecStatusParams params) 
{
+        SingleFragmentPipelineTask fragmentTask = 
backendFragmentTasks.get().get(
+                new BackendFragmentId(params.getBackendId(), 
params.getFragmentId()));
+        if (fragmentTask == null || 
!fragmentTask.processReportExecStatus(params)) {
+            return;
+        }
+
+        TUniqueId queryId = coordinatorContext.queryId;
+        Status status = new Status(params.status);
+        // for now, abort the query if we see any error except if the error is 
cancelled
+        // and returned_all_results_ is true.
+        // (UpdateStatus() initiates cancellation, if it hasn't already been 
initiated)
+        if (!status.ok()) {
+            if (coordinatorContext.isEos() && status.isCancelled()) {
+                logger.warn("Query {} has returned all results, fragment_id={} 
instance_id={}, be={}"
+                                + " is reporting failed status {}",
+                        DebugUtil.printId(queryId), params.getFragmentId(),
+                        DebugUtil.printId(params.getFragmentInstanceId()),
+                        params.getBackendId(),
+                        status.toString());
+            } else {
+                logger.warn("one instance report fail, query_id={} 
fragment_id={} instance_id={}, be={},"
+                                + " error message: {}",
+                        DebugUtil.printId(queryId), params.getFragmentId(),
+                        DebugUtil.printId(params.getFragmentInstanceId()),
+                        params.getBackendId(), status.toString());
+                coordinatorContext.updateStatusIfOk(status);
+            }
+        }
+        doProcessReportExecStatus(params, fragmentTask);
+    }
+
+    private Map<BackendFragmentId, SingleFragmentPipelineTask> 
buildBackendFragmentTasks(
+            PipelineExecutionTask executionTask) {
+        ImmutableMap.Builder<BackendFragmentId, SingleFragmentPipelineTask> 
backendFragmentTasks
+                = ImmutableMap.builder();
+        for (Entry<Long, MultiFragmentsPipelineTask> backendTask : 
executionTask.getChildrenTasks().entrySet()) {
+            Long backendId = backendTask.getKey();
+            for (Entry<Integer, SingleFragmentPipelineTask> fragmentIdToTask : 
backendTask.getValue()
+                    .getChildrenTasks().entrySet()) {
+                Integer fragmentId = fragmentIdToTask.getKey();
+                SingleFragmentPipelineTask fragmentTask = 
fragmentIdToTask.getValue();
+                backendFragmentTasks.put(new BackendFragmentId(backendId, 
fragmentId), fragmentTask);
+            }
+        }
+        return backendFragmentTasks.build();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/JobProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/JobProcessor.java
index ede218848c7..7e4042dde3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/JobProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/JobProcessor.java
@@ -19,9 +19,12 @@ package org.apache.doris.qe;
 
 import org.apache.doris.common.Status;
 import org.apache.doris.qe.runtime.PipelineExecutionTask;
+import org.apache.doris.thrift.TReportExecStatusParams;
 
 public interface JobProcessor {
-    void setSqlPipelineTask(PipelineExecutionTask pipelineExecutionTask);
+    void setPipelineExecutionTask(PipelineExecutionTask pipelineExecutionTask);
 
     void cancel(Status cancelReason);
+
+    void updateFragmentExecStatus(TReportExecStatusParams params);
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
index a9d6becc7fa..a6f24806ed7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
@@ -233,10 +233,7 @@ public class NereidsCoordinator extends Coordinator {
 
     @Override
     public void updateFragmentExecStatus(TReportExecStatusParams params) {
-        JobProcessor jobProcessor = coordinatorContext.getJobProcessor();
-        if (jobProcessor instanceof LoadProcessor) {
-            
coordinatorContext.asLoadProcessor().updateFragmentExecStatus(params);
-        }
+        coordinatorContext.getJobProcessor().updateFragmentExecStatus(params);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
index 3a448521fca..fb32919d834 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
@@ -24,46 +24,39 @@ import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.datasource.hive.HMSTransaction;
 import org.apache.doris.datasource.iceberg.IcebergTransaction;
 import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.qe.AbstractJobProcessor;
 import org.apache.doris.qe.CoordinatorContext;
-import org.apache.doris.qe.JobProcessor;
 import org.apache.doris.qe.LoadContext;
 import org.apache.doris.thrift.TFragmentInstanceReport;
 import org.apache.doris.thrift.TReportExecStatusParams;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TUniqueId;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
-public class LoadProcessor implements JobProcessor {
+public class LoadProcessor extends AbstractJobProcessor {
     private static final Logger LOG = 
LogManager.getLogger(LoadProcessor.class);
 
-    public final CoordinatorContext coordinatorContext;
     public final LoadContext loadContext;
     public final long jobId;
 
     // this latch is used to wait finish for load, for example, insert into 
statement
     // MarkedCountDownLatch:
     //  key: fragmentId, value: backendId
-    private volatile Optional<PipelineExecutionTask> executionTask;
     private volatile Optional<MarkedCountDownLatch<Integer, Long>> latch;
-    private volatile Optional<Map<BackendFragmentId, 
SingleFragmentPipelineTask>> backendFragmentTasks;
     private volatile List<SingleFragmentPipelineTask> topFragmentTasks;
 
     public LoadProcessor(CoordinatorContext coordinatorContext, long jobId) {
-        this.coordinatorContext = Objects.requireNonNull(coordinatorContext, 
"coordinatorContext can not be null");
+        super(coordinatorContext);
+
         this.loadContext = new LoadContext();
-        this.executionTask = Optional.empty();
         this.latch = Optional.empty();
         this.backendFragmentTasks = Optional.empty();
 
@@ -87,14 +80,8 @@ public class LoadProcessor implements JobProcessor {
     }
 
     @Override
-    public void setSqlPipelineTask(PipelineExecutionTask 
pipelineExecutionTask) {
-        Preconditions.checkArgument(pipelineExecutionTask != null, 
"sqlPipelineTask can not be null");
-
-        this.executionTask = Optional.of(pipelineExecutionTask);
-        Map<BackendFragmentId, SingleFragmentPipelineTask> backendFragmentTasks
-                = buildBackendFragmentTasks(pipelineExecutionTask);
-        this.backendFragmentTasks = Optional.of(backendFragmentTasks);
-
+    protected void afterSetPipelineExecutionTask(PipelineExecutionTask 
pipelineExecutionTask) {
+        Map<BackendFragmentId, SingleFragmentPipelineTask> 
backendFragmentTasks = this.backendFragmentTasks.get();
         MarkedCountDownLatch<Integer, Long> latch = new 
MarkedCountDownLatch<>(backendFragmentTasks.size());
         for (BackendFragmentId backendFragmentId : 
backendFragmentTasks.keySet()) {
             latch.addMark(backendFragmentId.fragmentId, 
backendFragmentId.backendId);
@@ -168,34 +155,9 @@ public class LoadProcessor implements JobProcessor {
         return latch.get().await(timeout, unit);
     }
 
-    public void updateFragmentExecStatus(TReportExecStatusParams params) {
-        SingleFragmentPipelineTask fragmentTask = 
backendFragmentTasks.get().get(
-                new BackendFragmentId(params.getBackendId(), 
params.getFragmentId()));
-        if (fragmentTask == null || 
!fragmentTask.processReportExecStatus(params)) {
-            return;
-        }
-        TUniqueId queryId = coordinatorContext.queryId;
-        Status status = new Status(params.status);
-        // for now, abort the query if we see any error except if the error is 
cancelled
-        // and returned_all_results_ is true.
-        // (UpdateStatus() initiates cancellation, if it hasn't already been 
initiated)
-        if (!status.ok()) {
-            if (coordinatorContext.isEos() && status.isCancelled()) {
-                LOG.warn("Query {} has returned all results, fragment_id={} 
instance_id={}, be={}"
-                                + " is reporting failed status {}",
-                        DebugUtil.printId(queryId), params.getFragmentId(),
-                        DebugUtil.printId(params.getFragmentInstanceId()),
-                        params.getBackendId(),
-                        status.toString());
-            } else {
-                LOG.warn("one instance report fail, query_id={} fragment_id={} 
instance_id={}, be={},"
-                                + " error message: {}",
-                        DebugUtil.printId(queryId), params.getFragmentId(),
-                        DebugUtil.printId(params.getFragmentInstanceId()),
-                        params.getBackendId(), status.toString());
-                coordinatorContext.updateStatusIfOk(status);
-            }
-        }
+
+    @Override
+    protected void doProcessReportExecStatus(TReportExecStatusParams params, 
SingleFragmentPipelineTask fragmentTask) {
         LoadContext loadContext = 
coordinatorContext.asLoadProcessor().loadContext;
         if (params.isSetDeltaUrls()) {
             loadContext.updateDeltaUrls(params.getDeltaUrls());
@@ -234,7 +196,7 @@ public class LoadProcessor implements JobProcessor {
         if (fragmentTask.isDone()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Query {} fragment {} is marked done",
-                        DebugUtil.printId(queryId), params.getFragmentId());
+                        DebugUtil.printId(coordinatorContext.queryId), 
params.getFragmentId());
             }
             latch.get().markedCountDown(params.getFragmentId(), 
params.getBackendId());
         }
@@ -258,22 +220,6 @@ public class LoadProcessor implements JobProcessor {
         }
     }
 
-    private Map<BackendFragmentId, SingleFragmentPipelineTask> 
buildBackendFragmentTasks(
-            PipelineExecutionTask executionTask) {
-        ImmutableMap.Builder<BackendFragmentId, SingleFragmentPipelineTask> 
backendFragmentTasks
-                = ImmutableMap.builder();
-        for (Entry<Long, MultiFragmentsPipelineTask> backendTask : 
executionTask.getChildrenTasks().entrySet()) {
-            Long backendId = backendTask.getKey();
-            for (Entry<Integer, SingleFragmentPipelineTask> fragmentIdToTask : 
backendTask.getValue()
-                    .getChildrenTasks().entrySet()) {
-                Integer fragmentId = fragmentIdToTask.getKey();
-                SingleFragmentPipelineTask fragmentTask = 
fragmentIdToTask.getValue();
-                backendFragmentTasks.put(new BackendFragmentId(backendId, 
fragmentId), fragmentTask);
-            }
-        }
-        return backendFragmentTasks.build();
-    }
-
     /*
      * Check the state of backends in needCheckBackendExecStates.
      * return true if all of them are OK. Otherwise, return false.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
index 8c1b9714c35..ae87d59d075 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
@@ -102,7 +102,7 @@ public class PipelineExecutionTask extends 
AbstractRuntimeTask<Long, MultiFragme
 
     @Override
     public String toString() {
-        return "SqlPipelineTask(\n"
+        return "PipelineExecutionTask(\n"
                 + childrenTasks.allTasks()
                     .stream()
                     .map(multiFragmentsPipelineTask -> "  " + 
multiFragmentsPipelineTask)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
index fd00bf0e3e8..0da6f4a5fe2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
@@ -61,7 +61,7 @@ public class PipelineExecutionTaskBuilder {
                 backendServiceProxy,
                 buildMultiFragmentTasks(coordinatorContext, 
backendServiceProxy, workerToFragmentsParam)
         );
-        
coordinatorContext.getJobProcessor().setSqlPipelineTask(pipelineExecutionTask);
+        
coordinatorContext.getJobProcessor().setPipelineExecutionTask(pipelineExecutionTask);
         return pipelineExecutionTask;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
index 2ec38e8cc8e..a5a5100faec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
@@ -25,13 +25,14 @@ import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWor
 import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
 import org.apache.doris.planner.DataSink;
 import org.apache.doris.planner.ResultSink;
+import org.apache.doris.qe.AbstractJobProcessor;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.CoordinatorContext;
-import org.apache.doris.qe.JobProcessor;
 import org.apache.doris.qe.ResultReceiver;
 import org.apache.doris.qe.RowBatch;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TReportExecStatusParams;
 import org.apache.doris.thrift.TStatusCode;
 
 import com.google.common.base.Strings;
@@ -44,24 +45,21 @@ import org.apache.thrift.TException;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-public class QueryProcessor implements JobProcessor {
+public class QueryProcessor extends AbstractJobProcessor {
     private static final Logger LOG = 
LogManager.getLogger(QueryProcessor.class);
 
     // constant fields
     private final long limitRows;
 
     // mutable field
-    private Optional<PipelineExecutionTask> sqlPipelineTask;
-    private final CoordinatorContext coordinatorContext;
     private final List<ResultReceiver> runningReceivers;
     private int receiverOffset;
     private long numReceivedRows;
 
     public QueryProcessor(CoordinatorContext coordinatorContext, 
List<ResultReceiver> runningReceivers) {
-        this.coordinatorContext = Objects.requireNonNull(coordinatorContext, 
"coordinatorContext can not be null");
+        super(coordinatorContext);
         this.runningReceivers = new CopyOnWriteArrayList<>(
                 Objects.requireNonNull(runningReceivers, "runningReceivers can 
not be null")
         );
@@ -69,8 +67,6 @@ public class QueryProcessor implements JobProcessor {
         this.limitRows = 
coordinatorContext.fragments.get(coordinatorContext.fragments.size() - 1)
                 .getPlanRoot()
                 .getLimit();
-
-        this.sqlPipelineTask = Optional.empty();
     }
 
     public static QueryProcessor build(CoordinatorContext coordinatorContext) {
@@ -109,8 +105,8 @@ public class QueryProcessor implements JobProcessor {
     }
 
     @Override
-    public void setSqlPipelineTask(PipelineExecutionTask 
pipelineExecutionTask) {
-        this.sqlPipelineTask = Optional.ofNullable(pipelineExecutionTask);
+    protected void doProcessReportExecStatus(TReportExecStatusParams params, 
SingleFragmentPipelineTask fragmentTask) {
+
     }
 
     public boolean isEos() {
@@ -178,7 +174,7 @@ public class QueryProcessor implements JobProcessor {
             receiver.cancel(cancelReason);
         }
 
-        this.sqlPipelineTask.ifPresent(sqlPipelineTask -> {
+        this.executionTask.ifPresent(sqlPipelineTask -> {
             for (MultiFragmentsPipelineTask fragmentsTask : 
sqlPipelineTask.getChildrenTasks().values()) {
                 fragmentsTask.cancelExecute(cancelReason);
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to