This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 9a283b2968e branch-3.1: [fix](mysql protocol)Set more stmt exists flag
correctly when forwardto master. #55711 (#55871)
9a283b2968e is described below
commit 9a283b2968e46c25e157ed6155c15cac36fe8d1b
Author: James <[email protected]>
AuthorDate: Thu Sep 11 10:39:50 2025 +0800
branch-3.1: [fix](mysql protocol)Set more stmt exists flag correctly when
forwardto master. #55711 (#55871)
backport: #55711
---
.../java/org/apache/doris/qe/ConnectProcessor.java | 12 +++++++
.../java/org/apache/doris/qe/FEOpExecutor.java | 5 +++
.../java/org/apache/doris/qe/StmtExecutor.java | 12 +++++++
gensrc/thrift/FrontendService.thrift | 1 +
.../suites/query_p0/test_multiple_stmt.groovy | 38 ++++++++++++++++++++++
5 files changed, 68 insertions(+)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 93ce46dd93c..606a457c8f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -350,6 +350,15 @@ public abstract class ConnectProcessor {
executor = new StmtExecutor(ctx, parsedStmt);
executor.getProfile().getSummaryProfile().setParseSqlStartTime(parseSqlStartTime);
executor.getProfile().getSummaryProfile().setParseSqlFinishTime(parseSqlFinishTime);
+ // Here we set the MoreStmtExists flag without considering
CLIENT_MULTI_STATEMENTS.
+ // So the master will always set SERVER_MORE_RESULTS_EXISTS
when the statement is not the last one.
+ // When the Follower/Observer received the return result of
Master, the Follower/Observer
+ // will check CLIENT_MULTI_STATEMENTS is set or not. It sends
SERVER_MORE_RESULTS_EXISTS back to client
+ // only when CLIENT_MULTI_STATEMENTS is set.
+ // See the code below : if
(getConnectContext().getMysqlChannel().clientMultiStatements())
+ if (i != stmts.size() - 1 &&
connectType.equals(ConnectType.MYSQL)) {
+ executor.setMoreStmtExists(true);
+ }
ctx.setExecutor(executor);
if (cacheKeyType != null) {
@@ -745,6 +754,9 @@ public abstract class ConnectProcessor {
result.setQueryId(ctx.queryId());
}
result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
+ if (request.moreResultExists) {
+ ctx.getState().serverStatus |=
MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS;
+ }
result.setPacket(getResultPacket());
result.setStatus(ctx.getState().toString());
if (ctx.getState().getStateType() == MysqlStateType.OK) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
index c80c0bfc2b0..95da359547a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
@@ -63,6 +63,7 @@ public class FEOpExecutor {
protected int thriftTimeoutMs;
protected boolean shouldNotRetry;
+ protected boolean moreStmtExists = false;
public FEOpExecutor(TNetworkAddress feAddress, OriginStatement originStmt,
ConnectContext ctx, boolean isQuery) {
this.feAddr = feAddress;
@@ -173,6 +174,7 @@ public class FEOpExecutor {
params.setStmtId(ctx.getStmtId());
params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
params.setSessionId(ctx.getSessionId());
+ params.setMoreResultExists(moreStmtExists);
if (Config.isCloudMode()) {
String cluster = "";
@@ -226,6 +228,9 @@ public class FEOpExecutor {
return result.getErrMessage();
}
+ public void setMoreStmtExists(boolean moreStmtExists) {
+ this.moreStmtExists = moreStmtExists;
+ }
public ByteBuffer getOutputPacket() {
if (result == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index ddd9ed971d4..f5630b80d31 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -293,6 +293,9 @@ public class StmtExecutor {
private Boolean isForwardedToMaster = null;
// Flag for execute prepare statement, need to use binary protocol
resultset
private boolean isComStmtExecute = false;
+ // Set to true if there are more stmt need to execute.
+ // Mainly for forward to master, so that master can set the mysql server
status correctly.
+ private boolean moreStmtExists = false;
// The result schema if "dry_run_query" is true.
// Only one column to indicate the real return row numbers.
@@ -439,6 +442,14 @@ public class StmtExecutor {
return isForwardedToMaster;
}
+ public boolean isMoreStmtExists() {
+ return moreStmtExists;
+ }
+
+ public void setMoreStmtExists(boolean moreStmtExists) {
+ this.moreStmtExists = moreStmtExists;
+ }
+
private boolean shouldForwardToMaster() {
if (Env.getCurrentEnv().isMaster()) {
return false;
@@ -1248,6 +1259,7 @@ public class StmtExecutor {
if (LOG.isDebugEnabled()) {
LOG.debug("need to transfer to Master. stmt: {}",
context.getStmtId());
}
+ masterOpExecutor.setMoreStmtExists(moreStmtExists);
masterOpExecutor.execute();
if (parsedStmt instanceof SetStmt) {
SetStmt setStmt = (SetStmt) parsedStmt;
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 1a942228735..bdd18eb2646 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -598,6 +598,7 @@ struct TMasterOpRequest {
29: optional TTxnLoadInfo txnLoadInfo
30: optional TGroupCommitInfo groupCommitInfo
31: optional binary prepareExecuteBuffer
+ 32: optional bool moreResultExists // Server has more result to send
// selectdb cloud
1000: optional string cloud_cluster
diff --git a/regression-test/suites/query_p0/test_multiple_stmt.groovy
b/regression-test/suites/query_p0/test_multiple_stmt.groovy
new file mode 100644
index 00000000000..4a7e75816d4
--- /dev/null
+++ b/regression-test/suites/query_p0/test_multiple_stmt.groovy
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_multiple_stmt") {
+ sql """drop database if exists test_multiple_stmt"""
+ sql """create database test_multiple_stmt"""
+ sql """use test_multiple_stmt"""
+ sql """CREATE TABLE test_multiple_stmt (
+ key1 int NOT NULL,
+ value1 int NOT NULL,
+ )ENGINE=OLAP
+ DUPLICATE KEY(`key1`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`key1`) BUCKETS 2
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ def result = sql """insert into test_multiple_stmt values (1, 1); select *
from test_multiple_stmt;"""
+ assertEquals(1, result.size())
+ assertEquals(1, result[0][0])
+ assertEquals(1, result[0][1])
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]