This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 82e5fd74896 [fix](query-forward) Fix forward query exception or stuck 
or potential query result loss (#41303) (#42368)
82e5fd74896 is described below

commit 82e5fd7489637f1afb99496bf174882cd5d69c85
Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com>
AuthorDate: Thu Oct 24 13:05:18 2024 +0800

    [fix](query-forward) Fix forward query exception or stuck or potential 
query result loss (#41303) (#42368)
    
    ## Proposed changes
    
    1. Fix forward query exception if no status code is set in master
    execution. EOF may result in this status.
    
    2. Fix forward query stuck due to no result packet sent to mysql
    channel. Should use result packets from master.
    
    3. Fix potential forward query result loss if follower can read status
    change during query process. Should judge by the status once before
    execution.
    
    4. Add assertion for regression test.
---
 .../main/java/org/apache/doris/common/Config.java    |  5 +++++
 .../java/org/apache/doris/qe/ConnectProcessor.java   | 14 ++++++++------
 .../main/java/org/apache/doris/qe/StmtExecutor.java  | 20 ++++++++++++--------
 3 files changed, 25 insertions(+), 14 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 8df13b1df6d..afa39be2f06 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2471,4 +2471,9 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, description = {
             "设置为 true,root 和 admin 将跳过 sql block rule", "Set to true, root and 
admin will skip SQL block rule"})
     public static boolean sql_block_rule_ignore_admin = false;
+
+    @ConfField(description = {"用于测试,强制将所有的查询forward到master以验证forward query的行为",
+            "For testing purposes, all queries are forcibly forwarded to the 
master to verify"
+                    + "the behavior of forwarding queries."})
+    public static boolean force_forward_all_queries = false;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 5fb25f4efdc..9fbe6eadc00 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -581,11 +581,8 @@ public class ConnectProcessor {
                 && ctx.getState().getStateType() != 
QueryState.MysqlStateType.ERR) {
             ShowResultSet resultSet = executor.getShowResultSet();
             if (resultSet == null) {
-                if (executor.sendProxyQueryResult()) {
-                    packet = getResultPacket();
-                } else {
-                    packet = executor.getOutputPacket();
-                }
+                executor.sendProxyQueryResult();
+                packet = executor.getOutputPacket();
             } else {
                 executor.sendResultSet(resultSet);
                 packet = getResultPacket();
@@ -761,7 +758,12 @@ public class ConnectProcessor {
         if (ctx.getState().getStateType() == MysqlStateType.OK) {
             result.setStatusCode(0);
         } else {
-            result.setStatusCode(ctx.getState().getErrorCode().getCode());
+            ErrorCode errorCode = ctx.getState().getErrorCode();
+            if (errorCode != null) {
+                result.setStatusCode(errorCode.getCode());
+            } else {
+                result.setStatusCode(ErrorCode.ERR_UNKNOWN_ERROR.getCode());
+            }
             result.setErrMessage(ctx.getState().getErrorMessage());
         }
         if (executor != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 02b0872fe6b..e4cb541744c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -253,6 +253,7 @@ public class StmtExecutor {
     private boolean isExecuteStmt = false;
     // The profile of this execution
     private final Profile profile;
+    private Boolean isForwardedToMaster = null;
 
     // The result schema if "dry_run_query" is true.
     // Only one column to indicate the real return row numbers.
@@ -372,13 +373,20 @@ public class StmtExecutor {
     }
 
     public boolean isForwardToMaster() {
+        if (isForwardedToMaster == null) {
+            isForwardedToMaster = shouldForwardToMaster();
+        }
+        return isForwardedToMaster;
+    }
+
+    private boolean shouldForwardToMaster() {
         if (Env.getCurrentEnv().isMaster()) {
             return false;
         }
 
         // this is a query stmt, but this non-master FE can not read, forward 
it to master
         if (isQuery() && !Env.getCurrentEnv().isMaster()
-                && (!Env.getCurrentEnv().canRead() || 
debugForwardAllQueries())) {
+                && (!Env.getCurrentEnv().canRead() || debugForwardAllQueries() 
|| Config.force_forward_all_queries)) {
             return true;
         }
 
@@ -391,7 +399,7 @@ public class StmtExecutor {
 
     private boolean debugForwardAllQueries() {
         DebugPoint debugPoint = 
DebugPointUtil.getDebugPoint("StmtExecutor.forward_all_queries");
-        return debugPoint != null && debugPoint.param("forwardAllQueries", 
true);
+        return debugPoint != null && debugPoint.param("forwardAllQueries", 
false);
     }
 
     public ByteBuffer getOutputPacket() {
@@ -2905,18 +2913,14 @@ public class StmtExecutor {
         return ((ProxyMysqlChannel) 
context.getMysqlChannel()).getProxyResultBufferList();
     }
 
-    public boolean sendProxyQueryResult() throws IOException {
+    public void sendProxyQueryResult() throws IOException {
         if (masterOpExecutor == null) {
-            return false;
+            return;
         }
         List<ByteBuffer> queryResultBufList = 
masterOpExecutor.getQueryResultBufList();
-        if (queryResultBufList.isEmpty()) {
-            return false;
-        }
         for (ByteBuffer byteBuffer : queryResultBufList) {
             context.getMysqlChannel().sendOnePacket(byteBuffer);
         }
-        return true;
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to