This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 10ccada  [fix](forward) Avoid endless forward execution (#7335)
10ccada is described below

commit 10ccadacce76fd62b67f621ee42d32fe97c97ae4
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Wed Dec 8 16:25:04 2021 +0800

    [fix](forward) Avoid endless forward execution (#7335)
    
    Close related #7334
    
    1. Fix bug describe in [Bug] show frontends cause FE oom #7334
    2. Fix error of CurrentConnected fields in show frontends result.
    3. Add more FAQ
---
 docs/en/faq/faq.md                                 | 50 +++++++++++++++++++-
 docs/zh-CN/faq/faq.md                              | 54 +++++++++++++++++++++-
 .../doris/common/proc/FrontendsProcNode.java       | 25 ++++++----
 .../java/org/apache/doris/qe/ConnectContext.java   | 32 ++++++++++---
 .../java/org/apache/doris/qe/ConnectProcessor.java | 39 ++++++++--------
 .../java/org/apache/doris/qe/StmtExecutor.java     | 35 +++++++++-----
 .../apache/doris/service/FrontendServiceImpl.java  | 44 +++++++++---------
 7 files changed, 209 insertions(+), 70 deletions(-)

diff --git a/docs/en/faq/faq.md b/docs/en/faq/faq.md
index 3349a9e..1e157ab 100644
--- a/docs/en/faq/faq.md
+++ b/docs/en/faq/faq.md
@@ -268,4 +268,52 @@ By specifying the storage medium properties of the path, 
we can use Doris's hot
 
 It should be noted that Doris does not automatically perceive the actual 
storage medium type of the disk where the storage path is located. This type 
needs to be explicitly indicated by the user in the path configuration. For 
example, the path "/path/to/data1.SSD" means that this path is an SSD storage 
medium. And "data1.SSD" is the actual directory name. Doris determines the 
storage medium type based on the ".SSD" suffix behind the directory name, not 
the actual storage medium type. In  [...]
 
-In other words, ".HDD" and ".SSD" are only used to identify the "relative" 
"low speed" and "high speed" of the storage directory, not the actual storage 
medium type. Therefore, if the storage path on the BE node has no difference in 
media, there is no need to fill in the suffix.
\ No newline at end of file
+In other words, ".HDD" and ".SSD" are only used to identify the "relative" 
"low speed" and "high speed" of the storage directory, not the actual storage 
medium type. Therefore, if the storage path on the BE node has no difference in 
media, there is no need to fill in the suffix.
+
+### Q19. `Lost connection to MySQL server at'reading initial communication 
packet', system error: 0`
+
+If the following problems occur when using the MySQL client to connect to 
Doris, this is usually caused by the difference between the jdk version used 
when compiling FE and the jdk version used when running FE.
+Note that when using docker image to compile, the default JDK version is 
openjdk 11, you can switch to openjdk 8 by command (see the compilation 
document for details).
+
+### Q20. -214 error
+
+When performing operations such as load and query, you may encounter the 
following errors:
+
+```
+failed to initialize storage reader. 
tablet=63416.1050661139.aa4d304e7a7aff9c-f0fa7579928c85a0, res=-214, 
backend=192.168.100.10
+```
+
+A -214 error means that the data version of the corresponding tablet is 
missing. For example, the above error indicates that the data version of the 
replica of tablet 63416 on the BE of 192.168.100.10 is missing. (There may be 
other similar error codes, which can be checked and repaired in the following 
ways).
+
+Normally, if your data has multiple replicas, the system will automatically 
repair these problematic replicas. You can troubleshoot through the following 
steps:
+
+First, use the `show tablet 63416` statement and execute the `show proc xxx` 
statement in the result to view the status of each replica of the corresponding 
tablet. Usually we need to care about the data in the `Version` column.
+
+Under normal circumstances, the Version of multiple replicas of a tablet 
should be the same. And it is the same as the VisibleVersion of the 
corresponding partition.
+
+You can use `show partitions from tblx` to view the corresponding partition 
version (the partition corresponding to the tablet can be obtained in the `show 
tablet` statement.)
+
+At the same time, you can also visit the URL in the CompactionStatus column of 
the `show proc` statement (just open it in the browser) to view more specific 
version information, to check which version is missing.
+
+If there is no automatic repair for a long time, you need to use the `show 
proc "/cluster_balance"` statement to view the tablet repair and scheduling 
tasks currently being performed by the system. It may be because there are a 
large number of tablets waiting to be scheduled, which leads to a long repair 
time. You can follow the records in `pending_tablets` and `running_tablets`.
+
+Furthermore, you can use the `admin repair` statement to specify the priority 
to repair a table or partition. For details, please refer to `help admin 
repair`;
+
+If it still cannot be repaired, then in the case of multiple replicas, we use 
the `admin set replica status` command to force the replica to go offline. For 
details, please refer to the example of `help admin set replica status` to set 
the status of the replica to bad. (After set to bad, the replica will not be 
accessed again. And will be automatically repaired later. But before the 
operation, you should make sure that the other replicas are normal)
+
+### Q21. Not connected to 192.168.100.1:8060 yet, server_id=384
+
+We may encounter this error when loading or querying. If you go to the 
corresponding BE log to check, you may also find similar errors.
+
+This is an RPC error, and there are usually two possibilities: 1. The 
corresponding BE node is down. 2. rpc congestion or other errors.
+
+If the BE node is down, you need to check the specific reason for the 
downtime. Only the problem of rpc congestion is discussed here.
+
+One situation is OVERCROWDED, which means that a large amount of unsent data 
at the rpc client exceeds the threshold. BE has two parameters related to it:
+
+1. `brpc_socket_max_unwritten_bytes`: The default is 64MB. If the unwritten 
data exceeds this value, an error will be reported. You can modify this value 
appropriately to avoid OVERCROWDED errors. (But this cures the symptoms rather 
than the root cause, essentially congestion still occurs).
+2. `tablet_writer_ignore_eovercrowded`: The default is false. If set to true, 
Doris will ignore OVERCROWDED errors during the load process. This parameter is 
mainly used to avoid load failure and improve the stability of load.
+
+The second is that the packet size of rpc exceeds `max_body_size`. This 
problem may occur if the query contains a very large String type or a Bitmap 
type. It can be circumvented by modifying the following BE parameters:
+
+1. `brpc_max_body_size`: The default is 200MB, if necessary, it can be 
modified to 3GB (in bytes).
\ No newline at end of file
diff --git a/docs/zh-CN/faq/faq.md b/docs/zh-CN/faq/faq.md
index d835f08..b63004e 100644
--- a/docs/zh-CN/faq/faq.md
+++ b/docs/zh-CN/faq/faq.md
@@ -268,4 +268,56 @@ Doris支持一个BE节点配置多个存储路径。通常情况下,每块盘
 
 需要注意的是,Doris并不会自动感知存储路径所在磁盘的实际存储介质类型。这个类型需要用户在路径配置中显式的表示。比如路径 
"/path/to/data1.SSD" 即表示这个路径是SSD存储介质。而 "data1.SSD" 就是实际的目录名称。Doris是根据目录名称后面的 
".SSD" 
后缀来确定存储介质类型的,而不是实际的存储介质类型。也就是说,用户可以指定任意路径为SSD存储介质,而Doris仅识别目录后缀,不会去判断存储介质是否匹配。如果不写后缀,则默认为HDD。
 
-换句话说,".HDD" 和 ".SSD" 
只是用于标识存储目录“相对”的“低速”和“高速”之分,而并不是标识实际的存储介质类型。所以如果BE节点上的存储路径没有介质区别,则无需填写后缀。
\ No newline at end of file
+换句话说,".HDD" 和 ".SSD" 
只是用于标识存储目录“相对”的“低速”和“高速”之分,而并不是标识实际的存储介质类型。所以如果BE节点上的存储路径没有介质区别,则无需填写后缀。
+
+### Q19. `Lost connection to MySQL server at 'reading initial communication 
packet', system error: 0`
+
+如果使用 MySQL 客户端连接 Doris 时出现如下问题,这通常是因为编译 FE 时使用的 jdk 版本和运行 FE 时使用的 jdk 版本不同导致的。
+注意使用 docker 编译镜像编译时,默认的 JDK 版本是 openjdk 11,可以通过命令切换到 openjdk 8(详见编译文档)。
+
+### Q20. -214 错误
+
+在执行导入、查询等操作时,可能会遇到如下错误:
+
+```
+failed to initialize storage reader. 
tablet=63416.1050661139.aa4d304e7a7aff9c-f0fa7579928c85a0, res=-214, 
backend=192.168.100.10
+```
+
+-214 错误意味着对应 tablet 的数据版本缺失。比如如上错误,表示 tablet 63416 在 192.168.100.10 这个 BE 
上的副本的数据版本有缺失。(可能还有其他类似错误码,都可以用如下方式进行排查和修复)。
+
+通常情况下,如果你的数据是多副本的,那么系统会自动修复这些有问题的副本。可以通过以下步骤进行排查:
+
+首先通过 `show tablet 63416` 语句并执行结果中的 `show proc xxx` 语句来查看对应 tablet 
的各个副本情况。通常我们需要关心 `Version` 这一列的数据。
+
+正常情况下,一个 tablet 的多个副本的 Version 应该是相同的。并且和对应分区的 VisibleVersion 版本相同。
+
+你可以通过 `show partitions from tblx` 来查看对应的分区版本(tablet 对应的分区可以在 `show tablet` 
语句中获取。)
+
+同时,你也可以访问 `show proc` 语句中的 CompactionStatus 列中的 
URL(在浏览器打开即可)来查看更具体的版本信息,来检查具体丢失的是哪些版本。
+
+如果长时间没有自动修复,则需要通过 `show proc "/cluster_balance"` 语句,查看当前系统正在执行的 tablet 
修复和调度任务。可能是因为有大量的 tablet 在等待被调度,导致修复时间较长。可以关注 `pending_tablets` 和 
`running_tablets` 中的记录。
+
+更进一步的,可以通过 `admin repair` 语句来指定优先修复某个表或分区,具体可以参阅 `help admin repair`;
+
+如果依然无法修复,那么在多副本的情况下,我们使用 `admin set replica status` 命令强制将有问题的副本下线。具体可参阅 `help 
admin set replica status` 中将副本状态置为 bad 的示例。(置为 bad 
后,副本将不会再被访问。并且会后续自动修复。但在操作前,应先确保其他副本是正常的)
+
+### Q21. Not connected to 192.168.100.1:8060 yet, server_id=384
+
+在导入或者查询时,我们可能遇到这个错误。如果你去对应的 BE 日志中查看,也可能会找到类似错误。
+
+这是一个 RPC 错误,通常由两种可能:1. 对应的 BE 节点宕机。2. rpc 拥塞或其他错误。
+
+如果是 BE 节点宕机,则需要查看具体的宕机原因。这里只讨论 rpc 拥塞的问题。
+
+一种情况是 OVERCROWDED,即表示 rpc 源端有大量未发送的数据超过了阈值。BE 有两个参数与之相关:
+
+1. `brpc_socket_max_unwritten_bytes`:默认 64MB,如果未发送数据超过这个值,则会报错。可以适当修改这个值以避免 
OVERCROWDED 错误。(但这个治标不治本,本质上还是有拥塞发生)。
+2. `tablet_writer_ignore_eovercrowded`:默认为 false。如果设为true,则 Doris 会忽略导入过程中出现的 
OVERCROWDED 错误。这个参数主要为了避免导入失败,以提高导入的稳定性。
+
+第二种是 rpc 的包大小超过 max_body_size。如果查询中带有超大 String 类型,或者 bitmap 
类型时,可能出现这个问题。可以通过修改以下 BE 参数规避:
+
+1. `brpc_max_body_size`:默认200MB,如果有必要,可以修改为 3GB(单位字节)。
+
+
+
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
index c8804f9..8bfcb59 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
@@ -22,8 +22,10 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.NetUtils;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.system.Frontend;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
@@ -47,20 +49,20 @@ public class FrontendsProcNode implements ProcNodeInterface 
{
             
.add("ReplayedJournalId").add("LastHeartbeat").add("IsHelper").add("ErrMsg").add("Version")
             .add("CurrentConnected")
             .build();
-    
+
     public static final int HOSTNAME_INDEX = 2;
 
     private Catalog catalog;
-    
+
     public FrontendsProcNode(Catalog catalog) {
         this.catalog = catalog;
     }
-    
+
     @Override
     public ProcResult fetchResult() {
         BaseProcResult result = new BaseProcResult();
         result.setNames(TITLE_NAMES);
-        
+
         List<List<String>> infos = Lists.newArrayList();
 
         getFrontendsInfo(catalog, infos);
@@ -90,7 +92,12 @@ public class FrontendsProcNode implements ProcNodeInterface {
         List<Pair<String, Integer>> allFeHosts = convertToHostPortPair(allFe);
         List<Pair<String, Integer>> helperNodes = catalog.getHelperNodes();
 
-        Pair<String, Integer> selfNode = 
Catalog.getCurrentCatalog().getSelfNode();
+        // Because the `show frontend` stmt maybe forwarded from other FE.
+        // if we only get self node from currrent catalog, the 
"CurrentConnected" field will always points to Msater FE.
+        String selfNode = Catalog.getCurrentCatalog().getSelfNode().first;
+        if (ConnectContext.get() != null && 
!Strings.isNullOrEmpty(ConnectContext.get().getCurrentConnectedFEIp())) {
+            selfNode = ConnectContext.get().getCurrentConnectedFEIp();
+        }
 
         for (Frontend fe : catalog.getFrontends(null /* all */)) {
 
@@ -115,7 +122,7 @@ public class FrontendsProcNode implements ProcNodeInterface 
{
 
             info.add(Integer.toString(catalog.getClusterId()));
             info.add(String.valueOf(isJoin(allFeHosts, fe)));
-            
+
             if (fe.getHost().equals(catalog.getSelfNode().first)) {
                 info.add("true");
                 
info.add(Long.toString(catalog.getEditLog().getMaxJournalId()));
@@ -128,12 +135,12 @@ public class FrontendsProcNode implements 
ProcNodeInterface {
             info.add(fe.getHeartbeatErrMsg());
             info.add(fe.getVersion());
             // To indicate which FE we currently connected
-            info.add(fe.getHost().equals(selfNode.first) ? "Yes" : "No");
+            info.add(fe.getHost().equals(selfNode) ? "Yes" : "No");
 
             infos.add(info);
         }
     }
-    
+
     private static boolean isHelperNode(List<Pair<String, Integer>> 
helperNodes, Frontend fe) {
         return helperNodes.stream().anyMatch(p -> p.first.equals(fe.getHost()) 
&& p.second == fe.getEditLogPort());
     }
@@ -146,7 +153,7 @@ public class FrontendsProcNode implements ProcNodeInterface 
{
         }
         return false;
     }
-    
+
     private static List<Pair<String, Integer>> 
convertToHostPortPair(List<InetSocketAddress> addrs) {
         List<Pair<String, Integer>> hostPortPair = Lists.newArrayList();
         for (InetSocketAddress addr : addrs) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 5d5ce3f..4290dd1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -130,6 +130,9 @@ public class ConnectContext {
 
     private String sqlHash;
 
+    // The FE ip current connected
+    private String currentConnectedFEIp = "";
+
     public static ConnectContext get() {
         return threadLocalInfo.get();
     }
@@ -182,12 +185,15 @@ public class ConnectContext {
     public boolean isTxnModel() {
         return txnEntry != null && txnEntry.isTxnModel();
     }
+
     public boolean isTxnIniting() {
         return txnEntry != null && txnEntry.isTxnIniting();
     }
+
     public boolean isTxnBegin() {
         return txnEntry != null && txnEntry.isTxnBegin();
     }
+
     public void closeTxn() {
         if (isTxnModel()) {
             if (isTxnBegin()) {
@@ -275,11 +281,17 @@ public class ConnectContext {
         this.qualifiedUser = qualifiedUser;
     }
 
-    public boolean getIsTempUser() { return isTempUser;}
+    public boolean getIsTempUser() {
+        return isTempUser;
+    }
 
-    public void setIsTempUser(boolean isTempUser) { this.isTempUser = 
isTempUser;}
+    public void setIsTempUser(boolean isTempUser) {
+        this.isTempUser = isTempUser;
+    }
 
-    public PaloRole getLdapGroupsPrivs() { return ldapGroupsPrivs; }
+    public PaloRole getLdapGroupsPrivs() {
+        return ldapGroupsPrivs;
+    }
 
     public void setLdapGroupsPrivs(PaloRole ldapGroupsPrivs) {
         this.ldapGroupsPrivs = ldapGroupsPrivs;
@@ -434,7 +446,7 @@ public class ConnectContext {
     // kill operation with no protect.
     public void kill(boolean killConnection) {
         LOG.warn("kill timeout query, {}, kill connection: {}",
-                 getMysqlChannel().getRemoteHostPortString(), killConnection);
+                getMysqlChannel().getRemoteHostPortString(), killConnection);
 
         if (killConnection) {
             isKilled = true;
@@ -460,7 +472,7 @@ public class ConnectContext {
             if (delta > sessionVariable.getWaitTimeoutS() * 1000) {
                 // Need kill this connection.
                 LOG.warn("kill wait timeout connection, remote: {}, wait 
timeout: {}",
-                         getMysqlChannel().getRemoteHostPortString(), 
sessionVariable.getWaitTimeoutS());
+                        getMysqlChannel().getRemoteHostPortString(), 
sessionVariable.getWaitTimeoutS());
 
                 killFlag = true;
                 killConnection = true;
@@ -468,7 +480,7 @@ public class ConnectContext {
         } else {
             if (delta > sessionVariable.getQueryTimeoutS() * 1000) {
                 LOG.warn("kill query timeout, remote: {}, query timeout: {}",
-                         getMysqlChannel().getRemoteHostPortString(), 
sessionVariable.getQueryTimeoutS());
+                        getMysqlChannel().getRemoteHostPortString(), 
sessionVariable.getQueryTimeoutS());
 
                 // Only kill
                 killFlag = true;
@@ -500,6 +512,14 @@ public class ConnectContext {
         this.isResourceTagsSet = !this.resourceTags.isEmpty();
     }
 
+    public void setCurrentConnectedFEIp(String ip) {
+        this.currentConnectedFEIp = ip;
+    }
+
+    public String getCurrentConnectedFEIp() {
+        return currentConnectedFEIp;
+    }
+
     public class ThreadInfo {
         public List<String> toRow(long nowMs) {
             List<String> row = Lists.newArrayList();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 01df0c0..7df6e1e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -113,16 +113,16 @@ public class ConnectProcessor {
         // slow query
         long endTime = System.currentTimeMillis();
         long elapseMs = endTime - ctx.getStartTime();
-        
+
         ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY)
-            .setState(ctx.getState().toString()).setQueryTime(elapseMs)
-            .setScanBytes(statistics == null ? 0 : statistics.getScanBytes())
-            .setScanRows(statistics == null ? 0 : statistics.getScanRows())
-            .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
-            .setPeakMemoryBytes(statistics == null ? 0 : 
statistics.getMaxPeakMemoryBytes())
-            .setReturnRows(ctx.getReturnRows())
-            .setStmtId(ctx.getStmtId())
-            .setQueryId(ctx.queryId() == null ? "NaN" : 
DebugUtil.printId(ctx.queryId()));
+                .setState(ctx.getState().toString()).setQueryTime(elapseMs)
+                .setScanBytes(statistics == null ? 0 : 
statistics.getScanBytes())
+                .setScanRows(statistics == null ? 0 : statistics.getScanRows())
+                .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
+                .setPeakMemoryBytes(statistics == null ? 0 : 
statistics.getMaxPeakMemoryBytes())
+                .setReturnRows(ctx.getReturnRows())
+                .setStmtId(ctx.getStmtId())
+                .setQueryId(ctx.queryId() == null ? "NaN" : 
DebugUtil.printId(ctx.queryId()));
 
         if (ctx.getState().isQuery()) {
             MetricRepo.COUNTER_QUERY_ALL.increase(1L);
@@ -143,14 +143,14 @@ public class ConnectProcessor {
         } else {
             ctx.getAuditEventBuilder().setIsQuery(false);
         }
-        
+
         
ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress());
-        
+
         // We put origin query stmt at the end of audit log, for parsing the 
log more convenient.
         if (!ctx.getState().isQuery() && (parsedStmt != null && 
parsedStmt.needAuditEncryption())) {
             ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql());
         } else {
-            if (parsedStmt instanceof InsertStmt && 
((InsertStmt)parsedStmt).isValuesOrConstantSelect()) {
+            if (parsedStmt instanceof InsertStmt && ((InsertStmt) 
parsedStmt).isValuesOrConstantSelect()) {
                 // INSERT INTO VALUES may be very long, so we only log at most 
1K bytes.
                 int length = Math.min(1024, origStmt.length());
                 ctx.getAuditEventBuilder().setStmt(origStmt.substring(0, 
length));
@@ -158,7 +158,7 @@ public class ConnectProcessor {
                 ctx.getAuditEventBuilder().setStmt(origStmt);
             }
         }
-        
+
         
Catalog.getCurrentAuditEventProcessor().handleAuditEvent(ctx.getAuditEventBuilder().build());
     }
 
@@ -192,11 +192,11 @@ public class ConnectProcessor {
         }
         ctx.getAuditEventBuilder().reset();
         ctx.getAuditEventBuilder()
-            .setTimestamp(System.currentTimeMillis())
-            .setClientIp(ctx.getMysqlChannel().getRemoteHostPortString())
-            .setUser(ctx.getQualifiedUser())
-            .setDb(ctx.getDatabase())
-            .setSqlHash(ctx.getSqlHash());
+                .setTimestamp(System.currentTimeMillis())
+                .setClientIp(ctx.getMysqlChannel().getRemoteHostPortString())
+                .setUser(ctx.getQualifiedUser())
+                .setDb(ctx.getDatabase())
+                .setSqlHash(ctx.getSqlHash());
 
         // execute this query.
         StatementBase parsedStmt = null;
@@ -483,8 +483,7 @@ public class ConnectProcessor {
             // return error directly.
             TMasterOpResult result = new TMasterOpResult();
             ctx.getState().setError(ErrorCode.ERR_ACCESS_DENIED_ERROR, 
"Missing current user identity. You need to upgrade this Frontend " +
-                    "to the " +
-                    "same version as Master Frontend.");
+                    "to the same version as Master Frontend.");
             
result.setMaxJournalId(Catalog.getCurrentCatalog().getMaxJournalId().longValue());
             result.setPacket(getResultPacket());
             return result;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 1309016..615454e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -325,6 +325,14 @@ public class StmtExecutor implements ProfileWriter {
                 // analyze this query
                 analyze(context.getSessionVariable().toThrift());
                 if (isForwardToMaster()) {
+                    if (isProxy) {
+                        // This is already a stmt forwarded from other FE.
+                        // If goes here, which means we can't find a valid 
Master FE(some error happens).
+                        // To avoid endless forward, throw exception here.
+                        throw new UserException("The statement has been 
forwarded to master FE("
+                                + 
Catalog.getCurrentCatalog().getSelfNode().first + ") and failed to execute" +
+                                " because Master FE is not ready. You may need 
to check FE's status");
+                    }
                     forwardToMaster();
                     if (masterOpExecutor != null && 
masterOpExecutor.getQueryId() != null) {
                         context.setQueryId(masterOpExecutor.getQueryId());
@@ -342,7 +350,7 @@ public class StmtExecutor implements ProfileWriter {
                 context.getState().setIsQuery(true);
                 MetricRepo.COUNTER_QUERY_BEGIN.increase(1L);
                 int retryTime = Config.max_query_retry_time;
-                for (int i = 0; i < retryTime; i ++) {
+                for (int i = 0; i < retryTime; i++) {
                     try {
                         //reset query id for each retry
                         if (i > 0) {
@@ -506,7 +514,7 @@ public class StmtExecutor implements ProfileWriter {
         if (isForwardToMaster()) {
             return;
         }
-        
+
         analyzer = new Analyzer(context.getCatalog(), context);
         // Convert show statement to select statement here
         if (parsedStmt instanceof ShowStmt) {
@@ -584,7 +592,7 @@ public class StmtExecutor implements ProfileWriter {
                 LOG.info("analysis exception happened when parsing stmt {}, 
id: {}, error: {}",
                         originStmt, context.getStmtId(), syntaxError, e);
                 if (syntaxError == null) {
-                    throw  e;
+                    throw e;
                 } else {
                     throw new AnalysisException(syntaxError, e);
                 }
@@ -626,7 +634,7 @@ public class StmtExecutor implements ProfileWriter {
                 // types and column labels to restore them after the rewritten 
stmt has been
                 // reset() and re-analyzed.
                 List<Type> origResultTypes = Lists.newArrayList();
-                for (Expr e: parsedStmt.getResultExprs()) {
+                for (Expr e : parsedStmt.getResultExprs()) {
                     origResultTypes.add(e.getType());
                 }
                 List<String> origColLabels =
@@ -691,7 +699,7 @@ public class StmtExecutor implements ProfileWriter {
             // Only user itself and user with admin priv can kill connection
             if 
(!killCtx.getQualifiedUser().equals(ConnectContext.get().getQualifiedUser())
                     && 
!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(),
-                                                                              
PrivPredicate.ADMIN)) {
+                    PrivPredicate.ADMIN)) {
                 
ErrorReport.reportDdlException(ErrorCode.ERR_KILL_DENIED_ERROR, id);
             }
 
@@ -808,7 +816,7 @@ public class StmtExecutor implements ProfileWriter {
                 break;
             }
         }
-        
+
         if (cacheResult != null && cacheAnalyzer.getHitRange() == 
Cache.HitRange.Right) {
             isSendFields = sendCachedValues(channel, 
cacheResult.getValuesList(), newSelectStmt, isSendFields, false);
         }
@@ -855,17 +863,17 @@ public class StmtExecutor implements ProfileWriter {
         QueryStmt queryStmt = (QueryStmt) parsedStmt;
 
         QueryDetail queryDetail = new QueryDetail(context.getStartTime(),
-                                                  
DebugUtil.printId(context.queryId()),
-                                                  context.getStartTime(), -1, 
-1,
-                                                  
QueryDetail.QueryMemState.RUNNING,
-                                                  context.getDatabase(),
-                                                  originStmt.originStmt);
+                DebugUtil.printId(context.queryId()),
+                context.getStartTime(), -1, -1,
+                QueryDetail.QueryMemState.RUNNING,
+                context.getDatabase(),
+                originStmt.originStmt);
         context.setQueryDetail(queryDetail);
         QueryDetailQueue.addOrUpdateQueryDetail(queryDetail);
 
         // handle selects that fe can do without be, so we can make sql tools 
happy, especially the setup step.
         if (parsedStmt instanceof SelectStmt && ((SelectStmt) 
parsedStmt).getTableRefs().isEmpty()
-                    && 
Catalog.getCurrentSystemInfo().getBackendIds(true).isEmpty() ) {
+                && 
Catalog.getCurrentSystemInfo().getBackendIds(true).isEmpty()) {
             SelectStmt parsedSelectStmt = (SelectStmt) parsedStmt;
             if (handleSelectRequestInFe(parsedSelectStmt)) {
                 return;
@@ -1413,6 +1421,7 @@ public class StmtExecutor implements ProfileWriter {
 
         context.getState().setEof();
     }
+
     // Process show statement
     private void handleShow() throws IOException, AnalysisException, 
DdlException {
         ShowExecutor executor = new ShowExecutor(context, (ShowStmt) 
parsedStmt);
@@ -1431,8 +1440,10 @@ public class StmtExecutor implements ProfileWriter {
 
     private void handleUnlockTablesStmt() {
     }
+
     private void handleLockTablesStmt() {
     }
+
     private void handleExplainStmt(String result) throws IOException {
         ShowResultSetMetaData metaData =
                 ShowResultSetMetaData.builder()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 81955de..ac8ee57 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -73,8 +73,8 @@ import org.apache.doris.thrift.TGetDbsResult;
 import org.apache.doris.thrift.TGetTablesParams;
 import org.apache.doris.thrift.TGetTablesResult;
 import org.apache.doris.thrift.TIsMethodSupportedRequest;
-import org.apache.doris.thrift.TListTableStatusResult;
 import org.apache.doris.thrift.TListPrivilegesResult;
+import org.apache.doris.thrift.TListTableStatusResult;
 import org.apache.doris.thrift.TLoadCheckRequest;
 import org.apache.doris.thrift.TLoadTxnBeginRequest;
 import org.apache.doris.thrift.TLoadTxnBeginResult;
@@ -90,6 +90,7 @@ import org.apache.doris.thrift.TMiniLoadBeginResult;
 import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
 import org.apache.doris.thrift.TMiniLoadRequest;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPrivilegeStatus;
 import org.apache.doris.thrift.TReportExecStatusParams;
 import org.apache.doris.thrift.TReportExecStatusResult;
 import org.apache.doris.thrift.TReportRequest;
@@ -101,7 +102,6 @@ import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStreamLoadPutRequest;
 import org.apache.doris.thrift.TStreamLoadPutResult;
 import org.apache.doris.thrift.TTableStatus;
-import org.apache.doris.thrift.TPrivilegeStatus;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.thrift.TUpdateExportTaskStatusRequest;
 import org.apache.doris.thrift.TUpdateMiniEtlTaskStatusRequest;
@@ -154,7 +154,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         if (params.isSetPattern()) {
             try {
                 matcher = 
PatternMatcher.createMysqlPattern(params.getPattern(),
-                                                            
CaseSensibility.DATABASE.getCaseSensibility());
+                        CaseSensibility.DATABASE.getCaseSensibility());
             } catch (AnalysisException e) {
                 throw new TException("Pattern is in bad format: " + 
params.getPattern());
             }
@@ -163,7 +163,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         Catalog catalog = Catalog.getCurrentCatalog();
         List<String> dbNames = catalog.getDbNames();
         LOG.debug("get db names: {}", dbNames);
-        
+
         UserIdentity currentUser = null;
         if (params.isSetCurrentUserIdent()) {
             currentUser = UserIdentity.fromThrift(params.current_user_ident);
@@ -196,7 +196,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         if (params.isSetPattern()) {
             try {
                 matcher = 
PatternMatcher.createMysqlPattern(params.getPattern(),
-                                                            
CaseSensibility.TABLE.getCaseSensibility());
+                        CaseSensibility.TABLE.getCaseSensibility());
             } catch (AnalysisException e) {
                 throw new TException("Pattern is in bad format: " + 
params.getPattern());
             }
@@ -215,7 +215,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             for (String tableName : db.getTableNamesWithLock()) {
                 LOG.debug("get table: {}, wait to check", tableName);
                 if 
(!Catalog.getCurrentCatalog().getAuth().checkTblPriv(currentUser, params.db,
-                                                                        
tableName, PrivPredicate.SHOW)) {
+                        tableName, PrivPredicate.SHOW)) {
                     continue;
                 }
 
@@ -238,7 +238,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         if (params.isSetPattern()) {
             try {
                 matcher = 
PatternMatcher.createMysqlPattern(params.getPattern(),
-                                                            
CaseSensibility.TABLE.getCaseSensibility());
+                        CaseSensibility.TABLE.getCaseSensibility());
             } catch (AnalysisException e) {
                 throw new TException("Pattern is in bad format " + 
params.getPattern());
             }
@@ -288,7 +288,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                     status.setComment(table.getComment());
                     status.setCreateTime(table.getCreateTime());
                     status.setLastCheckTime(table.getLastCheckTime());
-                    status.setUpdateTime(table.getUpdateTime()/1000);
+                    status.setUpdateTime(table.getUpdateTime() / 1000);
                     status.setCheckTime(table.getLastCheckTime());
                     status.setCollation("utf-8");
                     status.setRows(table.getRowCount());
@@ -404,7 +404,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                         desc.setIsAllowNull(column.isAllowNull());
                         final TColumnDef colDef = new TColumnDef(desc);
                         final String comment = column.getComment();
-                        if(comment != null) {
+                        if (comment != null) {
                             colDef.setComment(comment);
                         }
                         columns.add(colDef);
@@ -428,7 +428,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             return result;
         }
         List<List<String>> rows = 
VariableMgr.dump(SetType.fromThrift(params.getVarType()), 
ctx.getSessionVariable(),
-                                                   null);
+                null);
         for (List<String> row : rows) {
             map.put(row.get(0), row.get(1));
         }
@@ -517,7 +517,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                 .setState(TStatusCode.OK.name())
                 .setQueryTime(0)
                 .setStmt(stmt).build();
-        
+
         Catalog.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
     }
 
@@ -599,7 +599,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
     @Override
     public TMiniLoadBeginResult miniLoadBegin(TMiniLoadBeginRequest request) 
throws TException {
         LOG.debug("receive mini load begin request. label: {}, user: {}, ip: 
{}",
-                 request.getLabel(), request.getUser(), request.getUserIp());
+                request.getLabel(), request.getUser(), request.getUserIp());
 
         TMiniLoadBeginResult result = new TMiniLoadBeginResult();
         TStatus status = new TStatus(TStatusCode.OK);
@@ -611,7 +611,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             }
             // step1: check password and privs
             checkPasswordAndPrivs(cluster, request.getUser(), 
request.getPasswd(), request.getDb(),
-                                  request.getTbl(), request.getUserIp(), 
PrivPredicate.LOAD);
+                    request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
             // step2: check label and record metadata in load manager
             if (request.isSetSubLabel()) {
                 // TODO(ml): multi mini load
@@ -636,7 +636,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
     public TFeResult isMethodSupported(TIsMethodSupportedRequest request) 
throws TException {
         TStatus status = new TStatus(TStatusCode.OK);
         TFeResult result = new TFeResult(FrontendServiceVersion.V1, status);
-        switch (request.getFunctionName()){
+        switch (request.getFunctionName()) {
             case "STREAMING_MINI_LOAD":
                 break;
             default:
@@ -660,6 +660,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         // add this log so that we can track this stmt
         LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), 
clientAddr.getHostname());
         ConnectContext context = new ConnectContext(null);
+        // Set current connected FE to the client address, so that we can know 
where this request come from.
+        context.setCurrentConnectedFEIp(clientAddr.getHostname());
         ConnectProcessor processor = new ConnectProcessor(context);
         TMasterOpResult result = processor.proxyExecute(params);
         ConnectContext.remove();
@@ -700,7 +702,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
     @Override
     public TFeResult loadCheck(TLoadCheckRequest request) throws TException {
         LOG.debug("receive load check request. label: {}, user: {}, ip: {}",
-                 request.getLabel(), request.getUser(), request.getUserIp());
+                request.getLabel(), request.getUser(), request.getUserIp());
 
         TStatus status = new TStatus(TStatusCode.OK);
         TFeResult result = new TFeResult(FrontendServiceVersion.V1, status);
@@ -711,7 +713,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             }
 
             checkPasswordAndPrivs(cluster, request.getUser(), 
request.getPasswd(), request.getDb(),
-                                  request.getTbl(), request.getUserIp(), 
PrivPredicate.LOAD);
+                    request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
         } catch (UserException e) {
             status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
             status.addToErrorMsgs(e.getMessage());
@@ -865,9 +867,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         long timeoutMs = request.isSetThriftRpcTimeoutMs() ? 
request.getThriftRpcTimeoutMs() / 2 : 5000;
         Table table = db.getTableOrMetaException(request.getTbl(), 
TableType.OLAP);
         boolean ret = 
Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
-                        db, Lists.newArrayList(table), request.getTxnId(),
-                        TabletCommitInfo.fromThrift(request.getCommitInfos()),
-                        timeoutMs, 
TxnCommitAttachment.fromThrift(request.txnCommitAttachment));
+                db, Lists.newArrayList(table), request.getTxnId(),
+                TabletCommitInfo.fromThrift(request.getCommitInfos()),
+                timeoutMs, 
TxnCommitAttachment.fromThrift(request.txnCommitAttachment));
         if (ret) {
             // if commit and publish is success, load can be regarded as 
success
             MetricRepo.COUNTER_LOAD_FINISHED.increase(1L);
@@ -924,8 +926,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         }
         long dbId = db.getId();
         Catalog.getCurrentGlobalTransactionMgr().abortTransaction(dbId, 
request.getTxnId(),
-                                                                  
request.isSetReason() ? request.getReason() : "system cancel",
-                                                                  
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+                request.isSetReason() ? request.getReason() : "system cancel",
+                
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
     }
 
     @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to