This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new cae93219814 branch-3.0: [fix](docker case) Fix 
`test_sql_mode_node_mgr` and add cloud multi f… #44124 (#45239)
cae93219814 is described below

commit cae9321981412d79aa3af8dc47193422a0661e25
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Dec 11 22:54:12 2024 +0800

    branch-3.0: [fix](docker case) Fix `test_sql_mode_node_mgr` and add cloud 
multi f… #44124 (#45239)
    
    Cherry-picked from #44124
    
    Co-authored-by: deardeng <deng...@selectdb.com>
---
 docker/runtime/doris-compose/cluster.py            |  12 +-
 docker/runtime/doris-compose/command.py            |  32 +++---
 docker/runtime/doris-compose/database.py           |  58 ++++++----
 docker/runtime/doris-compose/doris-compose.py      |  25 ++--
 docker/runtime/doris-compose/requirements.txt      |   3 +-
 docker/runtime/doris-compose/resource/init_fe.sh   |   7 --
 docker/runtime/doris-compose/utils.py              |  55 +++++----
 .../doris/cloud/catalog/CloudClusterChecker.java   |   4 +
 .../org/apache/doris/cloud/catalog/CloudEnv.java   |   3 +-
 .../doris/common/proc/FrontendsProcNode.java       |  25 +++-
 .../doris/regression/suite/SuiteCluster.groovy     |  21 ++--
 .../node_mgr/test_sql_mode_node_mgr.groovy         | 127 +++++++++------------
 12 files changed, 206 insertions(+), 166 deletions(-)

diff --git a/docker/runtime/doris-compose/cluster.py 
b/docker/runtime/doris-compose/cluster.py
index ba834167bd1..f4522181d4b 100644
--- a/docker/runtime/doris-compose/cluster.py
+++ b/docker/runtime/doris-compose/cluster.py
@@ -23,6 +23,7 @@ import jsonpickle
 import os
 import os.path
 import utils
+import time
 
 DOCKER_DORIS_PATH = "/opt/apache-doris"
 LOCAL_DORIS_PATH = os.getenv("LOCAL_DORIS_PATH", "/tmp/doris")
@@ -139,11 +140,15 @@ def gen_subnet_prefix16():
 
 def get_master_fe_endpoint(cluster_name):
     master_fe_ip_file = get_cluster_path(cluster_name) + "/status/master_fe_ip"
-    if os.path.exists(master_fe_ip_file):
-        with open(master_fe_ip_file, "r") as f:
-            return "{}:{}".format(f.read().strip(), FE_QUERY_PORT)
+    max_retries = 10
+    for attempt in range(max_retries):
+        if os.path.exists(master_fe_ip_file):
+            with open(master_fe_ip_file, "r") as f:
+                return "{}:{}".format(f.read().strip(), FE_QUERY_PORT)
+        time.sleep(1)
     try:
         cluster = Cluster.load(cluster_name)
+        LOG.info("master file not exist, master ip get from node 1")
         return "{}:{}".format(
             cluster.get_node(Node.TYPE_FE, 1).get_ip(), FE_QUERY_PORT)
     except:
@@ -468,6 +473,7 @@ class FE(Node):
             for key in ("JAVA_OPTS", "JAVA_OPTS_FOR_JDK_17"):
                 value = parser["dummy_section"].get(key)
                 if value:
+                    value = value.strip().strip('"')
                     cfg.append(
                         f"{key} = \"{value} 
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:{FE_JAVA_DBG_PORT}\""
                     )
diff --git a/docker/runtime/doris-compose/command.py 
b/docker/runtime/doris-compose/command.py
index 7a2f3f3c195..638c1c465d7 100644
--- a/docker/runtime/doris-compose/command.py
+++ b/docker/runtime/doris-compose/command.py
@@ -183,7 +183,7 @@ class Command(object):
         return sys.version_info.major == 3 and sys.version_info.minor >= 9
 
     def _print_table(self, header, datas):
-        if utils.is_enable_log():
+        if utils.is_log_stdout():
             table = prettytable.PrettyTable(
                 [utils.render_green(field) for field in header])
             for row in datas:
@@ -598,13 +598,6 @@ class UpCommand(Command):
                                           related_nodes,
                                           output_real_time=output_real_time)
 
-        ls_cmd = "python docker/runtime/doris-compose/doris-compose.py ls " + 
cluster.name
-        LOG.info("Inspect command: " + utils.render_green(ls_cmd) + "\n")
-        LOG.info(
-            "Master fe query address: " +
-            utils.render_green(CLUSTER.get_master_fe_endpoint(cluster.name)) +
-            "\n")
-
         if not args.start:
             LOG.info(
                 utils.render_green(
@@ -618,14 +611,18 @@ class UpCommand(Command):
             LOG.info("Waiting for FE master to be elected...")
             expire_ts = time.time() + 30
             while expire_ts > time.time():
+                ready = False
                 db_mgr = database.get_db_mgr(args.NAME, False)
                 for id in add_fe_ids:
                     fe_state = db_mgr.get_fe(id)
                     if fe_state is not None and fe_state.alive:
+                        ready = True
                         break
-                    LOG.info("there is no fe ready")
-                time.sleep(5)
-
+                if ready:
+                    break
+                LOG.info("there is no fe ready")
+                time.sleep(1)
+            LOG.info("after Waiting for FE master to be elected...")
             if cluster.is_cloud and args.sql_mode_node_mgr:
                 db_mgr = database.get_db_mgr(args.NAME, False)
                 master_fe_endpoint = CLUSTER.get_master_fe_endpoint(
@@ -635,7 +632,9 @@ class UpCommand(Command):
                     fe_endpoint = f"{fe.get_ip()}:{CLUSTER.FE_EDITLOG_PORT}"
                     if fe_endpoint != master_fe_endpoint:
                         try:
-                            db_mgr.add_fe(fe_endpoint)
+                            db_mgr.add_fe(
+                                fe_endpoint, "FOLLOWER"
+                                if cluster.fe_follower else "OBSERVER")
                             LOG.info(f"Added FE {fe_endpoint} successfully.")
                         except Exception as e:
                             LOG.error(
@@ -661,6 +660,12 @@ class UpCommand(Command):
                     "Up cluster {} succ, related node num {}".format(
                         args.NAME, related_node_num)))
 
+        ls_cmd = "python docker/runtime/doris-compose/doris-compose.py ls " + 
cluster.name
+        LOG.info("Inspect command: " + utils.render_green(ls_cmd) + "\n")
+        LOG.info(
+            "Master fe query address: " +
+            utils.render_green(CLUSTER.get_master_fe_endpoint(cluster.name)) +
+            "\n")
         return {
             "fe": {
                 "add_list": add_fe_ids,
@@ -1066,8 +1071,7 @@ class ListCommand(Command):
                 if services is None:
                     return COMPOSE_BAD, {}
                 return COMPOSE_GOOD, {
-                    service:
-                    ComposeService(
+                    service: ComposeService(
                         service,
                         list(service_conf["networks"].values())[0]
                         ["ipv4_address"], service_conf["image"])
diff --git a/docker/runtime/doris-compose/database.py 
b/docker/runtime/doris-compose/database.py
index 46cdd961c9f..370f1d5ee2a 100644
--- a/docker/runtime/doris-compose/database.py
+++ b/docker/runtime/doris-compose/database.py
@@ -27,12 +27,13 @@ LOG = utils.get_logger()
 
 class FEState(object):
 
-    def __init__(self, id, is_master, alive, last_heartbeat, err_msg):
+    def __init__(self, id, is_master, alive, last_heartbeat, err_msg, role):
         self.id = id
         self.is_master = is_master
         self.alive = alive
         self.last_heartbeat = last_heartbeat
         self.err_msg = err_msg
+        self.role = role
 
 
 class BEState(object):
@@ -66,11 +67,11 @@ class DBManager(object):
         self._load_fe_states()
         self._load_be_states()
 
-    def add_fe(self, fe_endpoint):
+    def add_fe(self, fe_endpoint, role):
         try:
-            sql = f"ALTER SYSTEM ADD FOLLOWER '{fe_endpoint}'"
+            sql = f"ALTER SYSTEM ADD {role} '{fe_endpoint}'"
             self._exec_query(sql)
-            LOG.info(f"Added FE {fe_endpoint} via SQL successfully.")
+            LOG.info(f"Added {role} FE {fe_endpoint} via SQL successfully.")
         except Exception as e:
             LOG.error(f"Failed to add FE {fe_endpoint} via SQL: {str(e)}")
             raise
@@ -78,8 +79,9 @@ class DBManager(object):
     def drop_fe(self, fe_endpoint):
         id = CLUSTER.Node.get_id_from_ip(fe_endpoint[:fe_endpoint.find(":")])
         try:
-            self._exec_query(
-                "ALTER SYSTEM DROP FOLLOWER '{}'".format(fe_endpoint))
+            role = self.get_fe(id).role if self.get_fe(id) else "FOLLOWER"
+            self._exec_query("ALTER SYSTEM DROP {} '{}'".format(
+                role, fe_endpoint))
             LOG.info("Drop fe {} with id {} from db succ.".format(
                 fe_endpoint, id))
         except Exception as e:
@@ -152,7 +154,7 @@ class DBManager(object):
                 .format(be_endpoint, be.alive, be.decommissioned, 
be.tablet_num, old_tablet_num,
                         int(time.time() - start_ts)))
 
-            time.sleep(5)
+            time.sleep(1)
 
     def create_default_storage_vault(self, cloud_store_config):
         try:
@@ -194,7 +196,7 @@ class DBManager(object):
             id = CLUSTER.Node.get_id_from_ip(ip)
             last_heartbeat = utils.escape_null(record["LastHeartbeat"])
             err_msg = record["ErrMsg"]
-            fe = FEState(id, is_master, alive, last_heartbeat, err_msg)
+            fe = FEState(id, is_master, alive, last_heartbeat, err_msg, role)
             fe_states[id] = fe
             if is_master and alive:
                 alive_master_fe_ip = ip
@@ -223,13 +225,23 @@ class DBManager(object):
         self.be_states = be_states
 
     # return rows, and each row is a record map
-    def _exec_query(self, sql):
+    def _exec_query(self, sql, retries=3):
         self._prepare_conn()
-        with self.conn.cursor() as cursor:
-            cursor.execute(sql)
-            fields = [field_md[0] for field_md in cursor.description
-                      ] if cursor.description else []
-            return [dict(zip(fields, row)) for row in cursor.fetchall()]
+        for attempt in range(retries):
+            try:
+                with self.conn.cursor() as cursor:
+                    cursor.execute(sql)
+                    fields = [field_md[0] for field_md in cursor.description
+                              ] if cursor.description else []
+                    return [dict(zip(fields, row)) for row in 
cursor.fetchall()]
+            except Exception as e:
+                LOG.warn(f"Error occurred: {e}")
+                if "timed out" in str(e).lower() and attempt < retries - 1:
+                    LOG.warn(f"Query timed out. Retrying {attempt + 
1}/{retries}...")
+                    self._reset_conn()
+                else:
+                    raise e
+        raise Exception("Max retries exceeded")
 
     def _prepare_conn(self):
         if self.conn:
@@ -257,19 +269,23 @@ def get_db_mgr(cluster_name, required_load_succ=True):
     if not master_fe_ip:
         return db_mgr
 
-    has_alive_fe = False
+    alive_fe = None
+    cluster = CLUSTER.Cluster.load(cluster_name)
     containers = utils.get_doris_containers(cluster_name).get(cluster_name, [])
     for container in containers:
         if utils.is_container_running(container):
-            _, node_type, _ = utils.parse_service_name(container.name)
+            _, node_type, id = utils.parse_service_name(container.name)
             if node_type == CLUSTER.Node.TYPE_FE:
-                has_alive_fe = True
-                break
-
-    if not has_alive_fe:
+                node = cluster.get_node(node_type, id)
+                if not alive_fe:
+                    alive_fe = node
+                if node.get_ip() == master_fe_ip:
+                    alive_fe = node
+                    break
+    if not alive_fe:
         return db_mgr
 
-    db_mgr.master_fe_ip = master_fe_ip
+    db_mgr.master_fe_ip = alive_fe.get_ip()
     try:
         db_mgr.load_states()
     except Exception as e:
diff --git a/docker/runtime/doris-compose/doris-compose.py 
b/docker/runtime/doris-compose/doris-compose.py
index a2d3a517553..cf3692d5321 100644
--- a/docker/runtime/doris-compose/doris-compose.py
+++ b/docker/runtime/doris-compose/doris-compose.py
@@ -16,7 +16,9 @@
 # under the License.
 
 import argparse
+import cluster as CLUSTER
 import command
+import os.path
 import sys
 import traceback
 import utils
@@ -31,12 +33,12 @@ def parse_args():
     return ap.parse_args(), ap.format_help()
 
 
-def run(args, disable_log, help):
+def run(args, disable_log_stdout, help):
     for cmd in command.ALL_COMMANDS:
         if args.command == cmd.name:
             timer = utils.Timer()
             result = cmd.run(args)
-            if cmd.print_use_time() and not disable_log:
+            if cmd.print_use_time() and not disable_log_stdout:
                 timer.show()
             return result
     print(help)
@@ -48,19 +50,26 @@ if __name__ == '__main__':
     verbose = getattr(args, "verbose", False)
     if verbose:
         utils.set_log_verbose()
-    disable_log = getattr(args, "output_json", False)
-    if disable_log:
-        utils.set_enable_log(False)
+    disable_log_stdout = getattr(args, "output_json", False)
+    if disable_log_stdout:
+        log_file_name = ""
+        cluster_name = getattr(args, "NAME", "")
+        if cluster_name:
+            if type(cluster_name) == type([]):
+                cluster_name = cluster_name[0]
+            log_file_name = os.path.join(
+                CLUSTER.get_cluster_path(cluster_name), "doris-compose.log")
+        utils.set_log_to(log_file_name, False)
 
     code = None
     try:
-        data = run(args, disable_log, help)
-        if disable_log:
+        data = run(args, disable_log_stdout, help)
+        if disable_log_stdout:
             print(utils.pretty_json({"code": 0, "data": data}), flush=True)
         code = 0
     except:
         err = traceback.format_exc()
-        if disable_log:
+        if disable_log_stdout:
             print(utils.pretty_json({"code": 1, "err": err}), flush=True)
         else:
             print(err, flush=True)
diff --git a/docker/runtime/doris-compose/requirements.txt 
b/docker/runtime/doris-compose/requirements.txt
index 2f962ed68d8..1f32223a02e 100644
--- a/docker/runtime/doris-compose/requirements.txt
+++ b/docker/runtime/doris-compose/requirements.txt
@@ -22,5 +22,6 @@ jsonpickle
 prettytable
 pymysql
 python-dateutil
-#pyyaml==5.4.1
+# if mac install pyyaml failed, change pyyaml version
+#pyyaml==5.3.1
 requests<=2.31.0
diff --git a/docker/runtime/doris-compose/resource/init_fe.sh 
b/docker/runtime/doris-compose/resource/init_fe.sh
index b69ac3a209e..a58723db1d7 100755
--- a/docker/runtime/doris-compose/resource/init_fe.sh
+++ b/docker/runtime/doris-compose/resource/init_fe.sh
@@ -102,9 +102,6 @@ start_cloud_fe() {
         fe_daemon &
         run_fe
 
-        if [ "$MY_ID" == "1" ]; then
-            echo $MY_IP >$MASTER_FE_IP_FILE
-        fi
         return
     fi
 
@@ -168,10 +165,6 @@ start_cloud_fe() {
 
     fe_daemon &
     run_fe
-
-    if [ "$MY_ID" == "1" ]; then
-        echo $MY_IP >$MASTER_FE_IP_FILE
-    fi
 }
 
 stop_frontend() {
diff --git a/docker/runtime/doris-compose/utils.py 
b/docker/runtime/doris-compose/utils.py
index 4332ae6cf48..dcb821ddffd 100644
--- a/docker/runtime/doris-compose/utils.py
+++ b/docker/runtime/doris-compose/utils.py
@@ -23,6 +23,7 @@ import os
 import pwd
 import socket
 import subprocess
+import sys
 import time
 import yaml
 
@@ -30,7 +31,7 @@ DORIS_PREFIX = "doris-"
 
 LOG = None
 
-ENABLE_LOG = True
+ENALBE_LOG_STDOUT = True
 
 
 class Timer(object):
@@ -48,39 +49,41 @@ class Timer(object):
         self.canceled = True
 
 
-def set_enable_log(enabled):
-    global ENABLE_LOG
-    ENABLE_LOG = enabled
-    get_logger().disabled = not enabled
-
-
-def is_enable_log():
-    return ENABLE_LOG
+def is_log_stdout():
+    return ENALBE_LOG_STDOUT
 
 
 def set_log_verbose():
     get_logger().setLevel(logging.DEBUG)
 
 
-def get_logger(name=None):
-    global LOG
-    if LOG != None:
-        return LOG
-
-    logger = logging.getLogger(name)
-    if not logger.hasHandlers():
+def set_log_to(log_file_name, is_to_stdout):
+    logger = get_logger()
+    for ch in logger.handlers:
+        logger.removeHandler(ch)
+    if log_file_name:
+        os.makedirs(os.path.dirname(log_file_name), exist_ok=True)
+        logger.addHandler(logging.FileHandler(log_file_name))
+    global ENALBE_LOG_STDOUT
+    ENALBE_LOG_STDOUT = is_to_stdout
+    if is_to_stdout:
+        logger.addHandler(logging.StreamHandler(sys.stdout))
+    for ch in logger.handlers:
         formatter = logging.Formatter(
             '%(asctime)s - %(filename)s - %(lineno)dL - %(levelname)s - 
%(message)s'
         )
-        ch = logging.StreamHandler()
         ch.setLevel(logging.DEBUG)
         ch.setFormatter(formatter)
-        logger.addHandler(ch)
-        logger.setLevel(logging.INFO)
 
-    LOG = logger
 
-    return logger
+def get_logger(name="doris-compose"):
+    global LOG
+    if LOG is None:
+        LOG = logging.getLogger(name)
+        LOG.setLevel(logging.INFO)
+        set_log_to(None, True)
+
+    return LOG
 
 
 get_logger()
@@ -196,15 +199,17 @@ def exec_shell_command(command, ignore_errors=False, 
output_real_time=False):
     if output_real_time:
         while p.poll() is None:
             s = p.stdout.readline().decode('utf-8')
-            if ENABLE_LOG and s.rstrip():
-                print(s.rstrip())
+            if s.rstrip():
+                for line in s.strip().splitlines():
+                    LOG.info("(docker) " + line)
             out += s
         exitcode = p.wait()
     else:
         out = p.communicate()[0].decode('utf-8')
         exitcode = p.returncode
-        if ENABLE_LOG and out:
-            print(out)
+        if out:
+            for line in out.splitlines():
+                LOG.info("(docker) " + line)
     if not ignore_errors:
         assert exitcode == 0, out
     return exitcode, out
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
index e27339c2aac..9468c8acecd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -170,6 +170,10 @@ public class CloudClusterChecker extends MasterDaemon {
             String endpoint = addr + ":" + node.getHeartbeatPort();
             Cloud.NodeStatusPB status = node.getStatus();
             Backend be = currentMap.get(endpoint);
+            if (be == null) {
+                LOG.warn("cant get valid be {} from fe mem, ignore it checker 
will add this be at next", endpoint);
+                continue;
+            }
 
             if (status == Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONING) {
                 if (!be.isDecommissioned()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index f4c6005a0d8..89338c228fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -415,7 +415,8 @@ public class CloudEnv extends Env {
 
         Frontend frontend = checkFeExist(host, port);
         if (frontend == null) {
-            throw new DdlException("Frontend does not exist.");
+            throw new DdlException("frontend does not exist[" + NetUtils
+                .getHostPortInAccessibleFormat(host, port) + "]");
         }
 
         if (frontend.getRole() != role) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
index eb75bc9312a..ede8cb56258 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
@@ -126,7 +126,9 @@ public class FrontendsProcNode implements ProcNodeInterface 
{
             selfNode = ConnectContext.get().getCurrentConnectedFEIp();
         }
 
-        for (Frontend fe : env.getFrontends(null /* all */)) {
+        List<Frontend> envFes = env.getFrontends(null /* all */);
+        LOG.info("bdbje fes {}, env fes {}", allFe, envFes);
+        for (Frontend fe : envFes) {
             List<String> info = new ArrayList<String>();
             info.add(fe.getNodeName());
             info.add(fe.getHost());
@@ -211,11 +213,6 @@ public class FrontendsProcNode implements 
ProcNodeInterface {
             if (fe.getEditLogPort() != addr.getPort()) {
                 continue;
             }
-            if (!Strings.isNullOrEmpty(addr.getHostName())) {
-                if (addr.getHostName().equals(fe.getHost())) {
-                    return true;
-                }
-            }
             // if hostname of InetSocketAddress is ip, addr.getHostName() may 
be not equal to fe.getIp()
             // so we need to compare fe.getIp() with address.getHostAddress()
             InetAddress address = addr.getAddress();
@@ -227,6 +224,22 @@ public class FrontendsProcNode implements 
ProcNodeInterface {
                 return true;
             }
         }
+
+        // Avoid calling getHostName multiple times, don't remove it
+        for (InetSocketAddress addr : allFeHosts) {
+            // Avoid calling getHostName multiple times, don't remove it
+            if (fe.getEditLogPort() != addr.getPort()) {
+                continue;
+            }
+            // 
https://bugs.openjdk.org/browse/JDK-8143378#:~:text=getHostName()%3B%20takes%20about%205,millisecond%20on%20JDK%20update%2051
+            // getHostName sometime has bug, take 5s
+            String host = addr.getHostName();
+            if (!Strings.isNullOrEmpty(host)) {
+                if (host.equals(fe.getHost())) {
+                    return true;
+                }
+            }
+        }
         return false;
     }
 }
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index 856b0e76956..e77658793fe 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -23,12 +23,14 @@ import org.apache.doris.regression.util.JdbcUtils
 import org.apache.doris.regression.util.NodeType
 
 import com.google.common.collect.Maps
+import org.awaitility.Awaitility
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
 import groovy.json.JsonSlurper
 import groovy.transform.CompileStatic
 import groovy.util.logging.Slf4j
+import static java.util.concurrent.TimeUnit.SECONDS
 import java.util.stream.Collectors
 import java.sql.Connection
 
@@ -333,7 +335,7 @@ class SuiteCluster {
 
         sqlModeNodeMgr = options.sqlModeNodeMgr
 
-        runCmd(cmd.join(' '), -1)
+        runCmd(cmd.join(' '), 180)
 
         // wait be report disk
         Thread.sleep(5000)
@@ -483,6 +485,9 @@ class SuiteCluster {
             if (followerMode) {
                 sb.append('--fe-follower' + ' ')
             }
+            if (sqlModeNodeMgr) {
+                sb.append('--sql-mode-node-mgr' + ' ')
+            }
         }
         if (beNum > 0) {
             sb.append('--add-be-num ' + beNum + ' ')
@@ -492,7 +497,7 @@ class SuiteCluster {
         }
         sb.append('--wait-timeout 60')
 
-        def data = (Map<String, Map<String, Object>>) runCmd(sb.toString(), -1)
+        def data = (Map<String, Map<String, Object>>) runCmd(sb.toString(), 
180)
         def newFrontends = (List<Integer>) data.get('fe').get('add_list')
         def newBackends = (List<Integer>) data.get('be').get('add_list')
 
@@ -636,17 +641,15 @@ class SuiteCluster {
     }
 
     private Object runCmd(String cmd, int timeoutSecond = 60) throws Exception 
{
-        def fullCmd = String.format('python -W ignore %s %s --output-json', 
config.dorisComposePath, cmd)
+        def fullCmd = String.format('python -W ignore %s %s -v --output-json', 
config.dorisComposePath, cmd)
         logger.info('Run doris compose cmd: {}', fullCmd)
         def proc = fullCmd.execute()
         def outBuf = new StringBuilder()
         def errBuf = new StringBuilder()
-        proc.consumeProcessOutput(outBuf, errBuf)
-        if (timeoutSecond > 0) {
-            proc.waitForOrKill(timeoutSecond * 1000)
-        } else {
-            proc.waitFor()
-        }
+        Awaitility.await().atMost(timeoutSecond, SECONDS).until({
+            proc.waitForProcessOutput(outBuf, errBuf)
+            return true
+        })
         if (proc.exitValue() != 0) {
             throw new Exception(String.format('Exit value: %s != 0, stdout: 
%s, stderr: %s',
                                               proc.exitValue(), 
outBuf.toString(), errBuf.toString()))
diff --git 
a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy 
b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy
index 7405cb864d8..70372f68ab8 100644
--- a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy
+++ b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy
@@ -38,6 +38,7 @@ suite('test_sql_mode_node_mgr', 'multi_cluster,docker,p1') {
         options.sqlModeNodeMgr = true
         options.waitTimeout = 0
         options.feNum = 3
+        options.useFollowersMode = true
         options.feConfigs += ["resource_not_ready_sleep_seconds=1",
                 "heartbeat_interval_second=1",]
     }
@@ -121,6 +122,9 @@ suite('test_sql_mode_node_mgr', 'multi_cluster,docker,p1') {
                 
                 // Check FE number
                 def frontendResult = sql_return_maparray """SHOW FRONTENDS;"""
+                // Check that all frontends are alive
+                def aliveCount = frontendResult.count { it['Alive'] == 'true' }
+                assert aliveCount == expectedFeNum, "Expected all 
$expectedFeNum frontends to be alive, but only ${aliveCount} are alive"
                 assert frontendResult.size() == expectedFeNum, "Expected 
${expectedFeNum} frontends, but got ${frontendResult.size()}"
                 logger.info("FE number check passed: ${frontendResult.size()} 
FEs found")
 
@@ -272,28 +276,23 @@ suite('test_sql_mode_node_mgr', 
'multi_cluster,docker,p1') {
             def feEditLogPort = feToDropMap['EditLogPort']
             def feRole = feToDropMap['Role']
 
-            logger.info("Dropping non-master frontend: {}:{}", feHost, 
feEditLogPort)
+            def dropFeInx = cluster.getFrontends().find { it.host == feHost 
}.index 
+            logger.info("Dropping non-master frontend: {}:{}, index: {}", 
feHost, feEditLogPort, dropFeInx)
 
             // Drop the selected non-master frontend
             sql """ ALTER SYSTEM DROP ${feRole} "${feHost}:${feEditLogPort}"; 
"""
-
+            // After drop feHost container will exit
+            cluster.dropFrontends(true, dropFeInx)
+            sleep(3 * 1000)
+            logger.info("Dropping frontend index: {}, remove it from docker 
compose", dropFeInx)
             // Wait for the frontend to be fully dropped
-            maxWaitSeconds = 300
-            waited = 0
-            while (waited < maxWaitSeconds) {
+
+            dockerAwaitUntil(300) {
                 reconnectFe()
                 def currentFrontends = sql_return_maparray("SHOW FRONTENDS")
-                if (currentFrontends.size() == frontends.size() - 1) {
-                    logger.info("Non-master frontend successfully dropped")
-                    break
-                }
-                sleep(10000)
-                waited += 10
+                currentFrontends.size() == frontends.size() - 1
             }
 
-            if (waited >= maxWaitSeconds) {
-                throw new Exception("Timeout waiting for non-master frontend 
to be dropped")
-            }
 
             checkClusterStatus(2, 3, 4)
 
@@ -309,86 +308,72 @@ suite('test_sql_mode_node_mgr', 
'multi_cluster,docker,p1') {
             }
             
             assert droppedFE != null, "Could not find the dropped frontend"
-
-            feHost = droppedFE['Host']
-            feEditLogPort = droppedFE['EditLogPort']
-
-            logger.info("Adding back frontend: {}:{}", feHost, feEditLogPort)
-
-            // Add the frontend back
-            sql """ ALTER SYSTEM ADD FOLLOWER "${feHost}:${feEditLogPort}"; """
+            
+            // Up a new follower fe and add to docker compose
+            // ATTN: in addFrontend, sql node mode, will execute `ALTER SYSTEM 
ADD FOLLOWER "${feHost}:${feEditLogPort}";`
+            boolean fuzzyUpFollower = (getRandomBoolean() == "true") ? true : 
false
+            logger.info("Want up a new role [{}] frontend", fuzzyUpFollower ? 
"FOLLOWER" : "OBSERVER")
+            def addList = cluster.addFrontend(1, fuzzyUpFollower)
+            logger.info("Up a new frontend, addList: {}", addList)
+
+            def addFE = cluster.getFeByIndex(addList[0])
+            feHost = addFE['Host']
+            feEditLogPort = addFE['EditLogPort']
+            def showFes = sql """SHOW FRONTENDS"""
+            logger.info("Adding back frontend: {}", showFes)
 
             // Wait for the frontend to be fully added back
-            maxWaitSeconds = 300
-            waited = 0
-            while (waited < maxWaitSeconds) {
+            dockerAwaitUntil(300, 5) {
                 def updatedFrontends = sql_return_maparray("SHOW FRONTENDS")
-                if (updatedFrontends.size() == frontends.size()) {
-                    logger.info("Frontend successfully added back")
-                    break
-                }
-                sleep(10000)
-                waited += 10
+                updatedFrontends.size() == frontends.size()
             }
-
-            if (waited >= maxWaitSeconds) {
-                throw new Exception("Timeout waiting for frontend to be added 
back")
-            }
-
-            // Verify cluster status after adding the frontend back
+           
             checkClusterStatus(3, 3, 5)
 
             logger.info("Frontend successfully added back and cluster status 
verified")
 
             // CASE 6. Drop frontend and add back again
             logger.info("Dropping frontend and adding back again")
-
             // Get the frontend to be dropped
-            def frontendToDrop = frontends.find { it['Host'] == feHost && 
it['EditLogPort'] == feEditLogPort }
+            currentFrontends = sql_return_maparray("SHOW FRONTENDS")
+
+            int obServerCount = currentFrontends.count { it['Role'] == 
'OBSERVER' } 
+            String fuzzyDropRole
+            if (obServerCount != 0) {
+                fuzzyDropRole = (getRandomBoolean() == "true") ? "FOLLOWER" : 
"OBSERVER"
+            } else {
+                fuzzyDropRole = "FOLLOWER"
+            }
+
+            def frontendToDrop = currentFrontends.find {it['IsMaster'] == 
"false" && it['Role'] == fuzzyDropRole}
+            logger.info("Find drop again frontend: {}, drop role [{}]", 
frontendToDrop, fuzzyDropRole)
             assert frontendToDrop != null, "Could not find the frontend to 
drop"
 
+            def role = frontendToDrop.Role
             // Drop the frontend
-            sql """ ALTER SYSTEM DROP FOLLOWER "${feHost}:${feEditLogPort}"; 
"""
-            sleep(30000)
+            sql """ ALTER SYSTEM DROP $role 
"${frontendToDrop.Host}:${frontendToDrop.EditLogPort}"; """
+            dropFeInx = cluster.getFrontends().find { it.host == 
frontendToDrop.Host }.index 
+            // After drop frontendToDrop.Host container will exit
+            cluster.dropFrontends(true, dropFeInx)
+            logger.info("Dropping again frontend index: {}, remove it from 
docker compose", dropFeInx)
+            sleep(3 * 1000)
             reconnectFe()
 
             // Wait for the frontend to be fully dropped
-            maxWaitSeconds = 300
-            waited = 0
-            while (waited < maxWaitSeconds) {
+            dockerAwaitUntil(300, 5) {
                 def updatedFrontends = sql_return_maparray("SHOW FRONTENDS")
-                if (!updatedFrontends.any { it['Host'] == feHost && 
it['EditLogPort'] == feEditLogPort }) {
-                    logger.info("Frontend successfully dropped")
-                    break
-                }
-                sleep(10000)
-                waited += 10
-            }
-
-            if (waited >= maxWaitSeconds) {
-                throw new Exception("Timeout waiting for frontend to be 
dropped")
+                !updatedFrontends.any { it['Host'] == frontendToDrop.Host && 
it['EditLogPort'] == frontendToDrop.EditLogPort }
             }
 
-            // Add the frontend back
-            sql """ ALTER SYSTEM ADD FOLLOWER "${feHost}:${feEditLogPort}"; """
+            // Up a new follower fe and add to docker compose
+            // ATTN: in addFrontend, sql node mode, will execute `ALTER SYSTEM 
ADD FOLLOWER "${feHost}:${feEditLogPort}";`
+            addList = cluster.addFrontend(1, true)
+            logger.info("Up a new frontend, addList: {}", addList)
 
-            // Wait for the frontend to be fully added back
-            maxWaitSeconds = 300
-            waited = 0
-            while (waited < maxWaitSeconds) {
+            dockerAwaitUntil(300, 5) {
                 def updatedFrontends = sql_return_maparray("SHOW FRONTENDS")
-                if (updatedFrontends.any { it['Host'] == feHost && 
it['EditLogPort'] == feEditLogPort }) {
-                    logger.info("Frontend successfully added back")
-                    break
-                }
-                sleep(10000)
-                waited += 10
+                updatedFrontends.size() == 3
             }
-
-            if (waited >= maxWaitSeconds) {
-                throw new Exception("Timeout waiting for frontend to be added 
back")
-            }
-
             // Verify cluster status after adding the frontend back
             checkClusterStatus(3, 3, 6)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to