This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 3e096dda914 [Cherry-pick]Support kill query in be (#35794)
3e096dda914 is described below

commit 3e096dda914d93aaee66f22a47b98588b89c9ae7
Author: wangbo <wan...@apache.org>
AuthorDate: Mon Jun 3 15:39:30 2024 +0800

    [Cherry-pick]Support kill query in be (#35794)
    
    ## Proposed changes
     pick #35602
    
    ```
    mysql [information_schema]>kill query '2047df937c66704d-3ac4cfaf17f65eae';
    Query OK, 0 rows affected (0.01 sec)
    
    
    I20240603 15:21:50.373333 3355508 internal_service.cpp:592] Cancel query 
2047df937c66704d-3ac4cfaf17f65eae, reason: USER_CANCEL
    
    ```
---
 .../org/apache/doris/common/util/DebugUtil.java    | 24 ++++++++++++
 .../java/org/apache/doris/qe/StmtExecutor.java     | 30 ++++++++++++---
 .../org/apache/doris/rpc/BackendServiceProxy.java  | 18 +++++++++
 .../apache/doris/common/util/DebugUtilTest.java    | 45 ++++++++++++++++++++++
 4 files changed, 112 insertions(+), 5 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java
index 75fb331347e..2a52420a96d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java
@@ -21,6 +21,8 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.proto.Types;
 import org.apache.doris.thrift.TUniqueId;
 
+import com.google.common.base.Strings;
+
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.text.DecimalFormat;
@@ -135,6 +137,28 @@ public class DebugUtil {
         return builder.toString();
     }
 
+    // id is a String generated by DebugUtil.printId(TUniqueId)
+    public static TUniqueId parseTUniqueIdFromString(String id) {
+        if (Strings.isNullOrEmpty(id)) {
+            throw new NumberFormatException("invalid query id");
+        }
+
+        String[] parts = id.split("-");
+        if (parts.length != 2) {
+            throw new NumberFormatException("invalid query id");
+        }
+
+        TUniqueId uniqueId = new TUniqueId();
+        try {
+            uniqueId.setHi(Long.parseUnsignedLong(parts[0], 16));
+            uniqueId.setLo(Long.parseUnsignedLong(parts[1], 16));
+        } catch (NumberFormatException e) {
+            throw new NumberFormatException("invalid query id:" + 
e.getMessage());
+        }
+
+        return uniqueId;
+    }
+
     public static String printId(final UUID id) {
         TUniqueId tUniqueId = new TUniqueId(id.getMostSignificantBits(), 
id.getLeastSignificantBits());
         StringBuilder builder = new StringBuilder();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index c90ca0b5ff7..ba6d200847c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -220,6 +220,7 @@ import java.io.IOException;
 import java.io.StringReader;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -1419,16 +1420,15 @@ public class StmtExecutor {
     }
 
     // Handle kill statement.
-    private void handleKill() throws DdlException {
+    private void handleKill() throws UserException {
         KillStmt killStmt = (KillStmt) parsedStmt;
         ConnectContext killCtx = null;
         int id = killStmt.getConnectionId();
         String queryId = killStmt.getQueryId();
         if (id == -1) {
+            // when killCtx == null, this means the query not in FE,
+            // then we just send kill signal to BE
             killCtx = 
context.getConnectScheduler().getContextWithQueryId(queryId);
-            if (killCtx == null) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_QUERY, 
queryId);
-            }
         } else {
             killCtx = context.getConnectScheduler().getContext(id);
             if (killCtx == null) {
@@ -1436,7 +1436,27 @@ public class StmtExecutor {
             }
         }
 
-        if (context == killCtx) {
+        if (killCtx == null) {
+            TUniqueId tQueryId = null;
+            try {
+                tQueryId = DebugUtil.parseTUniqueIdFromString(queryId);
+            } catch (NumberFormatException e) {
+                throw new UserException(e.getMessage());
+            }
+            LOG.info("kill query {}", queryId);
+            Collection<Backend> nodesToPublish = 
Env.getCurrentSystemInfo().getIdToBackend().values();
+            for (Backend be : nodesToPublish) {
+                if (be.isAlive()) {
+                    try {
+                        BackendServiceProxy.getInstance()
+                                
.cancelPipelineXPlanFragmentAsync(be.getBrpcAddress(), tQueryId,
+                                        
Types.PPlanFragmentCancelReason.USER_CANCEL);
+                    } catch (Throwable t) {
+                        LOG.info("send kill query {} rpc to be {} failed", 
queryId, be);
+                    }
+                }
+            }
+        } else if (context == killCtx) {
             // Suicide
             context.setKilled();
         } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 0488fec2062..0fce50c327b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -276,6 +276,24 @@ public class BackendServiceProxy {
         }
     }
 
+    public ListenableFuture<InternalService.PCancelPlanFragmentResult> 
cancelPipelineXPlanFragmentAsync(
+            TNetworkAddress address, TUniqueId queryId,
+            Types.PPlanFragmentCancelReason cancelReason) throws RpcException {
+        final InternalService.PCancelPlanFragmentRequest pRequest = 
InternalService.PCancelPlanFragmentRequest
+                .newBuilder()
+                
.setFinstId(Types.PUniqueId.newBuilder().setHi(0).setLo(0).build())
+                .setCancelReason(cancelReason)
+                
.setQueryId(Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build()).build();
+        try {
+            final BackendServiceClient client = getProxy(address);
+            return client.cancelPlanFragmentAsync(pRequest);
+        } catch (Throwable e) {
+            LOG.warn("Cancel plan fragment catch a exception, address={}:{}", 
address.getHostname(), address.getPort(),
+                    e);
+            throw new RpcException(address.hostname, e.getMessage());
+        }
+    }
+
     public Future<InternalService.PFetchDataResult> fetchDataAsync(
             TNetworkAddress address, InternalService.PFetchDataRequest 
request) throws RpcException {
         try {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugUtilTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugUtilTest.java
index bebe65cd2e0..54a3f4c388b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugUtilTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugUtilTest.java
@@ -19,10 +19,13 @@ package org.apache.doris.common.util;
 
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.thrift.TUniqueId;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.UUID;
+
 public class DebugUtilTest {
     @Test
     public void testGetUint() {
@@ -97,4 +100,46 @@ public class DebugUtilTest {
                 .contains("org.apache.doris.common.DdlException: errCode = 2, 
detailMessage = only one exception"));
         Assert.assertEquals("unknown", Util.getRootCauseStack(null));
     }
+
+    @Test
+    public void testParseIdFromString() {
+        // test null
+        TUniqueId nullTUniqueId = null;
+        try {
+            nullTUniqueId = DebugUtil.parseTUniqueIdFromString(null);
+        } catch (NumberFormatException e) {
+            Assert.assertTrue("invalid query id".equals(e.getMessage()));
+        }
+        Assert.assertTrue(nullTUniqueId == null);
+
+
+        try {
+            nullTUniqueId = DebugUtil.parseTUniqueIdFromString("");
+        } catch (NumberFormatException e) {
+            Assert.assertTrue("invalid query id".equals(e.getMessage()));
+        }
+        Assert.assertTrue(nullTUniqueId == null);
+
+        Assert.assertEquals(new TUniqueId(), 
DebugUtil.parseTUniqueIdFromString("0-0"));
+
+        try {
+            nullTUniqueId = 
DebugUtil.parseTUniqueIdFromString("INVALID-STRING");
+        } catch (NumberFormatException e) {
+            Assert.assertTrue(e.getMessage().contains("For input string"));
+        }
+        Assert.assertTrue(nullTUniqueId == null);
+
+        for (int i = 0; i < 100; i++) {
+            UUID uuid = UUID.randomUUID();
+            TUniqueId originTQueryId = new 
TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
+            String originStrQueryId = DebugUtil.printId(originTQueryId);
+
+            TUniqueId convertedTQueryId = 
DebugUtil.parseTUniqueIdFromString(originStrQueryId);
+            String convertedStrQueryId = DebugUtil.printId(convertedTQueryId);
+
+            Assert.assertTrue(originTQueryId.hi == convertedTQueryId.hi);
+            Assert.assertTrue(originTQueryId.lo == convertedTQueryId.lo);
+            Assert.assertTrue(originStrQueryId.equals(convertedStrQueryId));
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to