This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 1ac00ea9830 branch-2.1: [feat](doris compose) Copy lastest compose 
code from master branch (#43464)
1ac00ea9830 is described below

commit 1ac00ea983004d6209c0ed68dba300ed11374c87
Author: yujun <yu...@selectdb.com>
AuthorDate: Fri Nov 8 09:47:19 2024 +0800

    branch-2.1: [feat](doris compose) Copy lastest compose code from master 
branch (#43464)
    
    Copy lastest code from master branch to support run docker suites
    without external doris cluster, enable jvm debug port, ..., etc.
---
 docker/runtime/doris-compose/Dockerfile            |  23 +-
 docker/runtime/doris-compose/Readme.md             |  58 +++-
 docker/runtime/doris-compose/cluster.py            | 225 ++++++++++---
 docker/runtime/doris-compose/command.py            | 349 +++++++++++++++++----
 docker/runtime/doris-compose/database.py           | 171 ++++++----
 docker/runtime/doris-compose/doris-compose.py      |   9 +-
 .../{requirements.txt => format-code.sh}           |   9 +-
 docker/runtime/doris-compose/requirements.txt      |   2 +
 docker/runtime/doris-compose/resource/common.sh    |  13 +-
 .../runtime/doris-compose/resource/entrypoint.sh   |  68 ++++
 docker/runtime/doris-compose/resource/init_be.sh   |   8 +-
 .../runtime/doris-compose/resource/init_cloud.sh   |  12 +-
 docker/runtime/doris-compose/resource/init_fe.sh   |  43 ++-
 docker/runtime/doris-compose/utils.py              |  23 +-
 .../org/apache/doris/regression/Config.groovy      |  22 +-
 .../org/apache/doris/regression/suite/Suite.groovy |   4 +-
 .../doris/regression/suite/SuiteCluster.groovy     | 232 +++++++++++---
 .../suites/demo_p0/docker_action.groovy            |   2 -
 18 files changed, 999 insertions(+), 274 deletions(-)

diff --git a/docker/runtime/doris-compose/Dockerfile 
b/docker/runtime/doris-compose/Dockerfile
index 73561e6410e..c64b732fe34 100644
--- a/docker/runtime/doris-compose/Dockerfile
+++ b/docker/runtime/doris-compose/Dockerfile
@@ -29,16 +29,7 @@ ARG JDK_IMAGE=openjdk:17-jdk-slim
 
 FROM ${JDK_IMAGE}
 
-RUN <<EOF
-    if [ -d "/usr/local/openjdk-17" ]; then
-        ln -s /usr/local/openjdk-17  /usr/local/openjdk
-    else \
-        ln -s /usr/local/openjdk-8  /usr/local/openjdk
-    fi
-EOF
-
 # set environment variables
-ENV JAVA_HOME="/usr/local/openjdk"
 ENV JACOCO_VERSION 0.8.8
 
 RUN mkdir -p /opt/apache-doris/coverage
@@ -47,7 +38,7 @@ RUN  sed -i s@/deb.debian.org/@/mirrors.aliyun.com/@g 
/etc/apt/sources.list
 RUN  apt-get clean
 
 RUN apt-get update && \
-    apt-get install -y default-mysql-client python lsof tzdata curl unzip 
patchelf jq procps && \
+    apt-get install -y default-mysql-client python lsof tzdata curl unzip 
patchelf jq procps util-linux gosu && \
     ln -fs /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \
     dpkg-reconfigure -f noninteractive tzdata && \
     apt-get clean
@@ -57,16 +48,14 @@ RUN curl -f 
https://repo1.maven.org/maven2/org/jacoco/jacoco/${JACOCO_VERSION}/j
     unzip jacoco.zip -d /jacoco
 
 # cloud
-#COPY cloud/CMakeLists.txt cloud/output* output/ms* /opt/apache-doris/cloud/
-RUN <<EOF
-    mkdir /opt/apache-doris/fdb
-    if [ -d /opt/apache-doris/cloud/bin ]; then
-        sed -i 's/\<chmod\>/echo/g' /opt/apache-doris/cloud/bin/start.sh
+# COPY --chmod=777 README.md cloud/output* output/ms* /opt/apache-doris/cloud/
+RUN mkdir /opt/apache-doris/fdb
+RUN if [ -d /opt/apache-doris/cloud/bin ]; then                            \
+        sed -i 's/\<chmod\>/echo/g' /opt/apache-doris/cloud/bin/start.sh ; \
     fi
-EOF
 
 # fe and be
-COPY output /opt/apache-doris/
+COPY --chmod=777 output /opt/apache-doris/
 # in docker, run 'chmod 755 doris_be' first time cost 1min, remove it.
 RUN sed -i 's/\<chmod\>/echo/g' /opt/apache-doris/be/bin/start_be.sh
 
diff --git a/docker/runtime/doris-compose/Readme.md 
b/docker/runtime/doris-compose/Readme.md
index a83fa81e761..c4c4dc0990f 100644
--- a/docker/runtime/doris-compose/Readme.md
+++ b/docker/runtime/doris-compose/Readme.md
@@ -23,7 +23,16 @@ Use doris compose to create doris docker compose clusters.
 
 ## Requirements
 
-1. The doris image should contains:
+##### 1. Make sure you have docker permissions
+
+ run:
+```
+docker run hello-world
+```
+
+if have problem with permission denied, then 
[add-docker-permission](https://docs.docker.com/engine/install/linux-postinstall/).
+
+##### 2. The doris image should contains
 
 ```
 /opt/apache-doris/{fe, be, cloud}
@@ -32,16 +41,14 @@ Use doris compose to create doris docker compose clusters.
 if don't create cloud cluster, the image no need to contains the cloud pkg.
 
 
-if build doris use `sh build.sh --fe --be --cloud`, then its output satisfy 
with all above, then run command in doris root
+if build doris use `sh build.sh --fe --be --cloud`, then its output satisfy 
with all above, then run command in doris root directory
+ will generate such a image.
 
 ```
 docker build -f docker/runtime/doris-compose/Dockerfile -t <image> .
 ```
 
-will generate a image.
-
-2. Install the dependent python library in 
'docker/runtime/doris-compose/requirements.txt'
-
+##### 3. Install the dependent python library in 
'docker/runtime/doris-compose/requirements.txt'
 
 ```
 python -m pip install --user -r docker/runtime/doris-compose/requirements.txt
@@ -49,6 +56,20 @@ python -m pip install --user -r 
docker/runtime/doris-compose/requirements.txt
 
 ## Usage
 
+### Notice
+
+Each cluster will have a directory in '/tmp/doris/{cluster-name}', user can 
set env LOCAL_DORIS_PATH to change its directory.
+
+For example, if user export LOCAL_DORIS_PATH=/mydoris, then the cluster's 
directory is '/mydoris/{cluster-name}'.
+
+And cluster's directory will contains all its containers's logs and data, like 
fe-1, fe-2, be-1, ..., etc.
+
+If there are multiple users run doris-compose on the same machine, suggest 
don't change LOCAL_DORIS_PATH or they should export the same LOCAL_DORIS_PATH.
+
+Because when create a new cluster, doris-compose will search the local doris 
path, and choose a docker network which is different with this path's clusters.
+
+So if multiple users use different LOCAL_DORIS_PATH, their clusters may have 
docker network conflict!!!
+
 ### Create a cluster or recreate its containers
 
 ```
@@ -65,9 +86,11 @@ add fe/be nodes with the specific image, or update existing 
nodes with `--fe-id`
 
 
 For create a cloud cluster, steps are as below:
+
 1. Write cloud s3 store config file, its default path is 
'/tmp/doris/cloud.ini'.
    It's defined in environment variable DORIS_CLOUD_CFG_FILE, user can change 
this env var to change its path.
    A Example file is locate in 
'docker/runtime/doris-compose/resource/cloud.ini.example'.
+
 2. Use doris compose up command with option '--cloud' to create a new cloud 
cluster.
 
 The simplest way to create a cloud cluster:
@@ -127,7 +150,26 @@ Generate regression-conf-custom.groovy to connect to the 
specific docker cluster
 
 steps:
 
-1. Create a new cluster:  `python doris-compose.py up my-cluster  my-image  
--add-fe-num 2  --add-be-num 4 --cloud`
-2. Generate regression-conf-custom.groovy: `python doris-compose.py config 
my-cluster  <doris-root-path> --connect-follow-fe`
+1. Create a new cluster:  `python 
docker/runtime/doris-compose/doris-compose.py up my-cluster  my-image  
--add-fe-num 2  --add-be-num 4 --cloud`
+2. Generate regression-conf-custom.groovy: `python 
docker/runtime/doris-compose/doris-compose.py config my-cluster  
<doris-root-path> --connect-follow-fe`
 3. Run regression test: `bash run-regression-test.sh --run -times 1 -parallel 
1 -suiteParallel 1 -d cloud/multi_cluster`
 
+## Problem investigation
+
+#### Log
+
+Each cluster has logs in /tmp/doris/{cluster-name}/{node-xxx}/log. For each 
node, doris compose will also print log in 
/tmp/doris/{cluster-name}/{node-xxx}/log/health.out
+
+#### Up cluster using non-detach mode
+
+```
+python docker/runtime/doris-compose/doris-compose.py up ...   -no-detach
+```
+
+## Developer
+
+Before submitting code, pls format code.
+
+```
+bash format-code.sh
+```
diff --git a/docker/runtime/doris-compose/cluster.py 
b/docker/runtime/doris-compose/cluster.py
index 5381c094cf2..ba834167bd1 100644
--- a/docker/runtime/doris-compose/cluster.py
+++ b/docker/runtime/doris-compose/cluster.py
@@ -15,8 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import configparser
 import filelock
-import json
+import getpass
+import hashlib
 import jsonpickle
 import os
 import os.path
@@ -24,7 +26,10 @@ import utils
 
 DOCKER_DORIS_PATH = "/opt/apache-doris"
 LOCAL_DORIS_PATH = os.getenv("LOCAL_DORIS_PATH", "/tmp/doris")
-DORIS_SUBNET_START = int(os.getenv("DORIS_SUBNET_START", 128))
+
+# an integer between 128 and 191, generally no need to set
+DORIS_SUBNET_START = os.getenv("DORIS_SUBNET_START")
+
 LOCAL_RESOURCE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
                                    "resource")
 DOCKER_RESOURCE_PATH = os.path.join(DOCKER_DORIS_PATH, "resource")
@@ -35,6 +40,7 @@ FE_HTTP_PORT = 8030
 FE_RPC_PORT = 9020
 FE_QUERY_PORT = 9030
 FE_EDITLOG_PORT = 9010
+FE_JAVA_DBG_PORT = 5005
 
 BE_PORT = 9060
 BE_WEBSVR_PORT = 8040
@@ -49,6 +55,8 @@ ID_LIMIT = 10000
 
 IP_PART4_SIZE = 200
 
+CLUSTER_ID = "12345678"
+
 LOG = utils.get_logger()
 
 
@@ -56,6 +64,15 @@ def get_cluster_path(cluster_name):
     return os.path.join(LOCAL_DORIS_PATH, cluster_name)
 
 
+def get_node_name(node_type, id):
+    return "{}-{}".format(node_type, id)
+
+
+def get_node_path(cluster_name, node_type, id):
+    return os.path.join(get_cluster_path(cluster_name),
+                        get_node_name(node_type, id))
+
+
 def get_compose_file(cluster_name):
     return os.path.join(get_cluster_path(cluster_name), "docker-compose.yml")
 
@@ -83,11 +100,39 @@ def gen_subnet_prefix16():
         except:
             pass
 
-    for i in range(DORIS_SUBNET_START, 192):
-        for j in range(256):
-            subnet = "{}.{}".format(i, j)
-            if not used_subnet.get(subnet, False):
-                return subnet
+    subnet_begin = 128
+    subnet_end = 192
+
+    subnet_part_1 = None
+    subnet_part_2 = None
+    if DORIS_SUBNET_START:
+        subnet_part_1 = int(DORIS_SUBNET_START)
+        subnet_part_2 = 0
+    else:
+        m = hashlib.md5()
+        m.update(getpass.getuser().encode("utf-8"))
+        hash_val = int(m.hexdigest(), 16)
+        # want subnet part ii to be a small num, just less than 100, so don't 
use 256 here.
+        small_width = 100
+        slot_num = (subnet_end - subnet_begin) * small_width
+        idx = hash_val % slot_num
+        if idx < 0:
+            idx += slot_num
+        subnet_part_1 = subnet_begin + int(idx / small_width)
+        subnet_part_2 = idx % small_width
+
+    intervals = [
+        [(subnet_part_1, subnet_part_1 + 1), (subnet_part_2, 256)],
+        [(subnet_part_1 + 1, subnet_end), (0, 256)],
+        [(subnet_begin, subnet_part_1), (0, 256)],
+        [(subnet_part_1, subnet_part_1 + 1), (0, subnet_part_2)],
+    ]
+    for interval in intervals:
+        for i in range(interval[0][0], interval[0][1]):
+            for j in range(interval[1][0], interval[1][1]):
+                subnet = "{}.{}".format(i, j)
+                if not used_subnet.get(subnet, False):
+                    return subnet
 
     raise Exception("Failed to gen subnet")
 
@@ -207,8 +252,6 @@ class Node(object):
         path = self.get_path()
         os.makedirs(path, exist_ok=True)
 
-        config = self.get_add_init_config()
-
         # copy config to local
         conf_dir = os.path.join(path, "conf")
         if not os.path.exists(conf_dir) or utils.is_dir_empty(conf_dir):
@@ -216,12 +259,15 @@ class Node(object):
             assert not utils.is_dir_empty(conf_dir), "conf directory {} is 
empty, " \
                     "check doris path in image is correct".format(conf_dir)
             utils.enable_dir_with_rw_perm(conf_dir)
+            config = self.get_add_init_config()
             if config:
                 with open(os.path.join(conf_dir, self.conf_file_name()),
                           "a") as f:
                     f.write("\n")
+                    f.write("#### start doris-compose add config ####\n\n")
                     for item in config:
                         f.write(item + "\n")
+                    f.write("\n#### end doris-compose add config ####\n")
         for sub_dir in self.expose_sub_dirs():
             os.makedirs(os.path.join(path, sub_dir), exist_ok=True)
 
@@ -246,11 +292,10 @@ class Node(object):
         return ["conf", "log"]
 
     def get_name(self):
-        return "{}-{}".format(self.node_type(), self.id)
+        return get_node_name(self.node_type(), self.id)
 
     def get_path(self):
-        return os.path.join(get_cluster_path(self.cluster.name),
-                            self.get_name())
+        return get_node_path(self.cluster.name, self.node_type(), self.id)
 
     def get_image(self):
         return self.meta.image
@@ -292,11 +337,18 @@ class Node(object):
             "DORIS_HOME": os.path.join(self.docker_home_dir()),
             "STOP_GRACE": 1 if enable_coverage else 0,
             "IS_CLOUD": 1 if self.cluster.is_cloud else 0,
+            "SQL_MODE_NODE_MGR": 1 if self.cluster.sql_mode_node_mgr else 0,
         }
 
         if self.cluster.is_cloud:
             envs["META_SERVICE_ENDPOINT"] = self.cluster.get_meta_server_addr()
 
+        # run as host user
+        if not getattr(self.cluster, 'is_root_user', True):
+            envs["HOST_USER"] = getpass.getuser()
+            envs["HOST_UID"] = os.getuid()
+            envs["HOST_GID"] = os.getgid()
+
         if enable_coverage:
             outfile = "{}/coverage/{}-coverage-{}-{}".format(
                 DOCKER_DORIS_PATH, self.node_type(), self.cluster.name,
@@ -311,6 +363,15 @@ class Node(object):
 
         return envs
 
+    def entrypoint(self):
+        if self.start_script():
+            return [
+                "bash",
+                os.path.join(DOCKER_RESOURCE_PATH, "entrypoint.sh")
+            ] + self.start_script()
+        else:
+            return None
+
     def get_add_init_config(self):
         return []
 
@@ -334,10 +395,16 @@ class Node(object):
             for path in ("/etc/localtime", "/etc/timezone",
                          "/usr/share/zoneinfo") if os.path.exists(path)
         ]
+
         if self.cluster.coverage_dir:
             volumes.append("{}:{}/coverage".format(self.cluster.coverage_dir,
                                                    DOCKER_DORIS_PATH))
 
+        extra_hosts = [
+            "{}:{}".format(node.get_name(), node.get_ip())
+            for node in self.cluster.get_all_nodes()
+        ]
+
         content = {
             "cap_add": ["SYS_PTRACE"],
             "hostname": self.get_name(),
@@ -349,6 +416,7 @@ class Node(object):
                     "ipv4_address": self.get_ip(),
                 }
             },
+            "extra_hosts": extra_hosts,
             "ports": self.docker_ports(),
             "ulimits": {
                 "core": -1
@@ -365,37 +433,74 @@ class Node(object):
 
 class FE(Node):
 
+    def init(self):
+        super().init()
+        self.init_is_follower()
+
     def get_add_init_config(self):
         cfg = []
         if self.cluster.fe_config:
             cfg += self.cluster.fe_config
         if self.cluster.is_cloud:
             cfg += [
-                "cloud_unique_id = " + self.cloud_unique_id(),
                 "meta_service_endpoint = {}".format(
                     self.cluster.get_meta_server_addr()),
                 "",
                 "# For regression-test",
                 "ignore_unsupported_properties_in_cloud_mode = true",
                 "merge_on_write_forced_to_false = true",
+                "deploy_mode = cloud",
             ]
 
+            if self.cluster.sql_mode_node_mgr:
+                cfg += [
+                    "cluster_id = " + CLUSTER_ID,
+                ]
+            else:
+                cfg += [
+                    "cloud_unique_id = " + self.cloud_unique_id(),
+                ]
+
+        with open("{}/conf/{}".format(self.get_path(), self.conf_file_name()),
+                  "r") as f:
+            parser = configparser.ConfigParser()
+            parser.read_string('[dummy_section]\n' + f.read())
+            for key in ("JAVA_OPTS", "JAVA_OPTS_FOR_JDK_17"):
+                value = parser["dummy_section"].get(key)
+                if value:
+                    cfg.append(
+                        f"{key} = \"{value} 
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:{FE_JAVA_DBG_PORT}\""
+                    )
+
         return cfg
 
+    def init_is_follower(self):
+        if self.cluster.is_cloud and self.cluster.fe_follower:
+            with open(self._is_follower_path(), "w") as f:
+                f.write("true")
+
+    def _is_follower_path(self):
+        return "{}/conf/is_follower".format(self.get_path())
+
     def docker_env(self):
         envs = super().docker_env()
         if self.cluster.is_cloud:
             envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id()
+        if os.path.exists(self._is_follower_path()):
+            envs["IS_FE_FOLLOWER"] = 1
         return envs
 
     def cloud_unique_id(self):
         return "sql_server_{}".format(self.id)
 
-    def entrypoint(self):
-        return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_fe.sh")]
+    def start_script(self):
+        return ["init_fe.sh"]
 
     def docker_ports(self):
-        return [FE_HTTP_PORT, FE_EDITLOG_PORT, FE_RPC_PORT, FE_QUERY_PORT]
+        return [
+            FE_HTTP_PORT, FE_EDITLOG_PORT, FE_RPC_PORT, FE_QUERY_PORT,
+            FE_JAVA_DBG_PORT
+        ]
 
     def docker_home_dir(self):
         return os.path.join(DOCKER_DORIS_PATH, "fe")
@@ -421,14 +526,26 @@ class BE(Node):
             cfg += self.cluster.be_config
         if self.cluster.is_cloud:
             cfg += [
-                "cloud_unique_id = " + self.cloud_unique_id(),
-                "meta_service_endpoint = {}".format(
-                    self.cluster.get_meta_server_addr()),
                 'tmp_file_dirs = [ 
{"path":"./storage/tmp","max_cache_bytes":10240000, 
"max_upload_bytes":10240000}]',
                 'enable_file_cache = true',
                 'file_cache_path = [ {{"path": "{}/storage/file_cache", 
"total_size":53687091200, "query_limit": 10737418240}}]'
                 .format(self.docker_home_dir()),
+                "deploy_mode = cloud",
             ]
+
+            if self.cluster.be_metaservice_endpoint:
+                cfg += [
+                    "meta_service_endpoint = {}".format(
+                        self.cluster.get_meta_server_addr()),
+                ]
+            if self.cluster.be_cluster_id:
+                cfg += [
+                    "cluster_id = " + CLUSTER_ID,
+                ]
+            if not self.cluster.sql_mode_node_mgr:
+                cfg += [
+                    "cloud_unique_id = " + self.cloud_unique_id(),
+                ]
         return cfg
 
     def init_cluster_name(self):
@@ -477,8 +594,8 @@ class BE(Node):
             storage_root_path = ";".join(dir_descs) if dir_descs else '""'
             f.write("\nstorage_root_path = {}\n".format(storage_root_path))
 
-    def entrypoint(self):
-        return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_be.sh")]
+    def start_script(self):
+        return ["init_be.sh"]
 
     def docker_env(self):
         envs = super().docker_env()
@@ -529,12 +646,8 @@ class MS(CLOUD):
             cfg += self.cluster.ms_config
         return cfg
 
-    def entrypoint(self):
-        return [
-            "bash",
-            os.path.join(DOCKER_RESOURCE_PATH, "init_cloud.sh"),
-            "--meta-service"
-        ]
+    def start_script(self):
+        return ["init_cloud.sh", "--meta-service"]
 
     def node_type(self):
         return Node.TYPE_MS
@@ -554,11 +667,8 @@ class RECYCLE(CLOUD):
             cfg += self.cluster.recycle_config
         return cfg
 
-    def entrypoint(self):
-        return [
-            "bash",
-            os.path.join(DOCKER_RESOURCE_PATH, "init_cloud.sh"), "--recycler"
-        ]
+    def start_script(self):
+        return ["init_cloud.sh", "--recycler"]
 
     def node_type(self):
         return Node.TYPE_RECYCLE
@@ -579,8 +689,8 @@ class FDB(Node):
         with open(os.path.join(local_conf_dir, "fdb.cluster"), "w") as f:
             f.write(self.cluster.get_fdb_cluster())
 
-    def entrypoint(self):
-        return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_fdb.sh")]
+    def start_script(self):
+        return ["init_fdb.sh"]
 
     def docker_home_dir(self):
         return os.path.join(DOCKER_DORIS_PATH, "fdb")
@@ -597,17 +707,20 @@ class FDB(Node):
 
 class Cluster(object):
 
-    def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,
-                 ms_config, recycle_config, be_disks, be_cluster, reg_be,
-                 coverage_dir, cloud_store_config):
+    def __init__(self, name, subnet, image, is_cloud, is_root_user, fe_config,
+                 be_config, ms_config, recycle_config, fe_follower, be_disks,
+                 be_cluster, reg_be, coverage_dir, cloud_store_config,
+                 sql_mode_node_mgr, be_metaservice_endpoint, be_cluster_id):
         self.name = name
         self.subnet = subnet
         self.image = image
         self.is_cloud = is_cloud
+        self.is_root_user = is_root_user
         self.fe_config = fe_config
         self.be_config = be_config
         self.ms_config = ms_config
         self.recycle_config = recycle_config
+        self.fe_follower = fe_follower
         self.be_disks = be_disks
         self.be_cluster = be_cluster
         self.reg_be = reg_be
@@ -617,18 +730,29 @@ class Cluster(object):
             node_type: Group(node_type)
             for node_type in Node.TYPE_ALL
         }
+        self.sql_mode_node_mgr = sql_mode_node_mgr
+        self.be_metaservice_endpoint = be_metaservice_endpoint
+        self.be_cluster_id = be_cluster_id
 
     @staticmethod
-    def new(name, image, is_cloud, fe_config, be_config, ms_config,
-            recycle_config, be_disks, be_cluster, reg_be, coverage_dir,
-            cloud_store_config):
-        os.makedirs(LOCAL_DORIS_PATH, exist_ok=True)
-        with filelock.FileLock(os.path.join(LOCAL_DORIS_PATH, "lock")):
+    def new(name, image, is_cloud, is_root_user, fe_config, be_config,
+            ms_config, recycle_config, fe_follower, be_disks, be_cluster,
+            reg_be, coverage_dir, cloud_store_config, sql_mode_node_mgr,
+            be_metaservice_endpoint, be_cluster_id):
+        if not os.path.exists(LOCAL_DORIS_PATH):
+            os.makedirs(LOCAL_DORIS_PATH, exist_ok=True)
+            os.chmod(LOCAL_DORIS_PATH, 0o777)
+        lock_file = os.path.join(LOCAL_DORIS_PATH, "lock")
+        with filelock.FileLock(lock_file):
+            if os.getuid() == utils.get_path_uid(lock_file):
+                os.chmod(lock_file, 0o666)
             subnet = gen_subnet_prefix16()
-            cluster = Cluster(name, subnet, image, is_cloud, fe_config,
-                              be_config, ms_config, recycle_config, be_disks,
-                              be_cluster, reg_be, coverage_dir,
-                              cloud_store_config)
+            cluster = Cluster(name, subnet, image, is_cloud, is_root_user,
+                              fe_config, be_config, ms_config, recycle_config,
+                              fe_follower, be_disks, be_cluster, reg_be,
+                              coverage_dir, cloud_store_config,
+                              sql_mode_node_mgr, be_metaservice_endpoint,
+                              be_cluster_id)
             os.makedirs(cluster.get_path(), exist_ok=True)
             os.makedirs(get_status_path(name), exist_ok=True)
             cluster._save_meta()
@@ -686,7 +810,14 @@ class Cluster(object):
             raise Exception("No found {} with id {}".format(node_type, id))
         return Node.new(self, node_type, id, meta)
 
-    def get_all_nodes(self, node_type):
+    def get_all_nodes(self, node_type=None):
+        if node_type is None:
+            nodes = []
+            for nt, group in self.groups.items():
+                for id, meta in group.get_all_nodes().items():
+                    nodes.append(Node.new(self, nt, id, meta))
+            return nodes
+
         group = self.groups.get(node_type, None)
         if not group:
             raise Exception("Unknown node_type: {}".format(node_type))
diff --git a/docker/runtime/doris-compose/command.py 
b/docker/runtime/doris-compose/command.py
index 87ae862236a..7a2f3f3c195 100644
--- a/docker/runtime/doris-compose/command.py
+++ b/docker/runtime/doris-compose/command.py
@@ -30,6 +30,47 @@ import time
 LOG = utils.get_logger()
 
 
+def wait_ready_service(wait_timeout, cluster, fe_ids, be_ids):
+
+    def is_fe_ready_service(ip):
+        return utils.is_socket_avail(ip, CLUSTER.FE_QUERY_PORT)
+
+    def is_be_ready_service(ip):
+        return utils.is_socket_avail(ip, CLUSTER.BE_HEARTBEAT_PORT)
+
+    if wait_timeout == 0:
+        return
+    if wait_timeout == -1:
+        wait_timeout = 1000000000
+    expire_ts = time.time() + wait_timeout
+    while True:
+        db_mgr = database.get_db_mgr(cluster.name, False)
+        dead_frontends = []
+        for id in fe_ids:
+            fe = cluster.get_node(CLUSTER.Node.TYPE_FE, id)
+            fe_state = db_mgr.get_fe(id)
+            if not fe_state or not fe_state.alive or not is_fe_ready_service(
+                    fe.get_ip()):
+                dead_frontends.append(id)
+        dead_backends = []
+        for id in be_ids:
+            be = cluster.get_node(CLUSTER.Node.TYPE_BE, id)
+            be_state = db_mgr.get_be(id)
+            if not be_state or not be_state.alive or not is_be_ready_service(
+                    be.get_ip()):
+                dead_backends.append(id)
+        if not dead_frontends and not dead_backends:
+            break
+        if time.time() >= expire_ts:
+            err = ""
+            if dead_frontends:
+                err += "dead fe: " + str(dead_frontends) + ". "
+            if dead_backends:
+                err += "dead be: " + str(dead_backends) + ". "
+            raise Exception(err)
+        time.sleep(1)
+
+
 # return for_all, related_nodes, related_node_num
 def get_ids_related_nodes(cluster,
                           fe_ids,
@@ -92,7 +133,12 @@ class Command(object):
     def run(self, args):
         raise Exception("No implemented")
 
-    def _add_parser_output_json(self, parser):
+    def _add_parser_common_args(self, parser):
+        parser.add_argument("-v",
+                            "--verbose",
+                            default=False,
+                            action=self._get_parser_bool_action(True),
+                            help="verbose logging.")
         parser.add_argument("--output-json",
                             default=False,
                             action=self._get_parser_bool_action(True),
@@ -136,6 +182,18 @@ class Command(object):
     def _support_boolean_action(self):
         return sys.version_info.major == 3 and sys.version_info.minor >= 9
 
+    def _print_table(self, header, datas):
+        if utils.is_enable_log():
+            table = prettytable.PrettyTable(
+                [utils.render_green(field) for field in header])
+            for row in datas:
+                table.add_row(row)
+            print(table)
+            return ""
+        else:
+            datas.insert(0, header)
+            return datas
+
 
 class SimpleCommand(Command):
 
@@ -150,11 +208,12 @@ class SimpleCommand(Command):
         parser = args_parsers.add_parser(self.command, help=help)
         parser.add_argument("NAME", help="Specify cluster name.")
         self._add_parser_ids_args(parser)
-        self._add_parser_output_json(parser)
+        self._add_parser_common_args(parser)
+        return parser
 
     def run(self, args):
         cluster = CLUSTER.Cluster.load(args.NAME)
-        _, related_nodes, related_node_num = get_ids_related_nodes(
+        for_all, related_nodes, related_node_num = get_ids_related_nodes(
             cluster, args.fe_id, args.be_id, args.ms_id, args.recycle_id,
             args.fdb_id)
         utils.exec_docker_compose_command(cluster.get_compose_file(),
@@ -165,6 +224,31 @@ class SimpleCommand(Command):
             utils.render_green("{} succ, total related node num {}".format(
                 show_cmd, related_node_num)))
 
+        if for_all:
+            related_nodes = cluster.get_all_nodes()
+        return cluster, related_nodes
+
+
+class NeedStartCommand(SimpleCommand):
+
+    def add_parser(self, args_parsers):
+        parser = super().add_parser(args_parsers)
+        parser.add_argument(
+            "--wait-timeout",
+            type=int,
+            default=0,
+            help=
+            "Specify wait seconds for fe/be ready for service: 0 not wait 
(default), "\
+            "> 0 max wait seconds, -1 wait unlimited."
+        )
+        return parser
+
+    def run(self, args):
+        cluster, related_nodes = super().run(args)
+        fe_ids = [node.id for node in related_nodes if node.is_fe()]
+        be_ids = [node.id for node in related_nodes if node.is_be()]
+        wait_ready_service(args.wait_timeout, cluster, fe_ids, be_ids)
+
 
 class UpCommand(Command):
 
@@ -180,6 +264,7 @@ class UpCommand(Command):
                             nargs="?",
                             help="Specify docker image.")
 
+        self._add_parser_common_args(parser)
         parser.add_argument(
             "--cloud",
             default=False,
@@ -187,6 +272,13 @@ class UpCommand(Command):
             help=
             "Create cloud cluster, default is false. Only use when creating 
new cluster."
         )
+        parser.add_argument(
+            "--root",
+            default=False,
+            action=self._get_parser_bool_action(True),
+            help=
+            "Run cluster as root user, default is false, it will run as host 
user."
+        )
 
         parser.add_argument(
             "--wait-timeout",
@@ -197,8 +289,6 @@ class UpCommand(Command):
             "> 0 max wait seconds, -1 wait unlimited."
         )
 
-        self._add_parser_output_json(parser)
-
         group1 = parser.add_argument_group("add new nodes",
                                            "add cluster nodes.")
         group1.add_argument(
@@ -245,6 +335,13 @@ class UpCommand(Command):
                             type=str,
                             help="Specify recycle configs for 
doris_cloud.conf. "\
                                     "Example: --recycle-config \"log_level = 
warn\".")
+        group1.add_argument(
+            "--fe-follower",
+            default=False,
+            action=self._get_parser_bool_action(True),
+            help=
+            "The new added fe is follower but not observer. Only support in 
cloud mode."
+        )
         group1.add_argument("--be-disks",
                             nargs="*",
                             default=["HDD=1"],
@@ -283,13 +380,50 @@ class UpCommand(Command):
         group2.add_argument("--force-recreate",
                            default=False,
                            action=self._get_parser_bool_action(True),
-                           help="Recreate containers even if their 
configuration" \
+                           help="Recreate containers even if their 
configuration " \
                                 "and image haven't changed. ")
 
         parser.add_argument("--coverage-dir",
                             default="",
                             help="Set code coverage output directory")
 
+        parser.add_argument("--sql-mode-node-mgr",
+                            default=False,
+                            action=self._get_parser_bool_action(True),
+                            help="Manager fe be via sql instead of http")
+
+        if self._support_boolean_action():
+            parser.add_argument(
+                "--be-metaservice-endpoint",
+                default=True,
+                action=self._get_parser_bool_action(False),
+                help=
+                "Do not set BE meta service endpoint in conf. Default is 
False."
+            )
+        else:
+            parser.add_argument(
+                "--no-be-metaservice-endpoint",
+                dest='be_metaservice_endpoint',
+                default=True,
+                action=self._get_parser_bool_action(False),
+                help=
+                "Do not set BE meta service endpoint in conf. Default is 
False."
+            )
+
+        if self._support_boolean_action():
+            parser.add_argument(
+                "--be-cluster-id",
+                default=True,
+                action=self._get_parser_bool_action(False),
+                help="Do not set BE cluster ID in conf. Default is False.")
+        else:
+            parser.add_argument(
+                "--no-be-cluster-id",
+                dest='be_cluster_id',
+                default=True,
+                action=self._get_parser_bool_action(False),
+                help="Do not set BE cluser ID in conf. Default is False.")
+
         parser.add_argument(
             "--fdb-version",
             type=str,
@@ -383,18 +517,21 @@ class UpCommand(Command):
                 args.add_ms_num = 0
                 args.add_recycle_num = 0
 
-            cluster = CLUSTER.Cluster.new(args.NAME, args.IMAGE, args.cloud,
-                                          args.fe_config, args.be_config,
-                                          args.ms_config, args.recycle_config,
-                                          args.be_disks, args.be_cluster,
-                                          args.reg_be, args.coverage_dir,
-                                          cloud_store_config)
+            cluster = CLUSTER.Cluster.new(
+                args.NAME, args.IMAGE, args.cloud, args.root, args.fe_config,
+                args.be_config, args.ms_config, args.recycle_config,
+                args.fe_follower, args.be_disks, args.be_cluster, args.reg_be,
+                args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr,
+                args.be_metaservice_endpoint, args.be_cluster_id)
             LOG.info("Create new cluster {} succ, cluster path is {}".format(
                 args.NAME, cluster.get_path()))
 
         if args.be_cluster and cluster.is_cloud:
             cluster.be_cluster = args.be_cluster
 
+        if cluster.is_cloud:
+            cluster.fe_follower = args.fe_follower
+
         _, related_nodes, _ = get_ids_related_nodes(cluster, args.fe_id,
                                                     args.be_id, args.ms_id,
                                                     args.recycle_id,
@@ -474,32 +611,51 @@ class UpCommand(Command):
                     "Not up cluster cause specific --no-start, related node 
num {}"
                     .format(related_node_num)))
         else:
-            if args.wait_timeout != 0:
-                if args.wait_timeout == -1:
-                    args.wait_timeout = 1000000000
-                expire_ts = time.time() + args.wait_timeout
-                while True:
-                    db_mgr = database.get_db_mgr(args.NAME, False)
-                    dead_frontends = []
-                    for id in add_fe_ids:
-                        fe_state = db_mgr.get_fe(id)
-                        if not fe_state or not fe_state.alive:
-                            dead_frontends.append(id)
-                    dead_backends = []
-                    for id in add_be_ids:
-                        be_state = db_mgr.get_be(id)
-                        if not be_state or not be_state.alive:
-                            dead_backends.append(id)
-                    if not dead_frontends and not dead_backends:
+            LOG.info("Using SQL mode for node management ? {}".format(
+                args.sql_mode_node_mgr))
+
+            # Wait for FE master to be elected
+            LOG.info("Waiting for FE master to be elected...")
+            expire_ts = time.time() + 30
+            while expire_ts > time.time():
+                db_mgr = database.get_db_mgr(args.NAME, False)
+                for id in add_fe_ids:
+                    fe_state = db_mgr.get_fe(id)
+                    if fe_state is not None and fe_state.alive:
                         break
-                    if time.time() >= expire_ts:
-                        err = ""
-                        if dead_frontends:
-                            err += "dead fe: " + str(dead_frontends) + ". "
-                        if dead_backends:
-                            err += "dead be: " + str(dead_backends) + ". "
-                        raise Exception(err)
-                    time.sleep(1)
+                    LOG.info("there is no fe ready")
+                time.sleep(5)
+
+            if cluster.is_cloud and args.sql_mode_node_mgr:
+                db_mgr = database.get_db_mgr(args.NAME, False)
+                master_fe_endpoint = CLUSTER.get_master_fe_endpoint(
+                    cluster.name)
+                # Add FEs except master_fe
+                for fe in cluster.get_all_nodes(CLUSTER.Node.TYPE_FE):
+                    fe_endpoint = f"{fe.get_ip()}:{CLUSTER.FE_EDITLOG_PORT}"
+                    if fe_endpoint != master_fe_endpoint:
+                        try:
+                            db_mgr.add_fe(fe_endpoint)
+                            LOG.info(f"Added FE {fe_endpoint} successfully.")
+                        except Exception as e:
+                            LOG.error(
+                                f"Failed to add FE {fe_endpoint}: {str(e)}")
+
+                # Add BEs
+                for be in cluster.get_all_nodes(CLUSTER.Node.TYPE_BE):
+                    be_endpoint = f"{be.get_ip()}:{CLUSTER.BE_HEARTBEAT_PORT}"
+                    try:
+                        db_mgr.add_be(be_endpoint)
+                        LOG.info(f"Added BE {be_endpoint} successfully.")
+                    except Exception as e:
+                        LOG.error(f"Failed to add BE {be_endpoint}: {str(e)}")
+
+                cloud_store_config = self._get_cloud_store_config()
+
+                db_mgr.create_default_storage_vault(cloud_store_config)
+
+            wait_ready_service(args.wait_timeout, cluster, add_fe_ids,
+                               add_be_ids)
             LOG.info(
                 utils.render_green(
                     "Up cluster {} succ, related node num {}".format(
@@ -576,7 +732,7 @@ class DownCommand(Command):
                                            "then apply to all containers.")
         parser.add_argument("NAME", help="Specify cluster name")
         self._add_parser_ids_args(parser)
-        self._add_parser_output_json(parser)
+        self._add_parser_common_args(parser)
         parser.add_argument(
             "--clean",
             default=False,
@@ -606,6 +762,8 @@ class DownCommand(Command):
             args.fdb_id,
             ignore_not_exists=True)
 
+        LOG.info("down cluster " + args.NAME + " for all " + str(for_all))
+
         if for_all:
             if os.path.exists(cluster.get_compose_file()):
                 try:
@@ -687,7 +845,6 @@ class ListNode(object):
         self.created = ""
         self.alive = ""
         self.is_master = ""
-        self.query_port = ""
         self.tablet_num = ""
         self.last_heartbeat = ""
         self.err_msg = ""
@@ -702,16 +859,23 @@ class ListNode(object):
         if detail:
             query_port = ""
             http_port = ""
+            heartbeat_port = ""
+            edit_log_port = ""
+            node_path = CLUSTER.get_node_path(self.cluster_name,
+                                              self.node_type, self.id)
             if self.node_type == CLUSTER.Node.TYPE_FE:
                 query_port = CLUSTER.FE_QUERY_PORT
                 http_port = CLUSTER.FE_HTTP_PORT
+                edit_log_port = CLUSTER.FE_EDITLOG_PORT
             elif self.node_type == CLUSTER.Node.TYPE_BE:
                 http_port = CLUSTER.BE_WEBSVR_PORT
+                heartbeat_port = CLUSTER.BE_HEARTBEAT_PORT
+            elif self.node_type == CLUSTER.Node.TYPE_MS or self.node_type == 
CLUSTER.Node.TYPE_RECYCLE:
+                http_port = CLUSTER.MS_PORT
             else:
                 pass
             result += [
-                query_port,
-                http_port,
+                query_port, http_port, node_path, edit_log_port, heartbeat_port
             ]
         return result
 
@@ -721,7 +885,6 @@ class ListNode(object):
             if fe:
                 self.alive = str(fe.alive).lower()
                 self.is_master = str(fe.is_master).lower()
-                self.query_port = fe.query_port
                 self.last_heartbeat = fe.last_heartbeat
                 self.err_msg = fe.err_msg
         elif self.node_type == CLUSTER.Node.TYPE_BE:
@@ -812,7 +975,16 @@ cloudUniqueId= "{fe_cloud_unique_id}"
                 print("\nNo write regression custom file.")
                 return
 
+        annotation_start = "//---------- Start auto generate by 
doris-compose.py---------"
+        annotation_end = "//---------- End auto generate by 
doris-compose.py---------"
+
+        old_contents = []
+        if os.path.exists(regression_conf_custom):
+            with open(regression_conf_custom, "r") as f:
+                old_contents = f.readlines()
         with open(regression_conf_custom, "w") as f:
+            # write auto gen config
+            f.write(annotation_start)
             f.write(base_conf.format(fe_ip=fe_ip))
             if cluster.is_cloud:
                 multi_cluster_bes = ",".join([
@@ -831,6 +1003,23 @@ cloudUniqueId= "{fe_cloud_unique_id}"
                         multi_cluster_bes=multi_cluster_bes,
                         fe_cloud_unique_id=cluster.get_node(
                             CLUSTER.Node.TYPE_FE, 1).cloud_unique_id()))
+            f.write(annotation_end + "\n\n")
+            annotation_end_line_count = -1
+
+            # write not-auto gen config
+            in_annotation = False
+            annotation_end_line_idx = -100
+            for line_idx, line in enumerate(old_contents):
+                line = line.rstrip()
+                if line == annotation_start:
+                    in_annotation = True
+                elif line == annotation_end:
+                    in_annotation = False
+                    annotation_end_line_idx = line_idx
+                elif not in_annotation:
+                    if line or line_idx != annotation_end_line_idx + 1:
+                        f.write(line + "\n")
+
         print("\nWrite succ: " + regression_conf_custom)
 
 
@@ -845,24 +1034,12 @@ class ListCommand(Command):
             help=
             "Specify multiple clusters, if specific, show all their 
containers."
         )
-        self._add_parser_output_json(parser)
+        self._add_parser_common_args(parser)
         parser.add_argument("--detail",
                             default=False,
                             action=self._get_parser_bool_action(True),
                             help="Print more detail fields.")
 
-    def _handle_data(self, header, datas):
-        if utils.is_enable_log():
-            table = prettytable.PrettyTable(
-                [utils.render_green(field) for field in header])
-            for row in datas:
-                table.add_row(row)
-            print(table)
-            return ""
-        else:
-            datas.insert(0, header)
-            return datas
-
     def run(self, args):
         COMPOSE_MISSING = "(missing)"
         COMPOSE_BAD = "(bad)"
@@ -954,7 +1131,7 @@ class ListCommand(Command):
                              CLUSTER.get_master_fe_endpoint(name), is_cloud,
                              "{}{}".format(compose_file,
                                            cluster_info["status"])))
-            return self._handle_data(header, rows)
+            return self._print_table(header, rows)
 
         header = [
             "CLUSTER", "NAME", "IP", "STATUS", "CONTAINER ID", "IMAGE",
@@ -965,6 +1142,9 @@ class ListCommand(Command):
             header += [
                 "query_port",
                 "http_port",
+                "path",
+                "edit_log_port",
+                "heartbeat_port",
             ]
 
         rows = []
@@ -1038,17 +1218,68 @@ class ListCommand(Command):
             for node in sorted(nodes, key=get_node_seq):
                 rows.append(node.info(args.detail))
 
-        return self._handle_data(header, rows)
+        return self._print_table(header, rows)
+
+
+class InfoCommand(Command):
+
+    def add_parser(self, args_parsers):
+        parser = args_parsers.add_parser(
+            "info", help="Show info like cloud.ini, port, path, etc")
+        self._add_parser_common_args(parser)
+
+    def run(self, args):
+
+        header = ["key", "value", "scope"]
+        cloud_cfg_file_env = os.getenv("DORIS_CLOUD_CFG_FILE")
+        cloud_cfg_file = cloud_cfg_file_env if cloud_cfg_file_env else 
"${LOCAL_DORIS_PATH}/cloud.ini"
+        rows = [
+            ("LOCAL_DORIS_PATH", CLUSTER.LOCAL_DORIS_PATH, "env variable"),
+            ("DORIS_CLOUD_CFG_FILE", cloud_cfg_file, "env variable"),
+            ("FE_QUERY_PORT", CLUSTER.FE_QUERY_PORT, "constant"),
+            ("FE_HTTP_PORT", CLUSTER.FE_HTTP_PORT, "constant"),
+            ("FE_EDITLOG_PORT", CLUSTER.FE_EDITLOG_PORT, "constant"),
+            ("FE_JAVA_DBG_PORT", CLUSTER.FE_JAVA_DBG_PORT, "constant"),
+            ("BE_HEARTBEAT_PORT", CLUSTER.BE_HEARTBEAT_PORT, "constant"),
+            ("BE_WEBSVR_PORT", CLUSTER.BE_WEBSVR_PORT, "constant"),
+            ("MS_PORT", CLUSTER.MS_PORT, "constant"),
+            ("RECYCLER_PORT", CLUSTER.MS_PORT, "constant"),
+        ]
+
+        with open(CLUSTER.CLOUD_CFG_FILE, "r") as f:
+            for line in f:
+                line = line.strip()
+                if line and not line.startswith('#'):
+                    key, value = line.split('=', 1)
+                    rows.append((key.strip(), value.strip(), "cloud.ini"))
+
+        return self._print_table(header, rows)
+
+
+class AddRWPermCommand(Command):
+
+    def add_parser(self, args_parsers):
+        parser = args_parsers.add_parser(
+            "add-rw-perm",
+            help="Add read and write permissions to the cluster files")
+        parser.add_argument("NAME", help="Specify cluster name.")
+        self._add_parser_common_args(parser)
+
+    def run(self, args):
+        utils.enable_dir_with_rw_perm(CLUSTER.get_cluster_path(args.NAME))
+        return ""
 
 
 ALL_COMMANDS = [
     UpCommand("up"),
     DownCommand("down"),
-    SimpleCommand("start", "Start the doris containers. "),
+    NeedStartCommand("start", "Start the doris containers. "),
     SimpleCommand("stop", "Stop the doris containers. "),
-    SimpleCommand("restart", "Restart the doris containers. "),
+    NeedStartCommand("restart", "Restart the doris containers. "),
     SimpleCommand("pause", "Pause the doris containers. "),
     SimpleCommand("unpause", "Unpause the doris containers. "),
     GenConfCommand("config"),
+    InfoCommand("info"),
     ListCommand("ls"),
+    AddRWPermCommand("add-rw-perm"),
 ]
diff --git a/docker/runtime/doris-compose/database.py 
b/docker/runtime/doris-compose/database.py
index d29905e94a9..46cdd961c9f 100644
--- a/docker/runtime/doris-compose/database.py
+++ b/docker/runtime/doris-compose/database.py
@@ -20,16 +20,15 @@ import os.path
 import pymysql
 import time
 import utils
+import uuid
 
 LOG = utils.get_logger()
 
 
 class FEState(object):
 
-    def __init__(self, id, query_port, is_master, alive, last_heartbeat,
-                 err_msg):
+    def __init__(self, id, is_master, alive, last_heartbeat, err_msg):
         self.id = id
-        self.query_port = query_port
         self.is_master = is_master
         self.alive = alive
         self.last_heartbeat = last_heartbeat
@@ -54,11 +53,8 @@ class DBManager(object):
     def __init__(self):
         self.fe_states = {}
         self.be_states = {}
-        self.query_port = -1
         self.conn = None
-
-    def set_query_port(self, query_port):
-        self.query_port = query_port
+        self.master_fe_ip = ""
 
     def get_fe(self, id):
         return self.fe_states.get(id, None)
@@ -66,10 +62,19 @@ class DBManager(object):
     def get_be(self, id):
         return self.be_states.get(id, None)
 
-    def load_states(self, query_ports):
-        self._load_fe_states(query_ports)
+    def load_states(self):
+        self._load_fe_states()
         self._load_be_states()
 
+    def add_fe(self, fe_endpoint):
+        try:
+            sql = f"ALTER SYSTEM ADD FOLLOWER '{fe_endpoint}'"
+            self._exec_query(sql)
+            LOG.info(f"Added FE {fe_endpoint} via SQL successfully.")
+        except Exception as e:
+            LOG.error(f"Failed to add FE {fe_endpoint} via SQL: {str(e)}")
+            raise
+
     def drop_fe(self, fe_endpoint):
         id = CLUSTER.Node.get_id_from_ip(fe_endpoint[:fe_endpoint.find(":")])
         try:
@@ -85,6 +90,15 @@ class DBManager(object):
                 return
             raise e
 
+    def add_be(self, be_endpoint):
+        try:
+            sql = f"ALTER SYSTEM ADD BACKEND '{be_endpoint}'"
+            self._exec_query(sql)
+            LOG.info(f"Added BE {be_endpoint} via SQL successfully.")
+        except Exception as e:
+            LOG.error(f"Failed to add BE {be_endpoint} via SQL: {str(e)}")
+            raise
+
     def drop_be(self, be_endpoint):
         id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")])
         try:
@@ -140,103 +154,124 @@ class DBManager(object):
 
             time.sleep(5)
 
-    def _load_fe_states(self, query_ports):
+    def create_default_storage_vault(self, cloud_store_config):
+        try:
+            # Create storage vault
+            create_vault_sql = f"""
+            CREATE STORAGE VAULT IF NOT EXISTS default_vault
+            PROPERTIES (
+                "type" = "S3",
+                "s3.access_key" = "{cloud_store_config['DORIS_CLOUD_AK']}",
+                "s3.secret_key" = "{cloud_store_config['DORIS_CLOUD_SK']}",
+                "s3.endpoint" = "{cloud_store_config['DORIS_CLOUD_ENDPOINT']}",
+                "s3.bucket" = "{cloud_store_config['DORIS_CLOUD_BUCKET']}",
+                "s3.region" = "{cloud_store_config['DORIS_CLOUD_REGION']}",
+                "s3.root.path" = "{str(uuid.uuid4())}",
+                "provider" = "{cloud_store_config['DORIS_CLOUD_PROVIDER']}"
+            );
+            """
+            self._exec_query(create_vault_sql)
+            LOG.info("Created storage vault 'default_vault'")
+
+            # Set as default storage vault
+            set_default_vault_sql = "SET default_vault as DEFAULT STORAGE 
VAULT;"
+            self._exec_query(set_default_vault_sql)
+            LOG.info("Set 'default_vault' as the default storage vault")
+
+        except Exception as e:
+            LOG.error(f"Failed to create default storage vault: {str(e)}")
+            raise
+
+    def _load_fe_states(self):
         fe_states = {}
-        alive_master_fe_port = None
-        for record in self._exec_query('''
-            select Host, IsMaster, Alive, LastHeartbeat, ErrMsg
-            from frontends()'''):
-            ip, is_master, alive, last_heartbeat, err_msg = record
-            is_master = utils.is_true(is_master)
-            alive = utils.is_true(alive)
+        alive_master_fe_ip = None
+        for record in self._exec_query("show frontends"):
+            name = record["Name"]
+            ip = record["Host"]
+            role = record["Role"]
+            is_master = utils.is_true(record["IsMaster"])
+            alive = utils.is_true(record["Alive"])
             id = CLUSTER.Node.get_id_from_ip(ip)
-            query_port = query_ports.get(id, "")
-            last_heartbeat = utils.escape_null(last_heartbeat)
-            fe = FEState(id, query_port, is_master, alive, last_heartbeat,
-                         err_msg)
+            last_heartbeat = utils.escape_null(record["LastHeartbeat"])
+            err_msg = record["ErrMsg"]
+            fe = FEState(id, is_master, alive, last_heartbeat, err_msg)
             fe_states[id] = fe
-            if is_master and alive and query_port:
-                alive_master_fe_port = query_port
+            if is_master and alive:
+                alive_master_fe_ip = ip
+            LOG.debug(
+                "record of show frontends, name {}, ip {}, alive {}, is_master 
{}, role {}"
+                .format(name, ip, alive, is_master, role))
+
         self.fe_states = fe_states
-        if alive_master_fe_port and alive_master_fe_port != self.query_port:
-            self.query_port = alive_master_fe_port
+        if alive_master_fe_ip and alive_master_fe_ip != self.master_fe_ip:
+            self.master_fe_ip = alive_master_fe_ip
             self._reset_conn()
 
     def _load_be_states(self):
         be_states = {}
-        for record in self._exec_query('''
-            select BackendId, Host, LastHeartbeat, Alive, 
SystemDecommissioned, TabletNum, ErrMsg
-            from backends()'''):
-            backend_id, ip, last_heartbeat, alive, decommissioned, tablet_num, 
err_msg = record
-            backend_id = int(backend_id)
-            alive = utils.is_true(alive)
-            decommissioned = utils.is_true(decommissioned)
-            tablet_num = int(tablet_num)
-            id = CLUSTER.Node.get_id_from_ip(ip)
-            last_heartbeat = utils.escape_null(last_heartbeat)
+        for record in self._exec_query("show backends"):
+            backend_id = int(record["BackendId"])
+            alive = utils.is_true(record["Alive"])
+            decommissioned = utils.is_true(record["SystemDecommissioned"])
+            tablet_num = int(record["TabletNum"])
+            id = CLUSTER.Node.get_id_from_ip(record["Host"])
+            last_heartbeat = utils.escape_null(record["LastHeartbeat"])
+            err_msg = record["ErrMsg"]
             be = BEState(id, backend_id, decommissioned, alive, tablet_num,
                          last_heartbeat, err_msg)
             be_states[id] = be
         self.be_states = be_states
 
+    # return rows, and each row is a record map
     def _exec_query(self, sql):
         self._prepare_conn()
         with self.conn.cursor() as cursor:
             cursor.execute(sql)
-            return cursor.fetchall()
+            fields = [field_md[0] for field_md in cursor.description
+                      ] if cursor.description else []
+            return [dict(zip(fields, row)) for row in cursor.fetchall()]
 
     def _prepare_conn(self):
         if self.conn:
             return
-        if self.query_port <= 0:
-            raise Exception("Not set query_port")
         self._reset_conn()
 
     def _reset_conn(self):
         self.conn = pymysql.connect(user="root",
-                                    host="127.0.0.1",
+                                    host=self.master_fe_ip,
                                     read_timeout=10,
-                                    port=self.query_port)
+                                    connect_timeout=3,
+                                    port=CLUSTER.FE_QUERY_PORT)
 
 
 def get_db_mgr(cluster_name, required_load_succ=True):
     assert cluster_name
     db_mgr = DBManager()
-    containers = utils.get_doris_containers(cluster_name).get(
-        cluster_name, None)
-    if not containers:
+    master_fe_ip_file = os.path.join(CLUSTER.get_status_path(cluster_name),
+                                     "master_fe_ip")
+    master_fe_ip = None
+    if os.path.exists(master_fe_ip_file):
+        with open(master_fe_ip_file, "r") as f:
+            master_fe_ip = f.read().strip()
+
+    if not master_fe_ip:
         return db_mgr
-    alive_fe_ports = {}
+
+    has_alive_fe = False
+    containers = utils.get_doris_containers(cluster_name).get(cluster_name, [])
     for container in containers:
         if utils.is_container_running(container):
-            _, node_type, id = utils.parse_service_name(container.name)
+            _, node_type, _ = utils.parse_service_name(container.name)
             if node_type == CLUSTER.Node.TYPE_FE:
-                query_port = utils.get_map_ports(container).get(
-                    CLUSTER.FE_QUERY_PORT, None)
-                if query_port:
-                    alive_fe_ports[id] = query_port
-    if not alive_fe_ports:
+                has_alive_fe = True
+                break
+
+    if not has_alive_fe:
         return db_mgr
 
-    master_fe_ip_file = os.path.join(CLUSTER.get_status_path(cluster_name),
-                                     "master_fe_ip")
-    query_port = None
-    if os.path.exists(master_fe_ip_file):
-        with open(master_fe_ip_file, "r") as f:
-            master_fe_ip = f.read()
-            if master_fe_ip:
-                master_id = CLUSTER.Node.get_id_from_ip(master_fe_ip)
-                query_port = alive_fe_ports.get(master_id, None)
-    if not query_port:
-        # A new cluster's master is fe-1
-        if 1 in alive_fe_ports:
-            query_port = alive_fe_ports[1]
-        else:
-            query_port = list(alive_fe_ports.values())[0]
-
-    db_mgr.set_query_port(query_port)
+    db_mgr.master_fe_ip = master_fe_ip
     try:
-        db_mgr.load_states(alive_fe_ports)
+        db_mgr.load_states()
     except Exception as e:
         if required_load_succ:
             raise e
diff --git a/docker/runtime/doris-compose/doris-compose.py 
b/docker/runtime/doris-compose/doris-compose.py
index 0091b70eae9..a2d3a517553 100644
--- a/docker/runtime/doris-compose/doris-compose.py
+++ b/docker/runtime/doris-compose/doris-compose.py
@@ -45,6 +45,9 @@ def run(args, disable_log, help):
 
 if __name__ == '__main__':
     args, help = parse_args()
+    verbose = getattr(args, "verbose", False)
+    if verbose:
+        utils.set_log_verbose()
     disable_log = getattr(args, "output_json", False)
     if disable_log:
         utils.set_enable_log(False)
@@ -53,13 +56,13 @@ if __name__ == '__main__':
     try:
         data = run(args, disable_log, help)
         if disable_log:
-            print(utils.pretty_json({"code": 0, "data": data}))
+            print(utils.pretty_json({"code": 0, "data": data}), flush=True)
         code = 0
     except:
         err = traceback.format_exc()
         if disable_log:
-            print(utils.pretty_json({"code": 1, "err": err}))
+            print(utils.pretty_json({"code": 1, "err": err}), flush=True)
         else:
-            print(err)
+            print(err, flush=True)
         code = 1
     sys.exit(code)
diff --git a/docker/runtime/doris-compose/requirements.txt 
b/docker/runtime/doris-compose/format-code.sh
similarity index 90%
copy from docker/runtime/doris-compose/requirements.txt
copy to docker/runtime/doris-compose/format-code.sh
index ac177eddf82..0626662e641 100644
--- a/docker/runtime/doris-compose/requirements.txt
+++ b/docker/runtime/doris-compose/format-code.sh
@@ -15,10 +15,5 @@
 # specific language governing permissions and limitations
 # under the License.
 
-docker
-docker-compose
-filelock
-jsonpickle
-prettytable
-pymysql
-python-dateutil
+yapf -i *.py
+shfmt -w resource/*.sh
diff --git a/docker/runtime/doris-compose/requirements.txt 
b/docker/runtime/doris-compose/requirements.txt
index ac177eddf82..2f962ed68d8 100644
--- a/docker/runtime/doris-compose/requirements.txt
+++ b/docker/runtime/doris-compose/requirements.txt
@@ -22,3 +22,5 @@ jsonpickle
 prettytable
 pymysql
 python-dateutil
+#pyyaml==5.4.1
+requests<=2.31.0
diff --git a/docker/runtime/doris-compose/resource/common.sh 
b/docker/runtime/doris-compose/resource/common.sh
index de6ba29865a..40833d01dc6 100644
--- a/docker/runtime/doris-compose/resource/common.sh
+++ b/docker/runtime/doris-compose/resource/common.sh
@@ -23,7 +23,7 @@ export LOG_FILE=$DORIS_HOME/log/health.out
 export LOCK_FILE=$DORIS_HOME/status/token
 
 health_log() {
-    echo "$(date +'%Y-%m-%d %H:%M:%S') $@" >>$LOG_FILE
+    echo "$(date +'%Y-%m-%d %H:%M:%S') $@" | tee -a $LOG_FILE
 }
 
 # concurrent write meta service server will failed due to fdb txn conflict.
@@ -120,10 +120,11 @@ wait_pid() {
     health_log ""
     health_log "ps -elf\n$(ps -elf)\n"
     if [ -z $pid ]; then
-        health_log "pid not exist"
+        health_log "pid $pid not exist"
         exit 1
     fi
 
+    health_log "pid $pid exist"
     health_log "wait process $pid"
     while true; do
         ps -p $pid >/dev/null
@@ -132,5 +133,13 @@ wait_pid() {
         fi
         sleep 1s
     done
+
+    health_log "show dmesg -T: "
+    dmesg -T | tail -n 50 | tee -a $LOG_FILE
+
+    health_log "show ps -elf"
+    health_log "ps -elf\n$(ps -elf)\n"
+    health_log "pid $pid not exist"
+
     health_log "wait end"
 }
diff --git a/docker/runtime/doris-compose/resource/entrypoint.sh 
b/docker/runtime/doris-compose/resource/entrypoint.sh
new file mode 100644
index 00000000000..a3cdaaae8f1
--- /dev/null
+++ b/docker/runtime/doris-compose/resource/entrypoint.sh
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+DIR=$(
+    cd $(dirname $0)
+    pwd
+)
+
+source $DIR/common.sh
+
+RUN_USER=root
+
+create_host_user() {
+    if [ -z ${HOST_USER} ]; then
+        health_log "no specific run user, run as root"
+        return
+    fi
+    id ${HOST_USER}
+    if [ $? -eq 0 ]; then
+        health_log "contain user ${HOST_USER}, no create new user"
+        RUN_USER=${HOST_USER}
+        return
+    fi
+    id ${HOST_UID}
+    if [ $? -eq 0 ]; then
+        health_log "contain uid ${HOST_UID}, no create new user"
+        return
+    fi
+    addgroup --gid ${HOST_GID} ${HOST_USER}
+    if [ $? -eq 0 ]; then
+        health_log "create group ${HOST_USER} with gid ${HOST_GID} succ"
+    else
+        health_log "create group ${HOST_USER} with gid ${HOST_GID} failed"
+        return
+    fi
+    adduser --disabled-password --shell /bin/bash --gecos "" --uid ${HOST_UID} 
--gid ${HOST_GID} ${HOST_USER}
+    if [ $? -eq 0 ]; then
+        health_log "create user ${HOST_USER} with uid ${HOST_UID} succ"
+        RUN_USER=${HOST_USER}
+    else
+        health_log "create user ${HOST_USER} with uid ${HOST_UID} failed"
+    fi
+}
+
+create_host_user
+
+if command -v gosu 2>&1 >/dev/null; then
+    if [ -f ${LOG_FILE} ]; then
+        chown ${RUN_USER}:${RUN_USER} ${LOG_FILE}
+    fi
+    gosu ${RUN_USER} bash ${DIR}/${1} ${@:2}
+else
+    bash ${DIR}/${1} ${@:2}
+fi
diff --git a/docker/runtime/doris-compose/resource/init_be.sh 
b/docker/runtime/doris-compose/resource/init_be.sh
index d9b7953b534..08cc914f6af 100755
--- a/docker/runtime/doris-compose/resource/init_be.sh
+++ b/docker/runtime/doris-compose/resource/init_be.sh
@@ -48,6 +48,12 @@ add_cloud_be() {
         return
     fi
 
+    # Check if SQL_MODE_NODE_MGR is set to 1
+    if [ "$SQL_MODE_NODE_MGR" -eq 1 ]; then
+        health_log "SQL_MODE_NODE_MGR is set to 1, skipping cluster creation"
+        return
+    fi
+
     cluster_file_name="${DORIS_HOME}/conf/CLUSTER_NAME"
     cluster_name=$(cat $cluster_file_name)
     if [ -z $cluster_name ]; then
@@ -167,7 +173,7 @@ main() {
     add_be_to_cluster
 
     health_log "run start_be.sh"
-    bash $DORIS_HOME/bin/start_be.sh --daemon
+    bash $DORIS_HOME/bin/start_be.sh --daemon | tee -a $DORIS_HOME/log/be.out
 
     wait_process
 }
diff --git a/docker/runtime/doris-compose/resource/init_cloud.sh 
b/docker/runtime/doris-compose/resource/init_cloud.sh
index 78152b5330b..18dfc4430e2 100644
--- a/docker/runtime/doris-compose/resource/init_cloud.sh
+++ b/docker/runtime/doris-compose/resource/init_cloud.sh
@@ -50,6 +50,13 @@ check_init_cloud() {
 
         lock_cluster
 
+        # Check if SQL_MODE_NODE_MGR is set
+        if [[ "$SQL_MODE_NODE_MGR" -eq 1 ]]; then
+            health_log "SQL_MODE_NODE_MGR is set, skipping create_instance"
+            touch $HAS_CREATE_INSTANCE_FILE
+            return
+        fi
+
         output=$(curl -s 
"${META_SERVICE_ENDPOINT}/MetaService/http/create_instance?token=greedisgood9999"
 \
             -d '{"instance_id":"default_instance_id",
                     "name": "default_instance",
@@ -109,9 +116,8 @@ main() {
 
     check_init_cloud &
 
-    health_log "input args: $ARGS"
-
-    bash bin/start.sh $ARGS --daemon
+    health_log "run starts.sh with args: $ARGS"
+    bash bin/start.sh $ARGS --daemon | tee -a $DORIS_HOME/log/doris_cloud.out
 
     wait_process
 }
diff --git a/docker/runtime/doris-compose/resource/init_fe.sh 
b/docker/runtime/doris-compose/resource/init_fe.sh
index e532d0d56e1..b69ac3a209e 100755
--- a/docker/runtime/doris-compose/resource/init_fe.sh
+++ b/docker/runtime/doris-compose/resource/init_fe.sh
@@ -45,8 +45,8 @@ fe_daemon() {
         sleep 1
         output=$(mysql -P $FE_QUERY_PORT -h $MY_IP -u root --execute "SHOW 
FRONTENDS;")
         code=$?
-        health_log "$output"
         if [ $code -ne 0 ]; then
+            health_log "exec show frontends bad: $output"
             continue
         fi
         header=$(grep IsMaster <<<$output)
@@ -81,8 +81,30 @@ fe_daemon() {
     done
 }
 
-add_cloud_fe() {
+run_fe() {
+    health_log "run start_fe.sh"
+    bash $DORIS_HOME/bin/start_fe.sh --daemon $@ | tee -a 
$DORIS_HOME/log/fe.out
+}
+
+start_cloud_fe() {
     if [ -f "$REGISTER_FILE" ]; then
+        fe_daemon &
+        run_fe
+        return
+    fi
+
+    # Check if SQL_MODE_NODE_MGR is set to 1
+    if [ "${SQL_MODE_NODE_MGR}" = "1" ]; then
+        health_log "SQL_MODE_NODE_MGR is set to 1. Skipping add FE."
+
+        touch $REGISTER_FILE
+
+        fe_daemon &
+        run_fe
+
+        if [ "$MY_ID" == "1" ]; then
+            echo $MY_IP >$MASTER_FE_IP_FILE
+        fi
         return
     fi
 
@@ -96,6 +118,10 @@ add_cloud_fe() {
         node_type=FE_OBSERVER
     fi
 
+    if [ "a$IS_FE_FOLLOWER" == "a1" ]; then
+        node_type=FE_FOLLOWER
+    fi
+
     nodes='{
         "cloud_unique_id": "'"${CLOUD_UNIQUE_ID}"'",
         "ip": "'"${MY_IP}"'",
@@ -139,6 +165,10 @@ add_cloud_fe() {
     fi
 
     touch $REGISTER_FILE
+
+    fe_daemon &
+    run_fe
+
     if [ "$MY_ID" == "1" ]; then
         echo $MY_IP >$MASTER_FE_IP_FILE
     fi
@@ -174,19 +204,14 @@ start_local_fe() {
 
     if [ -f $REGISTER_FILE ]; then
         fe_daemon &
-        bash $DORIS_HOME/bin/start_fe.sh --daemon
+        run_fe
     else
         add_local_fe
         fe_daemon &
-        bash $DORIS_HOME/bin/start_fe.sh --helper 
$MASTER_FE_IP:$FE_EDITLOG_PORT --daemon
+        run_fe --helper $MASTER_FE_IP:$FE_EDITLOG_PORT
     fi
 }
 
-start_cloud_fe() {
-    add_cloud_fe
-    bash $DORIS_HOME/bin/start_fe.sh --daemon
-}
-
 main() {
     trap stop_frontend SIGTERM
 
diff --git a/docker/runtime/doris-compose/utils.py 
b/docker/runtime/doris-compose/utils.py
index 54255b597bc..4332ae6cf48 100644
--- a/docker/runtime/doris-compose/utils.py
+++ b/docker/runtime/doris-compose/utils.py
@@ -15,11 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import contextlib
 import docker
-import json
+import jsonpickle
 import logging
 import os
 import pwd
+import socket
 import subprocess
 import time
 import yaml
@@ -56,6 +58,10 @@ def is_enable_log():
     return ENABLE_LOG
 
 
+def set_log_verbose():
+    get_logger().setLevel(logging.DEBUG)
+
+
 def get_logger(name=None):
     global LOG
     if LOG != None:
@@ -274,6 +280,12 @@ def copy_image_directory(image, image_dir, local_dir):
         entrypoint="cp -r  {}  /opt/mount/".format(image_dir))
 
 
+def is_socket_avail(ip, port):
+    with contextlib.closing(socket.socket(socket.AF_INET,
+                                          socket.SOCK_STREAM)) as sock:
+        return sock.connect_ex((ip, port)) == 0
+
+
 def enable_dir_with_rw_perm(dir):
     if not os.path.exists(dir):
         return
@@ -291,6 +303,13 @@ def get_path_owner(path):
         return ""
 
 
+def get_path_uid(path):
+    try:
+        return os.stat(path).st_uid
+    except:
+        return ""
+
+
 def read_compose_file(file):
     with open(file, "r") as f:
         return yaml.safe_load(f.read())
@@ -302,7 +321,7 @@ def write_compose_file(file, compose):
 
 
 def pretty_json(json_data):
-    return json.dumps(json_data, indent=4, sort_keys=True)
+    return jsonpickle.dumps(json_data, indent=4)
 
 
 def is_true(val):
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
index 0a440bade64..3675bbf4e31 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
@@ -308,8 +308,8 @@ class Config {
         Properties props = cmd.getOptionProperties("conf")
         config.otherConfigs.putAll(props)
 
-        config.tryCreateDbIfNotExist()
-        config.buildUrlWithDefaultDb()
+        // mainly auth_xxx cases use defaultDb, these suites better not use 
defaultDb
+        config.createDefaultDb()
 
         return config
     }
@@ -630,6 +630,24 @@ class Config {
         return null
     }
 
+    void createDefaultDb() {
+        String dbName = null
+        try {
+            tryCreateDbIfNotExist(defaultDb)
+            dbName = defaultDb
+        } catch (Exception e) {
+            // defaultDb is not need for most cases.
+            // when run docker suites without external fe/be,  createDefaultDb 
will fail, but can ignore this exception.
+            // Infact, only mainly auth_xxx cases use defaultDb, and they just 
use jdbcUrl in connect function.
+            // And they can avoid using defaultDb too. But modify all these 
cases take a lot work.
+            // We better delete all the usage of defaultDb in suites later, 
and all suites should use their own db, not the defaultDb.
+            log.warn("create default db failed ${defaultDb}".toString())
+        }
+
+        jdbcUrl = buildUrlWithDb(jdbcUrl, dbName)
+        log.info("Reset jdbcUrl to ${jdbcUrl}".toString())
+    }
+
     void tryCreateDbIfNotExist(String dbName = defaultDb) {
         // connect without specify default db
         try {
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 6e392ef2ac4..54f2cdfc722 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -255,13 +255,13 @@ class Suite implements GroovyInterceptable {
             return
         }
 
-        boolean pipelineIsCloud = isCloudCluster()
+        boolean pipelineIsCloud = isCloudMode()
         boolean dockerIsCloud = false
         if (options.cloudMode == null) {
             dockerIsCloud = pipelineIsCloud
         } else {
             dockerIsCloud = options.cloudMode
-            if (dockerIsCloud != pipelineIsCloud && 
options.skipRunWhenPipelineDiff) {
+            if (dockerIsCloud != pipelineIsCloud) {
                 return
             }
         }
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index 49bfbc18792..856b0e76956 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -17,8 +17,9 @@
 package org.apache.doris.regression.suite
 
 import org.apache.doris.regression.Config
-import org.apache.doris.regression.util.Http
 import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.Http
+import org.apache.doris.regression.util.JdbcUtils
 import org.apache.doris.regression.util.NodeType
 
 import com.google.common.collect.Maps
@@ -29,17 +30,29 @@ import groovy.json.JsonSlurper
 import groovy.transform.CompileStatic
 import groovy.util.logging.Slf4j
 import java.util.stream.Collectors
+import java.sql.Connection
 
 class ClusterOptions {
 
     int feNum = 1
     int beNum = 3
 
+    Boolean sqlModeNodeMgr = false
+    Boolean beMetaServiceEndpoint = true
+    Boolean beClusterId = false
+
+    int waitTimeout = 180
+
+    // don't add whitespace in feConfigs items,
+    // for example, ' xx = yy ' is bad, should use 'xx=yy'
     List<String> feConfigs = [
         'heartbeat_interval_second=5',
     ]
 
+    // don't add whitespace in beConfigs items,
+    // for example, ' xx = yy ' is bad, should use 'xx=yy'
     List<String> beConfigs = [
+        'max_sys_mem_available_low_water_mark_bytes=0', //no check mem 
available memory
         'report_disk_state_interval_seconds=2',
         'report_random_wait=false',
     ]
@@ -51,9 +64,11 @@ class ClusterOptions {
     // 3. cloudMode = null, create both cloud and none-cloud cluster, depend 
on the running pipeline mode.
     Boolean cloudMode = false
 
-    // when cloudMode = true/false,  but the running pipeline is diff with 
cloudMode,
-    // skip run this docker test or not.
-    boolean skipRunWhenPipelineDiff = true
+    // in cloud mode, deployment methods are divided into
+    // 1. master - multi observers
+    // 2. mutli followers - multi observers
+    // default use 1
+    Boolean useFollowersMode = false
 
     // each be disks, a disks format is: disk_type=disk_num[,disk_capacity]
     // here disk_type=HDD or SSD,  disk capacity is in gb unit.
@@ -95,12 +110,14 @@ class ServerNode {
     String host
     int httpPort
     boolean alive
+    String path
 
     static void fromCompose(ServerNode node, ListHeader header, int index, 
List<Object> fields) {
         node.index = index
         node.host = (String) fields.get(header.indexOf('IP'))
         node.httpPort = (Integer) fields.get(header.indexOf('http_port'))
         node.alive = fields.get(header.indexOf('alive')) == 'true'
+        node.path = (String) fields.get(header.indexOf('path'))
     }
 
     static long toLongOrDefault(Object val, long defValue) {
@@ -130,10 +147,19 @@ class ServerNode {
         assert false : 'Unknown node type'
     }
 
+    String getLogFilePath() {
+        assert false : 'Unknown node type'
+    }
+
+    String getConfFilePath() {
+        assert false : 'Unknown node type'
+    }
+
 }
 
 class Frontend extends ServerNode {
 
+    int editLogPort
     int queryPort
     boolean isMaster
 
@@ -141,6 +167,7 @@ class Frontend extends ServerNode {
         Frontend fe = new Frontend()
         ServerNode.fromCompose(fe, header, index, fields)
         fe.queryPort = (Integer) fields.get(header.indexOf('query_port'))
+        fe.editLogPort = (Integer) fields.get(header.indexOf('edit_log_port'))
         fe.isMaster = fields.get(header.indexOf('is_master')) == 'true'
         return fe
     }
@@ -149,18 +176,29 @@ class Frontend extends ServerNode {
         return NodeType.FE
     }
 
+    String getLogFilePath() {
+        return path + '/log/fe.log'
+    }
+
+    String getConfFilePath() {
+        return path + '/conf/fe.conf'
+    }
+
 }
 
 class Backend extends ServerNode {
 
+    int heartbeatPort
     long backendId
     int tabletNum
 
     static Backend fromCompose(ListHeader header, int index, List<Object> 
fields) {
         Backend be = new Backend()
         ServerNode.fromCompose(be, header, index, fields)
+        be.heartbeatPort = (Integer) 
fields.get(header.indexOf('heartbeat_port'))
         be.backendId = 
toLongOrDefault(fields.get(header.indexOf('backend_id')), -1L)
         be.tabletNum = (int) 
toLongOrDefault(fields.get(header.indexOf('tablet_num')), 0L)
+
         return be
     }
 
@@ -168,6 +206,58 @@ class Backend extends ServerNode {
         return NodeType.BE
     }
 
+    String getLogFilePath() {
+        return path + '/log/be.INFO'
+    }
+
+    String getConfFilePath() {
+        return path + '/conf/be.conf'
+    }
+
+}
+
+class MetaService extends ServerNode {
+
+    static MetaService fromCompose(ListHeader header, int index, List<Object> 
fields) {
+        MetaService ms = new MetaService()
+        ServerNode.fromCompose(ms, header, index, fields)
+        return ms
+    }
+
+    NodeType getNodeType() {
+        return NodeType.MS
+    }
+
+    String getLogFilePath() {
+        return path + '/log/meta_service.INFO'
+    }
+
+    String getConfFilePath() {
+        return path + '/conf/doris_cloud.conf'
+    }
+
+}
+
+class Recycler extends ServerNode {
+
+    static Recycler fromCompose(ListHeader header, int index, List<Object> 
fields) {
+        Recycler rs = new Recycler()
+        ServerNode.fromCompose(rs, header, index, fields)
+        return rs
+    }
+
+    NodeType getNodeType() {
+        return NodeType.RECYCLER
+    }
+
+    String getLogFilePath() {
+        return path + '/log/recycler.INFO'
+    }
+
+    String getConfFilePath() {
+        return path + '/conf/doris_cloud.conf'
+    }
+
 }
 
 @Slf4j
@@ -179,6 +269,8 @@ class SuiteCluster {
     final String name
     final Config config
     private boolean running
+    private boolean sqlModeNodeMgr = false
+    private boolean isCloudMode = false
 
     SuiteCluster(String name, Config config) {
         this.name = name
@@ -191,6 +283,8 @@ class SuiteCluster {
         assert options.feNum > 0 || options.beNum > 0
         assert config.image != null && config.image != ''
 
+        this.isCloudMode = isCloud
+
         def cmd = [
             'up', name, config.image
         ]
@@ -220,7 +314,24 @@ class SuiteCluster {
         if (isCloud) {
             cmd += ['--cloud']
         }
-        cmd += ['--wait-timeout', String.valueOf(180)]
+
+        if (isCloud && options.useFollowersMode) {
+            cmd += ['--fe-follower']
+        }
+
+        if (options.sqlModeNodeMgr) {
+            cmd += ['--sql-mode-node-mgr']
+        }
+        if (!options.beMetaServiceEndpoint) {
+            cmd += ['--no-be-metaservice-endpoint']
+        }
+        if (!options.beClusterId) {
+            cmd += ['--no-be-cluster-id']
+        }
+
+        cmd += ['--wait-timeout', String.valueOf(options.waitTimeout)]
+
+        sqlModeNodeMgr = options.sqlModeNodeMgr
 
         runCmd(cmd.join(' '), -1)
 
@@ -287,25 +398,44 @@ class SuiteCluster {
         return getBackends().stream().filter(be -> be.alive || 
!needAlive).collect(Collectors.toList());
     }
 
+    List<MetaService> getAllMetaservices(boolean needAlive = false) {
+        return getMetaservices().stream().filter(ms -> ms.alive || 
!needAlive).collect(Collectors.toList());
+    }
+
+    List<MetaService> getAllRecyclers(boolean needAlive = false) {
+        return getRecyclers().stream().filter(rc -> rc.alive || 
!needAlive).collect(Collectors.toList());
+    }
+
     private List<Frontend> getFrontends() {
-        List<Frontend> frontends = []
-        List<Backend> backends = []
-        getAllNodes(frontends, backends)
-        return frontends
+        def ret = getAllNodes()
+        return ret.getV1()
     }
 
     private List<Backend> getBackends() {
-        List<Frontend> frontends = []
-        List<Backend> backends = []
-        getAllNodes(frontends, backends)
-        return backends
+        def ret = getAllNodes()
+        return ret.getV2()
+    }
+
+    private List<MetaService> getMetaservices() {
+        def ret = getAllNodes()
+        return ret.getV3()
     }
 
-    private void getAllNodes(List<Frontend> frontends, List<Backend> backends) 
{
+    private List<Recycler> getRecyclers() {
+        def ret = getAllNodes()
+        return ret.getV4()
+    }
+
+    private Tuple4<List<Frontend>, List<Backend>, List<MetaService>, 
List<Recycler>> getAllNodes() {
+        List<Frontend> frontends = []
+        List<Backend> backends = []
+        List<MetaService> metaservices = []
+        List<Recycler> recyclers = []
         def cmd = 'ls ' + name + ' --detail'
         def data = runCmd(cmd)
         assert data instanceof List
         def rows = (List<List<Object>>) data
+        logger.info('get all nodes {}', rows)
         def header = new ListHeader(rows.get(0))
         for (int i = 1; i < rows.size(); i++) {
             def row = (List<Object>) rows.get(i)
@@ -316,34 +446,49 @@ class SuiteCluster {
             } else if (name.startsWith('fe-')) {
                 int index = name.substring('fe-'.length()) as int
                 frontends.add(Frontend.fromCompose(header, index, row))
-            } else if (name.startsWith('ms-') || name.startsWith('recycle-') 
|| name.startsWith('fdb-')) {
-            // TODO: handle these nodes
+            } else if (name.startsWith('ms-')) {
+                int index = name.substring('ms-'.length()) as int
+                metaservices.add(MetaService.fromCompose(header, index, row))
+            } else if (name.startsWith('recycle-')) {
+                int index = name.substring('recycle-'.length()) as int
+                recyclers.add(Recycler.fromCompose(header, index, row))
+            } else if (name.startsWith('fdb-')) {
+            // current not used
             } else {
                 assert false : 'Unknown node type with name: ' + name
             }
         }
+        return new Tuple4(frontends, backends, metaservices, recyclers)
     }
 
-    List<Integer> addFrontend(int num) throws Exception {
-        def result = add(num, 0)
+    List<Integer> addFrontend(int num, boolean followerMode=false) throws 
Exception {
+        def result = add(num, 0, null, followerMode)
         return result.first
     }
 
-    List<Integer> addBackend(int num) throws Exception {
-        def result = add(0, num)
+    List<Integer> addBackend(int num, String ClusterName='') throws Exception {
+        def result = add(0, num, ClusterName)
         return result.second
     }
 
-    Tuple2<List<Integer>, List<Integer>> add(int feNum, int beNum) throws 
Exception {
+    // ATTN: clusterName just used for cloud mode, 1 cluster has n bes
+    // ATTN: followerMode just used for cloud mode
+    Tuple2<List<Integer>, List<Integer>> add(int feNum, int beNum, String 
clusterName, boolean followerMode=false) throws Exception {
         assert feNum > 0 || beNum > 0
 
         def sb = new StringBuilder()
         sb.append('up ' + name + ' ')
         if (feNum > 0) {
             sb.append('--add-fe-num ' + feNum + ' ')
+            if (followerMode) {
+                sb.append('--fe-follower' + ' ')
+            }
         }
         if (beNum > 0) {
             sb.append('--add-be-num ' + beNum + ' ')
+            if (clusterName != null && !clusterName.isEmpty()) {
+                sb.append(' --be-cluster ' + clusterName + ' ')
+            }
         }
         sb.append('--wait-timeout 60')
 
@@ -373,40 +518,42 @@ class SuiteCluster {
         return running
     }
 
+    boolean isCloudMode() {
+        return this.isCloudMode
+    }
+
+    int START_WAIT_TIMEOUT = 120
+
     // if not specific fe indices, then start all frontends
     void startFrontends(int... indices) {
-        runFrontendsCmd('start', indices)
-        waitHbChanged()
+        runFrontendsCmd(START_WAIT_TIMEOUT + 5, "start  --wait-timeout 
${START_WAIT_TIMEOUT}".toString(), indices)
     }
 
     // if not specific be indices, then start all backends
     void startBackends(int... indices) {
-        runBackendsCmd('start', indices)
-        waitHbChanged()
+        runBackendsCmd(START_WAIT_TIMEOUT + 5, "start  --wait-timeout 
${START_WAIT_TIMEOUT}".toString(), indices)
     }
 
     // if not specific fe indices, then stop all frontends
     void stopFrontends(int... indices) {
-        runFrontendsCmd('stop', indices)
+        runFrontendsCmd(60, 'stop', indices)
         waitHbChanged()
     }
 
     // if not specific be indices, then stop all backends
     void stopBackends(int... indices) {
-        runBackendsCmd('stop', indices)
+        runBackendsCmd(60, 'stop', indices)
         waitHbChanged()
     }
 
     // if not specific fe indices, then restart all frontends
     void restartFrontends(int... indices) {
-        runFrontendsCmd('restart', indices)
-        waitHbChanged()
+        runFrontendsCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout 
${START_WAIT_TIMEOUT}".toString(), indices)
     }
 
     // if not specific be indices, then restart all backends
     void restartBackends(int... indices) {
-        runBackendsCmd('restart', indices)
-        waitHbChanged()
+        runBackendsCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout 
${START_WAIT_TIMEOUT}".toString(), indices)
     }
 
     // if not specific fe indices, then drop all frontends
@@ -415,7 +562,7 @@ class SuiteCluster {
         if (clean) {
             cmd += ' --clean'
         }
-        runFrontendsCmd(cmd, indices)
+        runFrontendsCmd(60, cmd, indices)
     }
 
     // if not specific be indices, then decommission all backends
@@ -433,7 +580,7 @@ class SuiteCluster {
         if (clean) {
             cmd += ' --clean'
         }
-        runBackendsCmd(cmd, indices)
+        runBackendsCmd(60, cmd, indices)
     }
 
     void checkFeIsAlive(int index, boolean isAlive) {
@@ -468,27 +615,28 @@ class SuiteCluster {
         }
     }
 
+    void addRWPermToAllFiles() {
+        def cmd = 'add-rw-perm ' + name
+        runCmd(cmd)
+    }
+
     private void waitHbChanged() {
         // heart beat interval is 5s
         Thread.sleep(7000)
     }
 
-    private void runFrontendsCmd(String op, int... indices) {
+    private void runFrontendsCmd(int timeoutSecond, String op, int... indices) 
{
         def cmd = op + ' ' + name + ' --fe-id ' + indices.join(' ')
-        runCmd(cmd)
+        runCmd(cmd, timeoutSecond)
     }
 
-    private void runBackendsCmd(Integer timeoutSecond = null, String op, 
int... indices) {
+    private void runBackendsCmd(int timeoutSecond, String op, int... indices) {
         def cmd = op + ' ' + name + ' --be-id ' + indices.join(' ')
-        if (timeoutSecond == null) {
-            runCmd(cmd)
-        } else {
-            runCmd(cmd, timeoutSecond)
-        }
+        runCmd(cmd, timeoutSecond)
     }
 
     private Object runCmd(String cmd, int timeoutSecond = 60) throws Exception 
{
-        def fullCmd = String.format('python %s %s --output-json', 
config.dorisComposePath, cmd)
+        def fullCmd = String.format('python -W ignore %s %s --output-json', 
config.dorisComposePath, cmd)
         logger.info('Run doris compose cmd: {}', fullCmd)
         def proc = fullCmd.execute()
         def outBuf = new StringBuilder()
diff --git a/regression-test/suites/demo_p0/docker_action.groovy 
b/regression-test/suites/demo_p0/docker_action.groovy
index 6d62d6ea7be..d59c0f43774 100644
--- a/regression-test/suites/demo_p0/docker_action.groovy
+++ b/regression-test/suites/demo_p0/docker_action.groovy
@@ -55,8 +55,6 @@ suite('docker_action') {
     options2.beNum = 1
     // create cloud cluster
     options2.cloudMode = true
-    //// cloud docker only run in cloud pipeline, but enable it run in 
none-cloud pipeline
-    // options2.skipRunWhenPipelineDiff = false
     // run another docker, create a cloud cluster
     docker(options2) {
         // cloud cluster will ignore replication_num, always set to 1. so 
create table succ even has 1 be.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to