This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7fbcf3c8ba [api-change](http) change kill query http api by using query id (#12120) 7fbcf3c8ba is described below commit 7fbcf3c8ba385bac3cb18d827269bf63ea273e2b Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Mon Aug 29 09:51:51 2022 +0800 [api-change](http) change kill query http api by using query id (#12120) Now user can cancel query id by http by following steps: Get query id by trace id cancel query by query id The modified api has not been released yet. --- .../fe/manager/query-profile-action.md | 16 +++-- .../fe/manager/query-profile-action.md | 11 ++-- .../httpv2/rest/manager/QueryProfileAction.java | 57 +++++++++++------ .../java/org/apache/doris/qe/ConnectContext.java | 13 ++++ .../java/org/apache/doris/qe/ConnectScheduler.java | 24 +++++++ .../main/java/org/apache/doris/qe/VariableMgr.java | 10 +-- .../org/apache/doris/qe/VariableVarCallbackI.java | 24 +++++++ .../org/apache/doris/qe/VariableVarCallbacks.java | 73 ++++++++++++++++++++++ .../java/org/apache/doris/qe/VariableMgrTest.java | 8 +++ 9 files changed, 200 insertions(+), 36 deletions(-) diff --git a/docs/en/docs/admin-manual/http-actions/fe/manager/query-profile-action.md b/docs/en/docs/admin-manual/http-actions/fe/manager/query-profile-action.md index 50d59191a8..0b86949483 100644 --- a/docs/en/docs/admin-manual/http-actions/fe/manager/query-profile-action.md +++ b/docs/en/docs/admin-manual/http-actions/fe/manager/query-profile-action.md @@ -34,9 +34,15 @@ under the License. `GET /rest/v2/manager/query/profile/text/{query_id}` +`GET /rest/v2/manager/query/profile/graph/{query_id}` + +`GET /rest/v2/manager/query/profile/json/{query_id}` + `GET /rest/v2/manager/query/profile/fragments/{query_id}` -`GET /rest/v2/manager/query/profile/graph/{query_id}` +`GET /rest/v2/manager/query/current_queries` + +`GET /rest/v2/manager/query/kill/{query_id}` ## Get the query information @@ -342,7 +348,7 @@ Same as `show proc "/current_query_stmts"`, return current running queries. ## Cancel query -`POST /rest/v2/manager/query/kill/{connection_id}` +`POST /rest/v2/manager/query/kill/{query_id}` ### Description @@ -350,9 +356,9 @@ Cancel query of specified connection. ### Path parameters -* `{connection_id}` +* `{query_id}` - connection id + query id. You can get query id by `trance_id` api. ### Query parameters @@ -362,7 +368,7 @@ Cancel query of specified connection. { "msg": "success", "code": 0, - "data": "", + "data": null, "count": 0 } ``` diff --git a/docs/zh-CN/docs/admin-manual/http-actions/fe/manager/query-profile-action.md b/docs/zh-CN/docs/admin-manual/http-actions/fe/manager/query-profile-action.md index 1917237807..5283107279 100644 --- a/docs/zh-CN/docs/admin-manual/http-actions/fe/manager/query-profile-action.md +++ b/docs/zh-CN/docs/admin-manual/http-actions/fe/manager/query-profile-action.md @@ -42,8 +42,7 @@ under the License. `GET /rest/v2/manager/query/current_queries` -`GET /rest/v2/manager/query/kill/{connection_id}` - +`GET /rest/v2/manager/query/kill/{query_id}` ## 获取查询信息 @@ -349,7 +348,7 @@ GET /rest/v2/manager/query/query_info ## 取消query -`POST /rest/v2/manager/query/kill/{connection_id}` +`POST /rest/v2/manager/query/kill/{query_id}` ### Description @@ -357,9 +356,9 @@ GET /rest/v2/manager/query/query_info ### Path parameters -* `{connection_id}` +* `{query_id}` - connection id + query id. 你可以通过 trace_id 接口,获取 query id。 ### Query parameters @@ -369,7 +368,7 @@ GET /rest/v2/manager/query/query_info { "msg": "success", "code": 0, - "data": "", + "data": null, "count": 0 } ``` diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java index 4e53dbbcd5..4d0f11f893 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java @@ -45,6 +45,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.json.simple.JSONObject; +import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; @@ -70,7 +71,7 @@ import javax.servlet.http.HttpServletResponse; * 4. /trace_id/{trace_id} * 5. /profile/fragments/{query_id} * 6. /current_queries - * 7. /kill/{connection_id} + * 7. /kill/{query_id} */ @RestController @RequestMapping("/rest/v2/manager/query") @@ -100,7 +101,8 @@ public class QueryProfileAction extends RestBaseController { .add(NODE).add(USER).add(DEFAULT_DB).add(SQL_STATEMENT).add(QUERY_TYPE).add(START_TIME).add(END_TIME) .add(TOTAL).add(QUERY_STATE).build(); - private List<String> requestAllFe(String httpPath, Map<String, String> arguments, String authorization) { + private List<String> requestAllFe(String httpPath, Map<String, String> arguments, String authorization, + HttpMethod method) { List<Pair<String, Integer>> frontends = HttpUtils.getFeList(); ImmutableMap<String, String> header = ImmutableMap.<String, String>builder() .put(NodeAction.AUTHORIZATION, authorization).build(); @@ -108,7 +110,12 @@ public class QueryProfileAction extends RestBaseController { for (Pair<String, Integer> ipPort : frontends) { String url = HttpUtils.concatUrl(ipPort, httpPath, arguments); try { - String data = HttpUtils.parseResponse(HttpUtils.doGet(url, header)); + String data = null; + if (method == HttpMethod.GET) { + data = HttpUtils.parseResponse(HttpUtils.doGet(url, header)); + } else if (method == HttpMethod.POST) { + data = HttpUtils.parseResponse(HttpUtils.doPost(url, header, null)); + } if (!Strings.isNullOrEmpty(data) && !data.equals("{}")) { dataList.add(data); } @@ -145,12 +152,12 @@ public class QueryProfileAction extends RestBaseController { arguments.put(SEARCH_PARA, search); arguments.put(IS_ALL_NODE_PARA, "false"); - List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION)); + List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION), + HttpMethod.GET); for (String data : dataList) { try { - NodeAction.NodeInfo nodeInfo = GsonUtils.GSON.fromJson(data, - new TypeToken<NodeAction.NodeInfo>() { - }.getType()); + NodeAction.NodeInfo nodeInfo = GsonUtils.GSON.fromJson(data, new TypeToken<NodeAction.NodeInfo>() { + }.getType()); queries.addAll(nodeInfo.getRows()); } catch (Exception e) { LOG.warn("parse query info error: {}", data, e); @@ -200,7 +207,8 @@ public class QueryProfileAction extends RestBaseController { String httpPath = "/rest/v2/manager/query/sql/" + queryId; ImmutableMap<String, String> arguments = ImmutableMap.<String, String>builder() .put(IS_ALL_NODE_PARA, "false").build(); - List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION)); + List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION), + HttpMethod.GET); if (!dataList.isEmpty()) { try { String sql = JsonParser.parseString(dataList.get(0)).getAsJsonObject().get("sql").getAsString(); @@ -292,7 +300,8 @@ public class QueryProfileAction extends RestBaseController { } } } else { - String queryId = ProfileManager.getInstance().getQueryIdByTraceId(traceId); + ExecuteEnv env = ExecuteEnv.getInstance(); + String queryId = env.getScheduler().getQueryIdByTraceId(traceId); if (Strings.isNullOrEmpty(queryId)) { return ResponseEntityBuilder.badRequest("Not found"); } @@ -415,7 +424,8 @@ public class QueryProfileAction extends RestBaseController { if (!Strings.isNullOrEmpty(instanceId)) { builder.put(INSTANCE_ID, instanceId); } - List<String> dataList = requestAllFe(httpPath, builder.build(), request.getHeader(NodeAction.AUTHORIZATION)); + List<String> dataList = requestAllFe(httpPath, builder.build(), request.getHeader(NodeAction.AUTHORIZATION), + HttpMethod.GET); Map<String, String> result = Maps.newHashMap(); if (!dataList.isEmpty()) { try { @@ -448,7 +458,8 @@ public class QueryProfileAction extends RestBaseController { Map<String, String> arguments = Maps.newHashMap(); arguments.put(IS_ALL_NODE_PARA, "false"); List<List<String>> queries = Lists.newArrayList(); - List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION)); + List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION), + HttpMethod.GET); for (String data : dataList) { try { NodeAction.NodeInfo nodeInfo = GsonUtils.GSON.fromJson(data, new TypeToken<NodeAction.NodeInfo>() { @@ -481,25 +492,31 @@ public class QueryProfileAction extends RestBaseController { } /** - * kill queries with specified connection id + * kill queries with specific query id * * @param request * @param response - * @param connectionId + * @param queryId * @return */ - @RequestMapping(path = "/kill/{connection_id}", method = RequestMethod.POST) + @RequestMapping(path = "/kill/{query_id}", method = RequestMethod.POST) public Object killQuery(HttpServletRequest request, HttpServletResponse response, - @PathVariable("connection_id") int connectionId) { + @PathVariable("query_id") String queryId, + @RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true") boolean isAllNode) { executeCheckPassword(request, response); checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); - ExecuteEnv env = ExecuteEnv.getInstance(); - ConnectContext ctx = env.getScheduler().getContext(connectionId); - if (ctx == null) { - return ResponseEntityBuilder.notFound("connection not found"); + if (isAllNode) { + // Get current queries from all FE + String httpPath = "/rest/v2/manager/query/kill/" + queryId; + Map<String, String> arguments = Maps.newHashMap(); + arguments.put(IS_ALL_NODE_PARA, "false"); + requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION), HttpMethod.POST); + return ResponseEntityBuilder.ok(); } - ctx.cancelQuery(); + + ExecuteEnv env = ExecuteEnv.getInstance(); + env.getScheduler().cancelQuery(queryId); return ResponseEntityBuilder.ok(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 9757f2f556..0ad83d5176 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -38,6 +38,7 @@ import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionEntry; import org.apache.doris.transaction.TransactionStatus; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.opentelemetry.api.trace.Tracer; @@ -62,6 +63,7 @@ public class ConnectContext { protected volatile long forwardedStmtId; protected volatile TUniqueId queryId; + protected volatile String traceId; // id for this connection protected volatile int connectionId; // mysql net @@ -469,6 +471,17 @@ public class ConnectContext { public void setQueryId(TUniqueId queryId) { this.queryId = queryId; + if (connectScheduler != null && !Strings.isNullOrEmpty(traceId)) { + connectScheduler.putTraceId2QueryId(traceId, queryId); + } + } + + public void setTraceId(String traceId) { + this.traceId = traceId; + } + + public String traceId() { + return traceId; } public TUniqueId queryId() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java index 534206028c..66702d438b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -21,9 +21,11 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.mysql.MysqlProto; import org.apache.doris.mysql.nio.NConnectContext; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -51,6 +53,9 @@ public class ConnectScheduler { private final ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool( Config.max_connection_scheduler_threads_num, "connect-scheduler-pool", true); + // valid trace id -> query id + private final Map<String, TUniqueId> traceId2QueryId = Maps.newConcurrentMap(); + // Use a thread to check whether connection is timeout. Because // 1. If use a scheduler, the task maybe a huge number when query is messy. // Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks in scheduler. @@ -125,6 +130,16 @@ public class ConnectScheduler { return connectionMap.get(connectionId); } + public void cancelQuery(String queryId) { + for (ConnectContext ctx : connectionMap.values()) { + TUniqueId qid = ctx.queryId(); + if (qid != null && DebugUtil.printId(qid).equals(queryId)) { + ctx.cancelQuery(); + break; + } + } + } + public int getConnectionNum() { return numberConnection.get(); } @@ -143,6 +158,15 @@ public class ConnectScheduler { return infos; } + public void putTraceId2QueryId(String traceId, TUniqueId queryId) { + traceId2QueryId.put(traceId, queryId); + } + + public String getQueryIdByTraceId(String traceId) { + TUniqueId queryId = traceId2QueryId.get(traceId); + return queryId == null ? "" : DebugUtil.printId(queryId); + } + private class LoopHandler implements Runnable { ConnectContext context; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java index d0c77fae27..7efbf0c7a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java @@ -186,6 +186,10 @@ public class VariableMgr { ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, attr.name(), value); } + if (VariableVarCallbacks.hasCallback(attr.name())) { + VariableVarCallbacks.call(attr.name(), value); + } + return true; } @@ -515,9 +519,6 @@ public class VariableMgr { // Set to true if the variables need to be forwarded along with forward statement. boolean needForward() default false; - - // Set to true if the variables need to be set in TQueryOptions - boolean isQueryOption() default false; } private static class VarContext { @@ -587,8 +588,7 @@ public class VariableMgr { } field.setAccessible(true); - builder.put(attr.name(), - new VarContext(field, null, GLOBAL | attr.flag(), getValue(null, field))); + builder.put(attr.name(), new VarContext(field, null, GLOBAL | attr.flag(), getValue(null, field))); } return builder; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbackI.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbackI.java new file mode 100644 index 0000000000..4aefa4fd9d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbackI.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.common.DdlException; + +public interface VariableVarCallbackI { + public void call(String value) throws DdlException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbacks.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbacks.java new file mode 100644 index 0000000000..f1dc5e69c3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableVarCallbacks.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.common.DdlException; + +import com.google.common.base.Strings; +import com.google.common.collect.Maps; + +import java.util.Map; + +/** + * Callback after setting the session variable + */ +public class VariableVarCallbacks { + + public static final Map<String, VariableVarCallbackI> callbacks = Maps.newHashMap(); + + static { + SessionContextCallback sessionContextCallback = new SessionContextCallback(); + callbacks.put(SessionVariable.SESSION_CONTEXT, sessionContextCallback); + } + + public static Boolean hasCallback(String varName) { + return callbacks.containsKey(varName); + } + + public static void call(String varName, String value) throws DdlException { + if (hasCallback(varName)) { + callbacks.get(varName).call(value); + } + } + + // Converter to convert runtime filter type variable + public static class SessionContextCallback implements VariableVarCallbackI { + public void call(String value) throws DdlException { + if (Strings.isNullOrEmpty(value)) { + return; + } + /** + * The sessionContext is as follows: + * "k1:v1;k2:v2;..." + * Here we want to get value with key named "trace_id". + */ + String[] parts = value.split(";"); + for (String part : parts) { + String[] innerParts = part.split(":"); + if (innerParts.length != 2) { + continue; + } + if (innerParts[0].equals("trace_id")) { + ConnectContext.get().setTraceId(innerParts[1]); + break; + } + } + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java index 8145ac1b77..1067a0c1a3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java @@ -233,4 +233,12 @@ public class VariableMgrTest { VariableMgr.setVar(null, setVar); Assert.fail("No exception throws."); } + + @Test + public void testVariableCallback() throws Exception { + SetStmt stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set session_context='trace_id:123'", ctx); + SetExecutor executor = new SetExecutor(ctx, stmt); + executor.execute(); + Assert.assertEquals("123", ctx.traceId()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org