This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 570dfb41e65 [fix](cloud) Fix cloud meet e-230 not retry query from 
observer (#37625)
570dfb41e65 is described below

commit 570dfb41e65fe22facb64c5f51728919f7dd80ee
Author: deardeng <565620...@qq.com>
AuthorDate: Fri Jul 12 15:55:34 2024 +0800

    [fix](cloud) Fix cloud meet e-230 not retry query from observer (#37625)
---
 .../java/org/apache/doris/qe/ConnectProcessor.java |   3 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |  10 +-
 .../cloud_p0/query_retry/test_retry_e-230.groovy   | 205 +++++++++++----------
 3 files changed, 116 insertions(+), 102 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index fb13633319b..cf1573810c9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -758,8 +758,7 @@ public abstract class ConnectProcessor {
                 UUID uuid = UUID.randomUUID();
                 queryId = new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
             }
-
-            executor.execute(queryId);
+            executor.queryRetry(queryId);
         } catch (IOException e) {
             // Client failed.
             LOG.warn("Process one query failed because IOException: ", e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index fcd0d25a130..783b6b95490 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -528,10 +528,15 @@ public class StmtExecutor {
     public void execute() throws Exception {
         UUID uuid = UUID.randomUUID();
         TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
-        TUniqueId firstQueryId = queryId;
         if (Config.enable_print_request_before_execution) {
             LOG.info("begin to execute query {} {}", queryId, originStmt == 
null ? "null" : originStmt.originStmt);
         }
+        queryRetry(queryId);
+    }
+
+    public void queryRetry(TUniqueId queryId) throws Exception {
+        TUniqueId firstQueryId = queryId;
+        UUID uuid;
         int retryTime = Config.max_query_retry_time;
         retryTime = retryTime <= 0 ? 1 : retryTime + 1;
         for (int i = 1; i <= retryTime; i++) {
@@ -751,6 +756,9 @@ public class StmtExecutor {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Command({}) process failed.", 
originStmt.originStmt, e);
                 }
+                if (Config.isCloudMode() && 
e.getDetailMessage().contains(FeConstants.CLOUD_RETRY_E230)) {
+                    throw e;
+                }
                 context.getState().setError(e.getMysqlErrorCode(), 
e.getMessage());
                 throw new NereidsException("Command (" + originStmt.originStmt 
+ ") process failed",
                         new AnalysisException(e.getMessage(), e));
diff --git 
a/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy 
b/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy
index 8f96d1fd9d2..2d8ca3f5296 100644
--- a/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy
+++ b/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy
@@ -24,120 +24,127 @@ suite("test_retry_e-230") {
     }
     def options = new ClusterOptions()
     options.enableDebugPoints()
-    options.setFeNum(1)
+    // one master, one observer
+    options.setFeNum(2)
     options.feConfigs.add('max_query_retry_time=100')
     options.feConfigs.add('sys_log_verbose_modules=org')
     options.setBeNum(1)
     options.cloudMode = true
-    docker(options) {
-    def tbl = 'test_retry_e_230_tbl'
-    def tbl1 = 'table_1'
-    def tbl2 = 'table_2'
-    sql """ DROP TABLE IF EXISTS ${tbl} """
-    sql """ DROP TABLE IF EXISTS ${tbl1} """
-    sql """ DROP TABLE IF EXISTS ${tbl2} """
-    try {
-        sql """set global experimental_enable_pipeline_x_engine=false"""
-        cluster.injectDebugPoints(NodeType.BE, 
['CloudTablet.capture_rs_readers.return.e-230' : null])
+    // 1. connect to master
+    options.connectToFollower = false
+    for (def j = 0; j < 2; j++) {
+        docker(options) {
+            def tbl = 'test_retry_e_230_tbl'
+            def tbl1 = 'table_1'
+            def tbl2 = 'table_2'
+            sql """ DROP TABLE IF EXISTS ${tbl} """
+            sql """ DROP TABLE IF EXISTS ${tbl1} """
+            sql """ DROP TABLE IF EXISTS ${tbl2} """
+            try {
+                sql """set global 
experimental_enable_pipeline_x_engine=false"""
+                cluster.injectDebugPoints(NodeType.BE, 
['CloudTablet.capture_rs_readers.return.e-230' : null])
 
-        sql """
-            CREATE TABLE ${tbl} (
-            `k1` int(11) NULL,
-            `k2` int(11) NULL
-            )
-            DUPLICATE KEY(`k1`, `k2`)
-            COMMENT 'OLAP'
-            DISTRIBUTED BY HASH(`k1`) BUCKETS 1
-            PROPERTIES (
-            "replication_num"="1"
-            );
-            """
-        for (def i = 1; i <= 5; i++) {
-            sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})"
-        }
+                sql """
+                    CREATE TABLE ${tbl} (
+                    `k1` int(11) NULL,
+                    `k2` int(11) NULL
+                    )
+                    DUPLICATE KEY(`k1`, `k2`)
+                    COMMENT 'OLAP'
+                    DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+                    PROPERTIES (
+                    "replication_num"="1"
+                    );
+                    """
+                for (def i = 1; i <= 5; i++) {
+                    sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})"
+                }
 
-        cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.retry.longtime' 
: null])
-        def futrue1 = thread {
-            Thread.sleep(3000)
-            cluster.clearBackendDebugPoints()
-        }
+                cluster.injectDebugPoints(NodeType.FE, 
['StmtExecutor.retry.longtime' : null])
+                def futrue1 = thread {
+                    Thread.sleep(3000)
+                    cluster.clearBackendDebugPoints()
+                }
 
-        def begin = System.currentTimeMillis();
-        def futrue2 = thread {
-            def result = try_sql """select * from ${tbl}"""
-        }
+                def begin = System.currentTimeMillis();
+                def futrue2 = thread {
+                    def result = try_sql """select * from ${tbl}"""
+                }
 
-        futrue2.get()
-        def cost = System.currentTimeMillis() - begin;
-        log.info("time cost: {}", cost)
-        futrue1.get()
-        // fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s
-        assertTrue(cost > 3000 && cost < 100000)
+                futrue2.get()
+                def cost = System.currentTimeMillis() - begin;
+                log.info("time cost: {}", cost)
+                futrue1.get()
+                // fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s
+                assertTrue(cost > 3000 && cost < 100000)
 
-        sql """
-            CREATE TABLE IF NOT EXISTS ${tbl1} (
-            `siteid` int(11) NOT NULL COMMENT "",
-            `citycode` int(11) NOT NULL COMMENT "",
-            `userid` int(11) NOT NULL COMMENT "",
-            `pv` int(11) NOT NULL COMMENT ""
-            ) ENGINE=OLAP
-            DUPLICATE KEY(`siteid`)
-            COMMENT "OLAP"
-            DISTRIBUTED BY HASH(`siteid`) BUCKETS 1
-            PROPERTIES (
-                "replication_allocation" = "tag.location.default: 1",
-                "in_memory" = "false",
-                "storage_format" = "V2"
-            )
-        """
+                sql """
+                    CREATE TABLE IF NOT EXISTS ${tbl1} (
+                    `siteid` int(11) NOT NULL COMMENT "",
+                    `citycode` int(11) NOT NULL COMMENT "",
+                    `userid` int(11) NOT NULL COMMENT "",
+                    `pv` int(11) NOT NULL COMMENT ""
+                    ) ENGINE=OLAP
+                    DUPLICATE KEY(`siteid`)
+                    COMMENT "OLAP"
+                    DISTRIBUTED BY HASH(`siteid`) BUCKETS 1
+                    PROPERTIES (
+                        "replication_allocation" = "tag.location.default: 1",
+                        "in_memory" = "false",
+                        "storage_format" = "V2"
+                    )
+                """
 
-        sql """
-            CREATE TABLE IF NOT EXISTS ${tbl2} (
-            `siteid` int(11) NOT NULL COMMENT "",
-            `citycode` int(11) NOT NULL COMMENT "",
-            `userid` int(11) NOT NULL COMMENT "",
-            `pv` int(11) NOT NULL COMMENT ""
-            ) ENGINE=OLAP
-            DUPLICATE KEY(`siteid`)
-            COMMENT "OLAP"
-            DISTRIBUTED BY HASH(`siteid`) BUCKETS 1
-            PROPERTIES (
-                "replication_allocation" = "tag.location.default: 1",
-                "in_memory" = "false",
-                "storage_format" = "V2"
-            )
-        """
+                sql """
+                    CREATE TABLE IF NOT EXISTS ${tbl2} (
+                    `siteid` int(11) NOT NULL COMMENT "",
+                    `citycode` int(11) NOT NULL COMMENT "",
+                    `userid` int(11) NOT NULL COMMENT "",
+                    `pv` int(11) NOT NULL COMMENT ""
+                    ) ENGINE=OLAP
+                    DUPLICATE KEY(`siteid`)
+                    COMMENT "OLAP"
+                    DISTRIBUTED BY HASH(`siteid`) BUCKETS 1
+                    PROPERTIES (
+                        "replication_allocation" = "tag.location.default: 1",
+                        "in_memory" = "false",
+                        "storage_format" = "V2"
+                    )
+                """
 
-        sql """
-            insert into ${tbl1} values (9,10,11,12), (1,2,3,4)
-        """
+                sql """
+                    insert into ${tbl1} values (9,10,11,12), (1,2,3,4)
+                """
 
-        // dp again
-        cluster.injectDebugPoints(NodeType.BE, 
['CloudTablet.capture_rs_readers.return.e-230' : null])
+                // dp again
+                cluster.injectDebugPoints(NodeType.BE, 
['CloudTablet.capture_rs_readers.return.e-230' : null])
 
-        def futrue3 = thread {
-            Thread.sleep(4000)
-            cluster.clearBackendDebugPoints()
-        }
+                def futrue3 = thread {
+                    Thread.sleep(4000)
+                    cluster.clearBackendDebugPoints()
+                }
 
-        begin = System.currentTimeMillis();
-        def futrue4 = thread {
-            def result = try_sql """insert into ${tbl2} select * from 
${tbl1}"""
-        }
+                begin = System.currentTimeMillis();
+                def futrue4 = thread {
+                    def result = try_sql """insert into ${tbl2} select * from 
${tbl1}"""
+                }
 
-        futrue4.get()
-        cost = System.currentTimeMillis() - begin;
-        log.info("time cost insert into select : {}", cost)
-        futrue3.get()
-        // fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s
-        assertTrue(cost > 4000 && cost < 100000)
+                futrue4.get()
+                cost = System.currentTimeMillis() - begin;
+                log.info("time cost insert into select : {}", cost)
+                futrue3.get()
+                // fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s
+                assertTrue(cost > 4000 && cost < 100000)
 
-    } finally {
-        cluster.clearFrontendDebugPoints()
-        cluster.clearBackendDebugPoints()   
-        sql """ DROP TABLE IF EXISTS ${tbl} """
-        sql """ DROP TABLE IF EXISTS ${tbl1} """
-        sql """ DROP TABLE IF EXISTS ${tbl2} """
-    }
+            } finally {
+                cluster.clearFrontendDebugPoints()
+                cluster.clearBackendDebugPoints()   
+                sql """ DROP TABLE IF EXISTS ${tbl} """
+                sql """ DROP TABLE IF EXISTS ${tbl1} """
+                sql """ DROP TABLE IF EXISTS ${tbl2} """
+            }
+        }
+        // 2. connect to follower
+        options.connectToFollower = true
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to