This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7a59633edd8 [fix](mysql protocol)Set more stmt exists flag correctly
when forward to master. (#55711)
7a59633edd8 is described below
commit 7a59633edd8644a520b8741c7c325d3327be537c
Author: James <[email protected]>
AuthorDate: Tue Sep 9 09:50:35 2025 +0800
[fix](mysql protocol)Set more stmt exists flag correctly when forward to
master. (#55711)
When execute multiple statements in one batch and
CLIENT_MULTI_STATEMENTS is set, Doris server need to set
SERVER_MORE_RESULTS_EXISTS flag in the return packet before the last
statement. But when the Observer forward stmt to Master, this
SERVER_MORE_RESULTS_EXISTS is not set, cause the following statements
failed to execute.
This pr forward a boolean value to Master, so the Master FE knows it is
the last statement or not, and could set SERVER_MORE_RESULTS_EXISTS
correctly.
---
.../java/org/apache/doris/qe/ConnectProcessor.java | 12 +++++++
.../java/org/apache/doris/qe/FEOpExecutor.java | 5 +++
.../java/org/apache/doris/qe/StmtExecutor.java | 12 +++++++
gensrc/thrift/FrontendService.thrift | 1 +
.../suites/query_p0/test_multiple_stmt.groovy | 38 ++++++++++++++++++++++
5 files changed, 68 insertions(+)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 55b8113379e..43bcf61c921 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -348,6 +348,15 @@ public abstract class ConnectProcessor {
executor.getProfile().getSummaryProfile().setParseSqlStartTime(parseSqlStartTime);
executor.getProfile().getSummaryProfile().setParseSqlFinishTime(parseSqlFinishTime);
executor.getProfile().getSummaryProfile().parsedByConnectionProcess = true;
+ // Here we set the MoreStmtExists flag without considering
CLIENT_MULTI_STATEMENTS.
+ // So the master will always set SERVER_MORE_RESULTS_EXISTS
when the statement is not the last one.
+ // When the Follower/Observer received the return result of
Master, the Follower/Observer
+ // will check CLIENT_MULTI_STATEMENTS is set or not. It sends
SERVER_MORE_RESULTS_EXISTS back to client
+ // only when CLIENT_MULTI_STATEMENTS is set.
+ // See the code below : if
(getConnectContext().getMysqlChannel().clientMultiStatements())
+ if (i != stmts.size() - 1 &&
connectType.equals(ConnectType.MYSQL)) {
+ executor.setMoreStmtExists(true);
+ }
ctx.setExecutor(executor);
if (cacheKeyType != null) {
@@ -706,6 +715,9 @@ public abstract class ConnectProcessor {
result.setQueryId(ctx.queryId());
}
result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
+ if (request.moreResultExists) {
+ ctx.getState().serverStatus |=
MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS;
+ }
result.setPacket(getResultPacket());
result.setStatus(ctx.getState().toString());
if (ctx.getState().getStateType() == MysqlStateType.OK) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
index 7c87907d619..9dcd379f913 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
@@ -63,6 +63,7 @@ public class FEOpExecutor {
protected int thriftTimeoutMs;
protected boolean shouldNotRetry;
+ protected boolean moreStmtExists = false;
public FEOpExecutor(TNetworkAddress feAddress, OriginStatement originStmt,
ConnectContext ctx, boolean isQuery) {
this.feAddr = feAddress;
@@ -173,6 +174,7 @@ public class FEOpExecutor {
params.setStmtId(ctx.getStmtId());
params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
params.setSessionId(ctx.getSessionId());
+ params.setMoreResultExists(moreStmtExists);
if (Config.isCloudMode()) {
String cluster = "";
@@ -224,6 +226,9 @@ public class FEOpExecutor {
return result.getErrMessage();
}
+ public void setMoreStmtExists(boolean moreStmtExists) {
+ this.moreStmtExists = moreStmtExists;
+ }
public ByteBuffer getOutputPacket() {
if (result == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 05450a0a6dc..0f8bb745e9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -201,6 +201,9 @@ public class StmtExecutor {
private Boolean isForwardedToMaster = null;
// Flag for execute prepare statement, need to use binary protocol
resultset
private boolean isComStmtExecute = false;
+ // Set to true if there are more stmt need to execute.
+ // Mainly for forward to master, so that master can set the mysql server
status correctly.
+ private boolean moreStmtExists = false;
// The result schema if "dry_run_query" is true.
// Only one column to indicate the real return row numbers.
@@ -354,6 +357,14 @@ public class StmtExecutor {
return isForwardedToMaster;
}
+ public boolean isMoreStmtExists() {
+ return moreStmtExists;
+ }
+
+ public void setMoreStmtExists(boolean moreStmtExists) {
+ this.moreStmtExists = moreStmtExists;
+ }
+
private boolean shouldForwardToMaster() {
if (Env.getCurrentEnv().isMaster()) {
return false;
@@ -945,6 +956,7 @@ public class StmtExecutor {
if (LOG.isDebugEnabled()) {
LOG.debug("need to transfer to Master. stmt: {}",
context.getStmtId());
}
+ masterOpExecutor.setMoreStmtExists(moreStmtExists);
masterOpExecutor.execute();
if (parsedStmt instanceof LogicalPlanAdapter) {
// for nereids command
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 87d8fc8d0cf..1a801ffd958 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -383,6 +383,7 @@ struct TMasterOpRequest {
29: optional TTxnLoadInfo txnLoadInfo
30: optional TGroupCommitInfo groupCommitInfo
31: optional binary prepareExecuteBuffer
+ 32: optional bool moreResultExists // Server has more result to send
// selectdb cloud
1000: optional string cloud_cluster
diff --git a/regression-test/suites/query_p0/test_multiple_stmt.groovy
b/regression-test/suites/query_p0/test_multiple_stmt.groovy
new file mode 100644
index 00000000000..4a7e75816d4
--- /dev/null
+++ b/regression-test/suites/query_p0/test_multiple_stmt.groovy
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_multiple_stmt") {
+ sql """drop database if exists test_multiple_stmt"""
+ sql """create database test_multiple_stmt"""
+ sql """use test_multiple_stmt"""
+ sql """CREATE TABLE test_multiple_stmt (
+ key1 int NOT NULL,
+ value1 int NOT NULL,
+ )ENGINE=OLAP
+ DUPLICATE KEY(`key1`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`key1`) BUCKETS 2
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ def result = sql """insert into test_multiple_stmt values (1, 1); select *
from test_multiple_stmt;"""
+ assertEquals(1, result.size())
+ assertEquals(1, result[0][0])
+ assertEquals(1, result[0][1])
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]