This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 37118d8359d branch-3.0: [opt](doris compose) pick lastest code from master and fix some unstable regression test (#50747) 37118d8359d is described below commit 37118d8359d2982e62d1ab78fc180094fa20105a Author: yujun <yu...@selectdb.com> AuthorDate: Wed May 14 15:14:41 2025 +0800 branch-3.0: [opt](doris compose) pick lastest code from master and fix some unstable regression test (#50747) --- docker/runtime/doris-compose/Dockerfile | 68 ++- docker/runtime/doris-compose/Readme.md | 82 ++-- docker/runtime/doris-compose/cluster.py | 341 ++++++++----- docker/runtime/doris-compose/command.py | 527 ++++++++++++++------- docker/runtime/doris-compose/database.py | 134 ++++-- docker/runtime/doris-compose/format-code.sh | 4 + docker/runtime/doris-compose/requirements.txt | 5 +- docker/runtime/doris-compose/resource/common.sh | 20 +- docker/runtime/doris-compose/resource/init_be.sh | 13 +- .../runtime/doris-compose/resource/init_cloud.sh | 16 +- docker/runtime/doris-compose/resource/init_fe.sh | 46 +- docker/runtime/doris-compose/utils.py | 24 + .../doris/regression/suite/SuiteCluster.groovy | 13 +- .../suites/clone_p0/test_decommission_mtmv.groovy | 2 +- .../suites/demo_p0/docker_action.groovy | 7 +- .../test_min_load_replica_num_complicate.groovy | 2 +- 16 files changed, 888 insertions(+), 416 deletions(-) diff --git a/docker/runtime/doris-compose/Dockerfile b/docker/runtime/doris-compose/Dockerfile index 501574e372e..04b86a30ed1 100644 --- a/docker/runtime/doris-compose/Dockerfile +++ b/docker/runtime/doris-compose/Dockerfile @@ -25,37 +25,59 @@ ARG JDK_IMAGE=openjdk:17-jdk-slim #ARG JDK_IMAGE=openjdk:8u342-jdk +# user can download a doris release package, extract it, then build its image used arg `OUTPUT_PATH` +# for example: +# +# ``` +# cd ~/tmp +# wget https://apache-doris-releases.oss-accelerate.aliyuncs.com/apache-doris-3.0.5-bin-x64.tar.gz +# tar xvf apache-doris-3.0.5-bin-x64.tar.gz # after extract, there will be a directory ./apache-doris-3.0.5-bin-x64/{fe, be, ms} +# +# docker build \ +# --build-arg OUTPUT_PATH=./apache-doris-3.0.5-bin-x64 \ +# -f ~/workspace/doris/docker/runtime/doris-compose/Dockerfile \ +# -t my-doris:v3.0.5 \ +# . +# ``` +ARG OUTPUT_PATH=./output + #### END ARG #### FROM ${JDK_IMAGE} # set environment variables -ENV JACOCO_VERSION 0.8.8 +ENV JACOCO_VERSION=0.8.8 -RUN mkdir -p /opt/apache-doris/coverage +RUN sed -i s@/deb.debian.org/@/mirrors.aliyun.com/@g /etc/apt/sources.list \ + && apt-get clean \ + && apt-get update \ + && apt-get install -y --no-install-recommends \ + default-mysql-client \ + python \ + lsof \ + tzdata \ + curl \ + unzip \ + patchelf \ + jq \ + procps \ + util-linux \ + gosu \ + && ln -fs /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \ + && dpkg-reconfigure -f noninteractive tzdata \ + && apt-get clean -RUN sed -i s@/deb.debian.org/@/mirrors.aliyun.com/@g /etc/apt/sources.list -RUN apt-get clean +RUN mkdir -p /opt/apache-doris/{fdb,coverage} -RUN apt-get update && \ - apt-get install -y default-mysql-client python lsof tzdata curl unzip patchelf jq procps util-linux gosu && \ - ln -fs /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \ - dpkg-reconfigure -f noninteractive tzdata && \ - apt-get clean +RUN curl -f https://repo1.maven.org/maven2/org/jacoco/jacoco/${JACOCO_VERSION}/jacoco-${JACOCO_VERSION}.zip -o jacoco.zip \ + && mkdir /jacoco \ + && unzip jacoco.zip -d /jacoco -RUN curl -f https://repo1.maven.org/maven2/org/jacoco/jacoco/${JACOCO_VERSION}/jacoco-${JACOCO_VERSION}.zip -o jacoco.zip && \ - mkdir /jacoco && \ - unzip jacoco.zip -d /jacoco - -# cloud -COPY --chmod=777 README.md cloud/output* output/ms* /opt/apache-doris/cloud/ -RUN mkdir /opt/apache-doris/fdb -RUN if [ -d /opt/apache-doris/cloud/bin ]; then \ - sed -i 's/\<chmod\>/echo/g' /opt/apache-doris/cloud/bin/start.sh ; \ - fi - -# fe and be -COPY --chmod=777 output /opt/apache-doris/ +# COPY need refine ARG after FROM +ARG OUTPUT_PATH +COPY --chmod=777 ${OUTPUT_PATH} /opt/apache-doris/ # in docker, run 'chmod 755 doris_be' first time cost 1min, remove it. RUN sed -i 's/\<chmod\>/echo/g' /opt/apache-doris/be/bin/start_be.sh - +RUN if [ -d /opt/apache-doris/ms/bin ]; then \ + sed -i 's/\<chmod\>/echo/g' /opt/apache-doris/ms/bin/start.sh ; \ + fi \ No newline at end of file diff --git a/docker/runtime/doris-compose/Readme.md b/docker/runtime/doris-compose/Readme.md index fea13ab1821..98b703be59e 100644 --- a/docker/runtime/doris-compose/Readme.md +++ b/docker/runtime/doris-compose/Readme.md @@ -23,10 +23,11 @@ Use doris compose to create doris docker compose clusters. ## Requirements -##### 1. Make sure you have docker permissions +### 1. Make sure you have docker permissions run: -``` + +```shell docker run hello-world ``` @@ -34,50 +35,60 @@ if have problem with permission denied, then [add-docker-permission](https://doc Make sure BuildKit configured in the machine. if not follow [docker-with-BuildKit](https://docs.docker.com/build/buildkit/). -##### 2. The doris image should contains +### 2. The doris image should contains -``` -/opt/apache-doris/{fe, be, cloud} +```shell +/opt/apache-doris/{fe, be, ms} ``` -if don't create cloud cluster, the image no need to contains the cloud pkg. +If don't create cloud cluster, the image no need to contains the ms pkg. +If build doris use `sh build.sh --fe --be --cloud` **without do any change on their origin conf or shells**, then its `output/` satisfy with all above, then run command in doris root directory will generate such a image. If you want to pack a product that is not the `output/` directory, you can modify `Dockerfile` by yourself. -if build doris use `sh build.sh --fe --be --cloud`, then its output satisfy with all above, then run command in doris root directory - will generate such a image. - -``` +```shell docker build -f docker/runtime/doris-compose/Dockerfile -t <image> . ``` -##### 3. Install the dependent python library in 'docker/runtime/doris-compose/requirements.txt' +The `<image>` is the name you want the docker image to have. -``` +### 3. Install the dependent python library in 'docker/runtime/doris-compose/requirements.txt' + +`PyYAML` of certain version not always fit other libraries' requirements. So we suggest to use a individual environment using `venv` or `conda`. + +```shell python -m pip install --user -r docker/runtime/doris-compose/requirements.txt ``` +if it failed, change content of `requirements.txt` to: + +```Dockerfile +pyyaml==5.3.1 +docker==6.1.3 +...... +``` + ## Usage ### Notice -Each cluster will have a directory in '/tmp/doris/{cluster-name}', user can set env LOCAL_DORIS_PATH to change its directory. +Each cluster will have a directory in '/tmp/doris/{cluster-name}', user can set env `LOCAL_DORIS_PATH` to change its directory. -For example, if user export LOCAL_DORIS_PATH=/mydoris, then the cluster's directory is '/mydoris/{cluster-name}'. +For example, if user export `LOCAL_DORIS_PATH=/mydoris`, then the cluster's directory is '/mydoris/{cluster-name}'. -And cluster's directory will contains all its containers's logs and data, like fe-1, fe-2, be-1, ..., etc. +And cluster's directory will contains all its containers's logs and data, like `fe-1`, `fe-2`, `be-1`, ..., etc. -If there are multiple users run doris-compose on the same machine, suggest don't change LOCAL_DORIS_PATH or they should export the same LOCAL_DORIS_PATH. +If there are multiple users run doris-compose on the same machine, suggest don't change `LOCAL_DORIS_PATH` or they should export the same `LOCAL_DORIS_PATH`. Because when create a new cluster, doris-compose will search the local doris path, and choose a docker network which is different with this path's clusters. -So if multiple users use different LOCAL_DORIS_PATH, their clusters may have docker network conflict!!! +So if multiple users use different `LOCAL_DORIS_PATH`, their clusters may have docker network conflict!!! ### Create a cluster or recreate its containers -``` +```shell python docker/runtime/doris-compose/doris-compose.py up <cluster-name> <image?> --add-fe-num <add-fe-num> --add-be-num <add-be-num> - --fe-id <fd-id> --be-id <be-id> + [--fe-id <fd-id> --be-id <be-id>] ... [ --cloud ] ``` @@ -86,18 +97,17 @@ if it's a new cluster, must specific the image. add fe/be nodes with the specific image, or update existing nodes with `--fe-id`, `--be-id` - For create a cloud cluster, steps are as below: 1. Write cloud s3 store config file, its default path is '/tmp/doris/cloud.ini'. - It's defined in environment variable DORIS_CLOUD_CFG_FILE, user can change this env var to change its path. + It's defined in environment variable `DORIS_CLOUD_CFG_FILE`, user can change this env var to change its path. A Example file is locate in 'docker/runtime/doris-compose/resource/cloud.ini.example'. -2. Use doris compose up command with option '--cloud' to create a new cloud cluster. +2. Use doris compose up command with option `--cloud` to create a new cloud cluster. The simplest way to create a cloud cluster: -``` +```shell python docker/runtime/doris-compose/doris-compose.py up <cluster-name> <image> --cloud ``` @@ -105,7 +115,7 @@ It will create 1 fdb, 1 meta service server, 1 recycler, 3 fe and 3 be. ### Remove node from the cluster -``` +```shell python docker/runtime/doris-compose/doris-compose.py down <cluster-name> --fe-id <fe-id> --be-id<be-id> [--clean] [--drop-force] ``` @@ -115,18 +125,16 @@ For BE, if specific drop force, it will send dropp sql to FE, otherwise it will If specific `--clean`, it will delete its data too. - ### Start, stop, restart specific nodes - -``` +```shell python docker/runtime/doris-compose/doris-compose.py start <cluster-name> --fe-id <multiple fe ids> --be-id <multiple be ids> python docker/runtime/doris-compose/doris-compose.py restart <cluster-name> --fe-id <multiple fe ids> --be-id <multiple be ids> ``` ### List doris cluster -``` +```shell python docker/runtime/doris-compose/doris-compose.py ls <multiple cluster names> ``` @@ -134,15 +142,15 @@ if specific cluster names, it will list all the cluster's nodes. Otherwise it will just list summary of each clusters. -There are more options about doris-compose. Just try +There are more options about doris-compose. Just try -``` -python docker/runtime/doris-compose/doris-compose.py <command> -h +```shell +python docker/runtime/doris-compose/doris-compose.py <command> -h ``` ### Generate regression custom conf file -``` +```shell python docker/runtime/doris-compose/doris-compose.py config <cluster-name> <doris-root-path> [-q] [--connect-follow-fe] ``` @@ -158,13 +166,13 @@ steps: ## Problem investigation -#### Log +### Log -Each cluster has logs in /tmp/doris/{cluster-name}/{node-xxx}/log. For each node, doris compose will also print log in /tmp/doris/{cluster-name}/{node-xxx}/log/health.out +Each cluster has logs in Docker in '/tmp/doris/{cluster-name}/{node-xxx}/log/'. For each node, doris compose will also print log in '/tmp/doris/{cluster-name}/{node-xxx}/log/health.out' -#### Up cluster using non-detach mode +### Up cluster using non-detach mode -``` +```shell python docker/runtime/doris-compose/doris-compose.py up ... -no-detach ``` @@ -172,6 +180,6 @@ python docker/runtime/doris-compose/doris-compose.py up ... -no-detach Before submitting code, pls format code. -``` +```shell bash format-code.sh ``` diff --git a/docker/runtime/doris-compose/cluster.py b/docker/runtime/doris-compose/cluster.py index f4522181d4b..552b9c038c9 100644 --- a/docker/runtime/doris-compose/cluster.py +++ b/docker/runtime/doris-compose/cluster.py @@ -42,11 +42,13 @@ FE_RPC_PORT = 9020 FE_QUERY_PORT = 9030 FE_EDITLOG_PORT = 9010 FE_JAVA_DBG_PORT = 5005 +FE_ARROW_FLIGHT_SQL_PORT = 8070 BE_PORT = 9060 BE_WEBSVR_PORT = 8040 BE_HEARTBEAT_PORT = 9050 BE_BRPC_PORT = 8060 +BE_ARROW_FLIGHT_SQL_PORT = 8050 FDB_PORT = 4500 @@ -82,6 +84,10 @@ def get_status_path(cluster_name): return os.path.join(get_cluster_path(cluster_name), "status") +def get_master_fe_addr_path(cluster_name): + return get_status_path(cluster_name) + "/master_fe_query_addr" + + def get_all_cluster_names(): if not os.path.exists(LOCAL_DORIS_PATH): return [] @@ -138,19 +144,30 @@ def gen_subnet_prefix16(): raise Exception("Failed to gen subnet") -def get_master_fe_endpoint(cluster_name): - master_fe_ip_file = get_cluster_path(cluster_name) + "/status/master_fe_ip" - max_retries = 10 - for attempt in range(max_retries): - if os.path.exists(master_fe_ip_file): - with open(master_fe_ip_file, "r") as f: - return "{}:{}".format(f.read().strip(), FE_QUERY_PORT) - time.sleep(1) +def get_master_fe_endpoint(cluster_name, wait_master_fe_query_addr_file=False): + cluster_path = get_cluster_path(cluster_name) + if os.path.exists(cluster_path): + master_fe_query_addr_file = get_master_fe_addr_path(cluster_name) + max_retries = 10 if wait_master_fe_query_addr_file else 0 + i = 0 + while True: + if os.path.exists(master_fe_query_addr_file): + with open(master_fe_query_addr_file, "r") as f: + return f.read().strip() + i += 1 + if i < max_retries: + time.sleep(1) + else: + break try: cluster = Cluster.load(cluster_name) LOG.info("master file not exist, master ip get from node 1") - return "{}:{}".format( - cluster.get_node(Node.TYPE_FE, 1).get_ip(), FE_QUERY_PORT) + if cluster.is_host_network(): + return cluster.remote_master_fe + else: + master_fe = cluster.get_node(Node.TYPE_FE, 1) + return "{}:{}".format(master_fe.get_ip(), + master_fe.meta["ports"]["query_port"]) except: return "" @@ -173,12 +190,6 @@ def get_node_seq(node_type, id): return seq -class NodeMeta(object): - - def __init__(self, image): - self.image = image - - class Group(object): def __init__(self, node_type): @@ -187,7 +198,7 @@ class Group(object): self.next_id = 1 def add(self, id, node_meta): - assert node_meta.image + assert node_meta["image"] if not id: id = self.next_id self.next_id += 1 @@ -222,6 +233,15 @@ class Group(object): self.nodes = nodes +class NodeNetInfo(object): + + def __init__(self, type, id, ip, ports): + self.type = type + self.id = id + self.ip = ip + self.ports = ports + + class Node(object): TYPE_FE = "fe" TYPE_BE = "be" @@ -250,9 +270,19 @@ class Node(object): else: raise Exception("Unknown node type {}".format(node_type)) + # only run once at create the node, later restart or upgrade image will not run def init(self): + self.init_ports() self.init_conf() + def init_ports(self): + if self.cluster.is_host_network(): + self.meta["ports"] = dict( + (port_name, utils.get_avail_port()) + for port_name in self.get_default_named_ports().keys()) + else: + self.meta["ports"] = self.get_default_named_ports() + def init_conf(self): path = self.get_path() os.makedirs(path, exist_ok=True) @@ -303,15 +333,28 @@ class Node(object): return get_node_path(self.cluster.name, self.node_type(), self.id) def get_image(self): - return self.meta.image + return self.meta["image"] def set_image(self, image): - self.meta.image = image + self.meta["image"] = image def get_ip(self): - seq = get_node_seq(self.node_type(), self.id) - return "{}.{}.{}".format(self.cluster.subnet, int(seq / IP_PART4_SIZE), - seq % IP_PART4_SIZE) + if self.cluster.is_host_network(): + # this is a remote node + if self.meta.get("is_remote", False): + return self.cluster.remote_master_fe.split(":")[0] + else: + return self.cluster.local_network_ip + else: + seq = get_node_seq(self.node_type(), self.id) + return "{}.{}.{}".format(self.cluster.subnet, + int(seq / IP_PART4_SIZE), + seq % IP_PART4_SIZE) + + def get_default_named_ports(self): + # port_name : default_port + # the port_name come from fe.conf, be.conf, cloud.conf, etc + return {} @staticmethod def get_id_from_ip(ip): @@ -336,9 +379,6 @@ class Node(object): "MY_IP": self.get_ip(), "MY_ID": self.id, "MY_TYPE": self.node_type(), - "FE_QUERY_PORT": FE_QUERY_PORT, - "FE_EDITLOG_PORT": FE_EDITLOG_PORT, - "BE_HEARTBEAT_PORT": BE_HEARTBEAT_PORT, "DORIS_HOME": os.path.join(self.docker_home_dir()), "STOP_GRACE": 1 if enable_coverage else 0, "IS_CLOUD": 1 if self.cluster.is_cloud else 0, @@ -349,7 +389,7 @@ class Node(object): envs["META_SERVICE_ENDPOINT"] = self.cluster.get_meta_server_addr() # run as host user - if not getattr(self.cluster, 'is_root_user', True): + if not self.cluster.is_root_user: envs["HOST_USER"] = getpass.getuser() envs["HOST_UID"] = os.getuid() envs["HOST_GID"] = os.getgid() @@ -378,10 +418,17 @@ class Node(object): return None def get_add_init_config(self): - return [] + cfg = [] + if self.cluster.is_host_network(): + cfg.append(f"priority_networks = {self.cluster.local_network_ip}") + cfg += [ + f"{port_name} = {port}" + for port_name, port in self.meta["ports"].items() + ] + return cfg def docker_ports(self): - raise Exception("No implemented") + return list(self.get_default_named_ports().values()) def docker_home_dir(self): raise Exception("No implemented") @@ -405,24 +452,11 @@ class Node(object): volumes.append("{}:{}/coverage".format(self.cluster.coverage_dir, DOCKER_DORIS_PATH)) - extra_hosts = [ - "{}:{}".format(node.get_name(), node.get_ip()) - for node in self.cluster.get_all_nodes() - ] - content = { "cap_add": ["SYS_PTRACE"], - "hostname": self.get_name(), "container_name": self.service_name(), "environment": self.docker_env(), "image": self.get_image(), - "networks": { - utils.with_doris_prefix(self.cluster.name): { - "ipv4_address": self.get_ip(), - } - }, - "extra_hosts": extra_hosts, - "ports": self.docker_ports(), "ulimits": { "core": -1 }, @@ -430,6 +464,21 @@ class Node(object): "volumes": volumes, } + if self.cluster.is_host_network(): + content["network_mode"] = "host" + else: + content["hostname"] = self.get_name() + content["networks"] = { + utils.with_doris_prefix(self.cluster.name): { + "ipv4_address": self.get_ip(), + } + } + content["extra_hosts"] = [ + "{}:{}".format(node.get_name(), node.get_ip()) + for node in self.cluster.get_all_nodes() + ] + content["ports"] = self.docker_ports() + if self.entrypoint(): content["entrypoint"] = self.entrypoint() @@ -439,21 +488,19 @@ class Node(object): class FE(Node): def init(self): + # for cloud mode, fe is follower or observer, default is observer + self.meta[ + "is_cloud_follower"] = self.cluster.is_cloud and self.cluster.fe_follower super().init() - self.init_is_follower() def get_add_init_config(self): - cfg = [] + cfg = super().get_add_init_config() if self.cluster.fe_config: cfg += self.cluster.fe_config if self.cluster.is_cloud: cfg += [ "meta_service_endpoint = {}".format( self.cluster.get_meta_server_addr()), - "", - "# For regression-test", - "ignore_unsupported_properties_in_cloud_mode = true", - "merge_on_write_forced_to_false = true", "deploy_mode = cloud", ] @@ -468,46 +515,54 @@ class FE(Node): with open("{}/conf/{}".format(self.get_path(), self.conf_file_name()), "r") as f: + java_debug_port = self.meta["ports"]["java_debug_port"] parser = configparser.ConfigParser() parser.read_string('[dummy_section]\n' + f.read()) + section = parser['dummy_section'] for key in ("JAVA_OPTS", "JAVA_OPTS_FOR_JDK_17"): - value = parser["dummy_section"].get(key) + value = section.get(key) if value: value = value.strip().strip('"') - cfg.append( - f"{key} = \"{value} -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:{FE_JAVA_DBG_PORT}\"" - ) + if key == "JAVA_OPTS": + # java 8 + cfg.append( + f"{key} = \"{value} -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address={java_debug_port}\"" + ) + else: + # JAVA_OPTS_FOR_JDK_17 + # >= java 9 + cfg.append( + f"{key} = \"{value} -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:{java_debug_port}\"" + ) return cfg - def init_is_follower(self): - if self.cluster.is_cloud and self.cluster.fe_follower: - with open(self._is_follower_path(), "w") as f: - f.write("true") - - def _is_follower_path(self): - return "{}/conf/is_follower".format(self.get_path()) - def docker_env(self): envs = super().docker_env() if self.cluster.is_cloud: envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id() - if os.path.exists(self._is_follower_path()): - envs["IS_FE_FOLLOWER"] = 1 + if self.meta["is_cloud_follower"]: + envs["is_fe_follower"] = 1 + envs["MY_QUERY_PORT"] = self.meta["ports"]["query_port"] + envs["MY_EDITLOG_PORT"] = self.meta["ports"]["edit_log_port"] return envs + def get_default_named_ports(self): + return { + "http_port": FE_HTTP_PORT, + "rpc_port": FE_RPC_PORT, + "query_port": FE_QUERY_PORT, + "edit_log_port": FE_EDITLOG_PORT, + "arrow_flight_sql_port": FE_ARROW_FLIGHT_SQL_PORT, + "java_debug_port": FE_JAVA_DBG_PORT, + } + def cloud_unique_id(self): return "sql_server_{}".format(self.id) def start_script(self): return ["init_fe.sh"] - def docker_ports(self): - return [ - FE_HTTP_PORT, FE_EDITLOG_PORT, FE_RPC_PORT, FE_QUERY_PORT, - FE_JAVA_DBG_PORT - ] - def docker_home_dir(self): return os.path.join(DOCKER_DORIS_PATH, "fe") @@ -523,11 +578,11 @@ class BE(Node): def init(self): super().init() if self.cluster.is_cloud: - self.init_cluster_name() + self.meta["cluster_name"] = self.cluster.be_cluster self.init_disk(self.cluster.be_disks) def get_add_init_config(self): - cfg = [] + cfg = super().get_add_init_config() if self.cluster.be_config: cfg += self.cluster.be_config if self.cluster.is_cloud: @@ -554,14 +609,6 @@ class BE(Node): ] return cfg - def init_cluster_name(self): - with open("{}/conf/CLUSTER_NAME".format(self.get_path()), "w") as f: - f.write(self.cluster.be_cluster) - - def get_cluster_name(self): - with open("{}/conf/CLUSTER_NAME".format(self.get_path()), "r") as f: - return f.read().strip() - def init_disk(self, be_disks): path = self.get_path() dirs = [] @@ -605,20 +652,29 @@ class BE(Node): def docker_env(self): envs = super().docker_env() + envs["MY_HEARTBEAT_PORT"] = self.meta["ports"][ + "heartbeat_service_port"] if self.cluster.is_cloud: envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id() envs["REG_BE_TO_MS"] = 1 if self.cluster.reg_be else 0 + envs["CLUSTER_NAME"] = self.meta["cluster_name"] return envs + def get_default_named_ports(self): + return { + "be_port": BE_PORT, + "webserver_port": BE_WEBSVR_PORT, + "heartbeat_service_port": BE_HEARTBEAT_PORT, + "brpc_port": BE_BRPC_PORT, + "arrow_flight_sql_port": BE_ARROW_FLIGHT_SQL_PORT, + } + def cloud_unique_id(self): return "compute_node_{}".format(self.id) def docker_home_dir(self): return os.path.join(DOCKER_DORIS_PATH, "be") - def docker_ports(self): - return [BE_WEBSVR_PORT, BE_BRPC_PORT, BE_HEARTBEAT_PORT, BE_PORT] - def node_type(self): return Node.TYPE_BE @@ -629,13 +685,17 @@ class BE(Node): class CLOUD(Node): def get_add_init_config(self): - return ["fdb_cluster = " + self.cluster.get_fdb_cluster()] + cfg = super().get_add_init_config() + cfg.append("fdb_cluster = " + self.cluster.get_fdb_cluster()) + return cfg def docker_home_dir(self): - return os.path.join(DOCKER_DORIS_PATH, "cloud") + return os.path.join(DOCKER_DORIS_PATH, "ms") - def docker_ports(self): - return [MS_PORT] + def get_default_named_ports(self): + return { + "brpc_listen_port": MS_PORT, + } def conf_file_name(self): for file in os.listdir(os.path.join(self.get_path(), "conf")): @@ -682,13 +742,17 @@ class RECYCLE(CLOUD): class FDB(Node): + def get_add_init_config(self): + return [] + def copy_conf_to_local(self, local_conf_dir): os.makedirs(local_conf_dir, exist_ok=True) with open(os.path.join(LOCAL_RESOURCE_PATH, "fdb.conf"), "r") as read_file: with open(os.path.join(local_conf_dir, self.conf_file_name()), "w") as f: - publish_addr = "{}:{}".format(self.get_ip(), FDB_PORT) + publish_addr = "{}:{}".format(self.get_ip(), + self.meta["ports"]["fdb_port"]) f.write(read_file.read().replace("${PUBLISH-ADDRESS}", publish_addr)) @@ -701,8 +765,8 @@ class FDB(Node): def docker_home_dir(self): return os.path.join(DOCKER_DORIS_PATH, "fdb") - def docker_ports(self): - return [FDB_PORT] + def get_default_named_ports(self): + return {"fdb_port": FDB_PORT} def node_type(self): return Node.TYPE_FDB @@ -714,9 +778,10 @@ class FDB(Node): class Cluster(object): def __init__(self, name, subnet, image, is_cloud, is_root_user, fe_config, - be_config, ms_config, recycle_config, fe_follower, be_disks, - be_cluster, reg_be, coverage_dir, cloud_store_config, - sql_mode_node_mgr, be_metaservice_endpoint, be_cluster_id): + be_config, ms_config, recycle_config, remote_master_fe, + local_network_ip, fe_follower, be_disks, be_cluster, reg_be, + coverage_dir, cloud_store_config, sql_mode_node_mgr, + be_metaservice_endpoint, be_cluster_id): self.name = name self.subnet = subnet self.image = image @@ -726,6 +791,8 @@ class Cluster(object): self.be_config = be_config self.ms_config = ms_config self.recycle_config = recycle_config + self.remote_master_fe = remote_master_fe + self.local_network_ip = local_network_ip self.fe_follower = fe_follower self.be_disks = be_disks self.be_cluster = be_cluster @@ -736,15 +803,19 @@ class Cluster(object): node_type: Group(node_type) for node_type in Node.TYPE_ALL } + if self.remote_master_fe: + # preserve fe id = 1 for the remote master:fe + self.groups[Node.TYPE_FE].next_id = 2 self.sql_mode_node_mgr = sql_mode_node_mgr self.be_metaservice_endpoint = be_metaservice_endpoint self.be_cluster_id = be_cluster_id @staticmethod def new(name, image, is_cloud, is_root_user, fe_config, be_config, - ms_config, recycle_config, fe_follower, be_disks, be_cluster, - reg_be, coverage_dir, cloud_store_config, sql_mode_node_mgr, - be_metaservice_endpoint, be_cluster_id): + ms_config, recycle_config, remote_master_fe, local_network_ip, + fe_follower, be_disks, be_cluster, reg_be, coverage_dir, + cloud_store_config, sql_mode_node_mgr, be_metaservice_endpoint, + be_cluster_id): if not os.path.exists(LOCAL_DORIS_PATH): os.makedirs(LOCAL_DORIS_PATH, exist_ok=True) os.chmod(LOCAL_DORIS_PATH, 0o777) @@ -752,13 +823,13 @@ class Cluster(object): with filelock.FileLock(lock_file): if os.getuid() == utils.get_path_uid(lock_file): os.chmod(lock_file, 0o666) - subnet = gen_subnet_prefix16() + subnet = gen_subnet_prefix16() if not remote_master_fe else "" cluster = Cluster(name, subnet, image, is_cloud, is_root_user, fe_config, be_config, ms_config, recycle_config, - fe_follower, be_disks, be_cluster, reg_be, - coverage_dir, cloud_store_config, - sql_mode_node_mgr, be_metaservice_endpoint, - be_cluster_id) + remote_master_fe, local_network_ip, fe_follower, + be_disks, be_cluster, reg_be, coverage_dir, + cloud_store_config, sql_mode_node_mgr, + be_metaservice_endpoint, be_cluster_id) os.makedirs(cluster.get_path(), exist_ok=True) os.makedirs(get_status_path(name), exist_ok=True) cluster._save_meta() @@ -788,6 +859,21 @@ class Cluster(object): def _get_meta_file(name): return os.path.join(get_cluster_path(name), "meta") + def is_host_network(self): + return getattr(self, "remote_master_fe", "") + + def get_remote_fe_node(self): + if not self.is_host_network(): + return None + meta = { + "image": "", + "is_remote": True, + "ports": { + "query_port": int(self.remote_master_fe.split(":")[1]) + }, + } + return Node.new(self, Node.TYPE_FE, 1, meta) + def get_image(self): return self.image @@ -798,7 +884,7 @@ class Cluster(object): if node_type == Node.TYPE_FDB: continue for _, node_meta in group.nodes.items(): - node_meta.image = image + node_meta["image"] = image def get_path(self): return get_cluster_path(self.name) @@ -809,28 +895,48 @@ class Cluster(object): raise Exception("Unknown node_type: {}".format(node_type)) return group - def get_node(self, node_type, id): + def get_all_node_net_infos(self): + return [ + NodeNetInfo(node.node_type(), node.id, node.get_ip(), + node.meta["ports"]) + for node in self.get_all_nodes(None, True) + ] + + def get_node(self, node_type, id, include_remote=False): group = self.get_group(node_type) meta = group.get_node(id) if not meta: + if include_remote and node_type == Node.TYPE_FE: + node = self.get_remote_fe_node() + if node and node.id == id: + return node raise Exception("No found {} with id {}".format(node_type, id)) return Node.new(self, node_type, id, meta) - def get_all_nodes(self, node_type=None): + def get_all_nodes(self, node_type=None, include_remote=False): if node_type is None: nodes = [] for nt, group in self.groups.items(): for id, meta in group.get_all_nodes().items(): nodes.append(Node.new(self, nt, id, meta)) + if include_remote: + node = self.get_remote_fe_node() + if node: + nodes.append(node) return nodes group = self.groups.get(node_type, None) if not group: raise Exception("Unknown node_type: {}".format(node_type)) - return [ + nodes = [ Node.new(self, node_type, id, meta) for id, meta in group.get_all_nodes().items() ] + if include_remote: + node = self.get_remote_fe_node() + if node: + nodes.append(node) + return nodes def get_all_nodes_num(self): num = 0 @@ -839,7 +945,8 @@ class Cluster(object): return num def add(self, node_type, id=None): - node_meta = NodeMeta(self.image) + node_meta = {} + node_meta["image"] = self.image id = self.get_group(node_type).add(id, node_meta) node = self.get_node(node_type, id) if not os.path.exists(node.get_path()): @@ -848,15 +955,19 @@ class Cluster(object): return node def get_fdb_cluster(self): - return "123456:123456@{}:{}".format( - self.get_node(Node.TYPE_FDB, 1).get_ip(), FDB_PORT) + fdb = self.get_node(Node.TYPE_FDB, 1) + return "123456:123456@{}:{}".format(fdb.get_ip(), + fdb.meta["ports"]["fdb_port"]) def get_meta_server_addr(self): - return "{}:{}".format(self.get_node(Node.TYPE_MS, 1).get_ip(), MS_PORT) + meta_server = self.get_node(Node.TYPE_MS, 1) + return "{}:{}".format(meta_server.get_ip(), + meta_server.meta["ports"]["brpc_listen_port"]) def get_recycle_addr(self): - return "{}:{}".format( - self.get_node(Node.TYPE_RECYCLE, 1).get_ip(), MS_PORT) + recycler = self.get_node(Node.TYPE_RECYCLE, 1) + return "{}:{}".format(recycler.get_ip(), + recycler.meta["ports"]["brpc_listen_port"]) def remove(self, node_type, id): group = self.get_group(node_type) @@ -875,21 +986,21 @@ class Cluster(object): for node_type in self.groups.keys(): for node in self.get_all_nodes(node_type): services[node.service_name()] = node.compose() - compose = { "version": "3", - "networks": { + "services": services, + } + if not self.is_host_network(): + compose["networks"] = { utils.with_doris_prefix(self.name): { "driver": "bridge", "ipam": { "config": [{ "subnet": "{}.0.0/16".format(self.subnet), }] - } - } - }, - "services": services, - } + }, + }, + } utils.write_compose_file(self.get_compose_file(), compose) diff --git a/docker/runtime/doris-compose/command.py b/docker/runtime/doris-compose/command.py index df3d47cabd9..f3e3754464e 100644 --- a/docker/runtime/doris-compose/command.py +++ b/docker/runtime/doris-compose/command.py @@ -30,43 +30,46 @@ import time LOG = utils.get_logger() -def wait_ready_service(wait_timeout, cluster, fe_ids, be_ids): - - def is_fe_ready_service(ip): - return utils.is_socket_avail(ip, CLUSTER.FE_QUERY_PORT) - - def is_be_ready_service(ip): - return utils.is_socket_avail(ip, CLUSTER.BE_HEARTBEAT_PORT) - +def wait_service(need_alive, wait_timeout, cluster, fe_ids, be_ids): if wait_timeout == 0: return if wait_timeout == -1: wait_timeout = 1000000000 expire_ts = time.time() + wait_timeout while True: - db_mgr = database.get_db_mgr(cluster.name, False) - dead_frontends = [] + db_mgr = database.get_db_mgr(cluster.name, + cluster.get_all_node_net_infos(), False) + failed_frontends = [] for id in fe_ids: fe = cluster.get_node(CLUSTER.Node.TYPE_FE, id) fe_state = db_mgr.get_fe(id) - if not fe_state or not fe_state.alive or not is_fe_ready_service( - fe.get_ip()): - dead_frontends.append(id) - dead_backends = [] + fe_alive = fe_state and fe_state.alive + if fe_alive and need_alive: + # if need alive, check port available, + # if need dead, don't check port available because it take some time for the disconnect socket + fe_alive = utils.is_socket_avail( + fe.get_ip(), fe.meta["ports"]["query_port"]) + if fe_alive != need_alive: + failed_frontends.append(id) + failed_backends = [] for id in be_ids: be = cluster.get_node(CLUSTER.Node.TYPE_BE, id) be_state = db_mgr.get_be(id) - if not be_state or not be_state.alive or not is_be_ready_service( - be.get_ip()): - dead_backends.append(id) - if not dead_frontends and not dead_backends: + be_alive = be_state and be_state.alive + if be_alive and need_alive: + be_alive = utils.is_socket_avail( + be.get_ip(), be.meta["ports"]["webserver_port"]) + if be_alive != need_alive: + failed_backends.append(id) + if not failed_frontends and not failed_backends: break if time.time() >= expire_ts: err = "" - if dead_frontends: - err += "dead fe: " + str(dead_frontends) + ". " - if dead_backends: - err += "dead be: " + str(dead_backends) + ". " + failed_status = "dead" if need_alive else "alive" + if failed_frontends: + err += failed_status + " fe: " + str(failed_frontends) + ". " + if failed_backends: + err += failed_status + " be: " + str(failed_backends) + ". " raise Exception(err) time.sleep(1) @@ -114,8 +117,6 @@ def get_ids_related_nodes(cluster, for node_type, ids in type_ids: nodes.extend(get_ids_related_nodes_with_type(node_type, ids)) - related_node_num = len(nodes) - return len(nodes) == cluster.get_all_nodes_num(), nodes, len(nodes) @@ -197,10 +198,11 @@ class Command(object): class SimpleCommand(Command): - def __init__(self, command, help): + def __init__(self, command, help, options=[]): super().__init__(command) self.command = command self.help = help + self.options = options def add_parser(self, args_parsers): help = self.help + " If none of --fe-id, --be-id, --ms-id, --recycle-id, --fdb-id is specific, "\ @@ -218,18 +220,21 @@ class SimpleCommand(Command): args.fdb_id) utils.exec_docker_compose_command(cluster.get_compose_file(), self.command, + options=self.options, nodes=related_nodes) show_cmd = self.command[0].upper() + self.command[1:] + + if for_all: + related_nodes = cluster.get_all_nodes() + LOG.info( utils.render_green("{} succ, total related node num {}".format( show_cmd, related_node_num))) - if for_all: - related_nodes = cluster.get_all_nodes() return cluster, related_nodes -class NeedStartCommand(SimpleCommand): +class StartBaseCommand(SimpleCommand): def add_parser(self, args_parsers): parser = super().add_parser(args_parsers) @@ -247,7 +252,48 @@ class NeedStartCommand(SimpleCommand): cluster, related_nodes = super().run(args) fe_ids = [node.id for node in related_nodes if node.is_fe()] be_ids = [node.id for node in related_nodes if node.is_be()] - wait_ready_service(args.wait_timeout, cluster, fe_ids, be_ids) + if not cluster.is_host_network(): + wait_service(True, args.wait_timeout, cluster, fe_ids, be_ids) + return cluster, related_nodes + + +class StartCommand(StartBaseCommand): + + def __init__(self, command): + super().__init__(command, "Start the doris containers. "), + + +class RestartCommand(StartBaseCommand): + + def __init__(self, command): + super().__init__(command, "Restart the doris containers. ", + ["-t", "1"]), + + +class StopCommand(SimpleCommand): + + def __init__(self, command): + super().__init__(command, "Stop the doris containers. ", ["-t", "1"]), + + def add_parser(self, args_parsers): + parser = super().add_parser(args_parsers) + parser.add_argument( + "--wait-timeout", + type=int, + default=0, + help= + "Specify wait seconds for fe/be close for service: 0 not wait (default), "\ + "> 0 max wait seconds, -1 wait unlimited." + ) + return parser + + def run(self, args): + cluster, related_nodes = super().run(args) + fe_ids = [node.id for node in related_nodes if node.is_fe()] + be_ids = [node.id for node in related_nodes if node.is_be()] + if not cluster.is_host_network(): + wait_service(False, args.wait_timeout, cluster, fe_ids, be_ids) + return cluster, related_nodes class UpCommand(Command): @@ -335,6 +381,7 @@ class UpCommand(Command): type=str, help="Specify recycle configs for doris_cloud.conf. "\ "Example: --recycle-config \"log_level = warn\".") + group1.add_argument( "--fe-follower", default=False, @@ -392,6 +439,21 @@ class UpCommand(Command): action=self._get_parser_bool_action(True), help="Manager fe be via sql instead of http") + parser.add_argument( + "--remote-master-fe", + type=str, + help= + "Specify remote master fe address with ip:query_port, and all the container use host network. " \ + "Only use when creating new cluster." + ) + + parser.add_argument( + "--local-network-ip", + type=str, + help= "Specify local network ip, no need specify, will auto chose a proper ip. "\ + "Only use when creating new cluster and specify --remote-master-fe." + ) + if self._support_boolean_action(): parser.add_argument( "--be-metaservice-endpoint", @@ -422,7 +484,7 @@ class UpCommand(Command): dest='be_cluster_id', default=True, action=self._get_parser_bool_action(False), - help="Do not set BE cluser ID in conf. Default is False.") + help="Do not set BE cluster ID in conf. Default is False.") parser.add_argument( "--fdb-version", @@ -466,6 +528,7 @@ class UpCommand(Command): raise Exception("Need specific not empty cluster name") for_all = True add_fdb_num = 0 + is_new_cluster = False try: cluster = CLUSTER.Cluster.load(args.NAME) @@ -483,6 +546,7 @@ class UpCommand(Command): for_all = False except: # a new cluster + is_new_cluster = True if not args.IMAGE: raise Exception("New cluster must specific image") from None if args.fe_id != None: @@ -499,7 +563,7 @@ class UpCommand(Command): args.recycle_id = None if args.add_fe_num is None: - args.add_fe_num = 3 + args.add_fe_num = 0 if args.remote_master_fe else 3 if args.add_be_num is None: args.add_be_num = 3 @@ -517,11 +581,26 @@ class UpCommand(Command): args.add_ms_num = 0 args.add_recycle_num = 0 + if args.remote_master_fe: + if not args.local_network_ip: + args.local_network_ip = utils.get_local_ip() + parts = args.remote_master_fe.split(":") + if len(parts) != 2: + raise Exception( + f"invalid --remote-master-fe-addr {args.remote_master_fe}, should be 'ip:query_port'" + ) + if not parts[0]: + args.remote_master_fe = args.local_network_ip + ":" + parts[ + 1] + if args.cloud: + args.sql_mode_node_mgr = True + cluster = CLUSTER.Cluster.new( args.NAME, args.IMAGE, args.cloud, args.root, args.fe_config, args.be_config, args.ms_config, args.recycle_config, - args.fe_follower, args.be_disks, args.be_cluster, args.reg_be, - args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr, + args.remote_master_fe, args.local_network_ip, args.fe_follower, + args.be_disks, args.be_cluster, args.reg_be, args.coverage_dir, + cloud_store_config, args.sql_mode_node_mgr, args.be_metaservice_endpoint, args.be_cluster_id) LOG.info("Create new cluster {} succ, cluster path is {}".format( args.NAME, cluster.get_path())) @@ -605,32 +684,80 @@ class UpCommand(Command): .format(related_node_num))) else: LOG.info("Using SQL mode for node management ? {}".format( - args.sql_mode_node_mgr)) - - # Wait for FE master to be elected - LOG.info("Waiting for FE master to be elected...") - expire_ts = time.time() + 30 - while expire_ts > time.time(): - ready = False - db_mgr = database.get_db_mgr(args.NAME, False) - for id in add_fe_ids: - fe_state = db_mgr.get_fe(id) - if fe_state is not None and fe_state.alive: - ready = True + cluster.sql_mode_node_mgr)) + + if cluster.remote_master_fe: + if is_new_cluster: + with open(CLUSTER.get_master_fe_addr_path(cluster.name), + "w") as f: + f.write(cluster.remote_master_fe) + if cluster.is_cloud: + cloud_config = "\n".join([ + f"meta_service_endpoint = {cluster.get_meta_server_addr()}", + "deploy_mode = cloud", + f"cluster_id = {CLUSTER.CLUSTER_ID}", + ]) + # write add conf to remote_master_fe_add.conf, remote fe can send ssh to get this content. + with open( + os.path.join( + CLUSTER.get_status_path(cluster.name), + "remote_master_fe_add.conf"), "w") as f: + f.write(cloud_config) + ans = input( + utils.render_red( + f"\nAdd remote fe {cluster.remote_master_fe} fe.conf with follow config: " + ) + "\n\n" + f"{cloud_config}\n\nConfirm ? y/n: ") + if ans != 'y': + LOG.info( + "Up cluster failed due to not confirm write the above config." + ) + return + + LOG.info("Waiting connect to remote FE...") + expire_ts = time.time() + 3600 * 5 + parts = cluster.remote_master_fe.split(":") + fe_ip = parts[0] + fe_port = int(parts[1]) + ready = False + while expire_ts > time.time(): + if utils.is_socket_avail(fe_ip, fe_port): + ready = True + break + if not ready: + raise Exception( + "Cannot connect to remote master fe: " + + cluster.remote_master_fe) + + LOG.info("After connect to remote FE...") + else: + # Wait for FE master to be elected + LOG.info("Waiting for FE master to be elected...") + expire_ts = time.time() + 30 + while expire_ts > time.time(): + ready = False + db_mgr = database.get_db_mgr( + args.NAME, cluster.get_all_node_net_infos(), False) + for id in add_fe_ids: + fe_state = db_mgr.get_fe(id) + if fe_state is not None and fe_state.alive: + ready = True + break + if ready: break - if ready: - break - LOG.info("there is no fe ready") - time.sleep(1) - LOG.info("after Waiting for FE master to be elected...") - if cluster.is_cloud and args.sql_mode_node_mgr: - db_mgr = database.get_db_mgr(args.NAME, False) + LOG.info("there is no fe ready") + time.sleep(1) + LOG.info("after Waiting for FE master to be elected...") + if cluster.is_cloud and cluster.sql_mode_node_mgr: + db_mgr = database.get_db_mgr(args.NAME, + cluster.get_all_node_net_infos(), + False) master_fe_endpoint = CLUSTER.get_master_fe_endpoint( - cluster.name) + cluster.name, True) # Add FEs except master_fe for fe in cluster.get_all_nodes(CLUSTER.Node.TYPE_FE): - fe_endpoint = f"{fe.get_ip()}:{CLUSTER.FE_EDITLOG_PORT}" - if fe_endpoint != master_fe_endpoint: + fe_querypoint = f"{fe.get_ip()}:{fe.meta['ports']['query_port']}" + fe_endpoint = f"{fe.get_ip()}:{fe.meta['ports']['edit_log_port']}" + if fe_querypoint != master_fe_endpoint: try: db_mgr.add_fe( fe_endpoint, "FOLLOWER" @@ -642,19 +769,19 @@ class UpCommand(Command): # Add BEs for be in cluster.get_all_nodes(CLUSTER.Node.TYPE_BE): - be_endpoint = f"{be.get_ip()}:{CLUSTER.BE_HEARTBEAT_PORT}" + be_endpoint = f"{be.get_ip()}:{be.meta['ports']['heartbeat_service_port']}" try: db_mgr.add_be(be_endpoint) LOG.info(f"Added BE {be_endpoint} successfully.") except Exception as e: LOG.error(f"Failed to add BE {be_endpoint}: {str(e)}") + if is_new_cluster: + cloud_store_config = self._get_cloud_store_config() + db_mgr.create_default_storage_vault(cloud_store_config) - cloud_store_config = self._get_cloud_store_config() - - db_mgr.create_default_storage_vault(cloud_store_config) - - wait_ready_service(args.wait_timeout, cluster, add_fe_ids, - add_be_ids) + if not cluster.is_host_network(): + wait_service(True, args.wait_timeout, cluster, add_fe_ids, + add_be_ids) LOG.info( utils.render_green( "Up cluster {} succ, related node num {}".format( @@ -753,51 +880,68 @@ class DownCommand(Command): "it will send dropp to fe, otherwise send decommission to fe.") def run(self, args): + cluster_name = args.NAME cluster = None + stop_grace = False + try: - cluster = CLUSTER.Cluster.load(args.NAME) - except: - return "Cluster not exists or load failed" - for_all, related_nodes, related_node_num = get_ids_related_nodes( - cluster, - args.fe_id, - args.be_id, - args.ms_id, - args.recycle_id, - args.fdb_id, - ignore_not_exists=True) + cluster = CLUSTER.Cluster.load(cluster_name) + for_all, related_nodes, related_node_num = get_ids_related_nodes( + cluster, + args.fe_id, + args.be_id, + args.ms_id, + args.recycle_id, + args.fdb_id, + ignore_not_exists=True) + stop_grace = cluster.coverage_dir + except Exception as e: + for_all = not args.fe_id and not args.be_id and not args.ms_id and not args.recycle_id + related_nodes = [] + related_node_num = 0 + if not for_all: + raise e - LOG.info("down cluster " + args.NAME + " for all " + str(for_all)) + LOG.info("down cluster " + args.NAME + " for all " + + str(for_all).lower()) if for_all: - if os.path.exists(cluster.get_compose_file()): + compose_file = CLUSTER.get_compose_file(cluster_name) + if os.path.exists(compose_file): try: - utils.exec_docker_compose_command( - cluster.get_compose_file(), "down", - ["-v", "--remove-orphans"]) + options = ["-v", "--remove-orphans"] + if not stop_grace: + options.extend(["-t", "1"]) + utils.exec_docker_compose_command(compose_file, + "down", + options=options) except Exception as e: LOG.warn("down cluster has exception: " + str(e)) try: - utils.remove_docker_network(cluster.name) + utils.remove_docker_network(cluster_name) except Exception as e: LOG.warn("remove network has exception: " + str(e)) if args.clean: - utils.enable_dir_with_rw_perm(cluster.get_path()) - shutil.rmtree(cluster.get_path()) + cluster_path = CLUSTER.get_cluster_path(cluster_name) + if os.path.exists(cluster_path): + utils.enable_dir_with_rw_perm(cluster_path) + shutil.rmtree(cluster_path) LOG.info( utils.render_yellow( "Clean cluster data cause has specific --clean")) else: - db_mgr = database.get_db_mgr(cluster.name) + db_mgr = database.get_db_mgr(cluster.name, + cluster.get_all_node_net_infos()) for node in related_nodes: if node.is_fe(): - fe_endpoint = "{}:{}".format(node.get_ip(), - CLUSTER.FE_EDITLOG_PORT) + fe_endpoint = "{}:{}".format( + node.get_ip(), node.meta["ports"]["edit_log_port"]) db_mgr.drop_fe(fe_endpoint) elif node.is_be(): - be_endpoint = "{}:{}".format(node.get_ip(), - CLUSTER.BE_HEARTBEAT_PORT) + be_endpoint = "{}:{}".format( + node.get_ip(), + node.meta["ports"]["heartbeat_service_port"]) if args.drop_force: db_mgr.drop_be(be_endpoint) else: @@ -817,7 +961,7 @@ class DownCommand(Command): shutil.rmtree(node.get_path()) register_file = "{}/{}-{}-register".format( CLUSTER.get_status_path(cluster.name), - node.node_type(), node.get_ip()) + node.node_type(), node.id) if os.path.exists(register_file): os.remove(register_file) LOG.info( @@ -831,7 +975,7 @@ class DownCommand(Command): LOG.info( utils.render_green( "Down cluster {} succ, related node num {}".format( - args.NAME, related_node_num))) + cluster_name, related_node_num))) return "down cluster succ" @@ -853,6 +997,11 @@ class ListNode(object): self.tablet_num = "" self.last_heartbeat = "" self.err_msg = "" + self.query_port = "" + self.http_port = "" + self.heartbeat_port = "" + self.edit_log_port = "" + self.heartbeat_port = "" def info(self, detail): result = [ @@ -862,29 +1011,20 @@ class ListNode(object): self.last_heartbeat, self.err_msg ] if detail: - query_port = "" - http_port = "" - heartbeat_port = "" - edit_log_port = "" node_path = CLUSTER.get_node_path(self.cluster_name, self.node_type, self.id) - if self.node_type == CLUSTER.Node.TYPE_FE: - query_port = CLUSTER.FE_QUERY_PORT - http_port = CLUSTER.FE_HTTP_PORT - edit_log_port = CLUSTER.FE_EDITLOG_PORT - elif self.node_type == CLUSTER.Node.TYPE_BE: - http_port = CLUSTER.BE_WEBSVR_PORT - heartbeat_port = CLUSTER.BE_HEARTBEAT_PORT - elif self.node_type == CLUSTER.Node.TYPE_MS or self.node_type == CLUSTER.Node.TYPE_RECYCLE: - http_port = CLUSTER.MS_PORT - else: - pass result += [ - query_port, http_port, node_path, edit_log_port, heartbeat_port + self.query_port, self.http_port, node_path, self.edit_log_port, + self.heartbeat_port ] return result - def update_db_info(self, db_mgr): + def update_db_info(self, cluster, db_mgr): + try: + node = cluster.get_node(self.node_type, self.id, True) + except: + node = None + ports = node.meta["ports"] if node else {} if self.node_type == CLUSTER.Node.TYPE_FE: fe = db_mgr.get_fe(self.id) if fe: @@ -892,6 +1032,9 @@ class ListNode(object): self.is_master = str(fe.is_master).lower() self.last_heartbeat = fe.last_heartbeat self.err_msg = fe.err_msg + self.query_port = fe.query_port + self.http_port = fe.http_port + self.edit_log_port = fe.edit_log_port elif self.node_type == CLUSTER.Node.TYPE_BE: self.backend_id = -1 be = db_mgr.get_be(self.id) @@ -901,6 +1044,15 @@ class ListNode(object): self.tablet_num = be.tablet_num self.last_heartbeat = be.last_heartbeat self.err_msg = be.err_msg + self.http_port = be.http_port + self.heartbeat_port = be.heartbeat_service_port + elif self.node_type == CLUSTER.Node.TYPE_MS or self.node_type == CLUSTER.Node.TYPE_RECYCLE: + if ports: + self.http_port = ports.get("brpc_listen_port", -1) + if node and node.meta.get("is_remote", False): + self.ip = node.get_ip() + self.container_id = "<remote>" + self.image = "<remote>" class GenConfCommand(Command): @@ -929,12 +1081,12 @@ class GenConfCommand(Command): def run(self, args): base_conf = ''' -jdbcUrl = "jdbc:mysql://{fe_ip}:9030/?useLocalSessionState=true&allowLoadLocalInfile=true" -targetJdbcUrl = "jdbc:mysql://{fe_ip}:9030/?useLocalSessionState=true&allowLoadLocalInfile=true" -feSourceThriftAddress = "{fe_ip}:9020" -feTargetThriftAddress = "{fe_ip}:9020" +jdbcUrl = "jdbc:mysql://{fe_ip}:{query_port}/?useLocalSessionState=true&allowLoadLocalInfile=true" +targetJdbcUrl = "jdbc:mysql://{fe_ip}:{query_port}/?useLocalSessionState=true&allowLoadLocalInfile=true" +feSourceThriftAddress = "{fe_ip}:{rpc_port}" +feTargetThriftAddress = "{fe_ip}:{rpc_port}" syncerAddress = "{fe_ip}:9190" -feHttpAddress = "{fe_ip}:8030" +feHttpAddress = "{fe_ip}:{http_port}" ''' cloud_conf = ''' @@ -954,21 +1106,24 @@ cloudUniqueId= "{fe_cloud_unique_id}" args.NAME, CLUSTER.LOCAL_DORIS_PATH)) return - master_fe_ip = master_fe_ip_ep[:master_fe_ip_ep.find(':')] + db_mgr = database.get_db_mgr(args.NAME, + cluster.get_all_node_net_infos(), False) fe_ip = "" - if not args.connect_follow_fe: - fe_ip = master_fe_ip - else: - for fe in cluster.get_all_nodes(CLUSTER.Node.TYPE_FE): - if fe.get_ip() == master_fe_ip: - continue - else: - fe_ip = fe.get_ip() - break - if not fe_ip: - raise Exception( - "Not found follow fe, pls add a follow fe use command `up <your-cluster> --add-fe-num 1`" - ) + rpc_port = 0 + query_port = 0 + http_port = 0 + for fe in db_mgr.get_all_fe().values(): + if not fe.alive: + continue + if fe.is_master == (not args.connect_follow_fe): + fe_ip = fe.ip + rpc_port = fe.rpc_port + query_port = fe.query_port + http_port = fe.http_port + break + if not fe_ip: + role = "follower" if args.connect_follow_fe else "master" + raise Exception(f"Not found an alive {role} fe") relative_custom_file_path = "regression-test/conf/regression-conf-custom.groovy" regression_conf_custom = os.path.join(args.DORIS_ROOT_PATH, @@ -990,26 +1145,31 @@ cloudUniqueId= "{fe_cloud_unique_id}" with open(regression_conf_custom, "w") as f: # write auto gen config f.write(annotation_start) - f.write(base_conf.format(fe_ip=fe_ip)) + f.write( + base_conf.format(fe_ip=fe_ip, + rpc_port=rpc_port, + query_port=query_port, + http_port=http_port)) if cluster.is_cloud: multi_cluster_bes = ",".join([ - "{}:{}:{}:{}:{}".format(be.get_ip(), - CLUSTER.BE_HEARTBEAT_PORT, - CLUSTER.BE_WEBSVR_PORT, - be.cloud_unique_id(), - CLUSTER.BE_BRPC_PORT) + "{}:{}:{}:{}:{}".format( + be.get_ip(), + be.meta["ports"]["heartbeat_service_port"], + be.meta["ports"]["webserver_port"], + be.cloud_unique_id(), be.meta["ports"]["brpc_port"]) for be in cluster.get_all_nodes(CLUSTER.Node.TYPE_BE) ]) + master_fe = cluster.get_remote_fe_node( + ) if cluster.is_host_network() else cluster.get_node( + CLUSTER.Node.TYPE_FE, 1) f.write( cloud_conf.format( fe_ip=fe_ip, ms_endpoint=cluster.get_meta_server_addr(), recycle_endpoint=cluster.get_recycle_addr(), multi_cluster_bes=multi_cluster_bes, - fe_cloud_unique_id=cluster.get_node( - CLUSTER.Node.TYPE_FE, 1).cloud_unique_id())) + fe_cloud_unique_id=master_fe.cloud_unique_id())) f.write(annotation_end + "\n\n") - annotation_end_line_count = -1 # write not-auto gen config in_annotation = False @@ -1045,12 +1205,31 @@ class ListCommand(Command): action=self._get_parser_bool_action(True), help="Print more detail fields.") + def _hint_cluster_bad(self, cluster_name): + cluster_path = CLUSTER.get_cluster_path(cluster_name) + if not os.path.exists(cluster_path): + LOG.info( + utils.render_yellow( + f"Not exits cluster directory in '{CLUSTER.LOCAL_DORIS_PATH}'" + )) + elif not os.path.exists(CLUSTER.Cluster._get_meta_file(cluster_name)): + LOG.error( + utils.render_red( + f"Not exits cluster meta file in '{cluster_path}'")) + else: + try: + CLUSTER.Cluster.load(cluster_name) + except: + LOG.error(utils.render_red("meta file is bad or incompatible with current doris-compose.py. " \ + "Run command `down --clean` to destroy it then recreate a new one")) + def run(self, args): COMPOSE_MISSING = "(missing)" COMPOSE_BAD = "(bad)" COMPOSE_GOOD = "" SERVICE_DEAD = "dead" + SERVICE_RUNNING = "running" class ComposeService(object): @@ -1061,6 +1240,12 @@ class ListCommand(Command): def parse_cluster_compose_file(cluster_name): compose_file = CLUSTER.get_compose_file(cluster_name) + try: + cluster = CLUSTER.Cluster.load(cluster_name) + ip_for_host_mode = cluster.local_network_ip if cluster.is_host_network( + ) else "" + except: + ip_for_host_mode = "" if not os.path.exists(compose_file): return COMPOSE_MISSING, {} try: @@ -1071,8 +1256,9 @@ class ListCommand(Command): if services is None: return COMPOSE_BAD, {} return COMPOSE_GOOD, { - service: ComposeService( - service, + service: + ComposeService( + service, ip_for_host_mode if ip_for_host_mode else list(service_conf["networks"].values())[0] ["ipv4_address"], service_conf["image"]) for service, service_conf in services.items() @@ -1107,7 +1293,7 @@ class ListCommand(Command): TYPE_COMPOSESERVICE = type(ComposeService("", "", "")) if not args.NAME: header = ("CLUSTER", "OWNER", "STATUS", "MASTER FE", "CLOUD", - "CONFIG FILES") + "NETWORK MODE", "CONFIG FILES") rows = [] for name in sorted(clusters.keys()): cluster_info = clusters[name] @@ -1125,16 +1311,20 @@ class ListCommand(Command): compose_file = CLUSTER.get_compose_file(name) is_cloud = "" + network_mode = "" try: cluster = CLUSTER.Cluster.load(name) is_cloud = "true" if cluster.is_cloud else "false" + network_mode = "host" if cluster.is_host_network( + ) else "bridge" except: pass - rows.append((name, owner, show_status, - CLUSTER.get_master_fe_endpoint(name), is_cloud, - "{}{}".format(compose_file, - cluster_info["status"]))) + rows.append( + (name, owner, show_status, + CLUSTER.get_master_fe_endpoint(name), is_cloud, + network_mode, "{}{}".format(compose_file, + cluster_info["status"]))) return self._print_table(header, rows) header = [ @@ -1156,8 +1346,15 @@ class ListCommand(Command): fe_ids = {} be_ids = {} services = clusters[cluster_name]["services"] - db_mgr = database.get_db_mgr(cluster_name, False) + cluster = None + try: + cluster = CLUSTER.Cluster.load(cluster_name) + except: + pass + db_mgr = database.get_db_mgr( + cluster_name, + cluster.get_all_node_net_infos() if cluster else [], False) nodes = [] for service_name, container in services.items(): _, node_type, id = utils.parse_service_name(container.name) @@ -1165,7 +1362,7 @@ class ListCommand(Command): node.cluster_name = cluster_name node.node_type = node_type node.id = id - node.update_db_info(db_mgr) + node.update_db_info(cluster, db_mgr) nodes.append(node) if node_type == CLUSTER.Node.TYPE_FE: @@ -1181,12 +1378,15 @@ class ListCommand(Command): node.created = dateutil.parser.parse( container.attrs.get("Created")).astimezone().strftime( "%Y-%m-%d %H:%M:%S") - node.ip = list( - container.attrs["NetworkSettings"] - ["Networks"].values())[0]["IPAMConfig"]["IPv4Address"] - node.image = ",".join(container.image.tags) + if cluster and cluster.is_host_network(): + node.ip = cluster.local_network_ip + else: + node.ip = list( + container.attrs["NetworkSettings"]["Networks"]. + values())[0]["IPAMConfig"]["IPv4Address"] + node.image = container.attrs["Config"]["Image"] if not node.image: - node.image = container.attrs["Config"]["Image"] + node.image = ",".join(container.image.tags) node.container_id = container.short_id node.status = container.status if node.container_id and \ @@ -1202,8 +1402,8 @@ class ListCommand(Command): node.cluster_name = cluster_name node.node_type = CLUSTER.Node.TYPE_FE node.id = id - node.status = SERVICE_DEAD - node.update_db_info(db_mgr) + node.status = SERVICE_RUNNING if fe.alive else SERVICE_DEAD + node.update_db_info(cluster, db_mgr) nodes.append(node) for id, be in db_mgr.be_states.items(): if be_ids.get(id, False): @@ -1212,8 +1412,8 @@ class ListCommand(Command): node.cluster_name = cluster_name node.node_type = CLUSTER.Node.TYPE_BE node.id = id - node.status = SERVICE_DEAD - node.update_db_info(db_mgr) + node.status = SERVICE_RUNNING if be.alive else SERVICE_DEAD + node.update_db_info(cluster, db_mgr) nodes.append(node) def get_node_seq(node): @@ -1222,7 +1422,11 @@ class ListCommand(Command): for node in sorted(nodes, key=get_node_seq): rows.append(node.info(args.detail)) - return self._print_table(header, rows) + ret = self._print_table(header, rows) + if len(args.NAME) == 1 and len(rows) == 0: + self._hint_cluster_bad(args.NAME[0]) + + return ret class InfoCommand(Command): @@ -1250,12 +1454,13 @@ class InfoCommand(Command): ("RECYCLER_PORT", CLUSTER.MS_PORT, "constant"), ] - with open(CLUSTER.CLOUD_CFG_FILE, "r") as f: - for line in f: - line = line.strip() - if line and not line.startswith('#'): - key, value = line.split('=', 1) - rows.append((key.strip(), value.strip(), "cloud.ini")) + if os.path.exists(CLUSTER.CLOUD_CFG_FILE): + with open(CLUSTER.CLOUD_CFG_FILE, "r") as f: + for line in f: + line = line.strip() + if line and not line.startswith("#"): + key, value = line.split("=", 1) + rows.append((key.strip(), value.strip(), "cloud.ini")) return self._print_table(header, rows) @@ -1277,9 +1482,9 @@ class AddRWPermCommand(Command): ALL_COMMANDS = [ UpCommand("up"), DownCommand("down"), - NeedStartCommand("start", "Start the doris containers. "), - SimpleCommand("stop", "Stop the doris containers. "), - NeedStartCommand("restart", "Restart the doris containers. "), + StartCommand("start"), + StopCommand("stop"), + RestartCommand("restart"), SimpleCommand("pause", "Pause the doris containers. "), SimpleCommand("unpause", "Unpause the doris containers. "), GenConfCommand("config"), diff --git a/docker/runtime/doris-compose/database.py b/docker/runtime/doris-compose/database.py index 370f1d5ee2a..f46635033b8 100644 --- a/docker/runtime/doris-compose/database.py +++ b/docker/runtime/doris-compose/database.py @@ -27,35 +27,49 @@ LOG = utils.get_logger() class FEState(object): - def __init__(self, id, is_master, alive, last_heartbeat, err_msg, role): + def __init__(self, id, ip, is_master, alive, last_heartbeat, err_msg, role, + query_port, rpc_port, http_port, edit_log_port): self.id = id + self.ip = ip self.is_master = is_master self.alive = alive self.last_heartbeat = last_heartbeat self.err_msg = err_msg self.role = role + self.query_port = query_port + self.rpc_port = rpc_port + self.http_port = http_port + self.edit_log_port = edit_log_port class BEState(object): - def __init__(self, id, backend_id, decommissioned, alive, tablet_num, - last_heartbeat, err_msg): + def __init__(self, id, ip, backend_id, decommissioned, alive, tablet_num, + last_heartbeat, err_msg, http_port, heartbeat_service_port): self.id = id + self.ip = ip self.backend_id = backend_id self.decommissioned = decommissioned self.alive = alive self.tablet_num = tablet_num self.last_heartbeat = last_heartbeat self.err_msg = err_msg + self.http_port = http_port + self.heartbeat_service_port = heartbeat_service_port class DBManager(object): - def __init__(self): + def __init__(self, all_node_net_infos): + self.all_node_net_infos = all_node_net_infos self.fe_states = {} self.be_states = {} self.conn = None - self.master_fe_ip = "" + self.fe_ip = "" + self.fe_port = -1 + + def get_all_fe(self): + return self.fe_states def get_fe(self, id): return self.fe_states.get(id, None) @@ -172,6 +186,8 @@ class DBManager(object): "provider" = "{cloud_store_config['DORIS_CLOUD_PROVIDER']}" ); """ + # create hk storage vault from beijing cost 14s + self._reset_conn(read_timeout=20) self._exec_query(create_vault_sql) LOG.info("Created storage vault 'default_vault'") @@ -187,26 +203,42 @@ class DBManager(object): def _load_fe_states(self): fe_states = {} alive_master_fe_ip = None + alive_master_fe_port = None for record in self._exec_query("show frontends"): name = record["Name"] ip = record["Host"] role = record["Role"] is_master = utils.is_true(record["IsMaster"]) alive = utils.is_true(record["Alive"]) - id = CLUSTER.Node.get_id_from_ip(ip) + query_port = int(record["QueryPort"]) + http_port = int(record["HttpPort"]) + rpc_port = int(record["RpcPort"]) + edit_log_port = int(record["EditLogPort"]) + id = None + for net_info in self.all_node_net_infos: + if net_info.ip == ip and net_info.ports.get("query_port", + -1) == query_port: + id = net_info.id + break + if not id: + id = CLUSTER.Node.get_id_from_ip(ip) last_heartbeat = utils.escape_null(record["LastHeartbeat"]) err_msg = record["ErrMsg"] - fe = FEState(id, is_master, alive, last_heartbeat, err_msg, role) + fe = FEState(id, ip, is_master, alive, last_heartbeat, err_msg, + role, query_port, rpc_port, http_port, edit_log_port) fe_states[id] = fe if is_master and alive: alive_master_fe_ip = ip + alive_master_fe_port = query_port LOG.debug( - "record of show frontends, name {}, ip {}, alive {}, is_master {}, role {}" - .format(name, ip, alive, is_master, role)) + "record of show frontends, name {}, ip {}, port {}, alive {}, is_master {}, role {}" + .format(name, ip, query_port, alive, is_master, role)) self.fe_states = fe_states - if alive_master_fe_ip and alive_master_fe_ip != self.master_fe_ip: - self.master_fe_ip = alive_master_fe_ip + if alive_master_fe_ip and (alive_master_fe_ip != self.fe_ip + or alive_master_fe_port != self.fe_port): + self.fe_ip = alive_master_fe_ip + self.fe_port = alive_master_fe_port self._reset_conn() def _load_be_states(self): @@ -216,11 +248,23 @@ class DBManager(object): alive = utils.is_true(record["Alive"]) decommissioned = utils.is_true(record["SystemDecommissioned"]) tablet_num = int(record["TabletNum"]) - id = CLUSTER.Node.get_id_from_ip(record["Host"]) + ip = record["Host"] + heartbeat_service_port = int(record["HeartbeatPort"]) + http_port = int(record["HttpPort"]) + id = None + for net_info in self.all_node_net_infos: + if net_info.ip == ip and net_info.ports.get( + "heartbeat_service_port", + -1) == heartbeat_service_port: + id = net_info.id + break + if not id: + id = CLUSTER.Node.get_id_from_ip(ip) last_heartbeat = utils.escape_null(record["LastHeartbeat"]) err_msg = record["ErrMsg"] - be = BEState(id, backend_id, decommissioned, alive, tablet_num, - last_heartbeat, err_msg) + be = BEState(id, ip, backend_id, decommissioned, alive, tablet_num, + last_heartbeat, err_msg, http_port, + heartbeat_service_port) be_states[id] = be self.be_states = be_states @@ -233,11 +277,17 @@ class DBManager(object): cursor.execute(sql) fields = [field_md[0] for field_md in cursor.description ] if cursor.description else [] - return [dict(zip(fields, row)) for row in cursor.fetchall()] + return [ + dict(zip(fields, row)) for row in cursor.fetchall() + ] except Exception as e: - LOG.warn(f"Error occurred: {e}") + LOG.warn( + f"Error occurred: fe {self.fe_ip}:{self.fe_port}, sql `{sql}`, err {e}" + ) if "timed out" in str(e).lower() and attempt < retries - 1: - LOG.warn(f"Query timed out. Retrying {attempt + 1}/{retries}...") + LOG.warn( + f"Query timed out, fe {self.fe_ip}:{self.fe_port}. Retrying {attempt + 1}/{retries}..." + ) self._reset_conn() else: raise e @@ -248,27 +298,32 @@ class DBManager(object): return self._reset_conn() - def _reset_conn(self): + def _reset_conn(self, read_timeout=10, connect_timeout=3): self.conn = pymysql.connect(user="root", - host=self.master_fe_ip, - read_timeout=10, - connect_timeout=3, - port=CLUSTER.FE_QUERY_PORT) + host=self.fe_ip, + read_timeout=read_timeout, + connect_timeout=connect_timeout, + port=self.fe_port) -def get_db_mgr(cluster_name, required_load_succ=True): +def get_db_mgr(cluster_name, all_node_net_infos, required_load_succ=True): assert cluster_name - db_mgr = DBManager() - master_fe_ip_file = os.path.join(CLUSTER.get_status_path(cluster_name), - "master_fe_ip") - master_fe_ip = None - if os.path.exists(master_fe_ip_file): - with open(master_fe_ip_file, "r") as f: - master_fe_ip = f.read().strip() - - if not master_fe_ip: + db_mgr = DBManager(all_node_net_infos) + master_fe_query_addr_file = CLUSTER.get_master_fe_addr_path(cluster_name) + master_fe_query_addr = None + if os.path.exists(master_fe_query_addr_file): + with open(master_fe_query_addr_file, "r") as f: + master_fe_query_addr = f.read().strip() + + if not master_fe_query_addr: return db_mgr + pos = master_fe_query_addr.find(':') + master_fe_ip = master_fe_query_addr[:pos] + master_fe_port = int(master_fe_query_addr[pos + 1:]) + + chose_fe_ip = "" + chose_fe_port = -1 alive_fe = None cluster = CLUSTER.Cluster.load(cluster_name) containers = utils.get_doris_containers(cluster_name).get(cluster_name, []) @@ -279,13 +334,22 @@ def get_db_mgr(cluster_name, required_load_succ=True): node = cluster.get_node(node_type, id) if not alive_fe: alive_fe = node - if node.get_ip() == master_fe_ip: + if node.get_ip() == master_fe_ip and node.meta["ports"][ + "query_port"] == master_fe_port: alive_fe = node break - if not alive_fe: + if alive_fe: + chose_fe_ip = alive_fe.get_ip() + chose_fe_port = alive_fe.meta["ports"]["query_port"] + elif utils.is_socket_avail(master_fe_ip, master_fe_port): + chose_fe_ip = master_fe_ip + chose_fe_port = master_fe_port + else: + LOG.debug("no available alive fe") return db_mgr - db_mgr.master_fe_ip = alive_fe.get_ip() + db_mgr.fe_ip = chose_fe_ip + db_mgr.fe_port = chose_fe_port try: db_mgr.load_states() except Exception as e: diff --git a/docker/runtime/doris-compose/format-code.sh b/docker/runtime/doris-compose/format-code.sh index 0626662e641..6ec88624f26 100644 --- a/docker/runtime/doris-compose/format-code.sh +++ b/docker/runtime/doris-compose/format-code.sh @@ -17,3 +17,7 @@ yapf -i *.py shfmt -w resource/*.sh + +# format docker file +# go install github.com/reteps/dockerfmt@latest +dockerfmt Dockerfile -w diff --git a/docker/runtime/doris-compose/requirements.txt b/docker/runtime/doris-compose/requirements.txt index 1f32223a02e..46eebbd0a3f 100644 --- a/docker/runtime/doris-compose/requirements.txt +++ b/docker/runtime/doris-compose/requirements.txt @@ -15,6 +15,9 @@ # specific language governing permissions and limitations # under the License. +# if install docker failed, specific pyyaml version and docker version +#pyyaml==5.3.1 +#docker==6.1.3 docker docker-compose filelock @@ -22,6 +25,4 @@ jsonpickle prettytable pymysql python-dateutil -# if mac install pyyaml failed, change pyyaml version -#pyyaml==5.3.1 requests<=2.31.0 diff --git a/docker/runtime/doris-compose/resource/common.sh b/docker/runtime/doris-compose/resource/common.sh index 40833d01dc6..2c53ca587a5 100644 --- a/docker/runtime/doris-compose/resource/common.sh +++ b/docker/runtime/doris-compose/resource/common.sh @@ -16,11 +16,13 @@ # under the License. export MASTER_FE_IP="" -export MASTER_FE_IP_FILE=$DORIS_HOME/status/master_fe_ip +export MASTER_FE_PORT="" +export MASTER_FE_QUERY_ADDR_FILE=$DORIS_HOME/status/master_fe_query_addr export HAS_INIT_FDB_FILE=${DORIS_HOME}/status/has_init_fdb export HAS_CREATE_INSTANCE_FILE=$DORIS_HOME/status/has_create_instance export LOG_FILE=$DORIS_HOME/log/health.out export LOCK_FILE=$DORIS_HOME/status/token +export MY_TYPE_ID="${MY_TYPE}-${MY_ID}" health_log() { echo "$(date +'%Y-%m-%d %H:%M:%S') $@" | tee -a $LOG_FILE @@ -32,7 +34,7 @@ lock_cluster() { health_log "start acquire token" while true; do if [ -f $LOCK_FILE ]; then - if [ "a$(cat $LOCK_FILE)" == "a${MY_IP}" ]; then + if [ "a$(cat $LOCK_FILE)" == "a${MY_TYPE_ID}" ]; then health_log "rm $LOCK_FILE generate by myself" rm $LOCK_FILE continue @@ -57,12 +59,12 @@ lock_cluster() { fi if [ ! -f $LOCK_FILE ]; then - echo $MY_IP >$LOCK_FILE + echo ${MY_TYPE_ID} >$LOCK_FILE fi sleep 0.1 - if [ "a$(cat $LOCK_FILE)" == "a${MY_IP}" ]; then + if [ "a$(cat $LOCK_FILE)" == "a${MY_TYPE_ID}" ]; then break fi @@ -77,16 +79,18 @@ unlock_cluster() { return fi - if [ "a$(cat $LOCK_FILE)" == "a${MY_IP}" ]; then + if [ "a$(cat $LOCK_FILE)" == "a${MY_TYPE_ID}" ]; then rm $LOCK_FILE fi } wait_master_fe_ready() { while true; do - MASTER_FE_IP=$(cat $MASTER_FE_IP_FILE) - if [ -n "$MASTER_FE_IP" ]; then - health_log "master fe ${MASTER_FE_IP} has ready." + master_fe_query_addr=$(cat $MASTER_FE_QUERY_ADDR_FILE) + if [ -n "$master_fe_query_addr" ]; then + MASTER_FE_IP=$(echo ${master_fe_query_addr} | cut -d ":" -f 1) + MASTER_FE_PORT=$(echo ${master_fe_query_addr} | cut -d ":" -f 2) + health_log "master fe ${master_fe_query_addr} has ready." break fi health_log "master fe has not ready." diff --git a/docker/runtime/doris-compose/resource/init_be.sh b/docker/runtime/doris-compose/resource/init_be.sh index 08cc914f6af..e4ac48bda76 100755 --- a/docker/runtime/doris-compose/resource/init_be.sh +++ b/docker/runtime/doris-compose/resource/init_be.sh @@ -22,19 +22,19 @@ DIR=$( source $DIR/common.sh -REGISTER_FILE=$DORIS_HOME/status/be-$MY_IP-register +REGISTER_FILE="${DORIS_HOME}/status/be-${MY_ID}-register" add_local_be() { wait_master_fe_ready while true; do - #lsof -i:$BE_HEARTBEAT_PORT + #lsof -i:$MY_HEARTBEAT_PORT #if [ $? -ne 0 ]; then # sleep 1 # continue #fi - output=$(mysql -P $FE_QUERY_PORT -h $MASTER_FE_IP -u root --execute "ALTER SYSTEM ADD BACKEND '$MY_IP:$BE_HEARTBEAT_PORT';" 2>&1) + output=$(mysql -P $MASTER_FE_PORT -h $MASTER_FE_IP -u root --execute "ALTER SYSTEM ADD BACKEND '$MY_IP:$MY_HEARTBEAT_PORT';" 2>&1) res=$? health_log "$output" [ $res -eq 0 ] && break @@ -54,10 +54,9 @@ add_cloud_be() { return fi - cluster_file_name="${DORIS_HOME}/conf/CLUSTER_NAME" - cluster_name=$(cat $cluster_file_name) + cluster_name="${CLUSTER_NAME}" if [ -z $cluster_name ]; then - health_log "Empty cluster name, it should specific in file ${cluster_file_name}" + health_log "Empty cluster name, should specific with --be-cluster" exit 1 fi @@ -68,7 +67,7 @@ add_cloud_be() { nodes='{ "cloud_unique_id": "'"${CLOUD_UNIQUE_ID}"'", "ip": "'"${MY_IP}"'", - "heartbeat_port": "'"${BE_HEARTBEAT_PORT}"'" + "heartbeat_port": "'"${MY_HEARTBEAT_PORT}"'" }' lock_cluster diff --git a/docker/runtime/doris-compose/resource/init_cloud.sh b/docker/runtime/doris-compose/resource/init_cloud.sh index 18dfc4430e2..22883ab2e40 100644 --- a/docker/runtime/doris-compose/resource/init_cloud.sh +++ b/docker/runtime/doris-compose/resource/init_cloud.sh @@ -38,11 +38,18 @@ wait_fdb_ready() { } check_init_cloud() { + if [ "$MY_TYPE" != "ms" -o "$MY_ID" != "1" ]; then + return + fi + if [ -f $HAS_CREATE_INSTANCE_FILE ]; then return fi - if [ "$MY_TYPE" != "ms" -o "$MY_ID" != "1" ]; then + # Check if SQL_MODE_NODE_MGR is set + if [[ "$SQL_MODE_NODE_MGR" -eq 1 ]]; then + health_log "SQL_MODE_NODE_MGR is set, skipping create_instance" + touch $HAS_CREATE_INSTANCE_FILE return fi @@ -50,13 +57,6 @@ check_init_cloud() { lock_cluster - # Check if SQL_MODE_NODE_MGR is set - if [[ "$SQL_MODE_NODE_MGR" -eq 1 ]]; then - health_log "SQL_MODE_NODE_MGR is set, skipping create_instance" - touch $HAS_CREATE_INSTANCE_FILE - return - fi - output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/create_instance?token=greedisgood9999" \ -d '{"instance_id":"default_instance_id", "name": "default_instance", diff --git a/docker/runtime/doris-compose/resource/init_fe.sh b/docker/runtime/doris-compose/resource/init_fe.sh index a58723db1d7..022928cbce6 100755 --- a/docker/runtime/doris-compose/resource/init_fe.sh +++ b/docker/runtime/doris-compose/resource/init_fe.sh @@ -22,13 +22,14 @@ DIR=$( source $DIR/common.sh -REGISTER_FILE=$DORIS_HOME/status/fe-$MY_IP-register +REGISTER_FILE="${DORIS_HOME}/status/fe-${MY_ID}-register" +MASTER_EDITLOG_PORT="" add_local_fe() { wait_master_fe_ready while true; do - output=$(mysql -P $FE_QUERY_PORT -h $MASTER_FE_IP -u root --execute "ALTER SYSTEM ADD FOLLOWER '$MY_IP:$FE_EDITLOG_PORT';" 2>&1) + output=$(mysql -P $MASTER_FE_PORT -h $MASTER_FE_IP -u root --execute "ALTER SYSTEM ADD FOLLOWER '$MY_IP:$MY_EDITLOG_PORT';" 2>&1) res=$? health_log "${output}\n" [ $res -eq 0 ] && break @@ -43,10 +44,10 @@ fe_daemon() { set +e while true; do sleep 1 - output=$(mysql -P $FE_QUERY_PORT -h $MY_IP -u root --execute "SHOW FRONTENDS;") + output=$(mysql -P $MY_QUERY_PORT -h $MY_IP -u root --execute "SHOW FRONTENDS;") code=$? if [ $code -ne 0 ]; then - health_log "exec show frontends bad: $output" + health_log "daemon get frontends exec show frontends bad: $output" continue fi header=$(grep IsMaster <<<$output) @@ -56,10 +57,12 @@ fe_daemon() { fi host_index=-1 is_master_index=-1 + query_port_index=-1 i=1 for field in $header; do [[ "$field" = "Host" ]] && host_index=$i [[ "$field" = "IsMaster" ]] && is_master_index=$i + [[ "$field" = "QueryPort" ]] && query_port_index=$i ((i = i + 1)) done if [ $host_index -eq -1 ]; then @@ -70,12 +73,17 @@ fe_daemon() { health_log "header not found IsMaster" continue fi - echo "$output" | awk -v is_master="$is_master_index" -v host="$host_index" '{print $is_master $host}' | grep $MY_IP | grep true 2>&1 + if [ $query_port_index -eq -1 ]; then + health_log "header not found QueryPort" + continue + fi + echo "$output" | awk -v query_port="$query_port_index" -v is_master="$is_master_index" -v host="$host_index" '{print $query_port $is_master $host}' | grep $MY_QUERY_PORT | grep $MY_IP | grep true 2>&1 if [ $? -eq 0 ]; then - echo $MY_IP >$MASTER_FE_IP_FILE - if [ "$MASTER_FE_IP" != "$MY_IP" ]; then - health_log "change to master, last master is $MASTER_FE_IP" + echo ${MY_IP}:${MY_QUERY_PORT} >$MASTER_FE_QUERY_ADDR_FILE + if [ "$MASTER_FE_IP" != "$MY_IP" -o "${MASTER_FE_PORT}" != "${MY_QUERY_PORT}" ]; then + health_log "change to master, last master is ${MASTER_FE_IP}:${MASTER_FE_PORT}" MASTER_FE_IP=$MY_IP + MASTER_FE_PORT=$MY_QUERY_PORT fi fi done @@ -122,7 +130,7 @@ start_cloud_fe() { nodes='{ "cloud_unique_id": "'"${CLOUD_UNIQUE_ID}"'", "ip": "'"${MY_IP}"'", - "edit_log_port": "'"${FE_EDITLOG_PORT}"'", + "edit_log_port": "'"${MY_EDITLOG_PORT}"'", "node_type": "'"${node_type}"'" }' @@ -190,6 +198,23 @@ wait_process() { wait_pid $pid } +fetch_master_fe_editlog_port() { + set +e + while true; do + output=$(mysql -P $MY_QUERY_PORT -h $MASTER_FE_IP -u root -N -s --execute "select EditLogPort from frontends() where IsMaster = 'true';") + code=$? + if [ $code -ne 0 ]; then + health_log "get editlog port exec show frontends bad: $output" + sleep 1 + continue + fi + if [ -n "$output" ]; then + MASTER_EDITLOG_PORT=${output} + break + fi + done +} + start_local_fe() { if [ "$MY_ID" = "1" -a ! -f $REGISTER_FILE ]; then touch $REGISTER_FILE @@ -201,7 +226,8 @@ start_local_fe() { else add_local_fe fe_daemon & - run_fe --helper $MASTER_FE_IP:$FE_EDITLOG_PORT + fetch_master_fe_editlog_port + run_fe --helper $MASTER_FE_IP:$MASTER_EDITLOG_PORT fi } diff --git a/docker/runtime/doris-compose/utils.py b/docker/runtime/doris-compose/utils.py index dcb821ddffd..4ff87ef2229 100644 --- a/docker/runtime/doris-compose/utils.py +++ b/docker/runtime/doris-compose/utils.py @@ -285,12 +285,36 @@ def copy_image_directory(image, image_dir, local_dir): entrypoint="cp -r {} /opt/mount/".format(image_dir)) +def get_avail_port(): + with contextlib.closing(socket.socket(socket.AF_INET, + socket.SOCK_STREAM)) as sock: + sock.bind(("", 0)) + _, port = sock.getsockname() + return port + + def is_socket_avail(ip, port): with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: return sock.connect_ex((ip, port)) == 0 +def get_local_ip(): + with contextlib.closing(socket.socket(socket.AF_INET, + socket.SOCK_DGRAM)) as sock: + sock.settimeout(0) + # the ip no need reachable. + # sometime connect to the external network '10.255.255.255' throw exception 'Permissions denied', + # then change to connect to the local network '192.168.0.255' + for ip in (('10.255.255.255'), ('192.168.0.255')): + try: + sock.connect((ip, 1)) + return sock.getsockname()[0] + except Exception as e: + LOG.info(f"get local ip connect {ip} failed: {e}") + return '127.0.0.1' + + def enable_dir_with_rw_perm(dir): if not os.path.exists(dir): return diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 15802a02888..bf43a57fbf7 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -544,6 +544,7 @@ class SuiteCluster { } int START_WAIT_TIMEOUT = 120 + int STOP_WAIT_TIMEOUT = 60 // if not specific fe indices, then start all frontends void startFrontends(int... indices) { @@ -557,13 +558,13 @@ class SuiteCluster { // if not specific fe indices, then stop all frontends void stopFrontends(int... indices) { - runFrontendsCmd(60, 'stop', indices) + runFrontendsCmd(STOP_WAIT_TIMEOUT + 5, "stop --wait-timeout ${STOP_WAIT_TIMEOUT}".toString(), indices) waitHbChanged() } // if not specific be indices, then stop all backends void stopBackends(int... indices) { - runBackendsCmd(60, 'stop', indices) + runBackendsCmd(STOP_WAIT_TIMEOUT + 5, "stop --wait-timeout ${STOP_WAIT_TIMEOUT}".toString(), indices) waitHbChanged() } @@ -580,7 +581,7 @@ class SuiteCluster { // if not specific ms indices, then restart all ms void restartMs(int... indices) { runMsCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout ${START_WAIT_TIMEOUT}".toString(), indices) - } + } // if not specific recycler indices, then restart all recyclers void restartRecyclers(int... indices) { @@ -588,7 +589,7 @@ class SuiteCluster { } // if not specific fe indices, then drop all frontends - void dropFrontends(boolean clean=false, int... indices) { + void dropFrontends(boolean clean, int... indices) { def cmd = 'down' if (clean) { cmd += ' --clean' @@ -597,7 +598,7 @@ class SuiteCluster { } // if not specific be indices, then decommission all backends - void decommissionBackends(boolean clean=false, int... indices) { + void decommissionBackends(boolean clean, int... indices) { def cmd = 'down' if (clean) { cmd += ' --clean' @@ -606,7 +607,7 @@ class SuiteCluster { } // if not specific be indices, then drop force all backends - void dropForceBackends(boolean clean=false, int... indices) { + void dropForceBackends(boolean clean, int... indices) { def cmd = 'down --drop-force' if (clean) { cmd += ' --clean' diff --git a/regression-test/suites/clone_p0/test_decommission_mtmv.groovy b/regression-test/suites/clone_p0/test_decommission_mtmv.groovy index b29d5c13c94..3efd5bc4497 100644 --- a/regression-test/suites/clone_p0/test_decommission_mtmv.groovy +++ b/regression-test/suites/clone_p0/test_decommission_mtmv.groovy @@ -73,7 +73,7 @@ suite('test_decommission_mtmv', 'docker') { def decommissionBeIdx = 1 def decommissionBe = cluster.getBeByIndex(decommissionBeIdx) - cluster.decommissionBackends(decommissionBeIdx) + cluster.decommissionBackends(true, decommissionBeIdx) def backends = sql_return_maparray 'show backends' assertEquals(3, backends.size()) diff --git a/regression-test/suites/demo_p0/docker_action.groovy b/regression-test/suites/demo_p0/docker_action.groovy index d13c5d13e54..7bea52989b5 100644 --- a/regression-test/suites/demo_p0/docker_action.groovy +++ b/regression-test/suites/demo_p0/docker_action.groovy @@ -36,12 +36,15 @@ import org.apache.doris.regression.suite.ClusterOptions // then run docker suite, otherwise don't run docker suite. // NOTICE: -// 1. No need to use code ` if (isCloudMode()) { return } ` in docker suites, +// 1. Need add 'docker' to suite's group, and don't add 'nonConcurrent' to it; +// 2. In docker closure: +// a. Don't use 'Awaitility.await()...until(f)', but use 'dockerAwaitUntil(..., f)'; +// b. Don't use java Thread, but use regress framework's ThreadAction(see example demo_p0/thread_action.groovy); +// 3. No need to use code ` if (isCloudMode()) { return } ` in docker suites, // instead should use `ClusterOptions.cloudMode = true/false` is enough. // Because when run docker suite without an external doris cluster, if suite use code `isCloudMode()`, it need specific -runMode=cloud/not_cloud. // On the contrary, `ClusterOptions.cloudMode = true/false` no need specific -runMode=cloud/not_cloud when no external doris cluster exists. -// need add 'docker' to suite's group, and don't add 'nonConcurrent' to it suite('docker_action', 'docker') { // run a new docker docker { diff --git a/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy b/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy index d6c9c71539b..79b1d3c30d1 100644 --- a/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy +++ b/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy @@ -119,7 +119,7 @@ suite('test_min_load_replica_num_complicate', 'docker') { futures.add(thread { sql '''admin set frontend config ("disable_tablet_scheduler" = "false")''' - cluster.decommissionBackends(clean = true, originBackends.get(0).index) + cluster.decommissionBackends(true, originBackends.get(0).index) cluster.clearFrontendDebugPoints() cluster.clearBackendDebugPoints() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org