This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 10ccada [fix](forward) Avoid endless forward execution (#7335) 10ccada is described below commit 10ccadacce76fd62b67f621ee42d32fe97c97ae4 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Wed Dec 8 16:25:04 2021 +0800 [fix](forward) Avoid endless forward execution (#7335) Close related #7334 1. Fix bug describe in [Bug] show frontends cause FE oom #7334 2. Fix error of CurrentConnected fields in show frontends result. 3. Add more FAQ --- docs/en/faq/faq.md | 50 +++++++++++++++++++- docs/zh-CN/faq/faq.md | 54 +++++++++++++++++++++- .../doris/common/proc/FrontendsProcNode.java | 25 ++++++---- .../java/org/apache/doris/qe/ConnectContext.java | 32 ++++++++++--- .../java/org/apache/doris/qe/ConnectProcessor.java | 39 ++++++++-------- .../java/org/apache/doris/qe/StmtExecutor.java | 35 +++++++++----- .../apache/doris/service/FrontendServiceImpl.java | 44 +++++++++--------- 7 files changed, 209 insertions(+), 70 deletions(-) diff --git a/docs/en/faq/faq.md b/docs/en/faq/faq.md index 3349a9e..1e157ab 100644 --- a/docs/en/faq/faq.md +++ b/docs/en/faq/faq.md @@ -268,4 +268,52 @@ By specifying the storage medium properties of the path, we can use Doris's hot It should be noted that Doris does not automatically perceive the actual storage medium type of the disk where the storage path is located. This type needs to be explicitly indicated by the user in the path configuration. For example, the path "/path/to/data1.SSD" means that this path is an SSD storage medium. And "data1.SSD" is the actual directory name. Doris determines the storage medium type based on the ".SSD" suffix behind the directory name, not the actual storage medium type. In [...] -In other words, ".HDD" and ".SSD" are only used to identify the "relative" "low speed" and "high speed" of the storage directory, not the actual storage medium type. Therefore, if the storage path on the BE node has no difference in media, there is no need to fill in the suffix. \ No newline at end of file +In other words, ".HDD" and ".SSD" are only used to identify the "relative" "low speed" and "high speed" of the storage directory, not the actual storage medium type. Therefore, if the storage path on the BE node has no difference in media, there is no need to fill in the suffix. + +### Q19. `Lost connection to MySQL server at'reading initial communication packet', system error: 0` + +If the following problems occur when using the MySQL client to connect to Doris, this is usually caused by the difference between the jdk version used when compiling FE and the jdk version used when running FE. +Note that when using docker image to compile, the default JDK version is openjdk 11, you can switch to openjdk 8 by command (see the compilation document for details). + +### Q20. -214 error + +When performing operations such as load and query, you may encounter the following errors: + +``` +failed to initialize storage reader. tablet=63416.1050661139.aa4d304e7a7aff9c-f0fa7579928c85a0, res=-214, backend=192.168.100.10 +``` + +A -214 error means that the data version of the corresponding tablet is missing. For example, the above error indicates that the data version of the replica of tablet 63416 on the BE of 192.168.100.10 is missing. (There may be other similar error codes, which can be checked and repaired in the following ways). + +Normally, if your data has multiple replicas, the system will automatically repair these problematic replicas. You can troubleshoot through the following steps: + +First, use the `show tablet 63416` statement and execute the `show proc xxx` statement in the result to view the status of each replica of the corresponding tablet. Usually we need to care about the data in the `Version` column. + +Under normal circumstances, the Version of multiple replicas of a tablet should be the same. And it is the same as the VisibleVersion of the corresponding partition. + +You can use `show partitions from tblx` to view the corresponding partition version (the partition corresponding to the tablet can be obtained in the `show tablet` statement.) + +At the same time, you can also visit the URL in the CompactionStatus column of the `show proc` statement (just open it in the browser) to view more specific version information, to check which version is missing. + +If there is no automatic repair for a long time, you need to use the `show proc "/cluster_balance"` statement to view the tablet repair and scheduling tasks currently being performed by the system. It may be because there are a large number of tablets waiting to be scheduled, which leads to a long repair time. You can follow the records in `pending_tablets` and `running_tablets`. + +Furthermore, you can use the `admin repair` statement to specify the priority to repair a table or partition. For details, please refer to `help admin repair`; + +If it still cannot be repaired, then in the case of multiple replicas, we use the `admin set replica status` command to force the replica to go offline. For details, please refer to the example of `help admin set replica status` to set the status of the replica to bad. (After set to bad, the replica will not be accessed again. And will be automatically repaired later. But before the operation, you should make sure that the other replicas are normal) + +### Q21. Not connected to 192.168.100.1:8060 yet, server_id=384 + +We may encounter this error when loading or querying. If you go to the corresponding BE log to check, you may also find similar errors. + +This is an RPC error, and there are usually two possibilities: 1. The corresponding BE node is down. 2. rpc congestion or other errors. + +If the BE node is down, you need to check the specific reason for the downtime. Only the problem of rpc congestion is discussed here. + +One situation is OVERCROWDED, which means that a large amount of unsent data at the rpc client exceeds the threshold. BE has two parameters related to it: + +1. `brpc_socket_max_unwritten_bytes`: The default is 64MB. If the unwritten data exceeds this value, an error will be reported. You can modify this value appropriately to avoid OVERCROWDED errors. (But this cures the symptoms rather than the root cause, essentially congestion still occurs). +2. `tablet_writer_ignore_eovercrowded`: The default is false. If set to true, Doris will ignore OVERCROWDED errors during the load process. This parameter is mainly used to avoid load failure and improve the stability of load. + +The second is that the packet size of rpc exceeds `max_body_size`. This problem may occur if the query contains a very large String type or a Bitmap type. It can be circumvented by modifying the following BE parameters: + +1. `brpc_max_body_size`: The default is 200MB, if necessary, it can be modified to 3GB (in bytes). \ No newline at end of file diff --git a/docs/zh-CN/faq/faq.md b/docs/zh-CN/faq/faq.md index d835f08..b63004e 100644 --- a/docs/zh-CN/faq/faq.md +++ b/docs/zh-CN/faq/faq.md @@ -268,4 +268,56 @@ Doris支持一个BE节点配置多个存储路径。通常情况下,每块盘 需要注意的是,Doris并不会自动感知存储路径所在磁盘的实际存储介质类型。这个类型需要用户在路径配置中显式的表示。比如路径 "/path/to/data1.SSD" 即表示这个路径是SSD存储介质。而 "data1.SSD" 就是实际的目录名称。Doris是根据目录名称后面的 ".SSD" 后缀来确定存储介质类型的,而不是实际的存储介质类型。也就是说,用户可以指定任意路径为SSD存储介质,而Doris仅识别目录后缀,不会去判断存储介质是否匹配。如果不写后缀,则默认为HDD。 -换句话说,".HDD" 和 ".SSD" 只是用于标识存储目录“相对”的“低速”和“高速”之分,而并不是标识实际的存储介质类型。所以如果BE节点上的存储路径没有介质区别,则无需填写后缀。 \ No newline at end of file +换句话说,".HDD" 和 ".SSD" 只是用于标识存储目录“相对”的“低速”和“高速”之分,而并不是标识实际的存储介质类型。所以如果BE节点上的存储路径没有介质区别,则无需填写后缀。 + +### Q19. `Lost connection to MySQL server at 'reading initial communication packet', system error: 0` + +如果使用 MySQL 客户端连接 Doris 时出现如下问题,这通常是因为编译 FE 时使用的 jdk 版本和运行 FE 时使用的 jdk 版本不同导致的。 +注意使用 docker 编译镜像编译时,默认的 JDK 版本是 openjdk 11,可以通过命令切换到 openjdk 8(详见编译文档)。 + +### Q20. -214 错误 + +在执行导入、查询等操作时,可能会遇到如下错误: + +``` +failed to initialize storage reader. tablet=63416.1050661139.aa4d304e7a7aff9c-f0fa7579928c85a0, res=-214, backend=192.168.100.10 +``` + +-214 错误意味着对应 tablet 的数据版本缺失。比如如上错误,表示 tablet 63416 在 192.168.100.10 这个 BE 上的副本的数据版本有缺失。(可能还有其他类似错误码,都可以用如下方式进行排查和修复)。 + +通常情况下,如果你的数据是多副本的,那么系统会自动修复这些有问题的副本。可以通过以下步骤进行排查: + +首先通过 `show tablet 63416` 语句并执行结果中的 `show proc xxx` 语句来查看对应 tablet 的各个副本情况。通常我们需要关心 `Version` 这一列的数据。 + +正常情况下,一个 tablet 的多个副本的 Version 应该是相同的。并且和对应分区的 VisibleVersion 版本相同。 + +你可以通过 `show partitions from tblx` 来查看对应的分区版本(tablet 对应的分区可以在 `show tablet` 语句中获取。) + +同时,你也可以访问 `show proc` 语句中的 CompactionStatus 列中的 URL(在浏览器打开即可)来查看更具体的版本信息,来检查具体丢失的是哪些版本。 + +如果长时间没有自动修复,则需要通过 `show proc "/cluster_balance"` 语句,查看当前系统正在执行的 tablet 修复和调度任务。可能是因为有大量的 tablet 在等待被调度,导致修复时间较长。可以关注 `pending_tablets` 和 `running_tablets` 中的记录。 + +更进一步的,可以通过 `admin repair` 语句来指定优先修复某个表或分区,具体可以参阅 `help admin repair`; + +如果依然无法修复,那么在多副本的情况下,我们使用 `admin set replica status` 命令强制将有问题的副本下线。具体可参阅 `help admin set replica status` 中将副本状态置为 bad 的示例。(置为 bad 后,副本将不会再被访问。并且会后续自动修复。但在操作前,应先确保其他副本是正常的) + +### Q21. Not connected to 192.168.100.1:8060 yet, server_id=384 + +在导入或者查询时,我们可能遇到这个错误。如果你去对应的 BE 日志中查看,也可能会找到类似错误。 + +这是一个 RPC 错误,通常由两种可能:1. 对应的 BE 节点宕机。2. rpc 拥塞或其他错误。 + +如果是 BE 节点宕机,则需要查看具体的宕机原因。这里只讨论 rpc 拥塞的问题。 + +一种情况是 OVERCROWDED,即表示 rpc 源端有大量未发送的数据超过了阈值。BE 有两个参数与之相关: + +1. `brpc_socket_max_unwritten_bytes`:默认 64MB,如果未发送数据超过这个值,则会报错。可以适当修改这个值以避免 OVERCROWDED 错误。(但这个治标不治本,本质上还是有拥塞发生)。 +2. `tablet_writer_ignore_eovercrowded`:默认为 false。如果设为true,则 Doris 会忽略导入过程中出现的 OVERCROWDED 错误。这个参数主要为了避免导入失败,以提高导入的稳定性。 + +第二种是 rpc 的包大小超过 max_body_size。如果查询中带有超大 String 类型,或者 bitmap 类型时,可能出现这个问题。可以通过修改以下 BE 参数规避: + +1. `brpc_max_body_size`:默认200MB,如果有必要,可以修改为 3GB(单位字节)。 + + + + diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java index c8804f9..8bfcb59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java @@ -22,8 +22,10 @@ import org.apache.doris.common.Config; import org.apache.doris.common.Pair; import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.Frontend; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -47,20 +49,20 @@ public class FrontendsProcNode implements ProcNodeInterface { .add("ReplayedJournalId").add("LastHeartbeat").add("IsHelper").add("ErrMsg").add("Version") .add("CurrentConnected") .build(); - + public static final int HOSTNAME_INDEX = 2; private Catalog catalog; - + public FrontendsProcNode(Catalog catalog) { this.catalog = catalog; } - + @Override public ProcResult fetchResult() { BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - + List<List<String>> infos = Lists.newArrayList(); getFrontendsInfo(catalog, infos); @@ -90,7 +92,12 @@ public class FrontendsProcNode implements ProcNodeInterface { List<Pair<String, Integer>> allFeHosts = convertToHostPortPair(allFe); List<Pair<String, Integer>> helperNodes = catalog.getHelperNodes(); - Pair<String, Integer> selfNode = Catalog.getCurrentCatalog().getSelfNode(); + // Because the `show frontend` stmt maybe forwarded from other FE. + // if we only get self node from currrent catalog, the "CurrentConnected" field will always points to Msater FE. + String selfNode = Catalog.getCurrentCatalog().getSelfNode().first; + if (ConnectContext.get() != null && !Strings.isNullOrEmpty(ConnectContext.get().getCurrentConnectedFEIp())) { + selfNode = ConnectContext.get().getCurrentConnectedFEIp(); + } for (Frontend fe : catalog.getFrontends(null /* all */)) { @@ -115,7 +122,7 @@ public class FrontendsProcNode implements ProcNodeInterface { info.add(Integer.toString(catalog.getClusterId())); info.add(String.valueOf(isJoin(allFeHosts, fe))); - + if (fe.getHost().equals(catalog.getSelfNode().first)) { info.add("true"); info.add(Long.toString(catalog.getEditLog().getMaxJournalId())); @@ -128,12 +135,12 @@ public class FrontendsProcNode implements ProcNodeInterface { info.add(fe.getHeartbeatErrMsg()); info.add(fe.getVersion()); // To indicate which FE we currently connected - info.add(fe.getHost().equals(selfNode.first) ? "Yes" : "No"); + info.add(fe.getHost().equals(selfNode) ? "Yes" : "No"); infos.add(info); } } - + private static boolean isHelperNode(List<Pair<String, Integer>> helperNodes, Frontend fe) { return helperNodes.stream().anyMatch(p -> p.first.equals(fe.getHost()) && p.second == fe.getEditLogPort()); } @@ -146,7 +153,7 @@ public class FrontendsProcNode implements ProcNodeInterface { } return false; } - + private static List<Pair<String, Integer>> convertToHostPortPair(List<InetSocketAddress> addrs) { List<Pair<String, Integer>> hostPortPair = Lists.newArrayList(); for (InetSocketAddress addr : addrs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 5d5ce3f..4290dd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -130,6 +130,9 @@ public class ConnectContext { private String sqlHash; + // The FE ip current connected + private String currentConnectedFEIp = ""; + public static ConnectContext get() { return threadLocalInfo.get(); } @@ -182,12 +185,15 @@ public class ConnectContext { public boolean isTxnModel() { return txnEntry != null && txnEntry.isTxnModel(); } + public boolean isTxnIniting() { return txnEntry != null && txnEntry.isTxnIniting(); } + public boolean isTxnBegin() { return txnEntry != null && txnEntry.isTxnBegin(); } + public void closeTxn() { if (isTxnModel()) { if (isTxnBegin()) { @@ -275,11 +281,17 @@ public class ConnectContext { this.qualifiedUser = qualifiedUser; } - public boolean getIsTempUser() { return isTempUser;} + public boolean getIsTempUser() { + return isTempUser; + } - public void setIsTempUser(boolean isTempUser) { this.isTempUser = isTempUser;} + public void setIsTempUser(boolean isTempUser) { + this.isTempUser = isTempUser; + } - public PaloRole getLdapGroupsPrivs() { return ldapGroupsPrivs; } + public PaloRole getLdapGroupsPrivs() { + return ldapGroupsPrivs; + } public void setLdapGroupsPrivs(PaloRole ldapGroupsPrivs) { this.ldapGroupsPrivs = ldapGroupsPrivs; @@ -434,7 +446,7 @@ public class ConnectContext { // kill operation with no protect. public void kill(boolean killConnection) { LOG.warn("kill timeout query, {}, kill connection: {}", - getMysqlChannel().getRemoteHostPortString(), killConnection); + getMysqlChannel().getRemoteHostPortString(), killConnection); if (killConnection) { isKilled = true; @@ -460,7 +472,7 @@ public class ConnectContext { if (delta > sessionVariable.getWaitTimeoutS() * 1000) { // Need kill this connection. LOG.warn("kill wait timeout connection, remote: {}, wait timeout: {}", - getMysqlChannel().getRemoteHostPortString(), sessionVariable.getWaitTimeoutS()); + getMysqlChannel().getRemoteHostPortString(), sessionVariable.getWaitTimeoutS()); killFlag = true; killConnection = true; @@ -468,7 +480,7 @@ public class ConnectContext { } else { if (delta > sessionVariable.getQueryTimeoutS() * 1000) { LOG.warn("kill query timeout, remote: {}, query timeout: {}", - getMysqlChannel().getRemoteHostPortString(), sessionVariable.getQueryTimeoutS()); + getMysqlChannel().getRemoteHostPortString(), sessionVariable.getQueryTimeoutS()); // Only kill killFlag = true; @@ -500,6 +512,14 @@ public class ConnectContext { this.isResourceTagsSet = !this.resourceTags.isEmpty(); } + public void setCurrentConnectedFEIp(String ip) { + this.currentConnectedFEIp = ip; + } + + public String getCurrentConnectedFEIp() { + return currentConnectedFEIp; + } + public class ThreadInfo { public List<String> toRow(long nowMs) { List<String> row = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 01df0c0..7df6e1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -113,16 +113,16 @@ public class ConnectProcessor { // slow query long endTime = System.currentTimeMillis(); long elapseMs = endTime - ctx.getStartTime(); - + ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY) - .setState(ctx.getState().toString()).setQueryTime(elapseMs) - .setScanBytes(statistics == null ? 0 : statistics.getScanBytes()) - .setScanRows(statistics == null ? 0 : statistics.getScanRows()) - .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs()) - .setPeakMemoryBytes(statistics == null ? 0 : statistics.getMaxPeakMemoryBytes()) - .setReturnRows(ctx.getReturnRows()) - .setStmtId(ctx.getStmtId()) - .setQueryId(ctx.queryId() == null ? "NaN" : DebugUtil.printId(ctx.queryId())); + .setState(ctx.getState().toString()).setQueryTime(elapseMs) + .setScanBytes(statistics == null ? 0 : statistics.getScanBytes()) + .setScanRows(statistics == null ? 0 : statistics.getScanRows()) + .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs()) + .setPeakMemoryBytes(statistics == null ? 0 : statistics.getMaxPeakMemoryBytes()) + .setReturnRows(ctx.getReturnRows()) + .setStmtId(ctx.getStmtId()) + .setQueryId(ctx.queryId() == null ? "NaN" : DebugUtil.printId(ctx.queryId())); if (ctx.getState().isQuery()) { MetricRepo.COUNTER_QUERY_ALL.increase(1L); @@ -143,14 +143,14 @@ public class ConnectProcessor { } else { ctx.getAuditEventBuilder().setIsQuery(false); } - + ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress()); - + // We put origin query stmt at the end of audit log, for parsing the log more convenient. if (!ctx.getState().isQuery() && (parsedStmt != null && parsedStmt.needAuditEncryption())) { ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql()); } else { - if (parsedStmt instanceof InsertStmt && ((InsertStmt)parsedStmt).isValuesOrConstantSelect()) { + if (parsedStmt instanceof InsertStmt && ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) { // INSERT INTO VALUES may be very long, so we only log at most 1K bytes. int length = Math.min(1024, origStmt.length()); ctx.getAuditEventBuilder().setStmt(origStmt.substring(0, length)); @@ -158,7 +158,7 @@ public class ConnectProcessor { ctx.getAuditEventBuilder().setStmt(origStmt); } } - + Catalog.getCurrentAuditEventProcessor().handleAuditEvent(ctx.getAuditEventBuilder().build()); } @@ -192,11 +192,11 @@ public class ConnectProcessor { } ctx.getAuditEventBuilder().reset(); ctx.getAuditEventBuilder() - .setTimestamp(System.currentTimeMillis()) - .setClientIp(ctx.getMysqlChannel().getRemoteHostPortString()) - .setUser(ctx.getQualifiedUser()) - .setDb(ctx.getDatabase()) - .setSqlHash(ctx.getSqlHash()); + .setTimestamp(System.currentTimeMillis()) + .setClientIp(ctx.getMysqlChannel().getRemoteHostPortString()) + .setUser(ctx.getQualifiedUser()) + .setDb(ctx.getDatabase()) + .setSqlHash(ctx.getSqlHash()); // execute this query. StatementBase parsedStmt = null; @@ -483,8 +483,7 @@ public class ConnectProcessor { // return error directly. TMasterOpResult result = new TMasterOpResult(); ctx.getState().setError(ErrorCode.ERR_ACCESS_DENIED_ERROR, "Missing current user identity. You need to upgrade this Frontend " + - "to the " + - "same version as Master Frontend."); + "to the same version as Master Frontend."); result.setMaxJournalId(Catalog.getCurrentCatalog().getMaxJournalId().longValue()); result.setPacket(getResultPacket()); return result; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 1309016..615454e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -325,6 +325,14 @@ public class StmtExecutor implements ProfileWriter { // analyze this query analyze(context.getSessionVariable().toThrift()); if (isForwardToMaster()) { + if (isProxy) { + // This is already a stmt forwarded from other FE. + // If goes here, which means we can't find a valid Master FE(some error happens). + // To avoid endless forward, throw exception here. + throw new UserException("The statement has been forwarded to master FE(" + + Catalog.getCurrentCatalog().getSelfNode().first + ") and failed to execute" + + " because Master FE is not ready. You may need to check FE's status"); + } forwardToMaster(); if (masterOpExecutor != null && masterOpExecutor.getQueryId() != null) { context.setQueryId(masterOpExecutor.getQueryId()); @@ -342,7 +350,7 @@ public class StmtExecutor implements ProfileWriter { context.getState().setIsQuery(true); MetricRepo.COUNTER_QUERY_BEGIN.increase(1L); int retryTime = Config.max_query_retry_time; - for (int i = 0; i < retryTime; i ++) { + for (int i = 0; i < retryTime; i++) { try { //reset query id for each retry if (i > 0) { @@ -506,7 +514,7 @@ public class StmtExecutor implements ProfileWriter { if (isForwardToMaster()) { return; } - + analyzer = new Analyzer(context.getCatalog(), context); // Convert show statement to select statement here if (parsedStmt instanceof ShowStmt) { @@ -584,7 +592,7 @@ public class StmtExecutor implements ProfileWriter { LOG.info("analysis exception happened when parsing stmt {}, id: {}, error: {}", originStmt, context.getStmtId(), syntaxError, e); if (syntaxError == null) { - throw e; + throw e; } else { throw new AnalysisException(syntaxError, e); } @@ -626,7 +634,7 @@ public class StmtExecutor implements ProfileWriter { // types and column labels to restore them after the rewritten stmt has been // reset() and re-analyzed. List<Type> origResultTypes = Lists.newArrayList(); - for (Expr e: parsedStmt.getResultExprs()) { + for (Expr e : parsedStmt.getResultExprs()) { origResultTypes.add(e.getType()); } List<String> origColLabels = @@ -691,7 +699,7 @@ public class StmtExecutor implements ProfileWriter { // Only user itself and user with admin priv can kill connection if (!killCtx.getQualifiedUser().equals(ConnectContext.get().getQualifiedUser()) && !Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), - PrivPredicate.ADMIN)) { + PrivPredicate.ADMIN)) { ErrorReport.reportDdlException(ErrorCode.ERR_KILL_DENIED_ERROR, id); } @@ -808,7 +816,7 @@ public class StmtExecutor implements ProfileWriter { break; } } - + if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) { isSendFields = sendCachedValues(channel, cacheResult.getValuesList(), newSelectStmt, isSendFields, false); } @@ -855,17 +863,17 @@ public class StmtExecutor implements ProfileWriter { QueryStmt queryStmt = (QueryStmt) parsedStmt; QueryDetail queryDetail = new QueryDetail(context.getStartTime(), - DebugUtil.printId(context.queryId()), - context.getStartTime(), -1, -1, - QueryDetail.QueryMemState.RUNNING, - context.getDatabase(), - originStmt.originStmt); + DebugUtil.printId(context.queryId()), + context.getStartTime(), -1, -1, + QueryDetail.QueryMemState.RUNNING, + context.getDatabase(), + originStmt.originStmt); context.setQueryDetail(queryDetail); QueryDetailQueue.addOrUpdateQueryDetail(queryDetail); // handle selects that fe can do without be, so we can make sql tools happy, especially the setup step. if (parsedStmt instanceof SelectStmt && ((SelectStmt) parsedStmt).getTableRefs().isEmpty() - && Catalog.getCurrentSystemInfo().getBackendIds(true).isEmpty() ) { + && Catalog.getCurrentSystemInfo().getBackendIds(true).isEmpty()) { SelectStmt parsedSelectStmt = (SelectStmt) parsedStmt; if (handleSelectRequestInFe(parsedSelectStmt)) { return; @@ -1413,6 +1421,7 @@ public class StmtExecutor implements ProfileWriter { context.getState().setEof(); } + // Process show statement private void handleShow() throws IOException, AnalysisException, DdlException { ShowExecutor executor = new ShowExecutor(context, (ShowStmt) parsedStmt); @@ -1431,8 +1440,10 @@ public class StmtExecutor implements ProfileWriter { private void handleUnlockTablesStmt() { } + private void handleLockTablesStmt() { } + private void handleExplainStmt(String result) throws IOException { ShowResultSetMetaData metaData = ShowResultSetMetaData.builder() diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 81955de..ac8ee57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -73,8 +73,8 @@ import org.apache.doris.thrift.TGetDbsResult; import org.apache.doris.thrift.TGetTablesParams; import org.apache.doris.thrift.TGetTablesResult; import org.apache.doris.thrift.TIsMethodSupportedRequest; -import org.apache.doris.thrift.TListTableStatusResult; import org.apache.doris.thrift.TListPrivilegesResult; +import org.apache.doris.thrift.TListTableStatusResult; import org.apache.doris.thrift.TLoadCheckRequest; import org.apache.doris.thrift.TLoadTxnBeginRequest; import org.apache.doris.thrift.TLoadTxnBeginResult; @@ -90,6 +90,7 @@ import org.apache.doris.thrift.TMiniLoadBeginResult; import org.apache.doris.thrift.TMiniLoadEtlStatusResult; import org.apache.doris.thrift.TMiniLoadRequest; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPrivilegeStatus; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; import org.apache.doris.thrift.TReportRequest; @@ -101,7 +102,6 @@ import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TStreamLoadPutResult; import org.apache.doris.thrift.TTableStatus; -import org.apache.doris.thrift.TPrivilegeStatus; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.thrift.TUpdateExportTaskStatusRequest; import org.apache.doris.thrift.TUpdateMiniEtlTaskStatusRequest; @@ -154,7 +154,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (params.isSetPattern()) { try { matcher = PatternMatcher.createMysqlPattern(params.getPattern(), - CaseSensibility.DATABASE.getCaseSensibility()); + CaseSensibility.DATABASE.getCaseSensibility()); } catch (AnalysisException e) { throw new TException("Pattern is in bad format: " + params.getPattern()); } @@ -163,7 +163,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { Catalog catalog = Catalog.getCurrentCatalog(); List<String> dbNames = catalog.getDbNames(); LOG.debug("get db names: {}", dbNames); - + UserIdentity currentUser = null; if (params.isSetCurrentUserIdent()) { currentUser = UserIdentity.fromThrift(params.current_user_ident); @@ -196,7 +196,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (params.isSetPattern()) { try { matcher = PatternMatcher.createMysqlPattern(params.getPattern(), - CaseSensibility.TABLE.getCaseSensibility()); + CaseSensibility.TABLE.getCaseSensibility()); } catch (AnalysisException e) { throw new TException("Pattern is in bad format: " + params.getPattern()); } @@ -215,7 +215,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { for (String tableName : db.getTableNamesWithLock()) { LOG.debug("get table: {}, wait to check", tableName); if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(currentUser, params.db, - tableName, PrivPredicate.SHOW)) { + tableName, PrivPredicate.SHOW)) { continue; } @@ -238,7 +238,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (params.isSetPattern()) { try { matcher = PatternMatcher.createMysqlPattern(params.getPattern(), - CaseSensibility.TABLE.getCaseSensibility()); + CaseSensibility.TABLE.getCaseSensibility()); } catch (AnalysisException e) { throw new TException("Pattern is in bad format " + params.getPattern()); } @@ -288,7 +288,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { status.setComment(table.getComment()); status.setCreateTime(table.getCreateTime()); status.setLastCheckTime(table.getLastCheckTime()); - status.setUpdateTime(table.getUpdateTime()/1000); + status.setUpdateTime(table.getUpdateTime() / 1000); status.setCheckTime(table.getLastCheckTime()); status.setCollation("utf-8"); status.setRows(table.getRowCount()); @@ -404,7 +404,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { desc.setIsAllowNull(column.isAllowNull()); final TColumnDef colDef = new TColumnDef(desc); final String comment = column.getComment(); - if(comment != null) { + if (comment != null) { colDef.setComment(comment); } columns.add(colDef); @@ -428,7 +428,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } List<List<String>> rows = VariableMgr.dump(SetType.fromThrift(params.getVarType()), ctx.getSessionVariable(), - null); + null); for (List<String> row : rows) { map.put(row.get(0), row.get(1)); } @@ -517,7 +517,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { .setState(TStatusCode.OK.name()) .setQueryTime(0) .setStmt(stmt).build(); - + Catalog.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent); } @@ -599,7 +599,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { @Override public TMiniLoadBeginResult miniLoadBegin(TMiniLoadBeginRequest request) throws TException { LOG.debug("receive mini load begin request. label: {}, user: {}, ip: {}", - request.getLabel(), request.getUser(), request.getUserIp()); + request.getLabel(), request.getUser(), request.getUserIp()); TMiniLoadBeginResult result = new TMiniLoadBeginResult(); TStatus status = new TStatus(TStatusCode.OK); @@ -611,7 +611,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { } // step1: check password and privs checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), - request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); + request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); // step2: check label and record metadata in load manager if (request.isSetSubLabel()) { // TODO(ml): multi mini load @@ -636,7 +636,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { public TFeResult isMethodSupported(TIsMethodSupportedRequest request) throws TException { TStatus status = new TStatus(TStatusCode.OK); TFeResult result = new TFeResult(FrontendServiceVersion.V1, status); - switch (request.getFunctionName()){ + switch (request.getFunctionName()) { case "STREAMING_MINI_LOAD": break; default: @@ -660,6 +660,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { // add this log so that we can track this stmt LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), clientAddr.getHostname()); ConnectContext context = new ConnectContext(null); + // Set current connected FE to the client address, so that we can know where this request come from. + context.setCurrentConnectedFEIp(clientAddr.getHostname()); ConnectProcessor processor = new ConnectProcessor(context); TMasterOpResult result = processor.proxyExecute(params); ConnectContext.remove(); @@ -700,7 +702,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { @Override public TFeResult loadCheck(TLoadCheckRequest request) throws TException { LOG.debug("receive load check request. label: {}, user: {}, ip: {}", - request.getLabel(), request.getUser(), request.getUserIp()); + request.getLabel(), request.getUser(), request.getUserIp()); TStatus status = new TStatus(TStatusCode.OK); TFeResult result = new TFeResult(FrontendServiceVersion.V1, status); @@ -711,7 +713,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { } checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), - request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); + request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); } catch (UserException e) { status.setStatusCode(TStatusCode.ANALYSIS_ERROR); status.addToErrorMsgs(e.getMessage()); @@ -865,9 +867,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2 : 5000; Table table = db.getTableOrMetaException(request.getTbl(), TableType.OLAP); boolean ret = Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( - db, Lists.newArrayList(table), request.getTxnId(), - TabletCommitInfo.fromThrift(request.getCommitInfos()), - timeoutMs, TxnCommitAttachment.fromThrift(request.txnCommitAttachment)); + db, Lists.newArrayList(table), request.getTxnId(), + TabletCommitInfo.fromThrift(request.getCommitInfos()), + timeoutMs, TxnCommitAttachment.fromThrift(request.txnCommitAttachment)); if (ret) { // if commit and publish is success, load can be regarded as success MetricRepo.COUNTER_LOAD_FINISHED.increase(1L); @@ -924,8 +926,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { } long dbId = db.getId(); Catalog.getCurrentGlobalTransactionMgr().abortTransaction(dbId, request.getTxnId(), - request.isSetReason() ? request.getReason() : "system cancel", - TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment())); + request.isSetReason() ? request.getReason() : "system cancel", + TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment())); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org