This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 85674814ebf [fix](query-forward) Fix forward query exception or stuck 
or potential query result loss (#41303) (#42369)
85674814ebf is described below

commit 85674814ebf0349bd550b8cc2182667a7934c494
Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com>
AuthorDate: Mon Oct 28 17:39:57 2024 +0800

    [fix](query-forward) Fix forward query exception or stuck or potential 
query result loss (#41303) (#42369)
    
    ## Proposed changes
    
    1. Fix forward query exception if no status code is set in master
    execution. EOF may result in this status.
    
    2. Fix forward query stuck due to no result packet sent to mysql
    channel. Should use result packets from master.
    
    3. Fix potential forward query result loss if follower can read status
    change during query process. Should judge by the status once before
    execution.
    
    4. Add assertion for regression test.
---
 .../main/java/org/apache/doris/common/Config.java    |  5 +++++
 .../java/org/apache/doris/qe/ConnectProcessor.java   | 14 ++++++++------
 .../main/java/org/apache/doris/qe/StmtExecutor.java  | 20 ++++++++++++--------
 .../suites/query_p0/test_forward_qeury.groovy        |  5 +++--
 4 files changed, 28 insertions(+), 16 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index fe0d28aee4f..cac5355a970 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2835,4 +2835,9 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, description = {
             "设置为 true,如果查询无法选择到健康副本时,会打印出该tablet所有副本的详细信息,"})
     public static boolean sql_block_rule_ignore_admin = false;
+
+    @ConfField(description = {"用于测试,强制将所有的查询forward到master以验证forward query的行为",
+            "For testing purposes, all queries are forcibly forwarded to the 
master to verify"
+                    + "the behavior of forwarding queries."})
+    public static boolean force_forward_all_queries = false;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 487a2910770..2a85af3a92f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -586,11 +586,8 @@ public abstract class ConnectProcessor {
                 && ctx.getState().getStateType() != 
QueryState.MysqlStateType.ERR) {
             ShowResultSet resultSet = executor.getShowResultSet();
             if (resultSet == null) {
-                if (executor.sendProxyQueryResult()) {
-                    packet = getResultPacket();
-                } else {
-                    packet = executor.getOutputPacket();
-                }
+                executor.sendProxyQueryResult();
+                packet = executor.getOutputPacket();
             } else {
                 executor.sendResultSet(resultSet);
                 packet = getResultPacket();
@@ -742,7 +739,12 @@ public abstract class ConnectProcessor {
         if (ctx.getState().getStateType() == MysqlStateType.OK) {
             result.setStatusCode(0);
         } else {
-            result.setStatusCode(ctx.getState().getErrorCode().getCode());
+            ErrorCode errorCode = ctx.getState().getErrorCode();
+            if (errorCode != null) {
+                result.setStatusCode(errorCode.getCode());
+            } else {
+                result.setStatusCode(ErrorCode.ERR_UNKNOWN_ERROR.getCode());
+            }
             result.setErrMessage(ctx.getState().getErrorMessage());
         }
         if (executor != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 83a6a0afb8a..4cab4220eef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -274,6 +274,7 @@ public class StmtExecutor {
     private boolean isHandleQueryInFe = false;
     // The profile of this execution
     private final Profile profile;
+    private Boolean isForwardedToMaster = null;
 
     private ExecuteStmt execStmt;
     PrepareStmtContext preparedStmtCtx = null;
@@ -405,6 +406,13 @@ public class StmtExecutor {
     }
 
     public boolean isForwardToMaster() {
+        if (isForwardedToMaster == null) {
+            isForwardedToMaster = shouldForwardToMaster();
+        }
+        return isForwardedToMaster;
+    }
+
+    private boolean shouldForwardToMaster() {
         if (Env.getCurrentEnv().isMaster()) {
             return false;
         }
@@ -415,7 +423,7 @@ public class StmtExecutor {
 
         // this is a query stmt, but this non-master FE can not read, forward 
it to master
         if (isQuery() && !Env.getCurrentEnv().isMaster()
-                && (!Env.getCurrentEnv().canRead() || 
debugForwardAllQueries())) {
+                && (!Env.getCurrentEnv().canRead() || debugForwardAllQueries() 
|| Config.force_forward_all_queries)) {
             return true;
         }
 
@@ -428,7 +436,7 @@ public class StmtExecutor {
 
     private boolean debugForwardAllQueries() {
         DebugPoint debugPoint = 
DebugPointUtil.getDebugPoint("StmtExecutor.forward_all_queries");
-        return debugPoint != null && debugPoint.param("forwardAllQueries", 
true);
+        return debugPoint != null && debugPoint.param("forwardAllQueries", 
false);
     }
 
     public ByteBuffer getOutputPacket() {
@@ -3455,17 +3463,13 @@ public class StmtExecutor {
         return ((ProxyMysqlChannel) 
context.getMysqlChannel()).getProxyResultBufferList();
     }
 
-    public boolean sendProxyQueryResult() throws IOException {
+    public void sendProxyQueryResult() throws IOException {
         if (masterOpExecutor == null) {
-            return false;
+            return;
         }
         List<ByteBuffer> queryResultBufList = 
masterOpExecutor.getQueryResultBufList();
-        if (queryResultBufList.isEmpty()) {
-            return false;
-        }
         for (ByteBuffer byteBuffer : queryResultBufList) {
             context.getMysqlChannel().sendOnePacket(byteBuffer);
         }
-        return true;
     }
 }
diff --git a/regression-test/suites/query_p0/test_forward_qeury.groovy 
b/regression-test/suites/query_p0/test_forward_qeury.groovy
index 98697ae94d4..563c2ded4aa 100644
--- a/regression-test/suites/query_p0/test_forward_qeury.groovy
+++ b/regression-test/suites/query_p0/test_forward_qeury.groovy
@@ -41,8 +41,9 @@ suite("test_forward_query") {
 
         sql """ INSERT INTO ${tbl} VALUES(1);"""
 
-        cluster.injectDebugPoints(NodeType.FE, 
['StmtExecutor.forward_all_queries' : [forwardAllQueries:true]])
+        cluster.injectDebugPoints(NodeType.FE, 
['StmtExecutor.forward_all_queries' : [forwardAllQueries:true, execute:1]])
 
-        sql """ SELECT * FROM ${tbl} """
+        def ret = sql """ SELECT * FROM ${tbl} """
+        assertEquals(ret[0][0], 1)
     }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to