This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 3e096dda914 [Cherry-pick]Support kill query in be (#35794) 3e096dda914 is described below commit 3e096dda914d93aaee66f22a47b98588b89c9ae7 Author: wangbo <wan...@apache.org> AuthorDate: Mon Jun 3 15:39:30 2024 +0800 [Cherry-pick]Support kill query in be (#35794) ## Proposed changes pick #35602 ``` mysql [information_schema]>kill query '2047df937c66704d-3ac4cfaf17f65eae'; Query OK, 0 rows affected (0.01 sec) I20240603 15:21:50.373333 3355508 internal_service.cpp:592] Cancel query 2047df937c66704d-3ac4cfaf17f65eae, reason: USER_CANCEL ``` --- .../org/apache/doris/common/util/DebugUtil.java | 24 ++++++++++++ .../java/org/apache/doris/qe/StmtExecutor.java | 30 ++++++++++++--- .../org/apache/doris/rpc/BackendServiceProxy.java | 18 +++++++++ .../apache/doris/common/util/DebugUtilTest.java | 45 ++++++++++++++++++++++ 4 files changed, 112 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java index 75fb331347e..2a52420a96d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java @@ -21,6 +21,8 @@ import org.apache.doris.common.Pair; import org.apache.doris.proto.Types; import org.apache.doris.thrift.TUniqueId; +import com.google.common.base.Strings; + import java.io.PrintWriter; import java.io.StringWriter; import java.text.DecimalFormat; @@ -135,6 +137,28 @@ public class DebugUtil { return builder.toString(); } + // id is a String generated by DebugUtil.printId(TUniqueId) + public static TUniqueId parseTUniqueIdFromString(String id) { + if (Strings.isNullOrEmpty(id)) { + throw new NumberFormatException("invalid query id"); + } + + String[] parts = id.split("-"); + if (parts.length != 2) { + throw new NumberFormatException("invalid query id"); + } + + TUniqueId uniqueId = new TUniqueId(); + try { + uniqueId.setHi(Long.parseUnsignedLong(parts[0], 16)); + uniqueId.setLo(Long.parseUnsignedLong(parts[1], 16)); + } catch (NumberFormatException e) { + throw new NumberFormatException("invalid query id:" + e.getMessage()); + } + + return uniqueId; + } + public static String printId(final UUID id) { TUniqueId tUniqueId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits()); StringBuilder builder = new StringBuilder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index c90ca0b5ff7..ba6d200847c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -220,6 +220,7 @@ import java.io.IOException; import java.io.StringReader; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -1419,16 +1420,15 @@ public class StmtExecutor { } // Handle kill statement. - private void handleKill() throws DdlException { + private void handleKill() throws UserException { KillStmt killStmt = (KillStmt) parsedStmt; ConnectContext killCtx = null; int id = killStmt.getConnectionId(); String queryId = killStmt.getQueryId(); if (id == -1) { + // when killCtx == null, this means the query not in FE, + // then we just send kill signal to BE killCtx = context.getConnectScheduler().getContextWithQueryId(queryId); - if (killCtx == null) { - ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_QUERY, queryId); - } } else { killCtx = context.getConnectScheduler().getContext(id); if (killCtx == null) { @@ -1436,7 +1436,27 @@ public class StmtExecutor { } } - if (context == killCtx) { + if (killCtx == null) { + TUniqueId tQueryId = null; + try { + tQueryId = DebugUtil.parseTUniqueIdFromString(queryId); + } catch (NumberFormatException e) { + throw new UserException(e.getMessage()); + } + LOG.info("kill query {}", queryId); + Collection<Backend> nodesToPublish = Env.getCurrentSystemInfo().getIdToBackend().values(); + for (Backend be : nodesToPublish) { + if (be.isAlive()) { + try { + BackendServiceProxy.getInstance() + .cancelPipelineXPlanFragmentAsync(be.getBrpcAddress(), tQueryId, + Types.PPlanFragmentCancelReason.USER_CANCEL); + } catch (Throwable t) { + LOG.info("send kill query {} rpc to be {} failed", queryId, be); + } + } + } + } else if (context == killCtx) { // Suicide context.setKilled(); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 0488fec2062..0fce50c327b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -276,6 +276,24 @@ public class BackendServiceProxy { } } + public ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelPipelineXPlanFragmentAsync( + TNetworkAddress address, TUniqueId queryId, + Types.PPlanFragmentCancelReason cancelReason) throws RpcException { + final InternalService.PCancelPlanFragmentRequest pRequest = InternalService.PCancelPlanFragmentRequest + .newBuilder() + .setFinstId(Types.PUniqueId.newBuilder().setHi(0).setLo(0).build()) + .setCancelReason(cancelReason) + .setQueryId(Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build()).build(); + try { + final BackendServiceClient client = getProxy(address); + return client.cancelPlanFragmentAsync(pRequest); + } catch (Throwable e) { + LOG.warn("Cancel plan fragment catch a exception, address={}:{}", address.getHostname(), address.getPort(), + e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + public Future<InternalService.PFetchDataResult> fetchDataAsync( TNetworkAddress address, InternalService.PFetchDataRequest request) throws RpcException { try { diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugUtilTest.java index bebe65cd2e0..54a3f4c388b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugUtilTest.java @@ -19,10 +19,13 @@ package org.apache.doris.common.util; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; +import org.apache.doris.thrift.TUniqueId; import org.junit.Assert; import org.junit.Test; +import java.util.UUID; + public class DebugUtilTest { @Test public void testGetUint() { @@ -97,4 +100,46 @@ public class DebugUtilTest { .contains("org.apache.doris.common.DdlException: errCode = 2, detailMessage = only one exception")); Assert.assertEquals("unknown", Util.getRootCauseStack(null)); } + + @Test + public void testParseIdFromString() { + // test null + TUniqueId nullTUniqueId = null; + try { + nullTUniqueId = DebugUtil.parseTUniqueIdFromString(null); + } catch (NumberFormatException e) { + Assert.assertTrue("invalid query id".equals(e.getMessage())); + } + Assert.assertTrue(nullTUniqueId == null); + + + try { + nullTUniqueId = DebugUtil.parseTUniqueIdFromString(""); + } catch (NumberFormatException e) { + Assert.assertTrue("invalid query id".equals(e.getMessage())); + } + Assert.assertTrue(nullTUniqueId == null); + + Assert.assertEquals(new TUniqueId(), DebugUtil.parseTUniqueIdFromString("0-0")); + + try { + nullTUniqueId = DebugUtil.parseTUniqueIdFromString("INVALID-STRING"); + } catch (NumberFormatException e) { + Assert.assertTrue(e.getMessage().contains("For input string")); + } + Assert.assertTrue(nullTUniqueId == null); + + for (int i = 0; i < 100; i++) { + UUID uuid = UUID.randomUUID(); + TUniqueId originTQueryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + String originStrQueryId = DebugUtil.printId(originTQueryId); + + TUniqueId convertedTQueryId = DebugUtil.parseTUniqueIdFromString(originStrQueryId); + String convertedStrQueryId = DebugUtil.printId(convertedTQueryId); + + Assert.assertTrue(originTQueryId.hi == convertedTQueryId.hi); + Assert.assertTrue(originTQueryId.lo == convertedTQueryId.lo); + Assert.assertTrue(originStrQueryId.equals(convertedStrQueryId)); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org