This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 2e3d15b552 [Feature](doris compose) A tool for setup and manage doris docker cluster scaling easily (#21649) 2e3d15b552 is described below commit 2e3d15b552aceb968bdcf5ce0352d9505cbb12ad Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Wed Jul 12 22:13:38 2023 +0800 [Feature](doris compose) A tool for setup and manage doris docker cluster scaling easily (#21649) --- docker/runtime/doris-compose/Dockerfile | 30 ++ docker/runtime/doris-compose/Readme.md | 102 +++++ docker/runtime/doris-compose/cluster.py | 438 ++++++++++++++++++ docker/runtime/doris-compose/command.py | 543 +++++++++++++++++++++++ docker/runtime/doris-compose/database.py | 239 ++++++++++ docker/runtime/doris-compose/doris-compose.py | 48 ++ docker/runtime/doris-compose/requirements.txt | 22 + docker/runtime/doris-compose/resource/common.sh | 36 ++ docker/runtime/doris-compose/resource/init_be.sh | 50 +++ docker/runtime/doris-compose/resource/init_fe.sh | 70 +++ docker/runtime/doris-compose/utils.py | 253 +++++++++++ 11 files changed, 1831 insertions(+) diff --git a/docker/runtime/doris-compose/Dockerfile b/docker/runtime/doris-compose/Dockerfile new file mode 100644 index 0000000000..8f5d7e3376 --- /dev/null +++ b/docker/runtime/doris-compose/Dockerfile @@ -0,0 +1,30 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# choose a base image +FROM openjdk:8u342-jdk + +# set environment variables +ENV JAVA_HOME="/usr/local/openjdk-8/" + +ADD output /opt/apache-doris/ + +RUN apt-get update && \ + apt-get install -y default-mysql-client python && \ + apt-get clean + diff --git a/docker/runtime/doris-compose/Readme.md b/docker/runtime/doris-compose/Readme.md new file mode 100644 index 0000000000..13f543d4e7 --- /dev/null +++ b/docker/runtime/doris-compose/Readme.md @@ -0,0 +1,102 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Doris compose + +Use doris compose to create doris docker compose clusters. + +## Requirements + +1. The doris image should contains: + +``` +/opt/apache-doris/{fe, be} +``` + +if build doris use `sh build.sh`, then its output satisfy with this, then run command in doris root + +``` +docker build -f docker/runtime/doris-compose/Dockerfile -t <image> . +``` + +will generate a image. + +2. Install the dependent python library in 'docker/runtime/doris-compose/requirements.txt' + + +``` +python -m pip install --user -r docker/runtime/doris-compose/requirements.txt +``` + +## Usage + +### Create a cluster or recreate its containers + +``` +python docker/runtime/doris-compose/doris-compose.py up <cluster-name> <image?> + --add-fe-num <add-fe-num> --add-be-num <add-be-num> + --fe-id <fd-id> --be-id <be-id> + +``` + +if it's a new cluster, must specific the image. + +add fe/be nodes with the specific image, or update existing nodes with `--fe-id`, `--be-id` + +### Remove node from the cluster + +``` +python docker/runtime/doris-compose/doris-compose.py down <cluster-name> --fe-id <fe-id> --be-id<be-id> [--clean] [--drop-force] +``` + +Down the containers and remove it from the DB. + +For BE, if specific drop force, it will send dropp sql to FE, otherwise it will send decommission sql to FE. + +If specific `--clean`, it will delete its data too. + + +### Start, stop, restart specific nodes + + +``` +python docker/runtime/doris-compose/doris-compose.py start <cluster-name> --fe-id <multiple fe ids> --be-id <multiple be ids> +python docker/runtime/doris-compose/doris-compose.py restart <cluster-name> --fe-id <multiple fe ids> --be-id <multiple be ids> +``` + +### List doris cluster + +``` +python docker/runtime/doris-compose/doris-compose.py ls <-a> <multiple cluster names> +``` + +if specific cluster names, it will list all the cluster's nodes. + +Otherwise it will just list summary of each clusters. If not specific -a, it will list only active clusters. + +If specific `-a`, it will list the unactive clusters too. + +There are more options about doris-compose. Just try + +``` +python docker/runtime/doris-compose/doris-compose.py <command> -h +``` + + + diff --git a/docker/runtime/doris-compose/cluster.py b/docker/runtime/doris-compose/cluster.py new file mode 100644 index 0000000000..2556809a3c --- /dev/null +++ b/docker/runtime/doris-compose/cluster.py @@ -0,0 +1,438 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +import jsonpickle +import os +import os.path +import utils + +DOCKER_DORIS_PATH = "/opt/apache-doris" +LOCAL_DORIS_PATH = os.getenv("LOCAL_DORIS_PATH") +if not LOCAL_DORIS_PATH: + LOCAL_DORIS_PATH = "/tmp/doris" + +LOCAL_RESOURCE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), + "resource") +DOCKER_RESOURCE_PATH = os.path.join(DOCKER_DORIS_PATH, "resource") + +FE_HTTP_PORT = 8030 +FE_RPC_PORT = 9020 +FE_QUERY_PORT = 9030 +FE_EDITLOG_PORT = 9010 + +BE_PORT = 9060 +BE_WEBSVR_PORT = 8040 +BE_HEARTBEAT_PORT = 9050 +BE_BRPC_PORT = 8060 + +ID_LIMIT = 10000 + +IP_PART4_SIZE = 200 + +LOG = utils.get_logger() + + +def get_cluster_path(cluster_name): + return os.path.join(LOCAL_DORIS_PATH, cluster_name) + + +def get_compose_file(cluster_name): + return os.path.join(get_cluster_path(cluster_name), "docker-compose.yml") + + +def get_status_path(cluster_name): + return os.path.join(get_cluster_path(cluster_name), "status") + + +def gen_subnet_prefix16(): + used_subnet = utils.get_docker_subnets_prefix16() + if os.path.exists(LOCAL_DORIS_PATH): + for cluster_name in os.listdir(LOCAL_DORIS_PATH): + try: + cluster = Cluster.load(cluster_name) + used_subnet[cluster.subnet] = True + except: + pass + + for i in range(128, 192): + for j in range(256): + subnet = "{}.{}".format(i, j) + if not used_subnet.get(subnet, False): + return subnet + + raise Exception("Failed to gen subnet") + + +class NodeMeta(object): + + def __init__(self, image): + self.image = image + + +class Group(object): + + def __init__(self, node_type): + self.node_type = node_type + self.nodes = {} # id : NodeMeta + self.next_id = 1 + + def add(self, id, image): + assert image + if not id: + id = self.next_id + self.next_id += 1 + if self.get_node(id): + raise Exception( + "Failed to add {} with id {}, id has exists".format( + self.node_type, id)) + if id > ID_LIMIT: + raise Exception( + "Failed to add {} with id {}, id exceeds {}".format( + self.node_type, id, ID_LIMIT)) + self.nodes[id] = NodeMeta(image) + + return id + + def remove(self, id): + self.nodes.pop(id, None) + + def get_node_num(self): + return len(self.nodes) + + def get_all_nodes(self): + return self.nodes + + def get_node(self, id): + return self.nodes.get(id, None) + + def on_loaded(self): + nodes = {} + for id, node in self.nodes.items(): + nodes[int(id)] = node + self.nodes = nodes + + +class Node(object): + TYPE_FE = "fe" + TYPE_BE = "be" + TYPE_ALL = [TYPE_FE, TYPE_BE] + + def __init__(self, cluster_name, id, subnet, meta): + self.cluster_name = cluster_name + self.id = id + self.subnet = subnet + self.meta = meta + + @staticmethod + def new(cluster_name, node_type, id, subnet, meta): + if node_type == Node.TYPE_FE: + return FE(cluster_name, id, subnet, meta) + elif node_type == Node.TYPE_BE: + return BE(cluster_name, id, subnet, meta) + else: + raise Exception("Unknown node type {}".format(node_type)) + + def init_dir(self): + path = self.get_path() + os.makedirs(path, exist_ok=True) + + # copy config to local + conf_dir = os.path.join(path, "conf") + if not os.path.exists(conf_dir) or utils.is_dir_empty(conf_dir): + utils.copy_image_directory( + self.get_image(), "{}/{}/conf".format(DOCKER_DORIS_PATH, + self.node_type()), + conf_dir) + assert not utils.is_dir_empty(conf_dir), "conf directory {} is empty, " \ + "check doris path in image is correct".format(conf_dir) + for sub_dir in self.expose_sub_dirs(): + os.makedirs(os.path.join(path, sub_dir), exist_ok=True) + + def is_fe(self): + return self.node_type() == Node.TYPE_FE + + def is_be(self): + return self.node_type() == Node.TYPE_BE + + def node_type(self): + raise Exception("No implemented") + + def expose_sub_dirs(self): + return ["conf", "log"] + + def get_name(self): + return "{}-{}".format(self.node_type(), self.id) + + def get_path(self): + return os.path.join(get_cluster_path(self.cluster_name), + self.get_name()) + + def get_image(self): + return self.meta.image + + def set_image(self, image): + self.meta.image = image + + def get_ip(self): + seq = self.id + seq += IP_PART4_SIZE + if self.node_type() == Node.TYPE_FE: + seq += 0 * ID_LIMIT + elif self.node_type() == Node.TYPE_BE: + seq += 1 * ID_LIMIT + else: + seq += 2 * ID_LIMIT + return "{}.{}.{}".format(self.subnet, int(seq / IP_PART4_SIZE), + seq % IP_PART4_SIZE) + + @staticmethod + def get_id_from_ip(ip): + pos2 = ip.rfind(".") + pos1 = ip.rfind(".", 0, pos2 - 1) + num3 = int(ip[pos1 + 1:pos2]) + num4 = int(ip[pos2 + 1:]) + seq = num3 * IP_PART4_SIZE + num4 + while seq > ID_LIMIT: + seq -= ID_LIMIT + seq -= IP_PART4_SIZE + return seq + + def service_name(self): + return utils.with_doris_prefix("{}-{}".format(self.cluster_name, + self.get_name())) + + def docker_env(self): + return { + "MY_IP": self.get_ip(), + "MY_ID": self.id, + "FE_QUERY_PORT": FE_QUERY_PORT, + "FE_EDITLOG_PORT": FE_EDITLOG_PORT, + "BE_HEARTBEAT_PORT": BE_HEARTBEAT_PORT, + "DORIS_HOME": os.path.join(DOCKER_DORIS_PATH, self.node_type()), + } + + def docker_ports(self): + raise Exception("No implemented") + + def compose(self): + return { + "cap_add": ["SYS_PTRACE"], + "hostname": + self.get_name(), + "container_name": + self.service_name(), + "command": + self.docker_command(), + "environment": + self.docker_env(), + "image": + self.get_image(), + "networks": { + utils.with_doris_prefix(self.cluster_name): { + "ipv4_address": self.get_ip(), + } + }, + "ports": + self.docker_ports(), + "ulimits": { + "core": -1 + }, + "security_opt": ["seccomp:unconfined"], + "volumes": [ + "{}:{}/{}/{}".format(os.path.join(self.get_path(), + sub_dir), DOCKER_DORIS_PATH, + self.node_type(), sub_dir) + for sub_dir in self.expose_sub_dirs() + ] + [ + "{}:{}:ro".format(LOCAL_RESOURCE_PATH, DOCKER_RESOURCE_PATH), + "{}:{}/{}/status".format(get_status_path(self.cluster_name), + DOCKER_DORIS_PATH, self.node_type()), + ] + [ + "{0}:{0}:ro".format(path) + for path in ("/etc/localtime", "/etc/timezone", + "/usr/share/zoneinfo") if os.path.exists(path) + ], + } + + +class FE(Node): + + def docker_command(self): + return [ + "bash", + os.path.join(DOCKER_RESOURCE_PATH, "init_fe.sh"), + #"{}/fe/bin/init_fe.sh".format(DOCKER_DORIS_PATH), + ] + + def docker_ports(self): + return [FE_HTTP_PORT, FE_EDITLOG_PORT, FE_RPC_PORT, FE_QUERY_PORT] + + def node_type(self): + return Node.TYPE_FE + + def expose_sub_dirs(self): + return super().expose_sub_dirs() + ["doris-meta"] + + +class BE(Node): + + def docker_command(self): + return [ + "bash", + os.path.join(DOCKER_RESOURCE_PATH, "init_be.sh"), + #"{}/be/bin/init_be.sh".format(DOCKER_DORIS_PATH), + ] + + def docker_ports(self): + return [BE_WEBSVR_PORT, BE_BRPC_PORT, BE_HEARTBEAT_PORT, BE_PORT] + + def node_type(self): + return Node.TYPE_BE + + def expose_sub_dirs(self): + return super().expose_sub_dirs() + ["storage"] + + +class Cluster(object): + + def __init__(self, name, subnet, image): + self.name = name + self.subnet = subnet + self.image = image + self.groups = { + node_type: Group(node_type) + for node_type in Node.TYPE_ALL + } + + @staticmethod + def new(name, image): + subnet = gen_subnet_prefix16() + cluster = Cluster(name, subnet, image) + os.makedirs(cluster.get_path(), exist_ok=True) + os.makedirs(get_status_path(name), exist_ok=True) + return cluster + + @staticmethod + def load(name): + if not name: + raise Exception("Failed to load cluster, name is empty") + path = get_cluster_path(name) + if not os.path.exists(path): + raise Exception( + "Failed to load cluster, its directory {} not exists.".format( + path)) + meta_path = Cluster._get_meta_file(name) + if not os.path.exists(meta_path): + raise Exception( + "Failed to load cluster, its meta file {} not exists.".format( + meta_path)) + with open(meta_path, "r") as f: + cluster = jsonpickle.loads(f.read()) + for group in cluster.groups.values(): + group.on_loaded() + return cluster + + @staticmethod + def _get_meta_file(name): + return os.path.join(get_cluster_path(name), "meta") + + def get_image(self): + return self.image + + # cluster's nodes will update image too if cluster update. + def set_image(self, image): + self.image = image + for _, group in self.groups.items(): + for _, node_meta in group.nodes.items(): + node_meta.image = image + + def get_path(self): + return get_cluster_path(self.name) + + def get_group(self, node_type): + group = self.groups.get(node_type, None) + if not group: + raise Exception("Unknown node_type: {}".format(node_type)) + return group + + def get_node(self, node_type, id): + group = self.get_group(node_type) + meta = group.get_node(id) + if not meta: + raise Exception("No found {} with id {}".format(node_type, id)) + return Node.new(self.name, node_type, id, self.subnet, meta) + + def get_all_nodes(self, node_type): + group = self.groups.get(node_type, None) + if not group: + raise Exception("Unknown node_type: {}".format(node_type)) + return [ + Node.new(self.name, node_type, id, self.subnet, meta) + for id, meta in group.get_all_nodes().items() + ] + + def get_all_nodes_num(self): + num = 0 + for group in self.groups.values(): + num += group.get_node_num() + return num + + def add(self, node_type, id=None): + id = self.get_group(node_type).add(id, self.image) + node = self.get_node(node_type, id) + node.init_dir() + return node + + def remove(self, node_type, id): + group = self.get_group(node_type) + group.remove(id) + + def save(self): + self._save_meta() + self._save_compose() + + def _save_meta(self): + with open(Cluster._get_meta_file(self.name), "w") as f: + f.write(jsonpickle.dumps(self, indent=2)) + + def _save_compose(self): + services = {} + for node_type in self.groups.keys(): + for node in self.get_all_nodes(node_type): + services[node.service_name()] = node.compose() + + compose = { + "version": "3", + "networks": { + utils.with_doris_prefix(self.name): { + "driver": "bridge", + "ipam": { + "config": [{ + "subnet": "{}.0.0/16".format(self.subnet), + }] + } + } + }, + "services": services, + } + + utils.write_compose_file(self.get_compose_file(), compose) + + def get_compose_file(self): + global get_compose_file + return get_compose_file(self.name) diff --git a/docker/runtime/doris-compose/command.py b/docker/runtime/doris-compose/command.py new file mode 100644 index 0000000000..2fa556d513 --- /dev/null +++ b/docker/runtime/doris-compose/command.py @@ -0,0 +1,543 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import argparse +import cluster as CLUSTER +import database +import utils +import os +import os.path +import prettytable +import shutil +import sys + +LOG = utils.get_logger() + + +# return for_all, related_nodes, related_node_num +def get_ids_related_nodes(cluster, fe_ids, be_ids, ignore_not_exists=False): + if fe_ids is None and be_ids is None: + return True, None, cluster.get_all_nodes_num() + + def get_ids_related_nodes_with_type(node_type, ids): + if ids is None: + return [] + if not ids: + return cluster.get_all_nodes(node_type) + else: + nodes = [] + for id in ids: + try: + nodes.append(cluster.get_node(node_type, id)) + except Exception as e: + if ignore_not_exists: + LOG.warning( + utils.render_yellow( + "Not found {} with id {}".format( + node_type, id))) + else: + raise e + return nodes + + nodes = get_ids_related_nodes_with_type( + CLUSTER.Node.TYPE_FE, fe_ids) + get_ids_related_nodes_with_type( + CLUSTER.Node.TYPE_BE, be_ids) + + related_node_num = len(nodes) + + return len(nodes) == cluster.get_all_nodes_num(), nodes, len(nodes) + + +class Command(object): + + def __init__(self, name): + self.name = name + + def add_parser(self, args_parsers): + raise Exception("No implemented") + + def run(self, args): + raise Exception("No implemented") + + def _add_parser_ids_args(self, parser): + group = parser.add_argument_group("for existing nodes", + "apply to the existing nodes.") + group.add_argument("--fe-id", nargs="*", type=int, help="Specify up fe ids, support multiple ids, " \ + "if specific --fe-id but not specific ids, apply to all fe.") + group.add_argument("--be-id", nargs="*", type=int, help="Specify up be ids, support multiple ids, " \ + "if specific --be but not specific ids, apply to all be.") + + def _get_parser_bool_action(self, is_store_true): + if sys.version_info.major == 3 and sys.version_info.minor >= 9: + return argparse.BooleanOptionalAction + else: + return "store_true" if is_store_true else "store_false" + + +class SimpleCommand(Command): + + def __init__(self, command, help): + super().__init__(command) + self.command = command + self.help = help + + def add_parser(self, args_parsers): + help = self.help + " If none of --fe-id, --be-id is specific, then apply to all containers." + parser = args_parsers.add_parser(self.command, help=help) + parser.add_argument("NAME", help="Specify cluster name.") + self._add_parser_ids_args(parser) + + def run(self, args): + cluster = CLUSTER.Cluster.load(args.NAME) + _, related_nodes, related_node_num = get_ids_related_nodes( + cluster, args.fe_id, args.be_id) + utils.exec_docker_compose_command(cluster.get_compose_file(), + self.command, + nodes=related_nodes) + show_cmd = self.command[0].upper() + self.command[1:] + LOG.info( + utils.render_green("{} succ, total related node num {}".format( + show_cmd, related_node_num))) + + +class UpCommand(Command): + + def add_parser(self, args_parsers): + parser = args_parsers.add_parser("up", help="Create and upgrade doris containers, "\ + "or add new containers. " \ + "If none of --add-fe-num, --add-be-num, --fe-id, --be-id is specific, " \ + "then apply to all containers.") + parser.add_argument("NAME", default="", help="Specific cluster name.") + parser.add_argument("IMAGE", + default="", + nargs="?", + help="Specify docker image.") + + group1 = parser.add_argument_group("add new nodes", + "add cluster nodes.") + group1.add_argument( + "--add-fe-num", + type=int, + help="Specify add fe num, default 3 for a new cluster.") + group1.add_argument( + "--add-be-num", + type=int, + help="Specify add be num, default 3 for a new cluster.") + + self._add_parser_ids_args(parser) + + group2 = parser.add_mutually_exclusive_group() + group2.add_argument( + "--no-start", + default=False, + action=self._get_parser_bool_action(True), + help="Not start containers, create or update config image only.") + group2.add_argument("--force-recreate", + default=False, + action=self._get_parser_bool_action(True), + help="Recreate containers even if their configuration" \ + "and image haven't changed. ") + + def run(self, args): + if not args.NAME: + raise Exception("Need specific not empty cluster name") + for_all = True + try: + cluster = CLUSTER.Cluster.load(args.NAME) + if args.fe_id != None or args.be_id != None or args.add_fe_num or args.add_be_num: + for_all = False + except: + # a new cluster + if not args.IMAGE: + raise Exception("New cluster must specific image") + if args.fe_id != None: + args.fe_id = None + LOG.warning( + utils.render_yellow("Ignore --fe-id for new cluster")) + if args.be_id != None: + args.be_id = None + LOG.warning( + utils.render_yellow("Ignore --be-id for new cluster")) + cluster = CLUSTER.Cluster.new(args.NAME, args.IMAGE) + LOG.info("Create new cluster {} succ, cluster path is {}".format( + args.NAME, cluster.get_path())) + if not args.add_fe_num: + args.add_fe_num = 3 + if not args.add_be_num: + args.add_be_num = 3 + + _, related_nodes, _ = get_ids_related_nodes(cluster, args.fe_id, + args.be_id) + if not related_nodes: + related_nodes = [] + if args.add_fe_num: + for i in range(args.add_fe_num): + related_nodes.append(cluster.add(CLUSTER.Node.TYPE_FE)) + if args.add_be_num: + for i in range(args.add_be_num): + related_nodes.append(cluster.add(CLUSTER.Node.TYPE_BE)) + if args.IMAGE: + for node in related_nodes: + node.set_image(args.IMAGE) + if for_all and args.IMAGE: + cluster.set_image(args.IMAGE) + cluster.save() + + options = [] + if args.no_start: + options.append("--no-start") + else: + options = ["-d", "--remove-orphans"] + if args.force_recreate: + options.append("--force-recreate") + + related_node_num = len(related_nodes) + if for_all: + related_node_num = cluster.get_all_nodes_num() + related_nodes = None + + utils.exec_docker_compose_command(cluster.get_compose_file(), "up", + options, related_nodes) + if args.no_start: + LOG.info( + utils.render_green( + "Not up cluster cause specific --no-start, related node num {}" + .format(related_node_num))) + else: + LOG.info( + utils.render_green( + "Up cluster {} succ, related node num {}".format( + args.NAME, related_node_num))) + + +class DownCommand(Command): + + def add_parser(self, args_parsers): + parser = args_parsers.add_parser("down", + help="Down doris containers, networks. "\ + "It will also remove node from DB. " \ + "If none of --fe-id, --be-id is specific, "\ + "then apply to all containers.") + parser.add_argument("NAME", help="Specify cluster name") + self._add_parser_ids_args(parser) + parser.add_argument( + "--clean", + default=False, + action=self._get_parser_bool_action(True), + help= + "Clean container related files, include expose data, config and logs" + ) + parser.add_argument( + "--drop-force", + default=None, + action=self._get_parser_bool_action(True), + help="Drop doris node force. For be, if specific --drop-force, "\ + "it will send dropp to fe, otherwise send decommission to fe.") + + def run(self, args): + cluster = CLUSTER.Cluster.load(args.NAME) + for_all, related_nodes, related_node_num = get_ids_related_nodes( + cluster, args.fe_id, args.be_id, ignore_not_exists=True) + + if for_all: + utils.exec_docker_compose_command(cluster.get_compose_file(), + "down", + ["-v", "--remove-orphans"]) + if args.clean: + utils.enable_dir_with_rw_perm(cluster.get_path()) + shutil.rmtree(cluster.get_path()) + LOG.info( + utils.render_yellow( + "Clean cluster data cause has specific --clean")) + else: + db_mgr = database.get_db_mgr(cluster.name) + + for node in related_nodes: + if node.is_fe(): + fe_endpoint = "{}:{}".format(node.get_ip(), + CLUSTER.FE_EDITLOG_PORT) + db_mgr.drop_fe(fe_endpoint) + elif node.is_be(): + be_endpoint = "{}:{}".format(node.get_ip(), + CLUSTER.BE_HEARTBEAT_PORT) + if args.drop_force: + db_mgr.drop_be(be_endpoint) + else: + db_mgr.decommission_be(be_endpoint) + else: + raise Exception("Unknown node type: {}".format( + node.node_type())) + + #utils.exec_docker_compose_command(cluster.get_compose_file(), + # "stop", + # nodes=[node]) + utils.exec_docker_compose_command(cluster.get_compose_file(), + "rm", ["-s", "-v", "-f"], + nodes=[node]) + if args.clean: + utils.enable_dir_with_rw_perm(node.get_path()) + shutil.rmtree(node.get_path()) + LOG.info( + utils.render_yellow( + "Clean {} with id {} data cause has specific --clean" + .format(node.node_type(), node.id))) + + cluster.remove(node.node_type(), node.id) + cluster.save() + + LOG.info( + utils.render_green( + "Down cluster {} succ, related node num {}".format( + args.NAME, related_node_num))) + + +class ListNode(object): + + def __init__(self): + self.node_type = "" + self.id = 0 + self.cluster_name = "" + self.ip = "" + self.status = "" + self.container_id = "" + self.image = "" + self.created = "" + self.alive = "" + self.is_master = "" + self.query_port = "" + self.tablet_num = "" + self.last_heartbeat = "" + self.err_msg = "" + + def info(self): + return (self.cluster_name, "{}-{}".format(self.node_type, self.id), + self.ip, self.status, self.container_id, self.image, + self.created, self.alive, self.is_master, self.query_port, + self.tablet_num, self.last_heartbeat, self.err_msg) + + def update_db_info(self, db_mgr): + if self.node_type == CLUSTER.Node.TYPE_FE: + fe = db_mgr.get_fe(self.id) + if fe: + self.alive = str(fe.alive).lower() + self.is_master = str(fe.is_master).lower() + self.query_port = fe.query_port + self.last_heartbeat = fe.last_heartbeat + self.err_msg = fe.err_msg + elif self.node_type == CLUSTER.Node.TYPE_BE: + be = db_mgr.get_be(self.id) + if be: + self.alive = str(be.alive).lower() + self.tablet_num = be.tablet_num + self.last_heartbeat = be.last_heartbeat + self.err_msg = be.err_msg + + +class ListCommand(Command): + + def add_parser(self, args_parsers): + parser = args_parsers.add_parser( + "ls", help="List running doris compose clusters.") + parser.add_argument( + "NAME", + nargs="*", + help= + "Specify multiple clusters, if specific, show all their containers." + ) + parser.add_argument( + "-a", + "--all", + default=False, + action=self._get_parser_bool_action(True), + help="Show all stopped and bad doris compose projects") + + def run(self, args): + COMPOSE_MISSING = "(missing)" + COMPOSE_BAD = "(bad)" + COMPOSE_GOOD = "" + + SERVICE_DEAD = "dead" + + class ComposeService(object): + + def __init__(self, name, ip, image): + self.name = name + self.ip = ip + self.image = image + + def parse_cluster_compose_file(cluster_name): + compose_file = CLUSTER.get_compose_file(cluster_name) + if not os.path.exists(compose_file): + return COMPOSE_MISSING, {} + try: + compose = utils.read_compose_file(compose_file) + if not compose: + return COMPOSE_BAD, {} + services = compose.get("services", {}) + if services is None: + return COMPOSE_BAD, {} + return COMPOSE_GOOD, { + service: + ComposeService( + service, + list(service_conf["networks"].values())[0] + ["ipv4_address"], service_conf["image"]) + for service, service_conf in services.items() + } + except: + return COMPOSE_BAD, {} + + clusters = {} + search_names = [] + if args.NAME: + search_names = args.NAME + elif os.path.exists(CLUSTER.LOCAL_DORIS_PATH): + search_names = os.listdir(CLUSTER.LOCAL_DORIS_PATH) + + for cluster_name in search_names: + status, services = parse_cluster_compose_file(cluster_name) + clusters[cluster_name] = {"status": status, "services": services} + + docker_clusters = utils.get_doris_containers(args.NAME) + for cluster_name, containers in docker_clusters.items(): + cluster_info = clusters.get(cluster_name, None) + if not cluster_info: + cluster_info = {"status": COMPOSE_MISSING, "services": {}} + clusters[cluster_name] = cluster_info + for container in containers: + #if container.status == "running" and cluster_info[ + # "status"] == COMPOSE_GOOD and ( + # container.name not in cluster_info["services"]): + # container.status = "orphans" + cluster_info["services"][container.name] = container + + TYPE_COMPOSESERVICE = type(ComposeService("", "", "")) + if not args.NAME: + headers = (utils.render_green(field) + for field in ("CLUSTER", "STATUS", "CONFIG FILES")) + table = prettytable.PrettyTable(headers) + for name in sorted(clusters.keys()): + cluster_info = clusters[name] + service_statuses = {} + for _, container in cluster_info["services"].items(): + status = SERVICE_DEAD if type( + container) == TYPE_COMPOSESERVICE else container.status + service_statuses[status] = service_statuses.get(status, + 0) + 1 + show_status = ",".join([ + "{}({})".format(status, count) + for status, count in service_statuses.items() + ]) + if not args.all and service_statuses.get("running", 0) == 0: + continue + compose_file = CLUSTER.get_compose_file(name) + table.add_row( + (name, show_status, "{}{}".format(compose_file, + cluster_info["status"]))) + print(table) + return + + headers = (utils.render_green(field) + for field in ("CLUSTER", "NAME", "IP", "STATUS", + "CONTAINER ID", "IMAGE", "CREATED", "alive", + "is_master", "query_port", "tablet_num", + "last_heartbeat", "err_msg")) + table = prettytable.PrettyTable(headers) + + for cluster_name in sorted(clusters.keys()): + fe_ids = {} + be_ids = {} + services = clusters[cluster_name]["services"] + db_mgr = database.get_db_mgr(cluster_name, False) + + nodes = [] + for service_name, container in services.items(): + _, node_type, id = utils.parse_service_name(container.name) + node = ListNode() + node.cluster_name = cluster_name + node.node_type = node_type + node.id = id + node.update_db_info(db_mgr) + nodes.append(node) + + if node_type == CLUSTER.Node.TYPE_FE: + fe_ids[id] = True + elif node_type == CLUSTER.Node.TYPE_BE: + be_ids[id] = True + + if type(container) == TYPE_COMPOSESERVICE: + node.ip = container.ip + node.image = container.image + node.status = SERVICE_DEAD + else: + node.created = container.attrs.get("Created", + "")[:19].replace( + "T", " ") + node.ip = list( + container.attrs["NetworkSettings"] + ["Networks"].values())[0]["IPAMConfig"]["IPv4Address"] + node.image = ",".join(container.image.tags) + node.container_id = container.short_id + node.status = container.status + + for id, fe in db_mgr.fe_states.items(): + if fe_ids.get(id, False): + continue + node = ListNode() + node.cluster_name = cluster_name + node.node_type = CLUSTER.Node.TYPE_FE + node.id = id + node.status = SERVICE_DEAD + node.update_db_info(db_mgr) + nodes.append(node) + for id, be in db_mgr.be_states.items(): + if be_ids.get(id, False): + continue + node = ListNode() + node.cluster_name = cluster_name + node.node_type = CLUSTER.Node.TYPE_BE + node.id = id + node.status = SERVICE_DEAD + node.update_db_info(db_mgr) + nodes.append(node) + + def get_key(node): + key = node.id + if node.node_type == CLUSTER.Node.TYPE_FE: + key += 0 * CLUSTER.ID_LIMIT + elif node.node_type == CLUSTER.Node.TYPE_BE: + key += 1 * CLUSTER.ID_LIMIT + else: + key += 2 * CLUSTER.ID_LIMIT + return key + + for node in sorted(nodes, key=get_key): + table.add_row(node.info()) + + print(table) + + +ALL_COMMANDS = [ + UpCommand("up"), + DownCommand("down"), + SimpleCommand("start", "Start the doris containers. "), + SimpleCommand("stop", "Stop the doris containers. "), + SimpleCommand("restart", "Restart the doris containers. "), + SimpleCommand("pause", "Pause the doris containers. "), + SimpleCommand("unpause", "Unpause the doris containers. "), + ListCommand("ls"), +] diff --git a/docker/runtime/doris-compose/database.py b/docker/runtime/doris-compose/database.py new file mode 100644 index 0000000000..21aa400a47 --- /dev/null +++ b/docker/runtime/doris-compose/database.py @@ -0,0 +1,239 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import cluster as CLUSTER +import os.path +import pymysql +import time +import utils + +LOG = utils.get_logger() + + +class FEState(object): + + def __init__(self, id, query_port, is_master, alive, last_heartbeat, + err_msg): + self.id = id + self.query_port = query_port + self.is_master = is_master + self.alive = alive + self.last_heartbeat = last_heartbeat + self.err_msg = err_msg + + +class BEState(object): + + def __init__(self, id, decommissioned, alive, tablet_num, last_heartbeat, + err_msg): + self.id = id + self.decommissioned = decommissioned + self.alive = alive + self.tablet_num = tablet_num + self.last_heartbeat = last_heartbeat + self.err_msg = err_msg + + +class DBManager(object): + + def __init__(self): + self.fe_states = {} + self.be_states = {} + self.query_port = -1 + self.conn = None + + def set_query_port(self, query_port): + self.query_port = query_port + + def get_fe(self, id): + return self.fe_states.get(id, None) + + def get_be(self, id): + return self.be_states.get(id, None) + + def load_states(self, query_ports): + self._load_fe_states(query_ports) + self._load_be_states() + + def drop_fe(self, fe_endpoint): + id = CLUSTER.Node.get_id_from_ip(fe_endpoint[:fe_endpoint.find(":")]) + try: + self._exec_query( + "ALTER SYSTEM DROP FOLLOWER '{}'".format(fe_endpoint)) + LOG.info("Drop fe {} with id {} from db succ.".format( + fe_endpoint, id)) + except Exception as e: + if str(e).find("frontend does not exist") >= 0: + LOG.info( + "Drop fe {} with id {} from db succ cause it does not exist in db." + .format(fe_endpoint, id)) + return + raise e + + def drop_be(self, be_endpoint): + id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")]) + try: + self._exec_query( + "ALTER SYSTEM DROPP BACKEND '{}'".format(be_endpoint)) + LOG.info("Drop be {} with id {} from db succ.".format( + be_endpoint, id)) + except Exception as e: + if str(e).find("backend does not exists") >= 0: + LOG.info( + "Drop be {} with id {} from db succ cause it does not exist in db." + .format(be_endpoint, id)) + return + raise e + + def decommission_be(self, be_endpoint): + old_tablet_num = 0 + id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")]) + if id not in self.be_states: + self._load_be_states() + if id in self.be_states: + be = self.be_states[id] + old_tablet_num = be.tablet_num + if not be.alive: + raise Exception("Decommission be {} with id {} fail " \ + "cause it's not alive, maybe you should specific --drop-force " \ + " to dropp it from db".format(be_endpoint, id)) + try: + self._exec_query( + "ALTER SYSTEM DECOMMISSION BACKEND '{}'".format(be_endpoint)) + LOG.info("Mark be {} with id {} as decommissioned, start migrate its tablets, " \ + "wait migrating job finish.".format(be_endpoint, id)) + except Exception as e: + if str(e).find("Backend does not exist") >= 0: + LOG.info("Decommission be {} with id {} from db succ " \ + "cause it does not exist in db.",format(be_endpoint, id)) + return + raise e + + while True: + self._load_be_states() + be = self.be_states.get(id, None) + if not be: + LOG.info("Decommission be {} succ, total migrate {} tablets, " \ + "has drop it from db.".format(be_endpoint, old_tablet_num)) + return + LOG.info( + "Decommission be {} status: alive {}, decommissioned {}. " \ + "It is migrating its tablets, left {}/{} tablets." + .format(be_endpoint, be.alive, be.decommissioned, be.tablet_num, old_tablet_num)) + + time.sleep(5) + + def _load_fe_states(self, query_ports): + fe_states = {} + alive_master_fe_port = None + for record in self._exec_query("show frontends;"): + ip = record[1] + is_master = record[7] == "true" + alive = record[10] == "true" + last_heartbeat = record[12] + err_msg = record[14] + id = CLUSTER.Node.get_id_from_ip(ip) + query_port = query_ports.get(id, None) + fe = FEState(id, query_port, is_master, alive, last_heartbeat, + err_msg) + fe_states[id] = fe + if is_master and alive and query_port: + alive_master_fe_port = query_port + self.fe_states = fe_states + if alive_master_fe_port and alive_master_fe_port != self.query_port: + self.query_port = alive_master_fe_port + self._reset_conn() + + def _load_be_states(self): + be_states = {} + for record in self._exec_query("show backends;"): + ip = record[1] + last_heartbeat = record[7] + alive = record[8] == "true" + decommissioned = record[9] == "true" + tablet_num = int(record[10]) + err_msg = record[18] + id = CLUSTER.Node.get_id_from_ip(ip) + be = BEState(id, decommissioned, alive, tablet_num, last_heartbeat, + err_msg) + be_states[id] = be + self.be_states = be_states + + def _exec_query(self, sql): + self._prepare_conn() + with self.conn.cursor() as cursor: + cursor.execute(sql) + return cursor.fetchall() + + def _prepare_conn(self): + if self.conn: + return + if self.query_port <= 0: + raise Exception("Not set query_port") + self._reset_conn() + + def _reset_conn(self): + self.conn = pymysql.connect(user="root", + host="127.0.0.1", + read_timeout = 10, + port=self.query_port) + + +def get_db_mgr(cluster_name, required_load_succ=True): + assert cluster_name + db_mgr = DBManager() + containers = utils.get_doris_containers(cluster_name).get( + cluster_name, None) + if not containers: + return db_mgr + alive_fe_ports = {} + for container in containers: + if utils.is_container_running(container): + _, node_type, id = utils.parse_service_name(container.name) + if node_type == CLUSTER.Node.TYPE_FE: + query_port = utils.get_map_ports(container).get( + CLUSTER.FE_QUERY_PORT, None) + if query_port: + alive_fe_ports[id] = query_port + if not alive_fe_ports: + return db_mgr + + master_fe_ip_file = os.path.join(CLUSTER.get_status_path(cluster_name), + "master_fe_ip") + query_port = None + if os.path.exists(master_fe_ip_file): + with open(master_fe_ip_file, "r") as f: + master_fe_ip = f.read() + if master_fe_ip: + master_id = CLUSTER.Node.get_id_from_ip(master_fe_ip) + query_port = alive_fe_ports.get(master_id, None) + if not query_port: + # A new cluster's master is fe-1 + if 1 in alive_fe_ports: + query_port = alive_fe_ports[1] + else: + query_port = list(alive_fe_ports.values())[0] + + db_mgr.set_query_port(query_port) + try: + db_mgr.load_states(alive_fe_ports) + except Exception as e: + if required_load_succ: + raise e + LOG.exception(e) + + return db_mgr diff --git a/docker/runtime/doris-compose/doris-compose.py b/docker/runtime/doris-compose/doris-compose.py new file mode 100644 index 0000000000..77ceb736e3 --- /dev/null +++ b/docker/runtime/doris-compose/doris-compose.py @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import argparse +import command +import utils + + +def parse_args(): + ap = argparse.ArgumentParser(description="") + args_parsers = ap.add_subparsers(dest="command") + for cmd in command.ALL_COMMANDS: + cmd.add_parser(args_parsers) + + return ap.parse_args(), ap.format_help() + + +def run(args, help): + timer = utils.Timer() + for cmd in command.ALL_COMMANDS: + if args.command == cmd.name: + return cmd.run(args) + timer.cancel() + print(help) + return -1 + + +def main(): + args, help = parse_args() + run(args, help) + + +if __name__ == '__main__': + main() diff --git a/docker/runtime/doris-compose/requirements.txt b/docker/runtime/doris-compose/requirements.txt new file mode 100644 index 0000000000..039260e6c7 --- /dev/null +++ b/docker/runtime/doris-compose/requirements.txt @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +docker +docker-compose +jsonpickle +prettytable +pymysql diff --git a/docker/runtime/doris-compose/resource/common.sh b/docker/runtime/doris-compose/resource/common.sh new file mode 100644 index 0000000000..044e26487d --- /dev/null +++ b/docker/runtime/doris-compose/resource/common.sh @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +export MASTER_FE_IP="" +export MASTER_FE_IP_FILE=$DORIS_HOME/status/master_fe_ip + +health_log() { + date >> "$DORIS_HOME/log/health.out" + echo "$@" >> "$DORIS_HOME/log/health.out" +} + +read_master_fe_ip() { + MASTER_FE_IP=`cat $MASTER_FE_IP_FILE` + if [ $? -eq 0 ]; then + health_log "master fe ${MASTER_FE_IP} has ready." + return 0 + else + health_log "master fe has not ready." + return 1 + fi +} + diff --git a/docker/runtime/doris-compose/resource/init_be.sh b/docker/runtime/doris-compose/resource/init_be.sh new file mode 100644 index 0000000000..9a59876382 --- /dev/null +++ b/docker/runtime/doris-compose/resource/init_be.sh @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +DIR=$(cd $(dirname $0);pwd) + +source $DIR/common.sh + +REGISTER_FILE=$DORIS_HOME/status/$MY_IP-register + +add_backend() { + while true; do + read_master_fe_ip + if [ $? -ne 0 ]; then + sleep 1 + continue + fi + + output=`mysql -P $FE_QUERY_PORT -h $MASTER_FE_IP -u root --execute "ALTER SYSTEM ADD BACKEND '$MY_IP:$BE_HEARTBEAT_PORT';" 2>&1` + res=$? + health_log "$output" + [ $res -eq 0 ] && break + (echo $output | grep "Same backend already exists") && break + sleep 1 + done + + touch $REGISTER_FILE +} + +main() { + if [ ! -f $REGISTER_FILE ]; then + add_backend + fi + bash $DORIS_HOME/bin/start_be.sh +} + +main diff --git a/docker/runtime/doris-compose/resource/init_fe.sh b/docker/runtime/doris-compose/resource/init_fe.sh new file mode 100644 index 0000000000..fbbd335f37 --- /dev/null +++ b/docker/runtime/doris-compose/resource/init_fe.sh @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +DIR=$(cd $(dirname $0);pwd) + +source $DIR/common.sh + +add_frontend() { + while true; do + read_master_fe_ip + if [ $? -ne 0 ]; then + sleep 1 + continue + fi + + output=`mysql -P $FE_QUERY_PORT -h $MASTER_FE_IP -u root --execute "ALTER SYSTEM ADD FOLLOWER '$MY_IP:$FE_EDITLOG_PORT';" 2>&1` + res=$? + health_log "$output" + [ $res -eq 0 ] && break + (echo $output | grep "frontend already exists") && break + sleep 1 + done +} + +fe_daemon() { + set +e + while true; do + output=`mysql -P $FE_QUERY_PORT -h $MY_IP -u root --execute "SHOW FRONTENDS;" | grep -w $MY_IP | awk '{print $8}' 2>&1` + if [ $? -ne 0 ]; then + health_log "$output" + else + echo $output | grep true + if [ $? -eq 0 ]; then + echo $MY_IP > $MASTER_FE_IP_FILE + if [ "$MASTER_FE_IP" != "$MY_IP" ]; then + health_log "change to master, last master is $MASTER_FE_IP" + MASTER_FE_IP=$MY_IP + fi + fi + fi + sleep 3 + done +} + +main() { + if [ "$MY_ID" = "1" -o -d "${DORIS_HOME}/doris-meta/image" ]; then + fe_daemon & + bash $DORIS_HOME/bin/start_fe.sh + else + add_frontend + fe_daemon & + $DORIS_HOME/bin/start_fe.sh --helper $MASTER_FE_IP:$FE_EDITLOG_PORT + fi +} + +main diff --git a/docker/runtime/doris-compose/utils.py b/docker/runtime/doris-compose/utils.py new file mode 100644 index 0000000000..7317715fe9 --- /dev/null +++ b/docker/runtime/doris-compose/utils.py @@ -0,0 +1,253 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import docker +import logging +import os +import subprocess +import time +import yaml + +DORIS_PREFIX = "doris-" + + +class Timer(object): + + def __init__(self): + self.start = time.time() + self.canceled = False + + def __del__(self): + if not self.canceled: + LOG.info("=== Total run time: {} s".format( + int(time.time() - self.start))) + + def cancel(self): + self.canceled = True + + +def get_logger(name=None): + logger = logging.getLogger(name) + if not logger.hasHandlers(): + formatter = logging.Formatter( + '%(asctime)s - %(filename)s - %(lineno)dL - %(levelname)s - %(message)s' + ) + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + ch.setFormatter(formatter) + logger.addHandler(ch) + logger.setLevel(logging.INFO) + + return logger + + +LOG = get_logger() + + +def render_red(s): + return "\x1B[31m" + str(s) + "\x1B[0m" + + +def render_green(s): + return "\x1B[32m" + str(s) + "\x1B[0m" + + +def render_yellow(s): + return "\x1B[33m" + str(s) + "\x1B[0m" + + +def render_blue(s): + return "\x1B[34m" + str(s) + "\x1B[0m" + + +def with_doris_prefix(name): + return DORIS_PREFIX + name + + +def parse_service_name(service_name): + import cluster + if not service_name or not service_name.startswith(DORIS_PREFIX): + return None, None, None + pos2 = service_name.rfind("-") + if pos2 < 0: + return None, None, None + id = None + try: + id = int(service_name[pos2 + 1:]) + except: + return None, None, None + pos1 = service_name.rfind("-", len(DORIS_PREFIX), pos2 - 1) + if pos1 < 0: + return None, None, None + node_type = service_name[pos1 + 1:pos2] + if node_type not in cluster.Node.TYPE_ALL: + return None, None, None + return service_name[len(DORIS_PREFIX):pos1], node_type, id + + +def get_map_ports(container): + return { + int(innner.replace("/tcp", "")): int(outer[0]["HostPort"]) + for innner, outer in container.attrs.get("NetworkSettings", {}).get( + "Ports", {}).items() + } + + +def is_container_running(container): + return container.status == "running" + + +# return all doris containers when cluster_names is empty +def get_doris_containers(cluster_names): + if cluster_names: + if type(cluster_names) == type(""): + filter_names = "{}{}-*".format(DORIS_PREFIX, cluster_names) + else: + filter_names = "|".join([ + "{}{}-*".format(DORIS_PREFIX, name) for name in cluster_names + ]) + else: + filter_names = "{}*".format(DORIS_PREFIX) + + clusters = {} + client = docker.client.from_env() + containers = client.containers.list(filters={"name": filter_names}) + for container in containers: + cluster_name, _, _ = parse_service_name(container.name) + if not cluster_name: + continue + if cluster_names and cluster_name not in cluster_names: + continue + if cluster_name not in clusters: + clusters[cluster_name] = [] + clusters[cluster_name].append(container) + return clusters + + +def get_doris_running_containers(cluster_name): + return { + container.name: container + for container in get_doris_containers(cluster_name).get( + cluster_name, []) if is_container_running(container) + } + + +def is_dir_empty(dir): + return False if os.listdir(dir) else True + + +def exec_shell_command(command, ignore_errors=False): + LOG.info("Exec command: {}".format(command)) + p = subprocess.Popen(command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + out = p.communicate()[0].decode('utf-8') + if not ignore_errors: + assert p.returncode == 0, out + if out: + print(out) + return p.returncode, out + + +def exec_docker_compose_command(compose_file, + command, + options=None, + nodes=None, + user_command=None): + if nodes != None and not nodes: + return 0, "Skip" + + compose_cmd = "docker-compose -f {} {} {} {} {}".format( + compose_file, command, " ".join(options) if options else "", + " ".join([node.service_name() for node in nodes]) if nodes else "", + user_command if user_command else "") + + return exec_shell_command(compose_cmd) + + +def get_docker_subnets_prefix16(): + subnet_prefixes = {} + client = docker.from_env() + for network in client.networks.list(): + if not network.attrs: + continue + ipam = network.attrs.get("IPAM", None) + if not ipam: + continue + configs = ipam.get("Config", None) + if not configs: + continue + for config in configs: + subnet = config.get("Subnet", None) + if not subnet: + continue + pos1 = subnet.find(".") + if pos1 <= 0: + continue + pos2 = subnet.find(".", pos1 + 1) + if pos2 <= 0: + continue + num1 = subnet[0:pos1] + num2 = subnet[pos1 + 1:pos2] + network_part_len = 16 + pos = subnet.find("/") + if pos != -1: + network_part_len = int(subnet[pos + 1:]) + if network_part_len < 16: + for i in range(256): + subnet_prefixes["{}.{}".format(num1, i)] = True + else: + subnet_prefixes["{}.{}".format(num1, num2)] = True + + LOG.debug("Get docker subnet prefixes: {}".format(subnet_prefixes)) + + return subnet_prefixes + + +def copy_image_directory(image, image_dir, local_dir): + client = docker.from_env() + volumes = ["{}:/opt/mount".format(local_dir)] + if image_dir.endswith("/"): + image_dir += "." + elif not image_dir.endswith("."): + image_dir += "/." + client.containers.run( + image, + remove=True, + volumes=volumes, + entrypoint="cp -r {} /opt/mount/".format(image_dir)) + + +def enable_dir_with_rw_perm(dir): + if not os.path.exists(dir): + return + client = docker.client.from_env() + client.containers.run("ubuntu", + remove=True, + volumes=["{}:/opt/mount".format(dir)], + entrypoint="chmod a+rw -R {}".format("/opt/mount")) + + +def read_compose_file(file): + with open(file, "r") as f: + return yaml.safe_load(f.read()) + + +def write_compose_file(file, compose): + with open(file, "w") as f: + f.write(yaml.dump(compose)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org