This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 1ac00ea9830 branch-2.1: [feat](doris compose) Copy lastest compose code from master branch (#43464) 1ac00ea9830 is described below commit 1ac00ea983004d6209c0ed68dba300ed11374c87 Author: yujun <yu...@selectdb.com> AuthorDate: Fri Nov 8 09:47:19 2024 +0800 branch-2.1: [feat](doris compose) Copy lastest compose code from master branch (#43464) Copy lastest code from master branch to support run docker suites without external doris cluster, enable jvm debug port, ..., etc. --- docker/runtime/doris-compose/Dockerfile | 23 +- docker/runtime/doris-compose/Readme.md | 58 +++- docker/runtime/doris-compose/cluster.py | 225 ++++++++++--- docker/runtime/doris-compose/command.py | 349 +++++++++++++++++---- docker/runtime/doris-compose/database.py | 171 ++++++---- docker/runtime/doris-compose/doris-compose.py | 9 +- .../{requirements.txt => format-code.sh} | 9 +- docker/runtime/doris-compose/requirements.txt | 2 + docker/runtime/doris-compose/resource/common.sh | 13 +- .../runtime/doris-compose/resource/entrypoint.sh | 68 ++++ docker/runtime/doris-compose/resource/init_be.sh | 8 +- .../runtime/doris-compose/resource/init_cloud.sh | 12 +- docker/runtime/doris-compose/resource/init_fe.sh | 43 ++- docker/runtime/doris-compose/utils.py | 23 +- .../org/apache/doris/regression/Config.groovy | 22 +- .../org/apache/doris/regression/suite/Suite.groovy | 4 +- .../doris/regression/suite/SuiteCluster.groovy | 232 +++++++++++--- .../suites/demo_p0/docker_action.groovy | 2 - 18 files changed, 999 insertions(+), 274 deletions(-) diff --git a/docker/runtime/doris-compose/Dockerfile b/docker/runtime/doris-compose/Dockerfile index 73561e6410e..c64b732fe34 100644 --- a/docker/runtime/doris-compose/Dockerfile +++ b/docker/runtime/doris-compose/Dockerfile @@ -29,16 +29,7 @@ ARG JDK_IMAGE=openjdk:17-jdk-slim FROM ${JDK_IMAGE} -RUN <<EOF - if [ -d "/usr/local/openjdk-17" ]; then - ln -s /usr/local/openjdk-17 /usr/local/openjdk - else \ - ln -s /usr/local/openjdk-8 /usr/local/openjdk - fi -EOF - # set environment variables -ENV JAVA_HOME="/usr/local/openjdk" ENV JACOCO_VERSION 0.8.8 RUN mkdir -p /opt/apache-doris/coverage @@ -47,7 +38,7 @@ RUN sed -i s@/deb.debian.org/@/mirrors.aliyun.com/@g /etc/apt/sources.list RUN apt-get clean RUN apt-get update && \ - apt-get install -y default-mysql-client python lsof tzdata curl unzip patchelf jq procps && \ + apt-get install -y default-mysql-client python lsof tzdata curl unzip patchelf jq procps util-linux gosu && \ ln -fs /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \ dpkg-reconfigure -f noninteractive tzdata && \ apt-get clean @@ -57,16 +48,14 @@ RUN curl -f https://repo1.maven.org/maven2/org/jacoco/jacoco/${JACOCO_VERSION}/j unzip jacoco.zip -d /jacoco # cloud -#COPY cloud/CMakeLists.txt cloud/output* output/ms* /opt/apache-doris/cloud/ -RUN <<EOF - mkdir /opt/apache-doris/fdb - if [ -d /opt/apache-doris/cloud/bin ]; then - sed -i 's/\<chmod\>/echo/g' /opt/apache-doris/cloud/bin/start.sh +# COPY --chmod=777 README.md cloud/output* output/ms* /opt/apache-doris/cloud/ +RUN mkdir /opt/apache-doris/fdb +RUN if [ -d /opt/apache-doris/cloud/bin ]; then \ + sed -i 's/\<chmod\>/echo/g' /opt/apache-doris/cloud/bin/start.sh ; \ fi -EOF # fe and be -COPY output /opt/apache-doris/ +COPY --chmod=777 output /opt/apache-doris/ # in docker, run 'chmod 755 doris_be' first time cost 1min, remove it. RUN sed -i 's/\<chmod\>/echo/g' /opt/apache-doris/be/bin/start_be.sh diff --git a/docker/runtime/doris-compose/Readme.md b/docker/runtime/doris-compose/Readme.md index a83fa81e761..c4c4dc0990f 100644 --- a/docker/runtime/doris-compose/Readme.md +++ b/docker/runtime/doris-compose/Readme.md @@ -23,7 +23,16 @@ Use doris compose to create doris docker compose clusters. ## Requirements -1. The doris image should contains: +##### 1. Make sure you have docker permissions + + run: +``` +docker run hello-world +``` + +if have problem with permission denied, then [add-docker-permission](https://docs.docker.com/engine/install/linux-postinstall/). + +##### 2. The doris image should contains ``` /opt/apache-doris/{fe, be, cloud} @@ -32,16 +41,14 @@ Use doris compose to create doris docker compose clusters. if don't create cloud cluster, the image no need to contains the cloud pkg. -if build doris use `sh build.sh --fe --be --cloud`, then its output satisfy with all above, then run command in doris root +if build doris use `sh build.sh --fe --be --cloud`, then its output satisfy with all above, then run command in doris root directory + will generate such a image. ``` docker build -f docker/runtime/doris-compose/Dockerfile -t <image> . ``` -will generate a image. - -2. Install the dependent python library in 'docker/runtime/doris-compose/requirements.txt' - +##### 3. Install the dependent python library in 'docker/runtime/doris-compose/requirements.txt' ``` python -m pip install --user -r docker/runtime/doris-compose/requirements.txt @@ -49,6 +56,20 @@ python -m pip install --user -r docker/runtime/doris-compose/requirements.txt ## Usage +### Notice + +Each cluster will have a directory in '/tmp/doris/{cluster-name}', user can set env LOCAL_DORIS_PATH to change its directory. + +For example, if user export LOCAL_DORIS_PATH=/mydoris, then the cluster's directory is '/mydoris/{cluster-name}'. + +And cluster's directory will contains all its containers's logs and data, like fe-1, fe-2, be-1, ..., etc. + +If there are multiple users run doris-compose on the same machine, suggest don't change LOCAL_DORIS_PATH or they should export the same LOCAL_DORIS_PATH. + +Because when create a new cluster, doris-compose will search the local doris path, and choose a docker network which is different with this path's clusters. + +So if multiple users use different LOCAL_DORIS_PATH, their clusters may have docker network conflict!!! + ### Create a cluster or recreate its containers ``` @@ -65,9 +86,11 @@ add fe/be nodes with the specific image, or update existing nodes with `--fe-id` For create a cloud cluster, steps are as below: + 1. Write cloud s3 store config file, its default path is '/tmp/doris/cloud.ini'. It's defined in environment variable DORIS_CLOUD_CFG_FILE, user can change this env var to change its path. A Example file is locate in 'docker/runtime/doris-compose/resource/cloud.ini.example'. + 2. Use doris compose up command with option '--cloud' to create a new cloud cluster. The simplest way to create a cloud cluster: @@ -127,7 +150,26 @@ Generate regression-conf-custom.groovy to connect to the specific docker cluster steps: -1. Create a new cluster: `python doris-compose.py up my-cluster my-image --add-fe-num 2 --add-be-num 4 --cloud` -2. Generate regression-conf-custom.groovy: `python doris-compose.py config my-cluster <doris-root-path> --connect-follow-fe` +1. Create a new cluster: `python docker/runtime/doris-compose/doris-compose.py up my-cluster my-image --add-fe-num 2 --add-be-num 4 --cloud` +2. Generate regression-conf-custom.groovy: `python docker/runtime/doris-compose/doris-compose.py config my-cluster <doris-root-path> --connect-follow-fe` 3. Run regression test: `bash run-regression-test.sh --run -times 1 -parallel 1 -suiteParallel 1 -d cloud/multi_cluster` +## Problem investigation + +#### Log + +Each cluster has logs in /tmp/doris/{cluster-name}/{node-xxx}/log. For each node, doris compose will also print log in /tmp/doris/{cluster-name}/{node-xxx}/log/health.out + +#### Up cluster using non-detach mode + +``` +python docker/runtime/doris-compose/doris-compose.py up ... -no-detach +``` + +## Developer + +Before submitting code, pls format code. + +``` +bash format-code.sh +``` diff --git a/docker/runtime/doris-compose/cluster.py b/docker/runtime/doris-compose/cluster.py index 5381c094cf2..ba834167bd1 100644 --- a/docker/runtime/doris-compose/cluster.py +++ b/docker/runtime/doris-compose/cluster.py @@ -15,8 +15,10 @@ # specific language governing permissions and limitations # under the License. +import configparser import filelock -import json +import getpass +import hashlib import jsonpickle import os import os.path @@ -24,7 +26,10 @@ import utils DOCKER_DORIS_PATH = "/opt/apache-doris" LOCAL_DORIS_PATH = os.getenv("LOCAL_DORIS_PATH", "/tmp/doris") -DORIS_SUBNET_START = int(os.getenv("DORIS_SUBNET_START", 128)) + +# an integer between 128 and 191, generally no need to set +DORIS_SUBNET_START = os.getenv("DORIS_SUBNET_START") + LOCAL_RESOURCE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "resource") DOCKER_RESOURCE_PATH = os.path.join(DOCKER_DORIS_PATH, "resource") @@ -35,6 +40,7 @@ FE_HTTP_PORT = 8030 FE_RPC_PORT = 9020 FE_QUERY_PORT = 9030 FE_EDITLOG_PORT = 9010 +FE_JAVA_DBG_PORT = 5005 BE_PORT = 9060 BE_WEBSVR_PORT = 8040 @@ -49,6 +55,8 @@ ID_LIMIT = 10000 IP_PART4_SIZE = 200 +CLUSTER_ID = "12345678" + LOG = utils.get_logger() @@ -56,6 +64,15 @@ def get_cluster_path(cluster_name): return os.path.join(LOCAL_DORIS_PATH, cluster_name) +def get_node_name(node_type, id): + return "{}-{}".format(node_type, id) + + +def get_node_path(cluster_name, node_type, id): + return os.path.join(get_cluster_path(cluster_name), + get_node_name(node_type, id)) + + def get_compose_file(cluster_name): return os.path.join(get_cluster_path(cluster_name), "docker-compose.yml") @@ -83,11 +100,39 @@ def gen_subnet_prefix16(): except: pass - for i in range(DORIS_SUBNET_START, 192): - for j in range(256): - subnet = "{}.{}".format(i, j) - if not used_subnet.get(subnet, False): - return subnet + subnet_begin = 128 + subnet_end = 192 + + subnet_part_1 = None + subnet_part_2 = None + if DORIS_SUBNET_START: + subnet_part_1 = int(DORIS_SUBNET_START) + subnet_part_2 = 0 + else: + m = hashlib.md5() + m.update(getpass.getuser().encode("utf-8")) + hash_val = int(m.hexdigest(), 16) + # want subnet part ii to be a small num, just less than 100, so don't use 256 here. + small_width = 100 + slot_num = (subnet_end - subnet_begin) * small_width + idx = hash_val % slot_num + if idx < 0: + idx += slot_num + subnet_part_1 = subnet_begin + int(idx / small_width) + subnet_part_2 = idx % small_width + + intervals = [ + [(subnet_part_1, subnet_part_1 + 1), (subnet_part_2, 256)], + [(subnet_part_1 + 1, subnet_end), (0, 256)], + [(subnet_begin, subnet_part_1), (0, 256)], + [(subnet_part_1, subnet_part_1 + 1), (0, subnet_part_2)], + ] + for interval in intervals: + for i in range(interval[0][0], interval[0][1]): + for j in range(interval[1][0], interval[1][1]): + subnet = "{}.{}".format(i, j) + if not used_subnet.get(subnet, False): + return subnet raise Exception("Failed to gen subnet") @@ -207,8 +252,6 @@ class Node(object): path = self.get_path() os.makedirs(path, exist_ok=True) - config = self.get_add_init_config() - # copy config to local conf_dir = os.path.join(path, "conf") if not os.path.exists(conf_dir) or utils.is_dir_empty(conf_dir): @@ -216,12 +259,15 @@ class Node(object): assert not utils.is_dir_empty(conf_dir), "conf directory {} is empty, " \ "check doris path in image is correct".format(conf_dir) utils.enable_dir_with_rw_perm(conf_dir) + config = self.get_add_init_config() if config: with open(os.path.join(conf_dir, self.conf_file_name()), "a") as f: f.write("\n") + f.write("#### start doris-compose add config ####\n\n") for item in config: f.write(item + "\n") + f.write("\n#### end doris-compose add config ####\n") for sub_dir in self.expose_sub_dirs(): os.makedirs(os.path.join(path, sub_dir), exist_ok=True) @@ -246,11 +292,10 @@ class Node(object): return ["conf", "log"] def get_name(self): - return "{}-{}".format(self.node_type(), self.id) + return get_node_name(self.node_type(), self.id) def get_path(self): - return os.path.join(get_cluster_path(self.cluster.name), - self.get_name()) + return get_node_path(self.cluster.name, self.node_type(), self.id) def get_image(self): return self.meta.image @@ -292,11 +337,18 @@ class Node(object): "DORIS_HOME": os.path.join(self.docker_home_dir()), "STOP_GRACE": 1 if enable_coverage else 0, "IS_CLOUD": 1 if self.cluster.is_cloud else 0, + "SQL_MODE_NODE_MGR": 1 if self.cluster.sql_mode_node_mgr else 0, } if self.cluster.is_cloud: envs["META_SERVICE_ENDPOINT"] = self.cluster.get_meta_server_addr() + # run as host user + if not getattr(self.cluster, 'is_root_user', True): + envs["HOST_USER"] = getpass.getuser() + envs["HOST_UID"] = os.getuid() + envs["HOST_GID"] = os.getgid() + if enable_coverage: outfile = "{}/coverage/{}-coverage-{}-{}".format( DOCKER_DORIS_PATH, self.node_type(), self.cluster.name, @@ -311,6 +363,15 @@ class Node(object): return envs + def entrypoint(self): + if self.start_script(): + return [ + "bash", + os.path.join(DOCKER_RESOURCE_PATH, "entrypoint.sh") + ] + self.start_script() + else: + return None + def get_add_init_config(self): return [] @@ -334,10 +395,16 @@ class Node(object): for path in ("/etc/localtime", "/etc/timezone", "/usr/share/zoneinfo") if os.path.exists(path) ] + if self.cluster.coverage_dir: volumes.append("{}:{}/coverage".format(self.cluster.coverage_dir, DOCKER_DORIS_PATH)) + extra_hosts = [ + "{}:{}".format(node.get_name(), node.get_ip()) + for node in self.cluster.get_all_nodes() + ] + content = { "cap_add": ["SYS_PTRACE"], "hostname": self.get_name(), @@ -349,6 +416,7 @@ class Node(object): "ipv4_address": self.get_ip(), } }, + "extra_hosts": extra_hosts, "ports": self.docker_ports(), "ulimits": { "core": -1 @@ -365,37 +433,74 @@ class Node(object): class FE(Node): + def init(self): + super().init() + self.init_is_follower() + def get_add_init_config(self): cfg = [] if self.cluster.fe_config: cfg += self.cluster.fe_config if self.cluster.is_cloud: cfg += [ - "cloud_unique_id = " + self.cloud_unique_id(), "meta_service_endpoint = {}".format( self.cluster.get_meta_server_addr()), "", "# For regression-test", "ignore_unsupported_properties_in_cloud_mode = true", "merge_on_write_forced_to_false = true", + "deploy_mode = cloud", ] + if self.cluster.sql_mode_node_mgr: + cfg += [ + "cluster_id = " + CLUSTER_ID, + ] + else: + cfg += [ + "cloud_unique_id = " + self.cloud_unique_id(), + ] + + with open("{}/conf/{}".format(self.get_path(), self.conf_file_name()), + "r") as f: + parser = configparser.ConfigParser() + parser.read_string('[dummy_section]\n' + f.read()) + for key in ("JAVA_OPTS", "JAVA_OPTS_FOR_JDK_17"): + value = parser["dummy_section"].get(key) + if value: + cfg.append( + f"{key} = \"{value} -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:{FE_JAVA_DBG_PORT}\"" + ) + return cfg + def init_is_follower(self): + if self.cluster.is_cloud and self.cluster.fe_follower: + with open(self._is_follower_path(), "w") as f: + f.write("true") + + def _is_follower_path(self): + return "{}/conf/is_follower".format(self.get_path()) + def docker_env(self): envs = super().docker_env() if self.cluster.is_cloud: envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id() + if os.path.exists(self._is_follower_path()): + envs["IS_FE_FOLLOWER"] = 1 return envs def cloud_unique_id(self): return "sql_server_{}".format(self.id) - def entrypoint(self): - return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_fe.sh")] + def start_script(self): + return ["init_fe.sh"] def docker_ports(self): - return [FE_HTTP_PORT, FE_EDITLOG_PORT, FE_RPC_PORT, FE_QUERY_PORT] + return [ + FE_HTTP_PORT, FE_EDITLOG_PORT, FE_RPC_PORT, FE_QUERY_PORT, + FE_JAVA_DBG_PORT + ] def docker_home_dir(self): return os.path.join(DOCKER_DORIS_PATH, "fe") @@ -421,14 +526,26 @@ class BE(Node): cfg += self.cluster.be_config if self.cluster.is_cloud: cfg += [ - "cloud_unique_id = " + self.cloud_unique_id(), - "meta_service_endpoint = {}".format( - self.cluster.get_meta_server_addr()), 'tmp_file_dirs = [ {"path":"./storage/tmp","max_cache_bytes":10240000, "max_upload_bytes":10240000}]', 'enable_file_cache = true', 'file_cache_path = [ {{"path": "{}/storage/file_cache", "total_size":53687091200, "query_limit": 10737418240}}]' .format(self.docker_home_dir()), + "deploy_mode = cloud", ] + + if self.cluster.be_metaservice_endpoint: + cfg += [ + "meta_service_endpoint = {}".format( + self.cluster.get_meta_server_addr()), + ] + if self.cluster.be_cluster_id: + cfg += [ + "cluster_id = " + CLUSTER_ID, + ] + if not self.cluster.sql_mode_node_mgr: + cfg += [ + "cloud_unique_id = " + self.cloud_unique_id(), + ] return cfg def init_cluster_name(self): @@ -477,8 +594,8 @@ class BE(Node): storage_root_path = ";".join(dir_descs) if dir_descs else '""' f.write("\nstorage_root_path = {}\n".format(storage_root_path)) - def entrypoint(self): - return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_be.sh")] + def start_script(self): + return ["init_be.sh"] def docker_env(self): envs = super().docker_env() @@ -529,12 +646,8 @@ class MS(CLOUD): cfg += self.cluster.ms_config return cfg - def entrypoint(self): - return [ - "bash", - os.path.join(DOCKER_RESOURCE_PATH, "init_cloud.sh"), - "--meta-service" - ] + def start_script(self): + return ["init_cloud.sh", "--meta-service"] def node_type(self): return Node.TYPE_MS @@ -554,11 +667,8 @@ class RECYCLE(CLOUD): cfg += self.cluster.recycle_config return cfg - def entrypoint(self): - return [ - "bash", - os.path.join(DOCKER_RESOURCE_PATH, "init_cloud.sh"), "--recycler" - ] + def start_script(self): + return ["init_cloud.sh", "--recycler"] def node_type(self): return Node.TYPE_RECYCLE @@ -579,8 +689,8 @@ class FDB(Node): with open(os.path.join(local_conf_dir, "fdb.cluster"), "w") as f: f.write(self.cluster.get_fdb_cluster()) - def entrypoint(self): - return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_fdb.sh")] + def start_script(self): + return ["init_fdb.sh"] def docker_home_dir(self): return os.path.join(DOCKER_DORIS_PATH, "fdb") @@ -597,17 +707,20 @@ class FDB(Node): class Cluster(object): - def __init__(self, name, subnet, image, is_cloud, fe_config, be_config, - ms_config, recycle_config, be_disks, be_cluster, reg_be, - coverage_dir, cloud_store_config): + def __init__(self, name, subnet, image, is_cloud, is_root_user, fe_config, + be_config, ms_config, recycle_config, fe_follower, be_disks, + be_cluster, reg_be, coverage_dir, cloud_store_config, + sql_mode_node_mgr, be_metaservice_endpoint, be_cluster_id): self.name = name self.subnet = subnet self.image = image self.is_cloud = is_cloud + self.is_root_user = is_root_user self.fe_config = fe_config self.be_config = be_config self.ms_config = ms_config self.recycle_config = recycle_config + self.fe_follower = fe_follower self.be_disks = be_disks self.be_cluster = be_cluster self.reg_be = reg_be @@ -617,18 +730,29 @@ class Cluster(object): node_type: Group(node_type) for node_type in Node.TYPE_ALL } + self.sql_mode_node_mgr = sql_mode_node_mgr + self.be_metaservice_endpoint = be_metaservice_endpoint + self.be_cluster_id = be_cluster_id @staticmethod - def new(name, image, is_cloud, fe_config, be_config, ms_config, - recycle_config, be_disks, be_cluster, reg_be, coverage_dir, - cloud_store_config): - os.makedirs(LOCAL_DORIS_PATH, exist_ok=True) - with filelock.FileLock(os.path.join(LOCAL_DORIS_PATH, "lock")): + def new(name, image, is_cloud, is_root_user, fe_config, be_config, + ms_config, recycle_config, fe_follower, be_disks, be_cluster, + reg_be, coverage_dir, cloud_store_config, sql_mode_node_mgr, + be_metaservice_endpoint, be_cluster_id): + if not os.path.exists(LOCAL_DORIS_PATH): + os.makedirs(LOCAL_DORIS_PATH, exist_ok=True) + os.chmod(LOCAL_DORIS_PATH, 0o777) + lock_file = os.path.join(LOCAL_DORIS_PATH, "lock") + with filelock.FileLock(lock_file): + if os.getuid() == utils.get_path_uid(lock_file): + os.chmod(lock_file, 0o666) subnet = gen_subnet_prefix16() - cluster = Cluster(name, subnet, image, is_cloud, fe_config, - be_config, ms_config, recycle_config, be_disks, - be_cluster, reg_be, coverage_dir, - cloud_store_config) + cluster = Cluster(name, subnet, image, is_cloud, is_root_user, + fe_config, be_config, ms_config, recycle_config, + fe_follower, be_disks, be_cluster, reg_be, + coverage_dir, cloud_store_config, + sql_mode_node_mgr, be_metaservice_endpoint, + be_cluster_id) os.makedirs(cluster.get_path(), exist_ok=True) os.makedirs(get_status_path(name), exist_ok=True) cluster._save_meta() @@ -686,7 +810,14 @@ class Cluster(object): raise Exception("No found {} with id {}".format(node_type, id)) return Node.new(self, node_type, id, meta) - def get_all_nodes(self, node_type): + def get_all_nodes(self, node_type=None): + if node_type is None: + nodes = [] + for nt, group in self.groups.items(): + for id, meta in group.get_all_nodes().items(): + nodes.append(Node.new(self, nt, id, meta)) + return nodes + group = self.groups.get(node_type, None) if not group: raise Exception("Unknown node_type: {}".format(node_type)) diff --git a/docker/runtime/doris-compose/command.py b/docker/runtime/doris-compose/command.py index 87ae862236a..7a2f3f3c195 100644 --- a/docker/runtime/doris-compose/command.py +++ b/docker/runtime/doris-compose/command.py @@ -30,6 +30,47 @@ import time LOG = utils.get_logger() +def wait_ready_service(wait_timeout, cluster, fe_ids, be_ids): + + def is_fe_ready_service(ip): + return utils.is_socket_avail(ip, CLUSTER.FE_QUERY_PORT) + + def is_be_ready_service(ip): + return utils.is_socket_avail(ip, CLUSTER.BE_HEARTBEAT_PORT) + + if wait_timeout == 0: + return + if wait_timeout == -1: + wait_timeout = 1000000000 + expire_ts = time.time() + wait_timeout + while True: + db_mgr = database.get_db_mgr(cluster.name, False) + dead_frontends = [] + for id in fe_ids: + fe = cluster.get_node(CLUSTER.Node.TYPE_FE, id) + fe_state = db_mgr.get_fe(id) + if not fe_state or not fe_state.alive or not is_fe_ready_service( + fe.get_ip()): + dead_frontends.append(id) + dead_backends = [] + for id in be_ids: + be = cluster.get_node(CLUSTER.Node.TYPE_BE, id) + be_state = db_mgr.get_be(id) + if not be_state or not be_state.alive or not is_be_ready_service( + be.get_ip()): + dead_backends.append(id) + if not dead_frontends and not dead_backends: + break + if time.time() >= expire_ts: + err = "" + if dead_frontends: + err += "dead fe: " + str(dead_frontends) + ". " + if dead_backends: + err += "dead be: " + str(dead_backends) + ". " + raise Exception(err) + time.sleep(1) + + # return for_all, related_nodes, related_node_num def get_ids_related_nodes(cluster, fe_ids, @@ -92,7 +133,12 @@ class Command(object): def run(self, args): raise Exception("No implemented") - def _add_parser_output_json(self, parser): + def _add_parser_common_args(self, parser): + parser.add_argument("-v", + "--verbose", + default=False, + action=self._get_parser_bool_action(True), + help="verbose logging.") parser.add_argument("--output-json", default=False, action=self._get_parser_bool_action(True), @@ -136,6 +182,18 @@ class Command(object): def _support_boolean_action(self): return sys.version_info.major == 3 and sys.version_info.minor >= 9 + def _print_table(self, header, datas): + if utils.is_enable_log(): + table = prettytable.PrettyTable( + [utils.render_green(field) for field in header]) + for row in datas: + table.add_row(row) + print(table) + return "" + else: + datas.insert(0, header) + return datas + class SimpleCommand(Command): @@ -150,11 +208,12 @@ class SimpleCommand(Command): parser = args_parsers.add_parser(self.command, help=help) parser.add_argument("NAME", help="Specify cluster name.") self._add_parser_ids_args(parser) - self._add_parser_output_json(parser) + self._add_parser_common_args(parser) + return parser def run(self, args): cluster = CLUSTER.Cluster.load(args.NAME) - _, related_nodes, related_node_num = get_ids_related_nodes( + for_all, related_nodes, related_node_num = get_ids_related_nodes( cluster, args.fe_id, args.be_id, args.ms_id, args.recycle_id, args.fdb_id) utils.exec_docker_compose_command(cluster.get_compose_file(), @@ -165,6 +224,31 @@ class SimpleCommand(Command): utils.render_green("{} succ, total related node num {}".format( show_cmd, related_node_num))) + if for_all: + related_nodes = cluster.get_all_nodes() + return cluster, related_nodes + + +class NeedStartCommand(SimpleCommand): + + def add_parser(self, args_parsers): + parser = super().add_parser(args_parsers) + parser.add_argument( + "--wait-timeout", + type=int, + default=0, + help= + "Specify wait seconds for fe/be ready for service: 0 not wait (default), "\ + "> 0 max wait seconds, -1 wait unlimited." + ) + return parser + + def run(self, args): + cluster, related_nodes = super().run(args) + fe_ids = [node.id for node in related_nodes if node.is_fe()] + be_ids = [node.id for node in related_nodes if node.is_be()] + wait_ready_service(args.wait_timeout, cluster, fe_ids, be_ids) + class UpCommand(Command): @@ -180,6 +264,7 @@ class UpCommand(Command): nargs="?", help="Specify docker image.") + self._add_parser_common_args(parser) parser.add_argument( "--cloud", default=False, @@ -187,6 +272,13 @@ class UpCommand(Command): help= "Create cloud cluster, default is false. Only use when creating new cluster." ) + parser.add_argument( + "--root", + default=False, + action=self._get_parser_bool_action(True), + help= + "Run cluster as root user, default is false, it will run as host user." + ) parser.add_argument( "--wait-timeout", @@ -197,8 +289,6 @@ class UpCommand(Command): "> 0 max wait seconds, -1 wait unlimited." ) - self._add_parser_output_json(parser) - group1 = parser.add_argument_group("add new nodes", "add cluster nodes.") group1.add_argument( @@ -245,6 +335,13 @@ class UpCommand(Command): type=str, help="Specify recycle configs for doris_cloud.conf. "\ "Example: --recycle-config \"log_level = warn\".") + group1.add_argument( + "--fe-follower", + default=False, + action=self._get_parser_bool_action(True), + help= + "The new added fe is follower but not observer. Only support in cloud mode." + ) group1.add_argument("--be-disks", nargs="*", default=["HDD=1"], @@ -283,13 +380,50 @@ class UpCommand(Command): group2.add_argument("--force-recreate", default=False, action=self._get_parser_bool_action(True), - help="Recreate containers even if their configuration" \ + help="Recreate containers even if their configuration " \ "and image haven't changed. ") parser.add_argument("--coverage-dir", default="", help="Set code coverage output directory") + parser.add_argument("--sql-mode-node-mgr", + default=False, + action=self._get_parser_bool_action(True), + help="Manager fe be via sql instead of http") + + if self._support_boolean_action(): + parser.add_argument( + "--be-metaservice-endpoint", + default=True, + action=self._get_parser_bool_action(False), + help= + "Do not set BE meta service endpoint in conf. Default is False." + ) + else: + parser.add_argument( + "--no-be-metaservice-endpoint", + dest='be_metaservice_endpoint', + default=True, + action=self._get_parser_bool_action(False), + help= + "Do not set BE meta service endpoint in conf. Default is False." + ) + + if self._support_boolean_action(): + parser.add_argument( + "--be-cluster-id", + default=True, + action=self._get_parser_bool_action(False), + help="Do not set BE cluster ID in conf. Default is False.") + else: + parser.add_argument( + "--no-be-cluster-id", + dest='be_cluster_id', + default=True, + action=self._get_parser_bool_action(False), + help="Do not set BE cluser ID in conf. Default is False.") + parser.add_argument( "--fdb-version", type=str, @@ -383,18 +517,21 @@ class UpCommand(Command): args.add_ms_num = 0 args.add_recycle_num = 0 - cluster = CLUSTER.Cluster.new(args.NAME, args.IMAGE, args.cloud, - args.fe_config, args.be_config, - args.ms_config, args.recycle_config, - args.be_disks, args.be_cluster, - args.reg_be, args.coverage_dir, - cloud_store_config) + cluster = CLUSTER.Cluster.new( + args.NAME, args.IMAGE, args.cloud, args.root, args.fe_config, + args.be_config, args.ms_config, args.recycle_config, + args.fe_follower, args.be_disks, args.be_cluster, args.reg_be, + args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr, + args.be_metaservice_endpoint, args.be_cluster_id) LOG.info("Create new cluster {} succ, cluster path is {}".format( args.NAME, cluster.get_path())) if args.be_cluster and cluster.is_cloud: cluster.be_cluster = args.be_cluster + if cluster.is_cloud: + cluster.fe_follower = args.fe_follower + _, related_nodes, _ = get_ids_related_nodes(cluster, args.fe_id, args.be_id, args.ms_id, args.recycle_id, @@ -474,32 +611,51 @@ class UpCommand(Command): "Not up cluster cause specific --no-start, related node num {}" .format(related_node_num))) else: - if args.wait_timeout != 0: - if args.wait_timeout == -1: - args.wait_timeout = 1000000000 - expire_ts = time.time() + args.wait_timeout - while True: - db_mgr = database.get_db_mgr(args.NAME, False) - dead_frontends = [] - for id in add_fe_ids: - fe_state = db_mgr.get_fe(id) - if not fe_state or not fe_state.alive: - dead_frontends.append(id) - dead_backends = [] - for id in add_be_ids: - be_state = db_mgr.get_be(id) - if not be_state or not be_state.alive: - dead_backends.append(id) - if not dead_frontends and not dead_backends: + LOG.info("Using SQL mode for node management ? {}".format( + args.sql_mode_node_mgr)) + + # Wait for FE master to be elected + LOG.info("Waiting for FE master to be elected...") + expire_ts = time.time() + 30 + while expire_ts > time.time(): + db_mgr = database.get_db_mgr(args.NAME, False) + for id in add_fe_ids: + fe_state = db_mgr.get_fe(id) + if fe_state is not None and fe_state.alive: break - if time.time() >= expire_ts: - err = "" - if dead_frontends: - err += "dead fe: " + str(dead_frontends) + ". " - if dead_backends: - err += "dead be: " + str(dead_backends) + ". " - raise Exception(err) - time.sleep(1) + LOG.info("there is no fe ready") + time.sleep(5) + + if cluster.is_cloud and args.sql_mode_node_mgr: + db_mgr = database.get_db_mgr(args.NAME, False) + master_fe_endpoint = CLUSTER.get_master_fe_endpoint( + cluster.name) + # Add FEs except master_fe + for fe in cluster.get_all_nodes(CLUSTER.Node.TYPE_FE): + fe_endpoint = f"{fe.get_ip()}:{CLUSTER.FE_EDITLOG_PORT}" + if fe_endpoint != master_fe_endpoint: + try: + db_mgr.add_fe(fe_endpoint) + LOG.info(f"Added FE {fe_endpoint} successfully.") + except Exception as e: + LOG.error( + f"Failed to add FE {fe_endpoint}: {str(e)}") + + # Add BEs + for be in cluster.get_all_nodes(CLUSTER.Node.TYPE_BE): + be_endpoint = f"{be.get_ip()}:{CLUSTER.BE_HEARTBEAT_PORT}" + try: + db_mgr.add_be(be_endpoint) + LOG.info(f"Added BE {be_endpoint} successfully.") + except Exception as e: + LOG.error(f"Failed to add BE {be_endpoint}: {str(e)}") + + cloud_store_config = self._get_cloud_store_config() + + db_mgr.create_default_storage_vault(cloud_store_config) + + wait_ready_service(args.wait_timeout, cluster, add_fe_ids, + add_be_ids) LOG.info( utils.render_green( "Up cluster {} succ, related node num {}".format( @@ -576,7 +732,7 @@ class DownCommand(Command): "then apply to all containers.") parser.add_argument("NAME", help="Specify cluster name") self._add_parser_ids_args(parser) - self._add_parser_output_json(parser) + self._add_parser_common_args(parser) parser.add_argument( "--clean", default=False, @@ -606,6 +762,8 @@ class DownCommand(Command): args.fdb_id, ignore_not_exists=True) + LOG.info("down cluster " + args.NAME + " for all " + str(for_all)) + if for_all: if os.path.exists(cluster.get_compose_file()): try: @@ -687,7 +845,6 @@ class ListNode(object): self.created = "" self.alive = "" self.is_master = "" - self.query_port = "" self.tablet_num = "" self.last_heartbeat = "" self.err_msg = "" @@ -702,16 +859,23 @@ class ListNode(object): if detail: query_port = "" http_port = "" + heartbeat_port = "" + edit_log_port = "" + node_path = CLUSTER.get_node_path(self.cluster_name, + self.node_type, self.id) if self.node_type == CLUSTER.Node.TYPE_FE: query_port = CLUSTER.FE_QUERY_PORT http_port = CLUSTER.FE_HTTP_PORT + edit_log_port = CLUSTER.FE_EDITLOG_PORT elif self.node_type == CLUSTER.Node.TYPE_BE: http_port = CLUSTER.BE_WEBSVR_PORT + heartbeat_port = CLUSTER.BE_HEARTBEAT_PORT + elif self.node_type == CLUSTER.Node.TYPE_MS or self.node_type == CLUSTER.Node.TYPE_RECYCLE: + http_port = CLUSTER.MS_PORT else: pass result += [ - query_port, - http_port, + query_port, http_port, node_path, edit_log_port, heartbeat_port ] return result @@ -721,7 +885,6 @@ class ListNode(object): if fe: self.alive = str(fe.alive).lower() self.is_master = str(fe.is_master).lower() - self.query_port = fe.query_port self.last_heartbeat = fe.last_heartbeat self.err_msg = fe.err_msg elif self.node_type == CLUSTER.Node.TYPE_BE: @@ -812,7 +975,16 @@ cloudUniqueId= "{fe_cloud_unique_id}" print("\nNo write regression custom file.") return + annotation_start = "//---------- Start auto generate by doris-compose.py---------" + annotation_end = "//---------- End auto generate by doris-compose.py---------" + + old_contents = [] + if os.path.exists(regression_conf_custom): + with open(regression_conf_custom, "r") as f: + old_contents = f.readlines() with open(regression_conf_custom, "w") as f: + # write auto gen config + f.write(annotation_start) f.write(base_conf.format(fe_ip=fe_ip)) if cluster.is_cloud: multi_cluster_bes = ",".join([ @@ -831,6 +1003,23 @@ cloudUniqueId= "{fe_cloud_unique_id}" multi_cluster_bes=multi_cluster_bes, fe_cloud_unique_id=cluster.get_node( CLUSTER.Node.TYPE_FE, 1).cloud_unique_id())) + f.write(annotation_end + "\n\n") + annotation_end_line_count = -1 + + # write not-auto gen config + in_annotation = False + annotation_end_line_idx = -100 + for line_idx, line in enumerate(old_contents): + line = line.rstrip() + if line == annotation_start: + in_annotation = True + elif line == annotation_end: + in_annotation = False + annotation_end_line_idx = line_idx + elif not in_annotation: + if line or line_idx != annotation_end_line_idx + 1: + f.write(line + "\n") + print("\nWrite succ: " + regression_conf_custom) @@ -845,24 +1034,12 @@ class ListCommand(Command): help= "Specify multiple clusters, if specific, show all their containers." ) - self._add_parser_output_json(parser) + self._add_parser_common_args(parser) parser.add_argument("--detail", default=False, action=self._get_parser_bool_action(True), help="Print more detail fields.") - def _handle_data(self, header, datas): - if utils.is_enable_log(): - table = prettytable.PrettyTable( - [utils.render_green(field) for field in header]) - for row in datas: - table.add_row(row) - print(table) - return "" - else: - datas.insert(0, header) - return datas - def run(self, args): COMPOSE_MISSING = "(missing)" COMPOSE_BAD = "(bad)" @@ -954,7 +1131,7 @@ class ListCommand(Command): CLUSTER.get_master_fe_endpoint(name), is_cloud, "{}{}".format(compose_file, cluster_info["status"]))) - return self._handle_data(header, rows) + return self._print_table(header, rows) header = [ "CLUSTER", "NAME", "IP", "STATUS", "CONTAINER ID", "IMAGE", @@ -965,6 +1142,9 @@ class ListCommand(Command): header += [ "query_port", "http_port", + "path", + "edit_log_port", + "heartbeat_port", ] rows = [] @@ -1038,17 +1218,68 @@ class ListCommand(Command): for node in sorted(nodes, key=get_node_seq): rows.append(node.info(args.detail)) - return self._handle_data(header, rows) + return self._print_table(header, rows) + + +class InfoCommand(Command): + + def add_parser(self, args_parsers): + parser = args_parsers.add_parser( + "info", help="Show info like cloud.ini, port, path, etc") + self._add_parser_common_args(parser) + + def run(self, args): + + header = ["key", "value", "scope"] + cloud_cfg_file_env = os.getenv("DORIS_CLOUD_CFG_FILE") + cloud_cfg_file = cloud_cfg_file_env if cloud_cfg_file_env else "${LOCAL_DORIS_PATH}/cloud.ini" + rows = [ + ("LOCAL_DORIS_PATH", CLUSTER.LOCAL_DORIS_PATH, "env variable"), + ("DORIS_CLOUD_CFG_FILE", cloud_cfg_file, "env variable"), + ("FE_QUERY_PORT", CLUSTER.FE_QUERY_PORT, "constant"), + ("FE_HTTP_PORT", CLUSTER.FE_HTTP_PORT, "constant"), + ("FE_EDITLOG_PORT", CLUSTER.FE_EDITLOG_PORT, "constant"), + ("FE_JAVA_DBG_PORT", CLUSTER.FE_JAVA_DBG_PORT, "constant"), + ("BE_HEARTBEAT_PORT", CLUSTER.BE_HEARTBEAT_PORT, "constant"), + ("BE_WEBSVR_PORT", CLUSTER.BE_WEBSVR_PORT, "constant"), + ("MS_PORT", CLUSTER.MS_PORT, "constant"), + ("RECYCLER_PORT", CLUSTER.MS_PORT, "constant"), + ] + + with open(CLUSTER.CLOUD_CFG_FILE, "r") as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + key, value = line.split('=', 1) + rows.append((key.strip(), value.strip(), "cloud.ini")) + + return self._print_table(header, rows) + + +class AddRWPermCommand(Command): + + def add_parser(self, args_parsers): + parser = args_parsers.add_parser( + "add-rw-perm", + help="Add read and write permissions to the cluster files") + parser.add_argument("NAME", help="Specify cluster name.") + self._add_parser_common_args(parser) + + def run(self, args): + utils.enable_dir_with_rw_perm(CLUSTER.get_cluster_path(args.NAME)) + return "" ALL_COMMANDS = [ UpCommand("up"), DownCommand("down"), - SimpleCommand("start", "Start the doris containers. "), + NeedStartCommand("start", "Start the doris containers. "), SimpleCommand("stop", "Stop the doris containers. "), - SimpleCommand("restart", "Restart the doris containers. "), + NeedStartCommand("restart", "Restart the doris containers. "), SimpleCommand("pause", "Pause the doris containers. "), SimpleCommand("unpause", "Unpause the doris containers. "), GenConfCommand("config"), + InfoCommand("info"), ListCommand("ls"), + AddRWPermCommand("add-rw-perm"), ] diff --git a/docker/runtime/doris-compose/database.py b/docker/runtime/doris-compose/database.py index d29905e94a9..46cdd961c9f 100644 --- a/docker/runtime/doris-compose/database.py +++ b/docker/runtime/doris-compose/database.py @@ -20,16 +20,15 @@ import os.path import pymysql import time import utils +import uuid LOG = utils.get_logger() class FEState(object): - def __init__(self, id, query_port, is_master, alive, last_heartbeat, - err_msg): + def __init__(self, id, is_master, alive, last_heartbeat, err_msg): self.id = id - self.query_port = query_port self.is_master = is_master self.alive = alive self.last_heartbeat = last_heartbeat @@ -54,11 +53,8 @@ class DBManager(object): def __init__(self): self.fe_states = {} self.be_states = {} - self.query_port = -1 self.conn = None - - def set_query_port(self, query_port): - self.query_port = query_port + self.master_fe_ip = "" def get_fe(self, id): return self.fe_states.get(id, None) @@ -66,10 +62,19 @@ class DBManager(object): def get_be(self, id): return self.be_states.get(id, None) - def load_states(self, query_ports): - self._load_fe_states(query_ports) + def load_states(self): + self._load_fe_states() self._load_be_states() + def add_fe(self, fe_endpoint): + try: + sql = f"ALTER SYSTEM ADD FOLLOWER '{fe_endpoint}'" + self._exec_query(sql) + LOG.info(f"Added FE {fe_endpoint} via SQL successfully.") + except Exception as e: + LOG.error(f"Failed to add FE {fe_endpoint} via SQL: {str(e)}") + raise + def drop_fe(self, fe_endpoint): id = CLUSTER.Node.get_id_from_ip(fe_endpoint[:fe_endpoint.find(":")]) try: @@ -85,6 +90,15 @@ class DBManager(object): return raise e + def add_be(self, be_endpoint): + try: + sql = f"ALTER SYSTEM ADD BACKEND '{be_endpoint}'" + self._exec_query(sql) + LOG.info(f"Added BE {be_endpoint} via SQL successfully.") + except Exception as e: + LOG.error(f"Failed to add BE {be_endpoint} via SQL: {str(e)}") + raise + def drop_be(self, be_endpoint): id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")]) try: @@ -140,103 +154,124 @@ class DBManager(object): time.sleep(5) - def _load_fe_states(self, query_ports): + def create_default_storage_vault(self, cloud_store_config): + try: + # Create storage vault + create_vault_sql = f""" + CREATE STORAGE VAULT IF NOT EXISTS default_vault + PROPERTIES ( + "type" = "S3", + "s3.access_key" = "{cloud_store_config['DORIS_CLOUD_AK']}", + "s3.secret_key" = "{cloud_store_config['DORIS_CLOUD_SK']}", + "s3.endpoint" = "{cloud_store_config['DORIS_CLOUD_ENDPOINT']}", + "s3.bucket" = "{cloud_store_config['DORIS_CLOUD_BUCKET']}", + "s3.region" = "{cloud_store_config['DORIS_CLOUD_REGION']}", + "s3.root.path" = "{str(uuid.uuid4())}", + "provider" = "{cloud_store_config['DORIS_CLOUD_PROVIDER']}" + ); + """ + self._exec_query(create_vault_sql) + LOG.info("Created storage vault 'default_vault'") + + # Set as default storage vault + set_default_vault_sql = "SET default_vault as DEFAULT STORAGE VAULT;" + self._exec_query(set_default_vault_sql) + LOG.info("Set 'default_vault' as the default storage vault") + + except Exception as e: + LOG.error(f"Failed to create default storage vault: {str(e)}") + raise + + def _load_fe_states(self): fe_states = {} - alive_master_fe_port = None - for record in self._exec_query(''' - select Host, IsMaster, Alive, LastHeartbeat, ErrMsg - from frontends()'''): - ip, is_master, alive, last_heartbeat, err_msg = record - is_master = utils.is_true(is_master) - alive = utils.is_true(alive) + alive_master_fe_ip = None + for record in self._exec_query("show frontends"): + name = record["Name"] + ip = record["Host"] + role = record["Role"] + is_master = utils.is_true(record["IsMaster"]) + alive = utils.is_true(record["Alive"]) id = CLUSTER.Node.get_id_from_ip(ip) - query_port = query_ports.get(id, "") - last_heartbeat = utils.escape_null(last_heartbeat) - fe = FEState(id, query_port, is_master, alive, last_heartbeat, - err_msg) + last_heartbeat = utils.escape_null(record["LastHeartbeat"]) + err_msg = record["ErrMsg"] + fe = FEState(id, is_master, alive, last_heartbeat, err_msg) fe_states[id] = fe - if is_master and alive and query_port: - alive_master_fe_port = query_port + if is_master and alive: + alive_master_fe_ip = ip + LOG.debug( + "record of show frontends, name {}, ip {}, alive {}, is_master {}, role {}" + .format(name, ip, alive, is_master, role)) + self.fe_states = fe_states - if alive_master_fe_port and alive_master_fe_port != self.query_port: - self.query_port = alive_master_fe_port + if alive_master_fe_ip and alive_master_fe_ip != self.master_fe_ip: + self.master_fe_ip = alive_master_fe_ip self._reset_conn() def _load_be_states(self): be_states = {} - for record in self._exec_query(''' - select BackendId, Host, LastHeartbeat, Alive, SystemDecommissioned, TabletNum, ErrMsg - from backends()'''): - backend_id, ip, last_heartbeat, alive, decommissioned, tablet_num, err_msg = record - backend_id = int(backend_id) - alive = utils.is_true(alive) - decommissioned = utils.is_true(decommissioned) - tablet_num = int(tablet_num) - id = CLUSTER.Node.get_id_from_ip(ip) - last_heartbeat = utils.escape_null(last_heartbeat) + for record in self._exec_query("show backends"): + backend_id = int(record["BackendId"]) + alive = utils.is_true(record["Alive"]) + decommissioned = utils.is_true(record["SystemDecommissioned"]) + tablet_num = int(record["TabletNum"]) + id = CLUSTER.Node.get_id_from_ip(record["Host"]) + last_heartbeat = utils.escape_null(record["LastHeartbeat"]) + err_msg = record["ErrMsg"] be = BEState(id, backend_id, decommissioned, alive, tablet_num, last_heartbeat, err_msg) be_states[id] = be self.be_states = be_states + # return rows, and each row is a record map def _exec_query(self, sql): self._prepare_conn() with self.conn.cursor() as cursor: cursor.execute(sql) - return cursor.fetchall() + fields = [field_md[0] for field_md in cursor.description + ] if cursor.description else [] + return [dict(zip(fields, row)) for row in cursor.fetchall()] def _prepare_conn(self): if self.conn: return - if self.query_port <= 0: - raise Exception("Not set query_port") self._reset_conn() def _reset_conn(self): self.conn = pymysql.connect(user="root", - host="127.0.0.1", + host=self.master_fe_ip, read_timeout=10, - port=self.query_port) + connect_timeout=3, + port=CLUSTER.FE_QUERY_PORT) def get_db_mgr(cluster_name, required_load_succ=True): assert cluster_name db_mgr = DBManager() - containers = utils.get_doris_containers(cluster_name).get( - cluster_name, None) - if not containers: + master_fe_ip_file = os.path.join(CLUSTER.get_status_path(cluster_name), + "master_fe_ip") + master_fe_ip = None + if os.path.exists(master_fe_ip_file): + with open(master_fe_ip_file, "r") as f: + master_fe_ip = f.read().strip() + + if not master_fe_ip: return db_mgr - alive_fe_ports = {} + + has_alive_fe = False + containers = utils.get_doris_containers(cluster_name).get(cluster_name, []) for container in containers: if utils.is_container_running(container): - _, node_type, id = utils.parse_service_name(container.name) + _, node_type, _ = utils.parse_service_name(container.name) if node_type == CLUSTER.Node.TYPE_FE: - query_port = utils.get_map_ports(container).get( - CLUSTER.FE_QUERY_PORT, None) - if query_port: - alive_fe_ports[id] = query_port - if not alive_fe_ports: + has_alive_fe = True + break + + if not has_alive_fe: return db_mgr - master_fe_ip_file = os.path.join(CLUSTER.get_status_path(cluster_name), - "master_fe_ip") - query_port = None - if os.path.exists(master_fe_ip_file): - with open(master_fe_ip_file, "r") as f: - master_fe_ip = f.read() - if master_fe_ip: - master_id = CLUSTER.Node.get_id_from_ip(master_fe_ip) - query_port = alive_fe_ports.get(master_id, None) - if not query_port: - # A new cluster's master is fe-1 - if 1 in alive_fe_ports: - query_port = alive_fe_ports[1] - else: - query_port = list(alive_fe_ports.values())[0] - - db_mgr.set_query_port(query_port) + db_mgr.master_fe_ip = master_fe_ip try: - db_mgr.load_states(alive_fe_ports) + db_mgr.load_states() except Exception as e: if required_load_succ: raise e diff --git a/docker/runtime/doris-compose/doris-compose.py b/docker/runtime/doris-compose/doris-compose.py index 0091b70eae9..a2d3a517553 100644 --- a/docker/runtime/doris-compose/doris-compose.py +++ b/docker/runtime/doris-compose/doris-compose.py @@ -45,6 +45,9 @@ def run(args, disable_log, help): if __name__ == '__main__': args, help = parse_args() + verbose = getattr(args, "verbose", False) + if verbose: + utils.set_log_verbose() disable_log = getattr(args, "output_json", False) if disable_log: utils.set_enable_log(False) @@ -53,13 +56,13 @@ if __name__ == '__main__': try: data = run(args, disable_log, help) if disable_log: - print(utils.pretty_json({"code": 0, "data": data})) + print(utils.pretty_json({"code": 0, "data": data}), flush=True) code = 0 except: err = traceback.format_exc() if disable_log: - print(utils.pretty_json({"code": 1, "err": err})) + print(utils.pretty_json({"code": 1, "err": err}), flush=True) else: - print(err) + print(err, flush=True) code = 1 sys.exit(code) diff --git a/docker/runtime/doris-compose/requirements.txt b/docker/runtime/doris-compose/format-code.sh similarity index 90% copy from docker/runtime/doris-compose/requirements.txt copy to docker/runtime/doris-compose/format-code.sh index ac177eddf82..0626662e641 100644 --- a/docker/runtime/doris-compose/requirements.txt +++ b/docker/runtime/doris-compose/format-code.sh @@ -15,10 +15,5 @@ # specific language governing permissions and limitations # under the License. -docker -docker-compose -filelock -jsonpickle -prettytable -pymysql -python-dateutil +yapf -i *.py +shfmt -w resource/*.sh diff --git a/docker/runtime/doris-compose/requirements.txt b/docker/runtime/doris-compose/requirements.txt index ac177eddf82..2f962ed68d8 100644 --- a/docker/runtime/doris-compose/requirements.txt +++ b/docker/runtime/doris-compose/requirements.txt @@ -22,3 +22,5 @@ jsonpickle prettytable pymysql python-dateutil +#pyyaml==5.4.1 +requests<=2.31.0 diff --git a/docker/runtime/doris-compose/resource/common.sh b/docker/runtime/doris-compose/resource/common.sh index de6ba29865a..40833d01dc6 100644 --- a/docker/runtime/doris-compose/resource/common.sh +++ b/docker/runtime/doris-compose/resource/common.sh @@ -23,7 +23,7 @@ export LOG_FILE=$DORIS_HOME/log/health.out export LOCK_FILE=$DORIS_HOME/status/token health_log() { - echo "$(date +'%Y-%m-%d %H:%M:%S') $@" >>$LOG_FILE + echo "$(date +'%Y-%m-%d %H:%M:%S') $@" | tee -a $LOG_FILE } # concurrent write meta service server will failed due to fdb txn conflict. @@ -120,10 +120,11 @@ wait_pid() { health_log "" health_log "ps -elf\n$(ps -elf)\n" if [ -z $pid ]; then - health_log "pid not exist" + health_log "pid $pid not exist" exit 1 fi + health_log "pid $pid exist" health_log "wait process $pid" while true; do ps -p $pid >/dev/null @@ -132,5 +133,13 @@ wait_pid() { fi sleep 1s done + + health_log "show dmesg -T: " + dmesg -T | tail -n 50 | tee -a $LOG_FILE + + health_log "show ps -elf" + health_log "ps -elf\n$(ps -elf)\n" + health_log "pid $pid not exist" + health_log "wait end" } diff --git a/docker/runtime/doris-compose/resource/entrypoint.sh b/docker/runtime/doris-compose/resource/entrypoint.sh new file mode 100644 index 00000000000..a3cdaaae8f1 --- /dev/null +++ b/docker/runtime/doris-compose/resource/entrypoint.sh @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +DIR=$( + cd $(dirname $0) + pwd +) + +source $DIR/common.sh + +RUN_USER=root + +create_host_user() { + if [ -z ${HOST_USER} ]; then + health_log "no specific run user, run as root" + return + fi + id ${HOST_USER} + if [ $? -eq 0 ]; then + health_log "contain user ${HOST_USER}, no create new user" + RUN_USER=${HOST_USER} + return + fi + id ${HOST_UID} + if [ $? -eq 0 ]; then + health_log "contain uid ${HOST_UID}, no create new user" + return + fi + addgroup --gid ${HOST_GID} ${HOST_USER} + if [ $? -eq 0 ]; then + health_log "create group ${HOST_USER} with gid ${HOST_GID} succ" + else + health_log "create group ${HOST_USER} with gid ${HOST_GID} failed" + return + fi + adduser --disabled-password --shell /bin/bash --gecos "" --uid ${HOST_UID} --gid ${HOST_GID} ${HOST_USER} + if [ $? -eq 0 ]; then + health_log "create user ${HOST_USER} with uid ${HOST_UID} succ" + RUN_USER=${HOST_USER} + else + health_log "create user ${HOST_USER} with uid ${HOST_UID} failed" + fi +} + +create_host_user + +if command -v gosu 2>&1 >/dev/null; then + if [ -f ${LOG_FILE} ]; then + chown ${RUN_USER}:${RUN_USER} ${LOG_FILE} + fi + gosu ${RUN_USER} bash ${DIR}/${1} ${@:2} +else + bash ${DIR}/${1} ${@:2} +fi diff --git a/docker/runtime/doris-compose/resource/init_be.sh b/docker/runtime/doris-compose/resource/init_be.sh index d9b7953b534..08cc914f6af 100755 --- a/docker/runtime/doris-compose/resource/init_be.sh +++ b/docker/runtime/doris-compose/resource/init_be.sh @@ -48,6 +48,12 @@ add_cloud_be() { return fi + # Check if SQL_MODE_NODE_MGR is set to 1 + if [ "$SQL_MODE_NODE_MGR" -eq 1 ]; then + health_log "SQL_MODE_NODE_MGR is set to 1, skipping cluster creation" + return + fi + cluster_file_name="${DORIS_HOME}/conf/CLUSTER_NAME" cluster_name=$(cat $cluster_file_name) if [ -z $cluster_name ]; then @@ -167,7 +173,7 @@ main() { add_be_to_cluster health_log "run start_be.sh" - bash $DORIS_HOME/bin/start_be.sh --daemon + bash $DORIS_HOME/bin/start_be.sh --daemon | tee -a $DORIS_HOME/log/be.out wait_process } diff --git a/docker/runtime/doris-compose/resource/init_cloud.sh b/docker/runtime/doris-compose/resource/init_cloud.sh index 78152b5330b..18dfc4430e2 100644 --- a/docker/runtime/doris-compose/resource/init_cloud.sh +++ b/docker/runtime/doris-compose/resource/init_cloud.sh @@ -50,6 +50,13 @@ check_init_cloud() { lock_cluster + # Check if SQL_MODE_NODE_MGR is set + if [[ "$SQL_MODE_NODE_MGR" -eq 1 ]]; then + health_log "SQL_MODE_NODE_MGR is set, skipping create_instance" + touch $HAS_CREATE_INSTANCE_FILE + return + fi + output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/create_instance?token=greedisgood9999" \ -d '{"instance_id":"default_instance_id", "name": "default_instance", @@ -109,9 +116,8 @@ main() { check_init_cloud & - health_log "input args: $ARGS" - - bash bin/start.sh $ARGS --daemon + health_log "run starts.sh with args: $ARGS" + bash bin/start.sh $ARGS --daemon | tee -a $DORIS_HOME/log/doris_cloud.out wait_process } diff --git a/docker/runtime/doris-compose/resource/init_fe.sh b/docker/runtime/doris-compose/resource/init_fe.sh index e532d0d56e1..b69ac3a209e 100755 --- a/docker/runtime/doris-compose/resource/init_fe.sh +++ b/docker/runtime/doris-compose/resource/init_fe.sh @@ -45,8 +45,8 @@ fe_daemon() { sleep 1 output=$(mysql -P $FE_QUERY_PORT -h $MY_IP -u root --execute "SHOW FRONTENDS;") code=$? - health_log "$output" if [ $code -ne 0 ]; then + health_log "exec show frontends bad: $output" continue fi header=$(grep IsMaster <<<$output) @@ -81,8 +81,30 @@ fe_daemon() { done } -add_cloud_fe() { +run_fe() { + health_log "run start_fe.sh" + bash $DORIS_HOME/bin/start_fe.sh --daemon $@ | tee -a $DORIS_HOME/log/fe.out +} + +start_cloud_fe() { if [ -f "$REGISTER_FILE" ]; then + fe_daemon & + run_fe + return + fi + + # Check if SQL_MODE_NODE_MGR is set to 1 + if [ "${SQL_MODE_NODE_MGR}" = "1" ]; then + health_log "SQL_MODE_NODE_MGR is set to 1. Skipping add FE." + + touch $REGISTER_FILE + + fe_daemon & + run_fe + + if [ "$MY_ID" == "1" ]; then + echo $MY_IP >$MASTER_FE_IP_FILE + fi return fi @@ -96,6 +118,10 @@ add_cloud_fe() { node_type=FE_OBSERVER fi + if [ "a$IS_FE_FOLLOWER" == "a1" ]; then + node_type=FE_FOLLOWER + fi + nodes='{ "cloud_unique_id": "'"${CLOUD_UNIQUE_ID}"'", "ip": "'"${MY_IP}"'", @@ -139,6 +165,10 @@ add_cloud_fe() { fi touch $REGISTER_FILE + + fe_daemon & + run_fe + if [ "$MY_ID" == "1" ]; then echo $MY_IP >$MASTER_FE_IP_FILE fi @@ -174,19 +204,14 @@ start_local_fe() { if [ -f $REGISTER_FILE ]; then fe_daemon & - bash $DORIS_HOME/bin/start_fe.sh --daemon + run_fe else add_local_fe fe_daemon & - bash $DORIS_HOME/bin/start_fe.sh --helper $MASTER_FE_IP:$FE_EDITLOG_PORT --daemon + run_fe --helper $MASTER_FE_IP:$FE_EDITLOG_PORT fi } -start_cloud_fe() { - add_cloud_fe - bash $DORIS_HOME/bin/start_fe.sh --daemon -} - main() { trap stop_frontend SIGTERM diff --git a/docker/runtime/doris-compose/utils.py b/docker/runtime/doris-compose/utils.py index 54255b597bc..4332ae6cf48 100644 --- a/docker/runtime/doris-compose/utils.py +++ b/docker/runtime/doris-compose/utils.py @@ -15,11 +15,13 @@ # specific language governing permissions and limitations # under the License. +import contextlib import docker -import json +import jsonpickle import logging import os import pwd +import socket import subprocess import time import yaml @@ -56,6 +58,10 @@ def is_enable_log(): return ENABLE_LOG +def set_log_verbose(): + get_logger().setLevel(logging.DEBUG) + + def get_logger(name=None): global LOG if LOG != None: @@ -274,6 +280,12 @@ def copy_image_directory(image, image_dir, local_dir): entrypoint="cp -r {} /opt/mount/".format(image_dir)) +def is_socket_avail(ip, port): + with contextlib.closing(socket.socket(socket.AF_INET, + socket.SOCK_STREAM)) as sock: + return sock.connect_ex((ip, port)) == 0 + + def enable_dir_with_rw_perm(dir): if not os.path.exists(dir): return @@ -291,6 +303,13 @@ def get_path_owner(path): return "" +def get_path_uid(path): + try: + return os.stat(path).st_uid + except: + return "" + + def read_compose_file(file): with open(file, "r") as f: return yaml.safe_load(f.read()) @@ -302,7 +321,7 @@ def write_compose_file(file, compose): def pretty_json(json_data): - return json.dumps(json_data, indent=4, sort_keys=True) + return jsonpickle.dumps(json_data, indent=4) def is_true(val): diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy index 0a440bade64..3675bbf4e31 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy @@ -308,8 +308,8 @@ class Config { Properties props = cmd.getOptionProperties("conf") config.otherConfigs.putAll(props) - config.tryCreateDbIfNotExist() - config.buildUrlWithDefaultDb() + // mainly auth_xxx cases use defaultDb, these suites better not use defaultDb + config.createDefaultDb() return config } @@ -630,6 +630,24 @@ class Config { return null } + void createDefaultDb() { + String dbName = null + try { + tryCreateDbIfNotExist(defaultDb) + dbName = defaultDb + } catch (Exception e) { + // defaultDb is not need for most cases. + // when run docker suites without external fe/be, createDefaultDb will fail, but can ignore this exception. + // Infact, only mainly auth_xxx cases use defaultDb, and they just use jdbcUrl in connect function. + // And they can avoid using defaultDb too. But modify all these cases take a lot work. + // We better delete all the usage of defaultDb in suites later, and all suites should use their own db, not the defaultDb. + log.warn("create default db failed ${defaultDb}".toString()) + } + + jdbcUrl = buildUrlWithDb(jdbcUrl, dbName) + log.info("Reset jdbcUrl to ${jdbcUrl}".toString()) + } + void tryCreateDbIfNotExist(String dbName = defaultDb) { // connect without specify default db try { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 6e392ef2ac4..54f2cdfc722 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -255,13 +255,13 @@ class Suite implements GroovyInterceptable { return } - boolean pipelineIsCloud = isCloudCluster() + boolean pipelineIsCloud = isCloudMode() boolean dockerIsCloud = false if (options.cloudMode == null) { dockerIsCloud = pipelineIsCloud } else { dockerIsCloud = options.cloudMode - if (dockerIsCloud != pipelineIsCloud && options.skipRunWhenPipelineDiff) { + if (dockerIsCloud != pipelineIsCloud) { return } } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 49bfbc18792..856b0e76956 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -17,8 +17,9 @@ package org.apache.doris.regression.suite import org.apache.doris.regression.Config -import org.apache.doris.regression.util.Http import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.Http +import org.apache.doris.regression.util.JdbcUtils import org.apache.doris.regression.util.NodeType import com.google.common.collect.Maps @@ -29,17 +30,29 @@ import groovy.json.JsonSlurper import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import java.util.stream.Collectors +import java.sql.Connection class ClusterOptions { int feNum = 1 int beNum = 3 + Boolean sqlModeNodeMgr = false + Boolean beMetaServiceEndpoint = true + Boolean beClusterId = false + + int waitTimeout = 180 + + // don't add whitespace in feConfigs items, + // for example, ' xx = yy ' is bad, should use 'xx=yy' List<String> feConfigs = [ 'heartbeat_interval_second=5', ] + // don't add whitespace in beConfigs items, + // for example, ' xx = yy ' is bad, should use 'xx=yy' List<String> beConfigs = [ + 'max_sys_mem_available_low_water_mark_bytes=0', //no check mem available memory 'report_disk_state_interval_seconds=2', 'report_random_wait=false', ] @@ -51,9 +64,11 @@ class ClusterOptions { // 3. cloudMode = null, create both cloud and none-cloud cluster, depend on the running pipeline mode. Boolean cloudMode = false - // when cloudMode = true/false, but the running pipeline is diff with cloudMode, - // skip run this docker test or not. - boolean skipRunWhenPipelineDiff = true + // in cloud mode, deployment methods are divided into + // 1. master - multi observers + // 2. mutli followers - multi observers + // default use 1 + Boolean useFollowersMode = false // each be disks, a disks format is: disk_type=disk_num[,disk_capacity] // here disk_type=HDD or SSD, disk capacity is in gb unit. @@ -95,12 +110,14 @@ class ServerNode { String host int httpPort boolean alive + String path static void fromCompose(ServerNode node, ListHeader header, int index, List<Object> fields) { node.index = index node.host = (String) fields.get(header.indexOf('IP')) node.httpPort = (Integer) fields.get(header.indexOf('http_port')) node.alive = fields.get(header.indexOf('alive')) == 'true' + node.path = (String) fields.get(header.indexOf('path')) } static long toLongOrDefault(Object val, long defValue) { @@ -130,10 +147,19 @@ class ServerNode { assert false : 'Unknown node type' } + String getLogFilePath() { + assert false : 'Unknown node type' + } + + String getConfFilePath() { + assert false : 'Unknown node type' + } + } class Frontend extends ServerNode { + int editLogPort int queryPort boolean isMaster @@ -141,6 +167,7 @@ class Frontend extends ServerNode { Frontend fe = new Frontend() ServerNode.fromCompose(fe, header, index, fields) fe.queryPort = (Integer) fields.get(header.indexOf('query_port')) + fe.editLogPort = (Integer) fields.get(header.indexOf('edit_log_port')) fe.isMaster = fields.get(header.indexOf('is_master')) == 'true' return fe } @@ -149,18 +176,29 @@ class Frontend extends ServerNode { return NodeType.FE } + String getLogFilePath() { + return path + '/log/fe.log' + } + + String getConfFilePath() { + return path + '/conf/fe.conf' + } + } class Backend extends ServerNode { + int heartbeatPort long backendId int tabletNum static Backend fromCompose(ListHeader header, int index, List<Object> fields) { Backend be = new Backend() ServerNode.fromCompose(be, header, index, fields) + be.heartbeatPort = (Integer) fields.get(header.indexOf('heartbeat_port')) be.backendId = toLongOrDefault(fields.get(header.indexOf('backend_id')), -1L) be.tabletNum = (int) toLongOrDefault(fields.get(header.indexOf('tablet_num')), 0L) + return be } @@ -168,6 +206,58 @@ class Backend extends ServerNode { return NodeType.BE } + String getLogFilePath() { + return path + '/log/be.INFO' + } + + String getConfFilePath() { + return path + '/conf/be.conf' + } + +} + +class MetaService extends ServerNode { + + static MetaService fromCompose(ListHeader header, int index, List<Object> fields) { + MetaService ms = new MetaService() + ServerNode.fromCompose(ms, header, index, fields) + return ms + } + + NodeType getNodeType() { + return NodeType.MS + } + + String getLogFilePath() { + return path + '/log/meta_service.INFO' + } + + String getConfFilePath() { + return path + '/conf/doris_cloud.conf' + } + +} + +class Recycler extends ServerNode { + + static Recycler fromCompose(ListHeader header, int index, List<Object> fields) { + Recycler rs = new Recycler() + ServerNode.fromCompose(rs, header, index, fields) + return rs + } + + NodeType getNodeType() { + return NodeType.RECYCLER + } + + String getLogFilePath() { + return path + '/log/recycler.INFO' + } + + String getConfFilePath() { + return path + '/conf/doris_cloud.conf' + } + } @Slf4j @@ -179,6 +269,8 @@ class SuiteCluster { final String name final Config config private boolean running + private boolean sqlModeNodeMgr = false + private boolean isCloudMode = false SuiteCluster(String name, Config config) { this.name = name @@ -191,6 +283,8 @@ class SuiteCluster { assert options.feNum > 0 || options.beNum > 0 assert config.image != null && config.image != '' + this.isCloudMode = isCloud + def cmd = [ 'up', name, config.image ] @@ -220,7 +314,24 @@ class SuiteCluster { if (isCloud) { cmd += ['--cloud'] } - cmd += ['--wait-timeout', String.valueOf(180)] + + if (isCloud && options.useFollowersMode) { + cmd += ['--fe-follower'] + } + + if (options.sqlModeNodeMgr) { + cmd += ['--sql-mode-node-mgr'] + } + if (!options.beMetaServiceEndpoint) { + cmd += ['--no-be-metaservice-endpoint'] + } + if (!options.beClusterId) { + cmd += ['--no-be-cluster-id'] + } + + cmd += ['--wait-timeout', String.valueOf(options.waitTimeout)] + + sqlModeNodeMgr = options.sqlModeNodeMgr runCmd(cmd.join(' '), -1) @@ -287,25 +398,44 @@ class SuiteCluster { return getBackends().stream().filter(be -> be.alive || !needAlive).collect(Collectors.toList()); } + List<MetaService> getAllMetaservices(boolean needAlive = false) { + return getMetaservices().stream().filter(ms -> ms.alive || !needAlive).collect(Collectors.toList()); + } + + List<MetaService> getAllRecyclers(boolean needAlive = false) { + return getRecyclers().stream().filter(rc -> rc.alive || !needAlive).collect(Collectors.toList()); + } + private List<Frontend> getFrontends() { - List<Frontend> frontends = [] - List<Backend> backends = [] - getAllNodes(frontends, backends) - return frontends + def ret = getAllNodes() + return ret.getV1() } private List<Backend> getBackends() { - List<Frontend> frontends = [] - List<Backend> backends = [] - getAllNodes(frontends, backends) - return backends + def ret = getAllNodes() + return ret.getV2() + } + + private List<MetaService> getMetaservices() { + def ret = getAllNodes() + return ret.getV3() } - private void getAllNodes(List<Frontend> frontends, List<Backend> backends) { + private List<Recycler> getRecyclers() { + def ret = getAllNodes() + return ret.getV4() + } + + private Tuple4<List<Frontend>, List<Backend>, List<MetaService>, List<Recycler>> getAllNodes() { + List<Frontend> frontends = [] + List<Backend> backends = [] + List<MetaService> metaservices = [] + List<Recycler> recyclers = [] def cmd = 'ls ' + name + ' --detail' def data = runCmd(cmd) assert data instanceof List def rows = (List<List<Object>>) data + logger.info('get all nodes {}', rows) def header = new ListHeader(rows.get(0)) for (int i = 1; i < rows.size(); i++) { def row = (List<Object>) rows.get(i) @@ -316,34 +446,49 @@ class SuiteCluster { } else if (name.startsWith('fe-')) { int index = name.substring('fe-'.length()) as int frontends.add(Frontend.fromCompose(header, index, row)) - } else if (name.startsWith('ms-') || name.startsWith('recycle-') || name.startsWith('fdb-')) { - // TODO: handle these nodes + } else if (name.startsWith('ms-')) { + int index = name.substring('ms-'.length()) as int + metaservices.add(MetaService.fromCompose(header, index, row)) + } else if (name.startsWith('recycle-')) { + int index = name.substring('recycle-'.length()) as int + recyclers.add(Recycler.fromCompose(header, index, row)) + } else if (name.startsWith('fdb-')) { + // current not used } else { assert false : 'Unknown node type with name: ' + name } } + return new Tuple4(frontends, backends, metaservices, recyclers) } - List<Integer> addFrontend(int num) throws Exception { - def result = add(num, 0) + List<Integer> addFrontend(int num, boolean followerMode=false) throws Exception { + def result = add(num, 0, null, followerMode) return result.first } - List<Integer> addBackend(int num) throws Exception { - def result = add(0, num) + List<Integer> addBackend(int num, String ClusterName='') throws Exception { + def result = add(0, num, ClusterName) return result.second } - Tuple2<List<Integer>, List<Integer>> add(int feNum, int beNum) throws Exception { + // ATTN: clusterName just used for cloud mode, 1 cluster has n bes + // ATTN: followerMode just used for cloud mode + Tuple2<List<Integer>, List<Integer>> add(int feNum, int beNum, String clusterName, boolean followerMode=false) throws Exception { assert feNum > 0 || beNum > 0 def sb = new StringBuilder() sb.append('up ' + name + ' ') if (feNum > 0) { sb.append('--add-fe-num ' + feNum + ' ') + if (followerMode) { + sb.append('--fe-follower' + ' ') + } } if (beNum > 0) { sb.append('--add-be-num ' + beNum + ' ') + if (clusterName != null && !clusterName.isEmpty()) { + sb.append(' --be-cluster ' + clusterName + ' ') + } } sb.append('--wait-timeout 60') @@ -373,40 +518,42 @@ class SuiteCluster { return running } + boolean isCloudMode() { + return this.isCloudMode + } + + int START_WAIT_TIMEOUT = 120 + // if not specific fe indices, then start all frontends void startFrontends(int... indices) { - runFrontendsCmd('start', indices) - waitHbChanged() + runFrontendsCmd(START_WAIT_TIMEOUT + 5, "start --wait-timeout ${START_WAIT_TIMEOUT}".toString(), indices) } // if not specific be indices, then start all backends void startBackends(int... indices) { - runBackendsCmd('start', indices) - waitHbChanged() + runBackendsCmd(START_WAIT_TIMEOUT + 5, "start --wait-timeout ${START_WAIT_TIMEOUT}".toString(), indices) } // if not specific fe indices, then stop all frontends void stopFrontends(int... indices) { - runFrontendsCmd('stop', indices) + runFrontendsCmd(60, 'stop', indices) waitHbChanged() } // if not specific be indices, then stop all backends void stopBackends(int... indices) { - runBackendsCmd('stop', indices) + runBackendsCmd(60, 'stop', indices) waitHbChanged() } // if not specific fe indices, then restart all frontends void restartFrontends(int... indices) { - runFrontendsCmd('restart', indices) - waitHbChanged() + runFrontendsCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout ${START_WAIT_TIMEOUT}".toString(), indices) } // if not specific be indices, then restart all backends void restartBackends(int... indices) { - runBackendsCmd('restart', indices) - waitHbChanged() + runBackendsCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout ${START_WAIT_TIMEOUT}".toString(), indices) } // if not specific fe indices, then drop all frontends @@ -415,7 +562,7 @@ class SuiteCluster { if (clean) { cmd += ' --clean' } - runFrontendsCmd(cmd, indices) + runFrontendsCmd(60, cmd, indices) } // if not specific be indices, then decommission all backends @@ -433,7 +580,7 @@ class SuiteCluster { if (clean) { cmd += ' --clean' } - runBackendsCmd(cmd, indices) + runBackendsCmd(60, cmd, indices) } void checkFeIsAlive(int index, boolean isAlive) { @@ -468,27 +615,28 @@ class SuiteCluster { } } + void addRWPermToAllFiles() { + def cmd = 'add-rw-perm ' + name + runCmd(cmd) + } + private void waitHbChanged() { // heart beat interval is 5s Thread.sleep(7000) } - private void runFrontendsCmd(String op, int... indices) { + private void runFrontendsCmd(int timeoutSecond, String op, int... indices) { def cmd = op + ' ' + name + ' --fe-id ' + indices.join(' ') - runCmd(cmd) + runCmd(cmd, timeoutSecond) } - private void runBackendsCmd(Integer timeoutSecond = null, String op, int... indices) { + private void runBackendsCmd(int timeoutSecond, String op, int... indices) { def cmd = op + ' ' + name + ' --be-id ' + indices.join(' ') - if (timeoutSecond == null) { - runCmd(cmd) - } else { - runCmd(cmd, timeoutSecond) - } + runCmd(cmd, timeoutSecond) } private Object runCmd(String cmd, int timeoutSecond = 60) throws Exception { - def fullCmd = String.format('python %s %s --output-json', config.dorisComposePath, cmd) + def fullCmd = String.format('python -W ignore %s %s --output-json', config.dorisComposePath, cmd) logger.info('Run doris compose cmd: {}', fullCmd) def proc = fullCmd.execute() def outBuf = new StringBuilder() diff --git a/regression-test/suites/demo_p0/docker_action.groovy b/regression-test/suites/demo_p0/docker_action.groovy index 6d62d6ea7be..d59c0f43774 100644 --- a/regression-test/suites/demo_p0/docker_action.groovy +++ b/regression-test/suites/demo_p0/docker_action.groovy @@ -55,8 +55,6 @@ suite('docker_action') { options2.beNum = 1 // create cloud cluster options2.cloudMode = true - //// cloud docker only run in cloud pipeline, but enable it run in none-cloud pipeline - // options2.skipRunWhenPipelineDiff = false // run another docker, create a cloud cluster docker(options2) { // cloud cluster will ignore replication_num, always set to 1. so create table succ even has 1 be. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org