This is an automated email from the ASF dual-hosted git repository.

lijibing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a59633edd8 [fix](mysql protocol)Set more stmt exists flag correctly 
when forward to master. (#55711)
7a59633edd8 is described below

commit 7a59633edd8644a520b8741c7c325d3327be537c
Author: James <[email protected]>
AuthorDate: Tue Sep 9 09:50:35 2025 +0800

    [fix](mysql protocol)Set more stmt exists flag correctly when forward to 
master. (#55711)
    
    When execute multiple statements in one batch and
    CLIENT_MULTI_STATEMENTS is set, Doris server need to set
    SERVER_MORE_RESULTS_EXISTS flag in the return packet before the last
    statement. But when the Observer forward stmt to Master, this
    SERVER_MORE_RESULTS_EXISTS is not set, cause the following statements
    failed to execute.
    This pr forward a boolean value to Master, so the Master FE knows it is
    the last statement or not, and could set SERVER_MORE_RESULTS_EXISTS
    correctly.
---
 .../java/org/apache/doris/qe/ConnectProcessor.java | 12 +++++++
 .../java/org/apache/doris/qe/FEOpExecutor.java     |  5 +++
 .../java/org/apache/doris/qe/StmtExecutor.java     | 12 +++++++
 gensrc/thrift/FrontendService.thrift               |  1 +
 .../suites/query_p0/test_multiple_stmt.groovy      | 38 ++++++++++++++++++++++
 5 files changed, 68 insertions(+)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 55b8113379e..43bcf61c921 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -348,6 +348,15 @@ public abstract class ConnectProcessor {
                 
executor.getProfile().getSummaryProfile().setParseSqlStartTime(parseSqlStartTime);
                 
executor.getProfile().getSummaryProfile().setParseSqlFinishTime(parseSqlFinishTime);
                 
executor.getProfile().getSummaryProfile().parsedByConnectionProcess = true;
+                // Here we set the MoreStmtExists flag without considering 
CLIENT_MULTI_STATEMENTS.
+                // So the master will always set SERVER_MORE_RESULTS_EXISTS 
when the statement is not the last one.
+                // When the Follower/Observer received the return result of 
Master, the Follower/Observer
+                // will check CLIENT_MULTI_STATEMENTS is set or not. It sends 
SERVER_MORE_RESULTS_EXISTS back to client
+                // only when CLIENT_MULTI_STATEMENTS is set.
+                // See the code below : if 
(getConnectContext().getMysqlChannel().clientMultiStatements())
+                if (i != stmts.size() - 1 && 
connectType.equals(ConnectType.MYSQL)) {
+                    executor.setMoreStmtExists(true);
+                }
                 ctx.setExecutor(executor);
 
                 if (cacheKeyType != null) {
@@ -706,6 +715,9 @@ public abstract class ConnectProcessor {
             result.setQueryId(ctx.queryId());
         }
         result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
+        if (request.moreResultExists) {
+            ctx.getState().serverStatus |= 
MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS;
+        }
         result.setPacket(getResultPacket());
         result.setStatus(ctx.getState().toString());
         if (ctx.getState().getStateType() == MysqlStateType.OK) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
index 7c87907d619..9dcd379f913 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
@@ -63,6 +63,7 @@ public class FEOpExecutor {
     protected int thriftTimeoutMs;
 
     protected boolean shouldNotRetry;
+    protected boolean moreStmtExists = false;
 
     public FEOpExecutor(TNetworkAddress feAddress, OriginStatement originStmt, 
ConnectContext ctx, boolean isQuery) {
         this.feAddr = feAddress;
@@ -173,6 +174,7 @@ public class FEOpExecutor {
         params.setStmtId(ctx.getStmtId());
         params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
         params.setSessionId(ctx.getSessionId());
+        params.setMoreResultExists(moreStmtExists);
 
         if (Config.isCloudMode()) {
             String cluster = "";
@@ -224,6 +226,9 @@ public class FEOpExecutor {
         return result.getErrMessage();
     }
 
+    public void setMoreStmtExists(boolean moreStmtExists) {
+        this.moreStmtExists = moreStmtExists;
+    }
 
     public ByteBuffer getOutputPacket() {
         if (result == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 05450a0a6dc..0f8bb745e9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -201,6 +201,9 @@ public class StmtExecutor {
     private Boolean isForwardedToMaster = null;
     // Flag for execute prepare statement, need to use binary protocol 
resultset
     private boolean isComStmtExecute = false;
+    // Set to true if there are more stmt need to execute.
+    // Mainly for forward to master, so that master can set the mysql server 
status correctly.
+    private boolean moreStmtExists = false;
 
     // The result schema if "dry_run_query" is true.
     // Only one column to indicate the real return row numbers.
@@ -354,6 +357,14 @@ public class StmtExecutor {
         return isForwardedToMaster;
     }
 
+    public boolean isMoreStmtExists() {
+        return moreStmtExists;
+    }
+
+    public void setMoreStmtExists(boolean moreStmtExists) {
+        this.moreStmtExists = moreStmtExists;
+    }
+
     private boolean shouldForwardToMaster() {
         if (Env.getCurrentEnv().isMaster()) {
             return false;
@@ -945,6 +956,7 @@ public class StmtExecutor {
         if (LOG.isDebugEnabled()) {
             LOG.debug("need to transfer to Master. stmt: {}", 
context.getStmtId());
         }
+        masterOpExecutor.setMoreStmtExists(moreStmtExists);
         masterOpExecutor.execute();
         if (parsedStmt instanceof LogicalPlanAdapter) {
             // for nereids command
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 87d8fc8d0cf..1a801ffd958 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -383,6 +383,7 @@ struct TMasterOpRequest {
     29: optional TTxnLoadInfo txnLoadInfo
     30: optional TGroupCommitInfo groupCommitInfo
     31: optional binary prepareExecuteBuffer
+    32: optional bool moreResultExists // Server has more result to send
 
     // selectdb cloud
     1000: optional string cloud_cluster
diff --git a/regression-test/suites/query_p0/test_multiple_stmt.groovy 
b/regression-test/suites/query_p0/test_multiple_stmt.groovy
new file mode 100644
index 00000000000..4a7e75816d4
--- /dev/null
+++ b/regression-test/suites/query_p0/test_multiple_stmt.groovy
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_multiple_stmt") {
+    sql """drop database if exists test_multiple_stmt"""
+    sql """create database test_multiple_stmt"""
+    sql """use test_multiple_stmt"""
+    sql """CREATE TABLE test_multiple_stmt (
+            key1 int NOT NULL,
+            value1 int NOT NULL,
+        )ENGINE=OLAP
+        DUPLICATE KEY(`key1`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`key1`) BUCKETS 2
+        PROPERTIES (
+            "replication_num" = "1"
+        )
+    """
+
+    def result = sql """insert into test_multiple_stmt values (1, 1); select * 
from test_multiple_stmt;"""
+    assertEquals(1, result.size())
+    assertEquals(1, result[0][0])
+    assertEquals(1, result[0][1])
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to