This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e3d15b552 [Feature](doris compose) A tool for setup and manage doris 
docker cluster scaling easily (#21649)
2e3d15b552 is described below

commit 2e3d15b552aceb968bdcf5ce0352d9505cbb12ad
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Wed Jul 12 22:13:38 2023 +0800

    [Feature](doris compose) A tool for setup and manage doris docker cluster 
scaling easily (#21649)
---
 docker/runtime/doris-compose/Dockerfile          |  30 ++
 docker/runtime/doris-compose/Readme.md           | 102 +++++
 docker/runtime/doris-compose/cluster.py          | 438 ++++++++++++++++++
 docker/runtime/doris-compose/command.py          | 543 +++++++++++++++++++++++
 docker/runtime/doris-compose/database.py         | 239 ++++++++++
 docker/runtime/doris-compose/doris-compose.py    |  48 ++
 docker/runtime/doris-compose/requirements.txt    |  22 +
 docker/runtime/doris-compose/resource/common.sh  |  36 ++
 docker/runtime/doris-compose/resource/init_be.sh |  50 +++
 docker/runtime/doris-compose/resource/init_fe.sh |  70 +++
 docker/runtime/doris-compose/utils.py            | 253 +++++++++++
 11 files changed, 1831 insertions(+)

diff --git a/docker/runtime/doris-compose/Dockerfile 
b/docker/runtime/doris-compose/Dockerfile
new file mode 100644
index 0000000000..8f5d7e3376
--- /dev/null
+++ b/docker/runtime/doris-compose/Dockerfile
@@ -0,0 +1,30 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# choose a base image
+FROM openjdk:8u342-jdk
+
+# set environment variables
+ENV JAVA_HOME="/usr/local/openjdk-8/"
+
+ADD output /opt/apache-doris/
+
+RUN apt-get update && \
+    apt-get install -y default-mysql-client python && \
+    apt-get clean
+
diff --git a/docker/runtime/doris-compose/Readme.md 
b/docker/runtime/doris-compose/Readme.md
new file mode 100644
index 0000000000..13f543d4e7
--- /dev/null
+++ b/docker/runtime/doris-compose/Readme.md
@@ -0,0 +1,102 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Doris compose
+
+Use doris compose to create doris docker compose clusters.
+
+## Requirements
+
+1. The doris image should contains:
+
+```
+/opt/apache-doris/{fe, be}
+```
+
+if build doris use `sh build.sh`, then its output satisfy with this, then run 
command in doris root
+
+```
+docker build -f docker/runtime/doris-compose/Dockerfile -t <image> .
+```
+
+will generate a image.
+
+2. Install the dependent python library in 
'docker/runtime/doris-compose/requirements.txt'
+
+
+```
+python -m pip install --user -r docker/runtime/doris-compose/requirements.txt
+```
+
+## Usage
+
+### Create a cluster or recreate its containers
+
+```
+python docker/runtime/doris-compose/doris-compose.py up  <cluster-name>   
<image?> 
+    --add-fe-num  <add-fe-num>  --add-be-num <add-be-num>
+    --fe-id <fd-id> --be-id <be-id>
+
+```
+
+if it's a new cluster, must specific the image.
+
+add fe/be nodes with the specific image, or update existing nodes with 
`--fe-id`, `--be-id`
+
+### Remove node from the cluster
+
+```
+python docker/runtime/doris-compose/doris-compose.py down  <cluster-name> 
--fe-id <fe-id>  --be-id<be-id> [--clean]  [--drop-force]
+```
+
+Down the containers and remove it from the DB.
+
+For BE, if specific drop force, it will send dropp sql to FE, otherwise it 
will send decommission sql to FE.
+
+If specific `--clean`, it will delete its data too.
+
+
+### Start, stop, restart specific nodes
+
+
+```
+python docker/runtime/doris-compose/doris-compose.py start  <cluster-name>  
--fe-id  <multiple fe ids>  --be-id <multiple be ids>
+python docker/runtime/doris-compose/doris-compose.py restart  <cluster-name>  
--fe-id  <multiple fe ids>  --be-id <multiple be ids>
+```
+
+### List doris cluster
+
+```
+python docker/runtime/doris-compose/doris-compose.py ls <-a>  <multiple 
cluster names>
+```
+
+if specific cluster names, it will list all the cluster's nodes.
+
+Otherwise it will just list summary of each clusters. If not specific -a, it 
will list only active clusters. 
+
+If specific `-a`, it will list the unactive clusters too.
+
+There are more options about doris-compose. Just try 
+
+```
+python docker/runtime/doris-compose/doris-compose.py  <command> -h 
+```
+
+
+
diff --git a/docker/runtime/doris-compose/cluster.py 
b/docker/runtime/doris-compose/cluster.py
new file mode 100644
index 0000000000..2556809a3c
--- /dev/null
+++ b/docker/runtime/doris-compose/cluster.py
@@ -0,0 +1,438 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import jsonpickle
+import os
+import os.path
+import utils
+
+DOCKER_DORIS_PATH = "/opt/apache-doris"
+LOCAL_DORIS_PATH = os.getenv("LOCAL_DORIS_PATH")
+if not LOCAL_DORIS_PATH:
+    LOCAL_DORIS_PATH = "/tmp/doris"
+
+LOCAL_RESOURCE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
+                                   "resource")
+DOCKER_RESOURCE_PATH = os.path.join(DOCKER_DORIS_PATH, "resource")
+
+FE_HTTP_PORT = 8030
+FE_RPC_PORT = 9020
+FE_QUERY_PORT = 9030
+FE_EDITLOG_PORT = 9010
+
+BE_PORT = 9060
+BE_WEBSVR_PORT = 8040
+BE_HEARTBEAT_PORT = 9050
+BE_BRPC_PORT = 8060
+
+ID_LIMIT = 10000
+
+IP_PART4_SIZE = 200
+
+LOG = utils.get_logger()
+
+
+def get_cluster_path(cluster_name):
+    return os.path.join(LOCAL_DORIS_PATH, cluster_name)
+
+
+def get_compose_file(cluster_name):
+    return os.path.join(get_cluster_path(cluster_name), "docker-compose.yml")
+
+
+def get_status_path(cluster_name):
+    return os.path.join(get_cluster_path(cluster_name), "status")
+
+
+def gen_subnet_prefix16():
+    used_subnet = utils.get_docker_subnets_prefix16()
+    if os.path.exists(LOCAL_DORIS_PATH):
+        for cluster_name in os.listdir(LOCAL_DORIS_PATH):
+            try:
+                cluster = Cluster.load(cluster_name)
+                used_subnet[cluster.subnet] = True
+            except:
+                pass
+
+    for i in range(128, 192):
+        for j in range(256):
+            subnet = "{}.{}".format(i, j)
+            if not used_subnet.get(subnet, False):
+                return subnet
+
+    raise Exception("Failed to gen subnet")
+
+
+class NodeMeta(object):
+
+    def __init__(self, image):
+        self.image = image
+
+
+class Group(object):
+
+    def __init__(self, node_type):
+        self.node_type = node_type
+        self.nodes = {}  # id : NodeMeta
+        self.next_id = 1
+
+    def add(self, id, image):
+        assert image
+        if not id:
+            id = self.next_id
+            self.next_id += 1
+        if self.get_node(id):
+            raise Exception(
+                "Failed to add {} with id {}, id has exists".format(
+                    self.node_type, id))
+        if id > ID_LIMIT:
+            raise Exception(
+                "Failed to add {} with id {}, id exceeds {}".format(
+                    self.node_type, id, ID_LIMIT))
+        self.nodes[id] = NodeMeta(image)
+
+        return id
+
+    def remove(self, id):
+        self.nodes.pop(id, None)
+
+    def get_node_num(self):
+        return len(self.nodes)
+
+    def get_all_nodes(self):
+        return self.nodes
+
+    def get_node(self, id):
+        return self.nodes.get(id, None)
+
+    def on_loaded(self):
+        nodes = {}
+        for id, node in self.nodes.items():
+            nodes[int(id)] = node
+        self.nodes = nodes
+
+
+class Node(object):
+    TYPE_FE = "fe"
+    TYPE_BE = "be"
+    TYPE_ALL = [TYPE_FE, TYPE_BE]
+
+    def __init__(self, cluster_name, id, subnet, meta):
+        self.cluster_name = cluster_name
+        self.id = id
+        self.subnet = subnet
+        self.meta = meta
+
+    @staticmethod
+    def new(cluster_name, node_type, id, subnet, meta):
+        if node_type == Node.TYPE_FE:
+            return FE(cluster_name, id, subnet, meta)
+        elif node_type == Node.TYPE_BE:
+            return BE(cluster_name, id, subnet, meta)
+        else:
+            raise Exception("Unknown node type {}".format(node_type))
+
+    def init_dir(self):
+        path = self.get_path()
+        os.makedirs(path, exist_ok=True)
+
+        # copy config to local
+        conf_dir = os.path.join(path, "conf")
+        if not os.path.exists(conf_dir) or utils.is_dir_empty(conf_dir):
+            utils.copy_image_directory(
+                self.get_image(), "{}/{}/conf".format(DOCKER_DORIS_PATH,
+                                                      self.node_type()),
+                conf_dir)
+            assert not utils.is_dir_empty(conf_dir), "conf directory {} is 
empty, " \
+                    "check doris path in image is correct".format(conf_dir)
+        for sub_dir in self.expose_sub_dirs():
+            os.makedirs(os.path.join(path, sub_dir), exist_ok=True)
+
+    def is_fe(self):
+        return self.node_type() == Node.TYPE_FE
+
+    def is_be(self):
+        return self.node_type() == Node.TYPE_BE
+
+    def node_type(self):
+        raise Exception("No implemented")
+
+    def expose_sub_dirs(self):
+        return ["conf", "log"]
+
+    def get_name(self):
+        return "{}-{}".format(self.node_type(), self.id)
+
+    def get_path(self):
+        return os.path.join(get_cluster_path(self.cluster_name),
+                            self.get_name())
+
+    def get_image(self):
+        return self.meta.image
+
+    def set_image(self, image):
+        self.meta.image = image
+
+    def get_ip(self):
+        seq = self.id
+        seq += IP_PART4_SIZE
+        if self.node_type() == Node.TYPE_FE:
+            seq += 0 * ID_LIMIT
+        elif self.node_type() == Node.TYPE_BE:
+            seq += 1 * ID_LIMIT
+        else:
+            seq += 2 * ID_LIMIT
+        return "{}.{}.{}".format(self.subnet, int(seq / IP_PART4_SIZE),
+                                 seq % IP_PART4_SIZE)
+
+    @staticmethod
+    def get_id_from_ip(ip):
+        pos2 = ip.rfind(".")
+        pos1 = ip.rfind(".", 0, pos2 - 1)
+        num3 = int(ip[pos1 + 1:pos2])
+        num4 = int(ip[pos2 + 1:])
+        seq = num3 * IP_PART4_SIZE + num4
+        while seq > ID_LIMIT:
+            seq -= ID_LIMIT
+        seq -= IP_PART4_SIZE
+        return seq
+
+    def service_name(self):
+        return utils.with_doris_prefix("{}-{}".format(self.cluster_name,
+                                                      self.get_name()))
+
+    def docker_env(self):
+        return {
+            "MY_IP": self.get_ip(),
+            "MY_ID": self.id,
+            "FE_QUERY_PORT": FE_QUERY_PORT,
+            "FE_EDITLOG_PORT": FE_EDITLOG_PORT,
+            "BE_HEARTBEAT_PORT": BE_HEARTBEAT_PORT,
+            "DORIS_HOME": os.path.join(DOCKER_DORIS_PATH, self.node_type()),
+        }
+
+    def docker_ports(self):
+        raise Exception("No implemented")
+
+    def compose(self):
+        return {
+            "cap_add": ["SYS_PTRACE"],
+            "hostname":
+            self.get_name(),
+            "container_name":
+            self.service_name(),
+            "command":
+            self.docker_command(),
+            "environment":
+            self.docker_env(),
+            "image":
+            self.get_image(),
+            "networks": {
+                utils.with_doris_prefix(self.cluster_name): {
+                    "ipv4_address": self.get_ip(),
+                }
+            },
+            "ports":
+            self.docker_ports(),
+            "ulimits": {
+                "core": -1
+            },
+            "security_opt": ["seccomp:unconfined"],
+            "volumes": [
+                "{}:{}/{}/{}".format(os.path.join(self.get_path(),
+                                                  sub_dir), DOCKER_DORIS_PATH,
+                                     self.node_type(), sub_dir)
+                for sub_dir in self.expose_sub_dirs()
+            ] + [
+                "{}:{}:ro".format(LOCAL_RESOURCE_PATH, DOCKER_RESOURCE_PATH),
+                "{}:{}/{}/status".format(get_status_path(self.cluster_name),
+                                         DOCKER_DORIS_PATH, self.node_type()),
+            ] + [
+                "{0}:{0}:ro".format(path)
+                for path in ("/etc/localtime", "/etc/timezone",
+                             "/usr/share/zoneinfo") if os.path.exists(path)
+            ],
+        }
+
+
+class FE(Node):
+
+    def docker_command(self):
+        return [
+            "bash",
+            os.path.join(DOCKER_RESOURCE_PATH, "init_fe.sh"),
+            #"{}/fe/bin/init_fe.sh".format(DOCKER_DORIS_PATH),
+        ]
+
+    def docker_ports(self):
+        return [FE_HTTP_PORT, FE_EDITLOG_PORT, FE_RPC_PORT, FE_QUERY_PORT]
+
+    def node_type(self):
+        return Node.TYPE_FE
+
+    def expose_sub_dirs(self):
+        return super().expose_sub_dirs() + ["doris-meta"]
+
+
+class BE(Node):
+
+    def docker_command(self):
+        return [
+            "bash",
+            os.path.join(DOCKER_RESOURCE_PATH, "init_be.sh"),
+            #"{}/be/bin/init_be.sh".format(DOCKER_DORIS_PATH),
+        ]
+
+    def docker_ports(self):
+        return [BE_WEBSVR_PORT, BE_BRPC_PORT, BE_HEARTBEAT_PORT, BE_PORT]
+
+    def node_type(self):
+        return Node.TYPE_BE
+
+    def expose_sub_dirs(self):
+        return super().expose_sub_dirs() + ["storage"]
+
+
+class Cluster(object):
+
+    def __init__(self, name, subnet, image):
+        self.name = name
+        self.subnet = subnet
+        self.image = image
+        self.groups = {
+            node_type: Group(node_type)
+            for node_type in Node.TYPE_ALL
+        }
+
+    @staticmethod
+    def new(name, image):
+        subnet = gen_subnet_prefix16()
+        cluster = Cluster(name, subnet, image)
+        os.makedirs(cluster.get_path(), exist_ok=True)
+        os.makedirs(get_status_path(name), exist_ok=True)
+        return cluster
+
+    @staticmethod
+    def load(name):
+        if not name:
+            raise Exception("Failed to load cluster, name is empty")
+        path = get_cluster_path(name)
+        if not os.path.exists(path):
+            raise Exception(
+                "Failed to load cluster, its directory {} not exists.".format(
+                    path))
+        meta_path = Cluster._get_meta_file(name)
+        if not os.path.exists(meta_path):
+            raise Exception(
+                "Failed to load cluster, its meta file {} not exists.".format(
+                    meta_path))
+        with open(meta_path, "r") as f:
+            cluster = jsonpickle.loads(f.read())
+            for group in cluster.groups.values():
+                group.on_loaded()
+            return cluster
+
+    @staticmethod
+    def _get_meta_file(name):
+        return os.path.join(get_cluster_path(name), "meta")
+
+    def get_image(self):
+        return self.image
+
+    # cluster's nodes will update image too if cluster update.
+    def set_image(self, image):
+        self.image = image
+        for _, group in self.groups.items():
+            for _, node_meta in group.nodes.items():
+                node_meta.image = image
+
+    def get_path(self):
+        return get_cluster_path(self.name)
+
+    def get_group(self, node_type):
+        group = self.groups.get(node_type, None)
+        if not group:
+            raise Exception("Unknown node_type: {}".format(node_type))
+        return group
+
+    def get_node(self, node_type, id):
+        group = self.get_group(node_type)
+        meta = group.get_node(id)
+        if not meta:
+            raise Exception("No found {} with id {}".format(node_type, id))
+        return Node.new(self.name, node_type, id, self.subnet, meta)
+
+    def get_all_nodes(self, node_type):
+        group = self.groups.get(node_type, None)
+        if not group:
+            raise Exception("Unknown node_type: {}".format(node_type))
+        return [
+            Node.new(self.name, node_type, id, self.subnet, meta)
+            for id, meta in group.get_all_nodes().items()
+        ]
+
+    def get_all_nodes_num(self):
+        num = 0
+        for group in self.groups.values():
+            num += group.get_node_num()
+        return num
+
+    def add(self, node_type, id=None):
+        id = self.get_group(node_type).add(id, self.image)
+        node = self.get_node(node_type, id)
+        node.init_dir()
+        return node
+
+    def remove(self, node_type, id):
+        group = self.get_group(node_type)
+        group.remove(id)
+
+    def save(self):
+        self._save_meta()
+        self._save_compose()
+
+    def _save_meta(self):
+        with open(Cluster._get_meta_file(self.name), "w") as f:
+            f.write(jsonpickle.dumps(self, indent=2))
+
+    def _save_compose(self):
+        services = {}
+        for node_type in self.groups.keys():
+            for node in self.get_all_nodes(node_type):
+                services[node.service_name()] = node.compose()
+
+        compose = {
+            "version": "3",
+            "networks": {
+                utils.with_doris_prefix(self.name): {
+                    "driver": "bridge",
+                    "ipam": {
+                        "config": [{
+                            "subnet": "{}.0.0/16".format(self.subnet),
+                        }]
+                    }
+                }
+            },
+            "services": services,
+        }
+
+        utils.write_compose_file(self.get_compose_file(), compose)
+
+    def get_compose_file(self):
+        global get_compose_file
+        return get_compose_file(self.name)
diff --git a/docker/runtime/doris-compose/command.py 
b/docker/runtime/doris-compose/command.py
new file mode 100644
index 0000000000..2fa556d513
--- /dev/null
+++ b/docker/runtime/doris-compose/command.py
@@ -0,0 +1,543 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import cluster as CLUSTER
+import database
+import utils
+import os
+import os.path
+import prettytable
+import shutil
+import sys
+
+LOG = utils.get_logger()
+
+
+# return for_all, related_nodes, related_node_num
+def get_ids_related_nodes(cluster, fe_ids, be_ids, ignore_not_exists=False):
+    if fe_ids is None and be_ids is None:
+        return True, None, cluster.get_all_nodes_num()
+
+    def get_ids_related_nodes_with_type(node_type, ids):
+        if ids is None:
+            return []
+        if not ids:
+            return cluster.get_all_nodes(node_type)
+        else:
+            nodes = []
+            for id in ids:
+                try:
+                    nodes.append(cluster.get_node(node_type, id))
+                except Exception as e:
+                    if ignore_not_exists:
+                        LOG.warning(
+                            utils.render_yellow(
+                                "Not found {} with id {}".format(
+                                    node_type, id)))
+                    else:
+                        raise e
+            return nodes
+
+    nodes = get_ids_related_nodes_with_type(
+        CLUSTER.Node.TYPE_FE, fe_ids) + get_ids_related_nodes_with_type(
+            CLUSTER.Node.TYPE_BE, be_ids)
+
+    related_node_num = len(nodes)
+
+    return len(nodes) == cluster.get_all_nodes_num(), nodes, len(nodes)
+
+
+class Command(object):
+
+    def __init__(self, name):
+        self.name = name
+
+    def add_parser(self, args_parsers):
+        raise Exception("No implemented")
+
+    def run(self, args):
+        raise Exception("No implemented")
+
+    def _add_parser_ids_args(self, parser):
+        group = parser.add_argument_group("for existing nodes",
+                                          "apply to the existing nodes.")
+        group.add_argument("--fe-id", nargs="*", type=int, help="Specify up fe 
ids, support multiple ids, " \
+                "if specific --fe-id but not specific ids, apply to all fe.")
+        group.add_argument("--be-id", nargs="*", type=int, help="Specify up be 
ids, support multiple ids, " \
+                "if specific --be but not specific ids, apply to all be.")
+
+    def _get_parser_bool_action(self, is_store_true):
+        if sys.version_info.major == 3 and sys.version_info.minor >= 9:
+            return argparse.BooleanOptionalAction
+        else:
+            return "store_true" if is_store_true else "store_false"
+
+
+class SimpleCommand(Command):
+
+    def __init__(self, command, help):
+        super().__init__(command)
+        self.command = command
+        self.help = help
+
+    def add_parser(self, args_parsers):
+        help = self.help + " If none of --fe-id, --be-id is specific, then 
apply to all containers."
+        parser = args_parsers.add_parser(self.command, help=help)
+        parser.add_argument("NAME", help="Specify cluster name.")
+        self._add_parser_ids_args(parser)
+
+    def run(self, args):
+        cluster = CLUSTER.Cluster.load(args.NAME)
+        _, related_nodes, related_node_num = get_ids_related_nodes(
+            cluster, args.fe_id, args.be_id)
+        utils.exec_docker_compose_command(cluster.get_compose_file(),
+                                          self.command,
+                                          nodes=related_nodes)
+        show_cmd = self.command[0].upper() + self.command[1:]
+        LOG.info(
+            utils.render_green("{} succ, total related node num {}".format(
+                show_cmd, related_node_num)))
+
+
+class UpCommand(Command):
+
+    def add_parser(self, args_parsers):
+        parser = args_parsers.add_parser("up", help="Create and upgrade doris 
containers, "\
+                "or add new containers. " \
+                "If none of --add-fe-num, --add-be-num, --fe-id, --be-id is 
specific, " \
+                "then apply to all containers.")
+        parser.add_argument("NAME", default="", help="Specific cluster name.")
+        parser.add_argument("IMAGE",
+                            default="",
+                            nargs="?",
+                            help="Specify docker image.")
+
+        group1 = parser.add_argument_group("add new nodes",
+                                           "add cluster nodes.")
+        group1.add_argument(
+            "--add-fe-num",
+            type=int,
+            help="Specify add fe num, default 3 for a new cluster.")
+        group1.add_argument(
+            "--add-be-num",
+            type=int,
+            help="Specify add be num, default 3 for a new cluster.")
+
+        self._add_parser_ids_args(parser)
+
+        group2 = parser.add_mutually_exclusive_group()
+        group2.add_argument(
+            "--no-start",
+            default=False,
+            action=self._get_parser_bool_action(True),
+            help="Not start containers, create or update config image only.")
+        group2.add_argument("--force-recreate",
+                           default=False,
+                           action=self._get_parser_bool_action(True),
+                           help="Recreate containers even if their 
configuration" \
+                                "and image haven't changed. ")
+
+    def run(self, args):
+        if not args.NAME:
+            raise Exception("Need specific not empty cluster name")
+        for_all = True
+        try:
+            cluster = CLUSTER.Cluster.load(args.NAME)
+            if args.fe_id != None or args.be_id != None or args.add_fe_num or 
args.add_be_num:
+                for_all = False
+        except:
+            # a new cluster
+            if not args.IMAGE:
+                raise Exception("New cluster must specific image")
+            if args.fe_id != None:
+                args.fe_id = None
+                LOG.warning(
+                    utils.render_yellow("Ignore --fe-id for new cluster"))
+            if args.be_id != None:
+                args.be_id = None
+                LOG.warning(
+                    utils.render_yellow("Ignore --be-id for new cluster"))
+            cluster = CLUSTER.Cluster.new(args.NAME, args.IMAGE)
+            LOG.info("Create new cluster {} succ, cluster path is {}".format(
+                args.NAME, cluster.get_path()))
+            if not args.add_fe_num:
+                args.add_fe_num = 3
+            if not args.add_be_num:
+                args.add_be_num = 3
+
+        _, related_nodes, _ = get_ids_related_nodes(cluster, args.fe_id,
+                                                    args.be_id)
+        if not related_nodes:
+            related_nodes = []
+        if args.add_fe_num:
+            for i in range(args.add_fe_num):
+                related_nodes.append(cluster.add(CLUSTER.Node.TYPE_FE))
+        if args.add_be_num:
+            for i in range(args.add_be_num):
+                related_nodes.append(cluster.add(CLUSTER.Node.TYPE_BE))
+        if args.IMAGE:
+            for node in related_nodes:
+                node.set_image(args.IMAGE)
+        if for_all and args.IMAGE:
+            cluster.set_image(args.IMAGE)
+        cluster.save()
+
+        options = []
+        if args.no_start:
+            options.append("--no-start")
+        else:
+            options = ["-d", "--remove-orphans"]
+            if args.force_recreate:
+                options.append("--force-recreate")
+
+        related_node_num = len(related_nodes)
+        if for_all:
+            related_node_num = cluster.get_all_nodes_num()
+            related_nodes = None
+
+        utils.exec_docker_compose_command(cluster.get_compose_file(), "up",
+                                          options, related_nodes)
+        if args.no_start:
+            LOG.info(
+                utils.render_green(
+                    "Not up cluster cause specific --no-start, related node 
num {}"
+                    .format(related_node_num)))
+        else:
+            LOG.info(
+                utils.render_green(
+                    "Up cluster {} succ, related node num {}".format(
+                        args.NAME, related_node_num)))
+
+
+class DownCommand(Command):
+
+    def add_parser(self, args_parsers):
+        parser = args_parsers.add_parser("down",
+                                     help="Down doris containers, networks. "\
+                                           "It will also remove node from DB. 
" \
+                                           "If none of --fe-id, --be-id is 
specific, "\
+                                           "then apply to all containers.")
+        parser.add_argument("NAME", help="Specify cluster name")
+        self._add_parser_ids_args(parser)
+        parser.add_argument(
+            "--clean",
+            default=False,
+            action=self._get_parser_bool_action(True),
+            help=
+            "Clean container related files, include expose data, config and 
logs"
+        )
+        parser.add_argument(
+            "--drop-force",
+            default=None,
+            action=self._get_parser_bool_action(True),
+            help="Drop doris node force. For be, if specific --drop-force, "\
+                    "it will send dropp to fe, otherwise send decommission to 
fe.")
+
+    def run(self, args):
+        cluster = CLUSTER.Cluster.load(args.NAME)
+        for_all, related_nodes, related_node_num = get_ids_related_nodes(
+            cluster, args.fe_id, args.be_id, ignore_not_exists=True)
+
+        if for_all:
+            utils.exec_docker_compose_command(cluster.get_compose_file(),
+                                              "down",
+                                              ["-v", "--remove-orphans"])
+            if args.clean:
+                utils.enable_dir_with_rw_perm(cluster.get_path())
+                shutil.rmtree(cluster.get_path())
+                LOG.info(
+                    utils.render_yellow(
+                        "Clean cluster data cause has specific --clean"))
+        else:
+            db_mgr = database.get_db_mgr(cluster.name)
+
+            for node in related_nodes:
+                if node.is_fe():
+                    fe_endpoint = "{}:{}".format(node.get_ip(),
+                                                 CLUSTER.FE_EDITLOG_PORT)
+                    db_mgr.drop_fe(fe_endpoint)
+                elif node.is_be():
+                    be_endpoint = "{}:{}".format(node.get_ip(),
+                                                 CLUSTER.BE_HEARTBEAT_PORT)
+                    if args.drop_force:
+                        db_mgr.drop_be(be_endpoint)
+                    else:
+                        db_mgr.decommission_be(be_endpoint)
+                else:
+                    raise Exception("Unknown node type: {}".format(
+                        node.node_type()))
+
+                #utils.exec_docker_compose_command(cluster.get_compose_file(),
+                #                                  "stop",
+                #                                  nodes=[node])
+                utils.exec_docker_compose_command(cluster.get_compose_file(),
+                                                  "rm", ["-s", "-v", "-f"],
+                                                  nodes=[node])
+                if args.clean:
+                    utils.enable_dir_with_rw_perm(node.get_path())
+                    shutil.rmtree(node.get_path())
+                    LOG.info(
+                        utils.render_yellow(
+                            "Clean {} with id {} data cause has specific 
--clean"
+                            .format(node.node_type(), node.id)))
+
+                cluster.remove(node.node_type(), node.id)
+                cluster.save()
+
+        LOG.info(
+            utils.render_green(
+                "Down cluster {} succ, related node num {}".format(
+                    args.NAME, related_node_num)))
+
+
+class ListNode(object):
+
+    def __init__(self):
+        self.node_type = ""
+        self.id = 0
+        self.cluster_name = ""
+        self.ip = ""
+        self.status = ""
+        self.container_id = ""
+        self.image = ""
+        self.created = ""
+        self.alive = ""
+        self.is_master = ""
+        self.query_port = ""
+        self.tablet_num = ""
+        self.last_heartbeat = ""
+        self.err_msg = ""
+
+    def info(self):
+        return (self.cluster_name, "{}-{}".format(self.node_type, self.id),
+                self.ip, self.status, self.container_id, self.image,
+                self.created, self.alive, self.is_master, self.query_port,
+                self.tablet_num, self.last_heartbeat, self.err_msg)
+
+    def update_db_info(self, db_mgr):
+        if self.node_type == CLUSTER.Node.TYPE_FE:
+            fe = db_mgr.get_fe(self.id)
+            if fe:
+                self.alive = str(fe.alive).lower()
+                self.is_master = str(fe.is_master).lower()
+                self.query_port = fe.query_port
+                self.last_heartbeat = fe.last_heartbeat
+                self.err_msg = fe.err_msg
+        elif self.node_type == CLUSTER.Node.TYPE_BE:
+            be = db_mgr.get_be(self.id)
+            if be:
+                self.alive = str(be.alive).lower()
+                self.tablet_num = be.tablet_num
+                self.last_heartbeat = be.last_heartbeat
+                self.err_msg = be.err_msg
+
+
+class ListCommand(Command):
+
+    def add_parser(self, args_parsers):
+        parser = args_parsers.add_parser(
+            "ls", help="List running doris compose clusters.")
+        parser.add_argument(
+            "NAME",
+            nargs="*",
+            help=
+            "Specify multiple clusters, if specific, show all their 
containers."
+        )
+        parser.add_argument(
+            "-a",
+            "--all",
+            default=False,
+            action=self._get_parser_bool_action(True),
+            help="Show all stopped and bad doris compose projects")
+
+    def run(self, args):
+        COMPOSE_MISSING = "(missing)"
+        COMPOSE_BAD = "(bad)"
+        COMPOSE_GOOD = ""
+
+        SERVICE_DEAD = "dead"
+
+        class ComposeService(object):
+
+            def __init__(self, name, ip, image):
+                self.name = name
+                self.ip = ip
+                self.image = image
+
+        def parse_cluster_compose_file(cluster_name):
+            compose_file = CLUSTER.get_compose_file(cluster_name)
+            if not os.path.exists(compose_file):
+                return COMPOSE_MISSING, {}
+            try:
+                compose = utils.read_compose_file(compose_file)
+                if not compose:
+                    return COMPOSE_BAD, {}
+                services = compose.get("services", {})
+                if services is None:
+                    return COMPOSE_BAD, {}
+                return COMPOSE_GOOD, {
+                    service:
+                    ComposeService(
+                        service,
+                        list(service_conf["networks"].values())[0]
+                        ["ipv4_address"], service_conf["image"])
+                    for service, service_conf in services.items()
+                }
+            except:
+                return COMPOSE_BAD, {}
+
+        clusters = {}
+        search_names = []
+        if args.NAME:
+            search_names = args.NAME
+        elif os.path.exists(CLUSTER.LOCAL_DORIS_PATH):
+            search_names = os.listdir(CLUSTER.LOCAL_DORIS_PATH)
+
+        for cluster_name in search_names:
+            status, services = parse_cluster_compose_file(cluster_name)
+            clusters[cluster_name] = {"status": status, "services": services}
+
+        docker_clusters = utils.get_doris_containers(args.NAME)
+        for cluster_name, containers in docker_clusters.items():
+            cluster_info = clusters.get(cluster_name, None)
+            if not cluster_info:
+                cluster_info = {"status": COMPOSE_MISSING, "services": {}}
+                clusters[cluster_name] = cluster_info
+            for container in containers:
+                #if container.status == "running" and cluster_info[
+                #        "status"] == COMPOSE_GOOD and (
+                #            container.name not in cluster_info["services"]):
+                #    container.status = "orphans"
+                cluster_info["services"][container.name] = container
+
+        TYPE_COMPOSESERVICE = type(ComposeService("", "", ""))
+        if not args.NAME:
+            headers = (utils.render_green(field)
+                       for field in ("CLUSTER", "STATUS", "CONFIG FILES"))
+            table = prettytable.PrettyTable(headers)
+            for name in sorted(clusters.keys()):
+                cluster_info = clusters[name]
+                service_statuses = {}
+                for _, container in cluster_info["services"].items():
+                    status = SERVICE_DEAD if type(
+                        container) == TYPE_COMPOSESERVICE else container.status
+                    service_statuses[status] = service_statuses.get(status,
+                                                                    0) + 1
+                show_status = ",".join([
+                    "{}({})".format(status, count)
+                    for status, count in service_statuses.items()
+                ])
+                if not args.all and service_statuses.get("running", 0) == 0:
+                    continue
+                compose_file = CLUSTER.get_compose_file(name)
+                table.add_row(
+                    (name, show_status, "{}{}".format(compose_file,
+                                                      cluster_info["status"])))
+            print(table)
+            return
+
+        headers = (utils.render_green(field)
+                   for field in ("CLUSTER", "NAME", "IP", "STATUS",
+                                 "CONTAINER ID", "IMAGE", "CREATED", "alive",
+                                 "is_master", "query_port", "tablet_num",
+                                 "last_heartbeat", "err_msg"))
+        table = prettytable.PrettyTable(headers)
+
+        for cluster_name in sorted(clusters.keys()):
+            fe_ids = {}
+            be_ids = {}
+            services = clusters[cluster_name]["services"]
+            db_mgr = database.get_db_mgr(cluster_name, False)
+
+            nodes = []
+            for service_name, container in services.items():
+                _, node_type, id = utils.parse_service_name(container.name)
+                node = ListNode()
+                node.cluster_name = cluster_name
+                node.node_type = node_type
+                node.id = id
+                node.update_db_info(db_mgr)
+                nodes.append(node)
+
+                if node_type == CLUSTER.Node.TYPE_FE:
+                    fe_ids[id] = True
+                elif node_type == CLUSTER.Node.TYPE_BE:
+                    be_ids[id] = True
+
+                if type(container) == TYPE_COMPOSESERVICE:
+                    node.ip = container.ip
+                    node.image = container.image
+                    node.status = SERVICE_DEAD
+                else:
+                    node.created = container.attrs.get("Created",
+                                                       "")[:19].replace(
+                                                           "T", " ")
+                    node.ip = list(
+                        container.attrs["NetworkSettings"]
+                        ["Networks"].values())[0]["IPAMConfig"]["IPv4Address"]
+                    node.image = ",".join(container.image.tags)
+                    node.container_id = container.short_id
+                    node.status = container.status
+
+            for id, fe in db_mgr.fe_states.items():
+                if fe_ids.get(id, False):
+                    continue
+                node = ListNode()
+                node.cluster_name = cluster_name
+                node.node_type = CLUSTER.Node.TYPE_FE
+                node.id = id
+                node.status = SERVICE_DEAD
+                node.update_db_info(db_mgr)
+                nodes.append(node)
+            for id, be in db_mgr.be_states.items():
+                if be_ids.get(id, False):
+                    continue
+                node = ListNode()
+                node.cluster_name = cluster_name
+                node.node_type = CLUSTER.Node.TYPE_BE
+                node.id = id
+                node.status = SERVICE_DEAD
+                node.update_db_info(db_mgr)
+                nodes.append(node)
+
+            def get_key(node):
+                key = node.id
+                if node.node_type == CLUSTER.Node.TYPE_FE:
+                    key += 0 * CLUSTER.ID_LIMIT
+                elif node.node_type == CLUSTER.Node.TYPE_BE:
+                    key += 1 * CLUSTER.ID_LIMIT
+                else:
+                    key += 2 * CLUSTER.ID_LIMIT
+                return key
+
+            for node in sorted(nodes, key=get_key):
+                table.add_row(node.info())
+
+        print(table)
+
+
+ALL_COMMANDS = [
+    UpCommand("up"),
+    DownCommand("down"),
+    SimpleCommand("start", "Start the doris containers. "),
+    SimpleCommand("stop", "Stop the doris containers. "),
+    SimpleCommand("restart", "Restart the doris containers. "),
+    SimpleCommand("pause", "Pause the doris containers. "),
+    SimpleCommand("unpause", "Unpause the doris containers. "),
+    ListCommand("ls"),
+]
diff --git a/docker/runtime/doris-compose/database.py 
b/docker/runtime/doris-compose/database.py
new file mode 100644
index 0000000000..21aa400a47
--- /dev/null
+++ b/docker/runtime/doris-compose/database.py
@@ -0,0 +1,239 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import cluster as CLUSTER
+import os.path
+import pymysql
+import time
+import utils
+
+LOG = utils.get_logger()
+
+
+class FEState(object):
+
+    def __init__(self, id, query_port, is_master, alive, last_heartbeat,
+                 err_msg):
+        self.id = id
+        self.query_port = query_port
+        self.is_master = is_master
+        self.alive = alive
+        self.last_heartbeat = last_heartbeat
+        self.err_msg = err_msg
+
+
+class BEState(object):
+
+    def __init__(self, id, decommissioned, alive, tablet_num, last_heartbeat,
+                 err_msg):
+        self.id = id
+        self.decommissioned = decommissioned
+        self.alive = alive
+        self.tablet_num = tablet_num
+        self.last_heartbeat = last_heartbeat
+        self.err_msg = err_msg
+
+
+class DBManager(object):
+
+    def __init__(self):
+        self.fe_states = {}
+        self.be_states = {}
+        self.query_port = -1
+        self.conn = None
+
+    def set_query_port(self, query_port):
+        self.query_port = query_port
+
+    def get_fe(self, id):
+        return self.fe_states.get(id, None)
+
+    def get_be(self, id):
+        return self.be_states.get(id, None)
+
+    def load_states(self, query_ports):
+        self._load_fe_states(query_ports)
+        self._load_be_states()
+
+    def drop_fe(self, fe_endpoint):
+        id = CLUSTER.Node.get_id_from_ip(fe_endpoint[:fe_endpoint.find(":")])
+        try:
+            self._exec_query(
+                "ALTER SYSTEM DROP FOLLOWER '{}'".format(fe_endpoint))
+            LOG.info("Drop fe {} with id {} from db succ.".format(
+                fe_endpoint, id))
+        except Exception as e:
+            if str(e).find("frontend does not exist") >= 0:
+                LOG.info(
+                    "Drop fe {} with id {} from db succ cause it does not 
exist in db."
+                    .format(fe_endpoint, id))
+                return
+            raise e
+
+    def drop_be(self, be_endpoint):
+        id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")])
+        try:
+            self._exec_query(
+                "ALTER SYSTEM DROPP BACKEND '{}'".format(be_endpoint))
+            LOG.info("Drop be {} with id {} from db succ.".format(
+                be_endpoint, id))
+        except Exception as e:
+            if str(e).find("backend does not exists") >= 0:
+                LOG.info(
+                    "Drop be {} with id {} from db succ cause it does not 
exist in db."
+                    .format(be_endpoint, id))
+                return
+            raise e
+
+    def decommission_be(self, be_endpoint):
+        old_tablet_num = 0
+        id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")])
+        if id not in self.be_states:
+            self._load_be_states()
+        if id in self.be_states:
+            be = self.be_states[id]
+            old_tablet_num = be.tablet_num
+            if not be.alive:
+                raise Exception("Decommission be {} with id {} fail " \
+                        "cause it's not alive, maybe you should specific 
--drop-force " \
+                        " to dropp it from db".format(be_endpoint, id))
+        try:
+            self._exec_query(
+                "ALTER SYSTEM DECOMMISSION BACKEND '{}'".format(be_endpoint))
+            LOG.info("Mark be {} with id {} as decommissioned, start migrate 
its tablets, " \
+                    "wait migrating job finish.".format(be_endpoint, id))
+        except Exception as e:
+            if str(e).find("Backend does not exist") >= 0:
+                LOG.info("Decommission be {} with id {} from db succ " \
+                        "cause it does not exist in db.",format(be_endpoint, 
id))
+                return
+            raise e
+
+        while True:
+            self._load_be_states()
+            be = self.be_states.get(id, None)
+            if not be:
+                LOG.info("Decommission be {} succ, total migrate {} tablets, " 
\
+                        "has drop it from db.".format(be_endpoint, 
old_tablet_num))
+                return
+            LOG.info(
+                    "Decommission be {} status: alive {}, decommissioned {}. " 
\
+                    "It is migrating its tablets, left {}/{} tablets."
+                .format(be_endpoint, be.alive, be.decommissioned, 
be.tablet_num, old_tablet_num))
+
+            time.sleep(5)
+
+    def _load_fe_states(self, query_ports):
+        fe_states = {}
+        alive_master_fe_port = None
+        for record in self._exec_query("show frontends;"):
+            ip = record[1]
+            is_master = record[7] == "true"
+            alive = record[10] == "true"
+            last_heartbeat = record[12]
+            err_msg = record[14]
+            id = CLUSTER.Node.get_id_from_ip(ip)
+            query_port = query_ports.get(id, None)
+            fe = FEState(id, query_port, is_master, alive, last_heartbeat,
+                         err_msg)
+            fe_states[id] = fe
+            if is_master and alive and query_port:
+                alive_master_fe_port = query_port
+        self.fe_states = fe_states
+        if alive_master_fe_port and alive_master_fe_port != self.query_port:
+            self.query_port = alive_master_fe_port
+            self._reset_conn()
+
+    def _load_be_states(self):
+        be_states = {}
+        for record in self._exec_query("show backends;"):
+            ip = record[1]
+            last_heartbeat = record[7]
+            alive = record[8] == "true"
+            decommissioned = record[9] == "true"
+            tablet_num = int(record[10])
+            err_msg = record[18]
+            id = CLUSTER.Node.get_id_from_ip(ip)
+            be = BEState(id, decommissioned, alive, tablet_num, last_heartbeat,
+                         err_msg)
+            be_states[id] = be
+        self.be_states = be_states
+
+    def _exec_query(self, sql):
+        self._prepare_conn()
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql)
+            return cursor.fetchall()
+
+    def _prepare_conn(self):
+        if self.conn:
+            return
+        if self.query_port <= 0:
+            raise Exception("Not set query_port")
+        self._reset_conn()
+
+    def _reset_conn(self):
+        self.conn = pymysql.connect(user="root",
+                                    host="127.0.0.1",
+                                    read_timeout = 10,
+                                    port=self.query_port)
+
+
+def get_db_mgr(cluster_name, required_load_succ=True):
+    assert cluster_name
+    db_mgr = DBManager()
+    containers = utils.get_doris_containers(cluster_name).get(
+        cluster_name, None)
+    if not containers:
+        return db_mgr
+    alive_fe_ports = {}
+    for container in containers:
+        if utils.is_container_running(container):
+            _, node_type, id = utils.parse_service_name(container.name)
+            if node_type == CLUSTER.Node.TYPE_FE:
+                query_port = utils.get_map_ports(container).get(
+                    CLUSTER.FE_QUERY_PORT, None)
+                if query_port:
+                    alive_fe_ports[id] = query_port
+    if not alive_fe_ports:
+        return db_mgr
+
+    master_fe_ip_file = os.path.join(CLUSTER.get_status_path(cluster_name),
+                                     "master_fe_ip")
+    query_port = None
+    if os.path.exists(master_fe_ip_file):
+        with open(master_fe_ip_file, "r") as f:
+            master_fe_ip = f.read()
+            if master_fe_ip:
+                master_id = CLUSTER.Node.get_id_from_ip(master_fe_ip)
+                query_port = alive_fe_ports.get(master_id, None)
+    if not query_port:
+        # A new cluster's master is fe-1
+        if 1 in alive_fe_ports:
+            query_port = alive_fe_ports[1]
+        else:
+            query_port = list(alive_fe_ports.values())[0]
+
+    db_mgr.set_query_port(query_port)
+    try:
+        db_mgr.load_states(alive_fe_ports)
+    except Exception as e:
+        if required_load_succ:
+            raise e
+        LOG.exception(e)
+
+    return db_mgr
diff --git a/docker/runtime/doris-compose/doris-compose.py 
b/docker/runtime/doris-compose/doris-compose.py
new file mode 100644
index 0000000000..77ceb736e3
--- /dev/null
+++ b/docker/runtime/doris-compose/doris-compose.py
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import command
+import utils
+
+
+def parse_args():
+    ap = argparse.ArgumentParser(description="")
+    args_parsers = ap.add_subparsers(dest="command")
+    for cmd in command.ALL_COMMANDS:
+        cmd.add_parser(args_parsers)
+
+    return ap.parse_args(), ap.format_help()
+
+
+def run(args, help):
+    timer = utils.Timer()
+    for cmd in command.ALL_COMMANDS:
+        if args.command == cmd.name:
+            return cmd.run(args)
+    timer.cancel()
+    print(help)
+    return -1
+
+
+def main():
+    args, help = parse_args()
+    run(args, help)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/docker/runtime/doris-compose/requirements.txt 
b/docker/runtime/doris-compose/requirements.txt
new file mode 100644
index 0000000000..039260e6c7
--- /dev/null
+++ b/docker/runtime/doris-compose/requirements.txt
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+docker
+docker-compose
+jsonpickle
+prettytable
+pymysql
diff --git a/docker/runtime/doris-compose/resource/common.sh 
b/docker/runtime/doris-compose/resource/common.sh
new file mode 100644
index 0000000000..044e26487d
--- /dev/null
+++ b/docker/runtime/doris-compose/resource/common.sh
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+export MASTER_FE_IP=""
+export MASTER_FE_IP_FILE=$DORIS_HOME/status/master_fe_ip
+
+health_log() {
+    date >> "$DORIS_HOME/log/health.out"
+    echo "$@" >> "$DORIS_HOME/log/health.out"
+}
+
+read_master_fe_ip() {
+    MASTER_FE_IP=`cat $MASTER_FE_IP_FILE`
+    if [ $? -eq 0 ]; then
+        health_log "master fe ${MASTER_FE_IP} has ready."
+        return 0
+    else
+        health_log "master fe has not ready."
+        return 1
+    fi
+}
+
diff --git a/docker/runtime/doris-compose/resource/init_be.sh 
b/docker/runtime/doris-compose/resource/init_be.sh
new file mode 100644
index 0000000000..9a59876382
--- /dev/null
+++ b/docker/runtime/doris-compose/resource/init_be.sh
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+DIR=$(cd $(dirname $0);pwd)
+
+source $DIR/common.sh
+
+REGISTER_FILE=$DORIS_HOME/status/$MY_IP-register
+
+add_backend() {
+    while true; do
+        read_master_fe_ip
+        if [ $? -ne 0 ]; then
+            sleep 1
+            continue
+        fi
+
+        output=`mysql -P $FE_QUERY_PORT -h $MASTER_FE_IP -u root --execute 
"ALTER SYSTEM ADD BACKEND '$MY_IP:$BE_HEARTBEAT_PORT';" 2>&1`
+        res=$?
+        health_log "$output"
+        [ $res -eq 0 ] && break
+        (echo $output | grep "Same backend already exists") && break
+        sleep 1
+    done
+
+    touch $REGISTER_FILE
+}
+
+main() {
+    if [ ! -f $REGISTER_FILE ]; then
+        add_backend
+    fi
+    bash $DORIS_HOME/bin/start_be.sh
+}
+
+main
diff --git a/docker/runtime/doris-compose/resource/init_fe.sh 
b/docker/runtime/doris-compose/resource/init_fe.sh
new file mode 100644
index 0000000000..fbbd335f37
--- /dev/null
+++ b/docker/runtime/doris-compose/resource/init_fe.sh
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+DIR=$(cd $(dirname $0);pwd)
+
+source $DIR/common.sh
+
+add_frontend() {
+    while true; do
+        read_master_fe_ip
+        if [ $? -ne 0 ]; then
+            sleep 1
+            continue
+        fi
+
+        output=`mysql -P $FE_QUERY_PORT -h $MASTER_FE_IP -u root --execute 
"ALTER SYSTEM ADD FOLLOWER '$MY_IP:$FE_EDITLOG_PORT';" 2>&1`
+        res=$?
+        health_log "$output"
+        [ $res -eq 0 ] && break
+        (echo $output | grep "frontend already exists") && break
+        sleep 1
+    done
+}
+
+fe_daemon() {
+    set +e
+    while true; do
+        output=`mysql -P $FE_QUERY_PORT -h $MY_IP -u root --execute "SHOW 
FRONTENDS;" | grep -w $MY_IP | awk '{print $8}' 2>&1`
+        if [ $? -ne 0 ]; then
+            health_log "$output"
+        else
+            echo $output | grep true
+            if [ $? -eq 0 ]; then
+                echo $MY_IP > $MASTER_FE_IP_FILE
+                if [ "$MASTER_FE_IP" != "$MY_IP" ]; then
+                    health_log "change to master, last master is $MASTER_FE_IP"
+                    MASTER_FE_IP=$MY_IP
+                fi
+            fi
+        fi
+        sleep 3
+    done
+}
+
+main() {
+    if [ "$MY_ID" = "1" -o  -d "${DORIS_HOME}/doris-meta/image" ]; then
+        fe_daemon &
+        bash $DORIS_HOME/bin/start_fe.sh
+    else
+        add_frontend
+        fe_daemon &
+        $DORIS_HOME/bin/start_fe.sh --helper $MASTER_FE_IP:$FE_EDITLOG_PORT
+    fi
+}
+
+main
diff --git a/docker/runtime/doris-compose/utils.py 
b/docker/runtime/doris-compose/utils.py
new file mode 100644
index 0000000000..7317715fe9
--- /dev/null
+++ b/docker/runtime/doris-compose/utils.py
@@ -0,0 +1,253 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import docker
+import logging
+import os
+import subprocess
+import time
+import yaml
+
+DORIS_PREFIX = "doris-"
+
+
+class Timer(object):
+
+    def __init__(self):
+        self.start = time.time()
+        self.canceled = False
+
+    def __del__(self):
+        if not self.canceled:
+            LOG.info("=== Total run time: {} s".format(
+                int(time.time() - self.start)))
+
+    def cancel(self):
+        self.canceled = True
+
+
+def get_logger(name=None):
+    logger = logging.getLogger(name)
+    if not logger.hasHandlers():
+        formatter = logging.Formatter(
+            '%(asctime)s - %(filename)s - %(lineno)dL - %(levelname)s - 
%(message)s'
+        )
+        ch = logging.StreamHandler()
+        ch.setLevel(logging.DEBUG)
+        ch.setFormatter(formatter)
+        logger.addHandler(ch)
+        logger.setLevel(logging.INFO)
+
+    return logger
+
+
+LOG = get_logger()
+
+
+def render_red(s):
+    return "\x1B[31m" + str(s) + "\x1B[0m"
+
+
+def render_green(s):
+    return "\x1B[32m" + str(s) + "\x1B[0m"
+
+
+def render_yellow(s):
+    return "\x1B[33m" + str(s) + "\x1B[0m"
+
+
+def render_blue(s):
+    return "\x1B[34m" + str(s) + "\x1B[0m"
+
+
+def with_doris_prefix(name):
+    return DORIS_PREFIX + name
+
+
+def parse_service_name(service_name):
+    import cluster
+    if not service_name or not service_name.startswith(DORIS_PREFIX):
+        return None, None, None
+    pos2 = service_name.rfind("-")
+    if pos2 < 0:
+        return None, None, None
+    id = None
+    try:
+        id = int(service_name[pos2 + 1:])
+    except:
+        return None, None, None
+    pos1 = service_name.rfind("-", len(DORIS_PREFIX), pos2 - 1)
+    if pos1 < 0:
+        return None, None, None
+    node_type = service_name[pos1 + 1:pos2]
+    if node_type not in cluster.Node.TYPE_ALL:
+        return None, None, None
+    return service_name[len(DORIS_PREFIX):pos1], node_type, id
+
+
+def get_map_ports(container):
+    return {
+        int(innner.replace("/tcp", "")): int(outer[0]["HostPort"])
+        for innner, outer in container.attrs.get("NetworkSettings", {}).get(
+            "Ports", {}).items()
+    }
+
+
+def is_container_running(container):
+    return container.status == "running"
+
+
+# return all doris containers when cluster_names is empty
+def get_doris_containers(cluster_names):
+    if cluster_names:
+        if type(cluster_names) == type(""):
+            filter_names = "{}{}-*".format(DORIS_PREFIX, cluster_names)
+        else:
+            filter_names = "|".join([
+                "{}{}-*".format(DORIS_PREFIX, name) for name in cluster_names
+            ])
+    else:
+        filter_names = "{}*".format(DORIS_PREFIX)
+
+    clusters = {}
+    client = docker.client.from_env()
+    containers = client.containers.list(filters={"name": filter_names})
+    for container in containers:
+        cluster_name, _, _ = parse_service_name(container.name)
+        if not cluster_name:
+            continue
+        if cluster_names and cluster_name not in cluster_names:
+            continue
+        if cluster_name not in clusters:
+            clusters[cluster_name] = []
+        clusters[cluster_name].append(container)
+    return clusters
+
+
+def get_doris_running_containers(cluster_name):
+    return {
+        container.name: container
+        for container in get_doris_containers(cluster_name).get(
+            cluster_name, []) if is_container_running(container)
+    }
+
+
+def is_dir_empty(dir):
+    return False if os.listdir(dir) else True
+
+
+def exec_shell_command(command, ignore_errors=False):
+    LOG.info("Exec command: {}".format(command))
+    p = subprocess.Popen(command,
+                         shell=True,
+                         stdout=subprocess.PIPE,
+                         stderr=subprocess.STDOUT)
+    out = p.communicate()[0].decode('utf-8')
+    if not ignore_errors:
+        assert p.returncode == 0, out
+    if out:
+        print(out)
+    return p.returncode, out
+
+
+def exec_docker_compose_command(compose_file,
+                                command,
+                                options=None,
+                                nodes=None,
+                                user_command=None):
+    if nodes != None and not nodes:
+        return 0, "Skip"
+
+    compose_cmd = "docker-compose -f {}  {}  {} {} {}".format(
+        compose_file, command, " ".join(options) if options else "",
+        " ".join([node.service_name() for node in nodes]) if nodes else "",
+        user_command if user_command else "")
+
+    return exec_shell_command(compose_cmd)
+
+
+def get_docker_subnets_prefix16():
+    subnet_prefixes = {}
+    client = docker.from_env()
+    for network in client.networks.list():
+        if not network.attrs:
+            continue
+        ipam = network.attrs.get("IPAM", None)
+        if not ipam:
+            continue
+        configs = ipam.get("Config", None)
+        if not configs:
+            continue
+        for config in configs:
+            subnet = config.get("Subnet", None)
+            if not subnet:
+                continue
+            pos1 = subnet.find(".")
+            if pos1 <= 0:
+                continue
+            pos2 = subnet.find(".", pos1 + 1)
+            if pos2 <= 0:
+                continue
+            num1 = subnet[0:pos1]
+            num2 = subnet[pos1 + 1:pos2]
+            network_part_len = 16
+            pos = subnet.find("/")
+            if pos != -1:
+                network_part_len = int(subnet[pos + 1:])
+            if network_part_len < 16:
+                for i in range(256):
+                    subnet_prefixes["{}.{}".format(num1, i)] = True
+            else:
+                subnet_prefixes["{}.{}".format(num1, num2)] = True
+
+    LOG.debug("Get docker subnet prefixes: {}".format(subnet_prefixes))
+
+    return subnet_prefixes
+
+
+def copy_image_directory(image, image_dir, local_dir):
+    client = docker.from_env()
+    volumes = ["{}:/opt/mount".format(local_dir)]
+    if image_dir.endswith("/"):
+        image_dir += "."
+    elif not image_dir.endswith("."):
+        image_dir += "/."
+    client.containers.run(
+        image,
+        remove=True,
+        volumes=volumes,
+        entrypoint="cp -r  {}  /opt/mount/".format(image_dir))
+
+
+def enable_dir_with_rw_perm(dir):
+    if not os.path.exists(dir):
+        return
+    client = docker.client.from_env()
+    client.containers.run("ubuntu",
+                          remove=True,
+                          volumes=["{}:/opt/mount".format(dir)],
+                          entrypoint="chmod a+rw -R {}".format("/opt/mount"))
+
+
+def read_compose_file(file):
+    with open(file, "r") as f:
+        return yaml.safe_load(f.read())
+
+
+def write_compose_file(file, compose):
+    with open(file, "w") as f:
+        f.write(yaml.dump(compose))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to