This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fbcf3c8ba [api-change](http) change kill query http api by using 
query id (#12120)
7fbcf3c8ba is described below

commit 7fbcf3c8ba385bac3cb18d827269bf63ea273e2b
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Mon Aug 29 09:51:51 2022 +0800

    [api-change](http) change kill query http api by using query id (#12120)
    
    Now user can cancel query id by http by following steps:
    
    Get query id by trace id
    cancel query by query id
    The modified api has not been released yet.
---
 .../fe/manager/query-profile-action.md             | 16 +++--
 .../fe/manager/query-profile-action.md             | 11 ++--
 .../httpv2/rest/manager/QueryProfileAction.java    | 57 +++++++++++------
 .../java/org/apache/doris/qe/ConnectContext.java   | 13 ++++
 .../java/org/apache/doris/qe/ConnectScheduler.java | 24 +++++++
 .../main/java/org/apache/doris/qe/VariableMgr.java | 10 +--
 .../org/apache/doris/qe/VariableVarCallbackI.java  | 24 +++++++
 .../org/apache/doris/qe/VariableVarCallbacks.java  | 73 ++++++++++++++++++++++
 .../java/org/apache/doris/qe/VariableMgrTest.java  |  8 +++
 9 files changed, 200 insertions(+), 36 deletions(-)

diff --git 
a/docs/en/docs/admin-manual/http-actions/fe/manager/query-profile-action.md 
b/docs/en/docs/admin-manual/http-actions/fe/manager/query-profile-action.md
index 50d59191a8..0b86949483 100644
--- a/docs/en/docs/admin-manual/http-actions/fe/manager/query-profile-action.md
+++ b/docs/en/docs/admin-manual/http-actions/fe/manager/query-profile-action.md
@@ -34,9 +34,15 @@ under the License.
 
 `GET /rest/v2/manager/query/profile/text/{query_id}`
 
+`GET /rest/v2/manager/query/profile/graph/{query_id}`
+
+`GET /rest/v2/manager/query/profile/json/{query_id}`
+
 `GET /rest/v2/manager/query/profile/fragments/{query_id}`
 
-`GET /rest/v2/manager/query/profile/graph/{query_id}`
+`GET /rest/v2/manager/query/current_queries`
+
+`GET /rest/v2/manager/query/kill/{query_id}`
 
 ## Get the query information
 
@@ -342,7 +348,7 @@ Same as `show proc "/current_query_stmts"`, return current 
running queries.
 
 ## Cancel query
 
-`POST /rest/v2/manager/query/kill/{connection_id}`
+`POST /rest/v2/manager/query/kill/{query_id}`
 
 ### Description
 
@@ -350,9 +356,9 @@ Cancel query of specified connection.
     
 ### Path parameters
 
-* `{connection_id}`
+* `{query_id}`
 
-    connection id
+    query id. You can get query id by `trance_id` api.
 
 ### Query parameters
 
@@ -362,7 +368,7 @@ Cancel query of specified connection.
 {
     "msg": "success",
     "code": 0,
-    "data": "",
+    "data": null,
     "count": 0
 }
 ```
diff --git 
a/docs/zh-CN/docs/admin-manual/http-actions/fe/manager/query-profile-action.md 
b/docs/zh-CN/docs/admin-manual/http-actions/fe/manager/query-profile-action.md
index 1917237807..5283107279 100644
--- 
a/docs/zh-CN/docs/admin-manual/http-actions/fe/manager/query-profile-action.md
+++ 
b/docs/zh-CN/docs/admin-manual/http-actions/fe/manager/query-profile-action.md
@@ -42,8 +42,7 @@ under the License.
 
 `GET /rest/v2/manager/query/current_queries`
 
-`GET /rest/v2/manager/query/kill/{connection_id}`
-
+`GET /rest/v2/manager/query/kill/{query_id}`
 
 ## 获取查询信息
 
@@ -349,7 +348,7 @@ GET /rest/v2/manager/query/query_info
 
 ## 取消query
 
-`POST /rest/v2/manager/query/kill/{connection_id}`
+`POST /rest/v2/manager/query/kill/{query_id}`
 
 ### Description
 
@@ -357,9 +356,9 @@ GET /rest/v2/manager/query/query_info
     
 ### Path parameters
 
-* `{connection_id}`
+* `{query_id}`
 
-    connection id
+    query id. 你可以通过 trace_id 接口,获取 query id。
 
 ### Query parameters
 
@@ -369,7 +368,7 @@ GET /rest/v2/manager/query/query_info
 {
     "msg": "success",
     "code": 0,
-    "data": "",
+    "data": null,
     "count": 0
 }
 ```
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
index 4e53dbbcd5..4d0f11f893 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
@@ -45,6 +45,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.jetbrains.annotations.NotNull;
 import org.json.simple.JSONObject;
+import org.springframework.http.HttpMethod;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestMapping;
@@ -70,7 +71,7 @@ import javax.servlet.http.HttpServletResponse;
  * 4. /trace_id/{trace_id}
  * 5. /profile/fragments/{query_id}
  * 6. /current_queries
- * 7. /kill/{connection_id}
+ * 7. /kill/{query_id}
  */
 @RestController
 @RequestMapping("/rest/v2/manager/query")
@@ -100,7 +101,8 @@ public class QueryProfileAction extends RestBaseController {
             
.add(NODE).add(USER).add(DEFAULT_DB).add(SQL_STATEMENT).add(QUERY_TYPE).add(START_TIME).add(END_TIME)
             .add(TOTAL).add(QUERY_STATE).build();
 
-    private List<String> requestAllFe(String httpPath, Map<String, String> 
arguments, String authorization) {
+    private List<String> requestAllFe(String httpPath, Map<String, String> 
arguments, String authorization,
+            HttpMethod method) {
         List<Pair<String, Integer>> frontends = HttpUtils.getFeList();
         ImmutableMap<String, String> header = ImmutableMap.<String, 
String>builder()
                 .put(NodeAction.AUTHORIZATION, authorization).build();
@@ -108,7 +110,12 @@ public class QueryProfileAction extends RestBaseController 
{
         for (Pair<String, Integer> ipPort : frontends) {
             String url = HttpUtils.concatUrl(ipPort, httpPath, arguments);
             try {
-                String data = HttpUtils.parseResponse(HttpUtils.doGet(url, 
header));
+                String data = null;
+                if (method == HttpMethod.GET) {
+                    data = HttpUtils.parseResponse(HttpUtils.doGet(url, 
header));
+                } else if (method == HttpMethod.POST) {
+                    data = HttpUtils.parseResponse(HttpUtils.doPost(url, 
header, null));
+                }
                 if (!Strings.isNullOrEmpty(data) && !data.equals("{}")) {
                     dataList.add(data);
                 }
@@ -145,12 +152,12 @@ public class QueryProfileAction extends 
RestBaseController {
             arguments.put(SEARCH_PARA, search);
             arguments.put(IS_ALL_NODE_PARA, "false");
 
-            List<String> dataList = requestAllFe(httpPath, arguments, 
request.getHeader(NodeAction.AUTHORIZATION));
+            List<String> dataList = requestAllFe(httpPath, arguments, 
request.getHeader(NodeAction.AUTHORIZATION),
+                    HttpMethod.GET);
             for (String data : dataList) {
                 try {
-                    NodeAction.NodeInfo nodeInfo = 
GsonUtils.GSON.fromJson(data,
-                            new TypeToken<NodeAction.NodeInfo>() {
-                            }.getType());
+                    NodeAction.NodeInfo nodeInfo = 
GsonUtils.GSON.fromJson(data, new TypeToken<NodeAction.NodeInfo>() {
+                    }.getType());
                     queries.addAll(nodeInfo.getRows());
                 } catch (Exception e) {
                     LOG.warn("parse query info error: {}", data, e);
@@ -200,7 +207,8 @@ public class QueryProfileAction extends RestBaseController {
             String httpPath = "/rest/v2/manager/query/sql/" + queryId;
             ImmutableMap<String, String> arguments = ImmutableMap.<String, 
String>builder()
                     .put(IS_ALL_NODE_PARA, "false").build();
-            List<String> dataList = requestAllFe(httpPath, arguments, 
request.getHeader(NodeAction.AUTHORIZATION));
+            List<String> dataList = requestAllFe(httpPath, arguments, 
request.getHeader(NodeAction.AUTHORIZATION),
+                    HttpMethod.GET);
             if (!dataList.isEmpty()) {
                 try {
                     String sql = 
JsonParser.parseString(dataList.get(0)).getAsJsonObject().get("sql").getAsString();
@@ -292,7 +300,8 @@ public class QueryProfileAction extends RestBaseController {
                 }
             }
         } else {
-            String queryId = 
ProfileManager.getInstance().getQueryIdByTraceId(traceId);
+            ExecuteEnv env = ExecuteEnv.getInstance();
+            String queryId = env.getScheduler().getQueryIdByTraceId(traceId);
             if (Strings.isNullOrEmpty(queryId)) {
                 return ResponseEntityBuilder.badRequest("Not found");
             }
@@ -415,7 +424,8 @@ public class QueryProfileAction extends RestBaseController {
         if (!Strings.isNullOrEmpty(instanceId)) {
             builder.put(INSTANCE_ID, instanceId);
         }
-        List<String> dataList = requestAllFe(httpPath, builder.build(), 
request.getHeader(NodeAction.AUTHORIZATION));
+        List<String> dataList = requestAllFe(httpPath, builder.build(), 
request.getHeader(NodeAction.AUTHORIZATION),
+                HttpMethod.GET);
         Map<String, String> result = Maps.newHashMap();
         if (!dataList.isEmpty()) {
             try {
@@ -448,7 +458,8 @@ public class QueryProfileAction extends RestBaseController {
             Map<String, String> arguments = Maps.newHashMap();
             arguments.put(IS_ALL_NODE_PARA, "false");
             List<List<String>> queries = Lists.newArrayList();
-            List<String> dataList = requestAllFe(httpPath, arguments, 
request.getHeader(NodeAction.AUTHORIZATION));
+            List<String> dataList = requestAllFe(httpPath, arguments, 
request.getHeader(NodeAction.AUTHORIZATION),
+                    HttpMethod.GET);
             for (String data : dataList) {
                 try {
                     NodeAction.NodeInfo nodeInfo = 
GsonUtils.GSON.fromJson(data, new TypeToken<NodeAction.NodeInfo>() {
@@ -481,25 +492,31 @@ public class QueryProfileAction extends 
RestBaseController {
     }
 
     /**
-     * kill queries with specified connection id
+     * kill queries with specific query id
      *
      * @param request
      * @param response
-     * @param connectionId
+     * @param queryId
      * @return
      */
-    @RequestMapping(path = "/kill/{connection_id}", method = 
RequestMethod.POST)
+    @RequestMapping(path = "/kill/{query_id}", method = RequestMethod.POST)
     public Object killQuery(HttpServletRequest request, HttpServletResponse 
response,
-            @PathVariable("connection_id") int connectionId) {
+            @PathVariable("query_id") String queryId,
+            @RequestParam(value = IS_ALL_NODE_PARA, required = false, 
defaultValue = "true") boolean isAllNode) {
         executeCheckPassword(request, response);
         checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), 
PrivPredicate.ADMIN);
 
-        ExecuteEnv env = ExecuteEnv.getInstance();
-        ConnectContext ctx = env.getScheduler().getContext(connectionId);
-        if (ctx == null) {
-            return ResponseEntityBuilder.notFound("connection not found");
+        if (isAllNode) {
+            // Get current queries from all FE
+            String httpPath = "/rest/v2/manager/query/kill/" + queryId;
+            Map<String, String> arguments = Maps.newHashMap();
+            arguments.put(IS_ALL_NODE_PARA, "false");
+            requestAllFe(httpPath, arguments, 
request.getHeader(NodeAction.AUTHORIZATION), HttpMethod.POST);
+            return ResponseEntityBuilder.ok();
         }
-        ctx.cancelQuery();
+
+        ExecuteEnv env = ExecuteEnv.getInstance();
+        env.getScheduler().cancelQuery(queryId);
         return ResponseEntityBuilder.ok();
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 9757f2f556..0ad83d5176 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -38,6 +38,7 @@ import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.TransactionEntry;
 import org.apache.doris.transaction.TransactionStatus;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import io.opentelemetry.api.trace.Tracer;
@@ -62,6 +63,7 @@ public class ConnectContext {
     protected volatile long forwardedStmtId;
 
     protected volatile TUniqueId queryId;
+    protected volatile String traceId;
     // id for this connection
     protected volatile int connectionId;
     // mysql net
@@ -469,6 +471,17 @@ public class ConnectContext {
 
     public void setQueryId(TUniqueId queryId) {
         this.queryId = queryId;
+        if (connectScheduler != null && !Strings.isNullOrEmpty(traceId)) {
+            connectScheduler.putTraceId2QueryId(traceId, queryId);
+        }
+    }
+
+    public void setTraceId(String traceId) {
+        this.traceId = traceId;
+    }
+
+    public String traceId() {
+        return traceId;
     }
 
     public TUniqueId queryId() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index 534206028c..66702d438b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -21,9 +21,11 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.mysql.MysqlProto;
 import org.apache.doris.mysql.nio.NConnectContext;
 import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -51,6 +53,9 @@ public class ConnectScheduler {
     private final ExecutorService executor = 
ThreadPoolManager.newDaemonCacheThreadPool(
             Config.max_connection_scheduler_threads_num, 
"connect-scheduler-pool", true);
 
+    // valid trace id -> query id
+    private final Map<String, TUniqueId> traceId2QueryId = 
Maps.newConcurrentMap();
+
     // Use a thread to check whether connection is timeout. Because
     // 1. If use a scheduler, the task maybe a huge number when query is messy.
     //    Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks 
in scheduler.
@@ -125,6 +130,16 @@ public class ConnectScheduler {
         return connectionMap.get(connectionId);
     }
 
+    public void cancelQuery(String queryId) {
+        for (ConnectContext ctx : connectionMap.values()) {
+            TUniqueId qid = ctx.queryId();
+            if (qid != null && DebugUtil.printId(qid).equals(queryId)) {
+                ctx.cancelQuery();
+                break;
+            }
+        }
+    }
+
     public int getConnectionNum() {
         return numberConnection.get();
     }
@@ -143,6 +158,15 @@ public class ConnectScheduler {
         return infos;
     }
 
+    public void putTraceId2QueryId(String traceId, TUniqueId queryId) {
+        traceId2QueryId.put(traceId, queryId);
+    }
+
+    public String getQueryIdByTraceId(String traceId) {
+        TUniqueId queryId = traceId2QueryId.get(traceId);
+        return queryId == null ? "" : DebugUtil.printId(queryId);
+    }
+
     private class LoopHandler implements Runnable {
         ConnectContext context;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
index d0c77fae27..7efbf0c7a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
@@ -186,6 +186,10 @@ public class VariableMgr {
             ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, 
attr.name(), value);
         }
 
+        if (VariableVarCallbacks.hasCallback(attr.name())) {
+            VariableVarCallbacks.call(attr.name(), value);
+        }
+
         return true;
     }
 
@@ -515,9 +519,6 @@ public class VariableMgr {
 
         // Set to true if the variables need to be forwarded along with 
forward statement.
         boolean needForward() default false;
-
-        // Set to true if the variables need to be set in TQueryOptions
-        boolean isQueryOption() default false;
     }
 
     private static class VarContext {
@@ -587,8 +588,7 @@ public class VariableMgr {
             }
 
             field.setAccessible(true);
-            builder.put(attr.name(),
-                    new VarContext(field, null, GLOBAL | attr.flag(), 
getValue(null, field)));
+            builder.put(attr.name(), new VarContext(field, null, GLOBAL | 
attr.flag(), getValue(null, field)));
         }
         return builder;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbackI.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbackI.java
new file mode 100644
index 0000000000..4aefa4fd9d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbackI.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.DdlException;
+
+public interface VariableVarCallbackI {
+    public void call(String value) throws DdlException;
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbacks.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbacks.java
new file mode 100644
index 0000000000..f1dc5e69c3
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbacks.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+/**
+ * Callback after setting the session variable
+ */
+public class VariableVarCallbacks {
+
+    public static final Map<String, VariableVarCallbackI> callbacks = 
Maps.newHashMap();
+
+    static {
+        SessionContextCallback sessionContextCallback = new 
SessionContextCallback();
+        callbacks.put(SessionVariable.SESSION_CONTEXT, sessionContextCallback);
+    }
+
+    public static Boolean hasCallback(String varName) {
+        return callbacks.containsKey(varName);
+    }
+
+    public static void call(String varName, String value) throws DdlException {
+        if (hasCallback(varName)) {
+            callbacks.get(varName).call(value);
+        }
+    }
+
+    // Converter to convert runtime filter type variable
+    public static class SessionContextCallback implements VariableVarCallbackI 
{
+        public void call(String value) throws DdlException {
+            if (Strings.isNullOrEmpty(value)) {
+                return;
+            }
+            /**
+             * The sessionContext is as follows:
+             * "k1:v1;k2:v2;..."
+             * Here we want to get value with key named "trace_id".
+             */
+            String[] parts = value.split(";");
+            for (String part : parts) {
+                String[] innerParts = part.split(":");
+                if (innerParts.length != 2) {
+                    continue;
+                }
+                if (innerParts[0].equals("trace_id")) {
+                    ConnectContext.get().setTraceId(innerParts[1]);
+                    break;
+                }
+            }
+        }
+    }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
index 8145ac1b77..1067a0c1a3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
@@ -233,4 +233,12 @@ public class VariableMgrTest {
         VariableMgr.setVar(null, setVar);
         Assert.fail("No exception throws.");
     }
+
+    @Test
+    public void testVariableCallback() throws Exception {
+        SetStmt stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set 
session_context='trace_id:123'", ctx);
+        SetExecutor executor = new SetExecutor(ctx, stmt);
+        executor.execute();
+        Assert.assertEquals("123", ctx.traceId());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to