This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 5dfc5d2c77c [enhancement](querycancel) print detail message when query is cancelled (#38859) 5dfc5d2c77c is described below commit 5dfc5d2c77ce0d5feaf1a67abd2669407ed19ac8 Author: yiguolei <676222...@qq.com> AuthorDate: Mon Aug 5 14:47:03 2024 +0800 [enhancement](querycancel) print detail message when query is cancelled (#38859) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --------- Co-authored-by: yiguolei <yiguo...@gmail.com> --- .../main/java/org/apache/doris/common/Status.java | 7 ++++ .../java/org/apache/doris/qe/CoordInterface.java | 2 +- .../main/java/org/apache/doris/qe/Coordinator.java | 43 +++++++++++++--------- .../java/org/apache/doris/qe/PointQueryExec.java | 2 +- .../org/apache/doris/qe/PointQueryExecutor.java | 2 +- .../org/apache/doris/qe/QueryCancelWorker.java | 6 ++- .../java/org/apache/doris/qe/StmtExecutor.java | 12 ++++-- 7 files changed, 47 insertions(+), 27 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java index 18852d8c04c..638c1123820 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java @@ -21,6 +21,8 @@ import org.apache.doris.proto.Types.PStatus; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; +import org.apache.logging.log4j.message.ParameterizedMessage; + public class Status { public static final Status OK = new Status(); public static final Status CANCELLED = new Status(TStatusCode.CANCELLED, "Cancelled"); @@ -43,6 +45,11 @@ public class Status { this.errorMsg = errorMsg; } + public Status(TStatusCode code, final String errorMsg, final Object...params) { + this.errorCode = code; + this.errorMsg = ParameterizedMessage.format(errorMsg, params); + } + public Status(final TStatus status) { this.errorCode = status.status_code; if (status.isSetErrorMsgs()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java index 5718e68c6b0..baf6d922e4e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java @@ -25,7 +25,7 @@ public interface CoordInterface { public RowBatch getNext() throws Exception; - public void cancel(Types.PPlanFragmentCancelReason cancelReason); + public void cancel(Types.PPlanFragmentCancelReason cancelReason, String errorMsg); // When call exec or get next data finished, should call this method to release // some resource. diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 9e22f853c73..dac9be06b9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1443,7 +1443,7 @@ public class Coordinator implements CoordInterface { // We use a very conservative cancel strategy. // 0. If backends has zero process epoch, do not cancel. Zero process epoch usually arises in cluster upgrading. // 1. If process epoch is same, do not cancel. Means backends does not restart or die. - public boolean shouldCancel(List<Backend> currentBackends) { + public Status shouldCancel(List<Backend> currentBackends) { Map<Long, Backend> curBeMap = Maps.newHashMap(); for (Backend be : currentBackends) { curBeMap.put(be.getId(), be); @@ -1456,21 +1456,24 @@ public class Coordinator implements CoordInterface { for (PipelineExecContext pipelineExecContext : pipelineExecContexts.values()) { Backend be = curBeMap.get(pipelineExecContext.backend.getId()); if (be == null || !be.isAlive()) { - LOG.warn("Backend {} not exists or dead, query {} should be cancelled", + Status errorStatus = new Status(TStatusCode.CANCELLED, + "Backend {} not exists or dead, query {} should be cancelled", pipelineExecContext.backend.toString(), DebugUtil.printId(queryId)); - return true; + LOG.warn(errorStatus.getErrorMsg()); + return errorStatus; } // Backend process epoch changed, indicates that this be restarts, query should be cancelled. // Check zero since during upgrading, older version oplog will not persistent be start time // so newer version follower will get zero epoch when replaying oplog or snapshot if (pipelineExecContext.beProcessEpoch != be.getProcessEpoch() && be.getProcessEpoch() != 0) { - LOG.warn("Backend process epoch changed, previous {} now {}, " - + "means this be has already restarted, should cancel this coordinator," - + " query id {}", - pipelineExecContext.beProcessEpoch, be.getProcessEpoch(), - DebugUtil.printId(queryId)); - return true; + Status errorStatus = new Status(TStatusCode.CANCELLED, + "Backend process epoch changed, previous {} now {}, " + + "means this be has already restarted, should cancel this coordinator," + + "query id {}", pipelineExecContext.beProcessEpoch, be.getProcessEpoch(), + DebugUtil.printId(queryId)); + LOG.warn(errorStatus.getErrorMsg()); + return errorStatus; } else if (be.getProcessEpoch() == 0) { LOG.warn("Backend {} has zero process epoch, maybe we are upgrading cluster?", be.toString()); @@ -1481,23 +1484,27 @@ public class Coordinator implements CoordInterface { for (BackendExecStates beExecState : beToExecStates.values()) { Backend be = curBeMap.get(beExecState.beId); if (be == null || !be.isAlive()) { - LOG.warn("Backend {} not exists or dead, query {} should be cancelled.", + Status errorStatus = new Status(TStatusCode.CANCELLED, + "Backend {} not exists or dead, query {} should be cancelled.", beExecState.beId, DebugUtil.printId(queryId)); - return true; + LOG.warn(errorStatus.getErrorMsg()); + return errorStatus; } if (beExecState.beProcessEpoch != be.getProcessEpoch() && be.getProcessEpoch() != 0) { - LOG.warn("Process epoch changed, previous {} now {}, means this be has already restarted, " - + "should cancel this coordinator, query id {}", + Status errorStatus = new Status(TStatusCode.CANCELLED, + "Process epoch changed, previous {} now {}, means this be has already restarted," + + "should cancel this coordinator, query id {}", beExecState.beProcessEpoch, be.getProcessEpoch(), DebugUtil.printId(queryId)); - return true; + LOG.warn(errorStatus.getErrorMsg()); + return errorStatus; } else if (be.getProcessEpoch() == 0) { LOG.warn("Backend {} has zero process epoch, maybe we are upgrading cluster?", be.toString()); } } } - return false; + return Status.OK; } finally { unlock(); } @@ -1507,14 +1514,14 @@ public class Coordinator implements CoordInterface { // fragment, // if any, as well as all plan fragments on remote nodes. public void cancel() { - cancel(Types.PPlanFragmentCancelReason.USER_CANCEL); + cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, "user cancel"); if (queueToken != null) { queueToken.signalForCancel(); } } @Override - public void cancel(Types.PPlanFragmentCancelReason cancelReason) { + public void cancel(Types.PPlanFragmentCancelReason cancelReason, String errorMsg) { for (ScanNode scanNode : scanNodes) { scanNode.stop(); } @@ -1527,7 +1534,7 @@ public class Coordinator implements CoordInterface { DebugUtil.printId(queryId), queryStatus.toString(), new Exception("cancel failed")); } else { - queryStatus.updateStatus(TStatusCode.CANCELLED, "cancelled"); + queryStatus.updateStatus(TStatusCode.CANCELLED, errorMsg); } LOG.warn("Cancel execution of query {}, this is a outside invoke, cancelReason {}", DebugUtil.printId(queryId), cancelReason.toString()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java index f31251dae17..b4812280896 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java @@ -169,7 +169,7 @@ public class PointQueryExec implements CoordInterface { } @Override - public void cancel(Types.PPlanFragmentCancelReason cancelReason) { + public void cancel(Types.PPlanFragmentCancelReason cancelReason, String errorMsg) { // Do nothing } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java index 572367fa33b..b0af8431471 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java @@ -163,7 +163,7 @@ public class PointQueryExecutor implements CoordInterface { } @Override - public void cancel(Types.PPlanFragmentCancelReason cancelReason) { + public void cancel(Types.PPlanFragmentCancelReason cancelReason, String errorMsg) { // Do nothing } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java index 500cad0f28d..cf1b4fec286 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java @@ -17,6 +17,7 @@ package org.apache.doris.qe; +import org.apache.doris.common.Status; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.proto.Types; import org.apache.doris.system.Backend; @@ -36,10 +37,11 @@ public class QueryCancelWorker extends MasterDaemon { List<Backend> allBackends = systemInfoService.getAllBackends(); for (Coordinator co : QeProcessorImpl.INSTANCE.getAllCoordinators()) { - if (co.shouldCancel(allBackends)) { + Status status = co.shouldCancel(allBackends); + if (!status.ok()) { // TODO(zhiqiang): We need more clear cancel message, so that user can figure out what happened // by searching log. - co.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); + co.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, status.getErrorMsg()); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 30dbedbf3de..dbab3ab3957 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -103,6 +103,7 @@ import org.apache.doris.common.FormatOptions; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.NereidsException; import org.apache.doris.common.NereidsSqlCacheManager; +import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.Version; import org.apache.doris.common.profile.Profile; @@ -1453,7 +1454,7 @@ public class StmtExecutor { public void cancel(Types.PPlanFragmentCancelReason cancelReason) { Coordinator coordRef = coord; if (coordRef != null) { - coordRef.cancel(cancelReason); + coordRef.cancel(cancelReason, ""); } if (mysqlLoadId != null) { Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId); @@ -1874,8 +1875,11 @@ public class StmtExecutor { // notify all be cancel running fragment // in some case may block all fragment handle threads // details see issue https://github.com/apache/doris/issues/16203 - LOG.warn("cancel fragment query_id:{} cause {}", DebugUtil.printId(context.queryId()), e.getMessage()); - coordBase.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); + Status internalErrorSt = new Status(TStatusCode.INTERNAL_ERROR, + "cancel fragment query_id:{} cause {}", + DebugUtil.printId(context.queryId()), e.getMessage()); + LOG.warn(internalErrorSt.getErrorMsg()); + coordBase.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, internalErrorSt.getErrorMsg()); throw e; } finally { coordBase.close(); @@ -2257,7 +2261,7 @@ public class StmtExecutor { } boolean notTimeout = coord.join(execTimeout); if (!coord.isDone()) { - coord.cancel(Types.PPlanFragmentCancelReason.TIMEOUT); + coord.cancel(Types.PPlanFragmentCancelReason.TIMEOUT, "timeout"); if (notTimeout) { errMsg = coord.getExecStatus().getErrorMsg(); ErrorReport.reportDdlException("There exists unhealthy backend. " --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org