This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d483293d98d [feat](doris compose) Add host mode for add remote fe (#48839) d483293d98d is described below commit d483293d98d3b677727e1590d352dd6484cd411b Author: yujun <yu...@selectdb.com> AuthorDate: Mon Mar 10 11:31:12 2025 +0800 [feat](doris compose) Add host mode for add remote fe (#48839) ### What problem does this PR solve? Add network host mode, in this mode container will use host ip and host ports. These ports are auto-pick, no need user to specify. Then remote fe can join into the cluster. But for simple reason, the host mode need to add a remote fe. We add an option --remote-master-fe(the remote fe address: ip:query_port). When user creating a new cluster, if special this option, the cluster will use host mode. --- docker/runtime/doris-compose/cluster.py | 325 ++++++++++------ docker/runtime/doris-compose/command.py | 407 ++++++++++++++------- docker/runtime/doris-compose/database.py | 114 ++++-- docker/runtime/doris-compose/resource/common.sh | 20 +- docker/runtime/doris-compose/resource/init_be.sh | 11 +- .../runtime/doris-compose/resource/init_cloud.sh | 16 +- docker/runtime/doris-compose/resource/init_fe.sh | 44 ++- docker/runtime/doris-compose/utils.py | 19 + 8 files changed, 650 insertions(+), 306 deletions(-) diff --git a/docker/runtime/doris-compose/cluster.py b/docker/runtime/doris-compose/cluster.py index d3b08d71226..a25abbf9c3c 100644 --- a/docker/runtime/doris-compose/cluster.py +++ b/docker/runtime/doris-compose/cluster.py @@ -82,6 +82,10 @@ def get_status_path(cluster_name): return os.path.join(get_cluster_path(cluster_name), "status") +def get_master_fe_addr_path(cluster_name): + return get_status_path(cluster_name) + "/master_fe_query_addr" + + def get_all_cluster_names(): if not os.path.exists(LOCAL_DORIS_PATH): return [] @@ -138,16 +142,16 @@ def gen_subnet_prefix16(): raise Exception("Failed to gen subnet") -def get_master_fe_endpoint(cluster_name, wait_master_fe_ip_file=False): +def get_master_fe_endpoint(cluster_name, wait_master_fe_query_addr_file=False): cluster_path = get_cluster_path(cluster_name) if os.path.exists(cluster_path): - master_fe_ip_file = "{}/status/master_fe_ip".format(cluster_path) - max_retries = 10 if wait_master_fe_ip_file else 0 + master_fe_query_addr_file = get_master_fe_addr_path(cluster_name) + max_retries = 10 if wait_master_fe_query_addr_file else 0 i = 0 while True: - if os.path.exists(master_fe_ip_file): - with open(master_fe_ip_file, "r") as f: - return "{}:{}".format(f.read().strip(), FE_QUERY_PORT) + if os.path.exists(master_fe_query_addr_file): + with open(master_fe_query_addr_file, "r") as f: + return f.read().strip() i += 1 if i < max_retries: time.sleep(1) @@ -156,8 +160,12 @@ def get_master_fe_endpoint(cluster_name, wait_master_fe_ip_file=False): try: cluster = Cluster.load(cluster_name) LOG.info("master file not exist, master ip get from node 1") - return "{}:{}".format( - cluster.get_node(Node.TYPE_FE, 1).get_ip(), FE_QUERY_PORT) + if cluster.is_host_network(): + return cluster.remote_master_fe + else: + master_fe = cluster.get_node(Node.TYPE_FE, 1) + return "{}:{}".format(master_fe.get_ip(), + master_fe.meta["ports"]["query_port"]) except: return "" @@ -180,12 +188,6 @@ def get_node_seq(node_type, id): return seq -class NodeMeta(object): - - def __init__(self, image): - self.image = image - - class Group(object): def __init__(self, node_type): @@ -194,7 +196,7 @@ class Group(object): self.next_id = 1 def add(self, id, node_meta): - assert node_meta.image + assert node_meta["image"] if not id: id = self.next_id self.next_id += 1 @@ -229,6 +231,15 @@ class Group(object): self.nodes = nodes +class NodeNetInfo(object): + + def __init__(self, type, id, ip, ports): + self.type = type + self.id = id + self.ip = ip + self.ports = ports + + class Node(object): TYPE_FE = "fe" TYPE_BE = "be" @@ -257,9 +268,19 @@ class Node(object): else: raise Exception("Unknown node type {}".format(node_type)) + # only run once at create the node, later restart or upgrade image will not run def init(self): + self.init_ports() self.init_conf() + def init_ports(self): + if self.cluster.is_host_network(): + self.meta["ports"] = dict( + (port_name, utils.get_avail_port()) + for port_name in self.get_default_named_ports().keys()) + else: + self.meta["ports"] = self.get_default_named_ports() + def init_conf(self): path = self.get_path() os.makedirs(path, exist_ok=True) @@ -310,15 +331,28 @@ class Node(object): return get_node_path(self.cluster.name, self.node_type(), self.id) def get_image(self): - return self.meta.image + return self.meta["image"] def set_image(self, image): - self.meta.image = image + self.meta["image"] = image def get_ip(self): - seq = get_node_seq(self.node_type(), self.id) - return "{}.{}.{}".format(self.cluster.subnet, int(seq / IP_PART4_SIZE), - seq % IP_PART4_SIZE) + if self.cluster.is_host_network(): + # this is a remote node + if self.meta.get("is_remote", False): + return self.cluster.remote_master_fe.split(":")[0] + else: + return self.cluster.local_network_ip + else: + seq = get_node_seq(self.node_type(), self.id) + return "{}.{}.{}".format(self.cluster.subnet, + int(seq / IP_PART4_SIZE), + seq % IP_PART4_SIZE) + + def get_default_named_ports(self): + # port_name : default_port + # the port_name come from fe.conf, be.conf, cloud.conf, etc + return {} @staticmethod def get_id_from_ip(ip): @@ -343,9 +377,6 @@ class Node(object): "MY_IP": self.get_ip(), "MY_ID": self.id, "MY_TYPE": self.node_type(), - "FE_QUERY_PORT": FE_QUERY_PORT, - "FE_EDITLOG_PORT": FE_EDITLOG_PORT, - "BE_HEARTBEAT_PORT": BE_HEARTBEAT_PORT, "DORIS_HOME": os.path.join(self.docker_home_dir()), "STOP_GRACE": 1 if enable_coverage else 0, "IS_CLOUD": 1 if self.cluster.is_cloud else 0, @@ -356,7 +387,7 @@ class Node(object): envs["META_SERVICE_ENDPOINT"] = self.cluster.get_meta_server_addr() # run as host user - if not getattr(self.cluster, 'is_root_user', True): + if not self.cluster.is_root_user: envs["HOST_USER"] = getpass.getuser() envs["HOST_UID"] = os.getuid() envs["HOST_GID"] = os.getgid() @@ -385,10 +416,17 @@ class Node(object): return None def get_add_init_config(self): - return [] + cfg = [] + if self.cluster.is_host_network(): + cfg.append(f"priority_networks = {self.cluster.local_network_ip}") + cfg += [ + f"{port_name} = {port}" + for port_name, port in self.meta["ports"].items() + ] + return cfg def docker_ports(self): - raise Exception("No implemented") + return list(self.get_default_named_ports().values()) def docker_home_dir(self): raise Exception("No implemented") @@ -412,24 +450,11 @@ class Node(object): volumes.append("{}:{}/coverage".format(self.cluster.coverage_dir, DOCKER_DORIS_PATH)) - extra_hosts = [ - "{}:{}".format(node.get_name(), node.get_ip()) - for node in self.cluster.get_all_nodes() - ] - content = { "cap_add": ["SYS_PTRACE"], - "hostname": self.get_name(), "container_name": self.service_name(), "environment": self.docker_env(), "image": self.get_image(), - "networks": { - utils.with_doris_prefix(self.cluster.name): { - "ipv4_address": self.get_ip(), - } - }, - "extra_hosts": extra_hosts, - "ports": self.docker_ports(), "ulimits": { "core": -1 }, @@ -437,6 +462,22 @@ class Node(object): "volumes": volumes, } + if self.cluster.is_host_network(): + content["network_mode"] = "host" + content["ports"] = self.docker_ports() + else: + content["hostname"] = self.get_name() + content["networks"] = { + utils.with_doris_prefix(self.cluster.name): { + "ipv4_address": self.get_ip(), + } + } + content["extra_hosts"] = [ + "{}:{}".format(node.get_name(), node.get_ip()) + for node in self.cluster.get_all_nodes() + ] + content["ports"] = self.docker_ports() + if self.entrypoint(): content["entrypoint"] = self.entrypoint() @@ -446,21 +487,19 @@ class Node(object): class FE(Node): def init(self): + # for cloud mode, fe is follower or observer, default is observer + self.meta[ + "is_cloud_follower"] = self.cluster.is_cloud and self.cluster.fe_follower super().init() - self.init_is_follower() def get_add_init_config(self): - cfg = [] + cfg = super().get_add_init_config() if self.cluster.fe_config: cfg += self.cluster.fe_config if self.cluster.is_cloud: cfg += [ "meta_service_endpoint = {}".format( self.cluster.get_meta_server_addr()), - "", - "# For regression-test", - "ignore_unsupported_properties_in_cloud_mode = true", - "merge_on_write_forced_to_false = true", "deploy_mode = cloud", ] @@ -475,46 +514,53 @@ class FE(Node): with open("{}/conf/{}".format(self.get_path(), self.conf_file_name()), "r") as f: + java_debug_port = self.meta["ports"]["java_debug_port"] parser = configparser.ConfigParser() parser.read_string('[dummy_section]\n' + f.read()) + section = parser['dummy_section'] for key in ("JAVA_OPTS", "JAVA_OPTS_FOR_JDK_17"): - value = parser["dummy_section"].get(key) + value = section.get(key) if value: value = value.strip().strip('"') - cfg.append( - f"{key} = \"{value} -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:{FE_JAVA_DBG_PORT}\"" - ) + if key == "JAVA_OPTS": + # java 8 + cfg.append( + f"{key} = \"{value} -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address={java_debug_port}\"" + ) + else: + # JAVA_OPTS_FOR_JDK_17 + # >= java 9 + cfg.append( + f"{key} = \"{value} -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:{java_debug_port}\"" + ) return cfg - def init_is_follower(self): - if self.cluster.is_cloud and self.cluster.fe_follower: - with open(self._is_follower_path(), "w") as f: - f.write("true") - - def _is_follower_path(self): - return "{}/conf/is_follower".format(self.get_path()) - def docker_env(self): envs = super().docker_env() if self.cluster.is_cloud: envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id() - if os.path.exists(self._is_follower_path()): - envs["IS_FE_FOLLOWER"] = 1 + if self.meta["is_cloud_follower"]: + envs["is_fe_follower"] = 1 + envs["MY_QUERY_PORT"] = self.meta["ports"]["query_port"] + envs["MY_EDITLOG_PORT"] = self.meta["ports"]["edit_log_port"] return envs + def get_default_named_ports(self): + return { + "http_port": FE_HTTP_PORT, + "rpc_port": FE_RPC_PORT, + "query_port": FE_QUERY_PORT, + "edit_log_port": FE_EDITLOG_PORT, + "java_debug_port": FE_JAVA_DBG_PORT, + } + def cloud_unique_id(self): return "sql_server_{}".format(self.id) def start_script(self): return ["init_fe.sh"] - def docker_ports(self): - return [ - FE_HTTP_PORT, FE_EDITLOG_PORT, FE_RPC_PORT, FE_QUERY_PORT, - FE_JAVA_DBG_PORT - ] - def docker_home_dir(self): return os.path.join(DOCKER_DORIS_PATH, "fe") @@ -530,11 +576,11 @@ class BE(Node): def init(self): super().init() if self.cluster.is_cloud: - self.init_cluster_name() + self.meta["cluster_name"] = self.cluster.be_cluster self.init_disk(self.cluster.be_disks) def get_add_init_config(self): - cfg = [] + cfg = super().get_add_init_config() if self.cluster.be_config: cfg += self.cluster.be_config if self.cluster.is_cloud: @@ -561,14 +607,6 @@ class BE(Node): ] return cfg - def init_cluster_name(self): - with open("{}/conf/CLUSTER_NAME".format(self.get_path()), "w") as f: - f.write(self.cluster.be_cluster) - - def get_cluster_name(self): - with open("{}/conf/CLUSTER_NAME".format(self.get_path()), "r") as f: - return f.read().strip() - def init_disk(self, be_disks): path = self.get_path() dirs = [] @@ -612,20 +650,28 @@ class BE(Node): def docker_env(self): envs = super().docker_env() + envs["MY_HEARTBEAT_PORT"] = self.meta["ports"][ + "heartbeat_service_port"] if self.cluster.is_cloud: envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id() envs["REG_BE_TO_MS"] = 1 if self.cluster.reg_be else 0 + envs["CLUSTER_NAME"] = self.meta["cluster_name"] return envs + def get_default_named_ports(self): + return { + "be_port": BE_PORT, + "webserver_port": BE_WEBSVR_PORT, + "heartbeat_service_port": BE_HEARTBEAT_PORT, + "brpc_port": BE_BRPC_PORT, + } + def cloud_unique_id(self): return "compute_node_{}".format(self.id) def docker_home_dir(self): return os.path.join(DOCKER_DORIS_PATH, "be") - def docker_ports(self): - return [BE_WEBSVR_PORT, BE_BRPC_PORT, BE_HEARTBEAT_PORT, BE_PORT] - def node_type(self): return Node.TYPE_BE @@ -636,13 +682,17 @@ class BE(Node): class CLOUD(Node): def get_add_init_config(self): - return ["fdb_cluster = " + self.cluster.get_fdb_cluster()] + cfg = super().get_add_init_config() + cfg.append("fdb_cluster = " + self.cluster.get_fdb_cluster()) + return cfg def docker_home_dir(self): return os.path.join(DOCKER_DORIS_PATH, "cloud") - def docker_ports(self): - return [MS_PORT] + def get_default_named_ports(self): + return { + "brpc_listen_port": MS_PORT, + } def conf_file_name(self): for file in os.listdir(os.path.join(self.get_path(), "conf")): @@ -689,13 +739,17 @@ class RECYCLE(CLOUD): class FDB(Node): + def get_add_init_config(self): + return [] + def copy_conf_to_local(self, local_conf_dir): os.makedirs(local_conf_dir, exist_ok=True) with open(os.path.join(LOCAL_RESOURCE_PATH, "fdb.conf"), "r") as read_file: with open(os.path.join(local_conf_dir, self.conf_file_name()), "w") as f: - publish_addr = "{}:{}".format(self.get_ip(), FDB_PORT) + publish_addr = "{}:{}".format(self.get_ip(), + self.meta["ports"]["fdb_port"]) f.write(read_file.read().replace("${PUBLISH-ADDRESS}", publish_addr)) @@ -708,8 +762,8 @@ class FDB(Node): def docker_home_dir(self): return os.path.join(DOCKER_DORIS_PATH, "fdb") - def docker_ports(self): - return [FDB_PORT] + def get_default_named_ports(self): + return {"fdb_port": FDB_PORT} def node_type(self): return Node.TYPE_FDB @@ -721,9 +775,10 @@ class FDB(Node): class Cluster(object): def __init__(self, name, subnet, image, is_cloud, is_root_user, fe_config, - be_config, ms_config, recycle_config, fe_follower, be_disks, - be_cluster, reg_be, coverage_dir, cloud_store_config, - sql_mode_node_mgr, be_metaservice_endpoint, be_cluster_id): + be_config, ms_config, recycle_config, remote_master_fe, + local_network_ip, fe_follower, be_disks, be_cluster, reg_be, + coverage_dir, cloud_store_config, sql_mode_node_mgr, + be_metaservice_endpoint, be_cluster_id): self.name = name self.subnet = subnet self.image = image @@ -733,6 +788,8 @@ class Cluster(object): self.be_config = be_config self.ms_config = ms_config self.recycle_config = recycle_config + self.remote_master_fe = remote_master_fe + self.local_network_ip = local_network_ip self.fe_follower = fe_follower self.be_disks = be_disks self.be_cluster = be_cluster @@ -743,15 +800,19 @@ class Cluster(object): node_type: Group(node_type) for node_type in Node.TYPE_ALL } + if self.remote_master_fe: + # preserve fe id = 1 for the remote master:fe + self.groups[Node.TYPE_FE].next_id = 2 self.sql_mode_node_mgr = sql_mode_node_mgr self.be_metaservice_endpoint = be_metaservice_endpoint self.be_cluster_id = be_cluster_id @staticmethod def new(name, image, is_cloud, is_root_user, fe_config, be_config, - ms_config, recycle_config, fe_follower, be_disks, be_cluster, - reg_be, coverage_dir, cloud_store_config, sql_mode_node_mgr, - be_metaservice_endpoint, be_cluster_id): + ms_config, recycle_config, remote_master_fe, local_network_ip, + fe_follower, be_disks, be_cluster, reg_be, coverage_dir, + cloud_store_config, sql_mode_node_mgr, be_metaservice_endpoint, + be_cluster_id): if not os.path.exists(LOCAL_DORIS_PATH): os.makedirs(LOCAL_DORIS_PATH, exist_ok=True) os.chmod(LOCAL_DORIS_PATH, 0o777) @@ -759,13 +820,13 @@ class Cluster(object): with filelock.FileLock(lock_file): if os.getuid() == utils.get_path_uid(lock_file): os.chmod(lock_file, 0o666) - subnet = gen_subnet_prefix16() + subnet = gen_subnet_prefix16() if not remote_master_fe else "" cluster = Cluster(name, subnet, image, is_cloud, is_root_user, fe_config, be_config, ms_config, recycle_config, - fe_follower, be_disks, be_cluster, reg_be, - coverage_dir, cloud_store_config, - sql_mode_node_mgr, be_metaservice_endpoint, - be_cluster_id) + remote_master_fe, local_network_ip, fe_follower, + be_disks, be_cluster, reg_be, coverage_dir, + cloud_store_config, sql_mode_node_mgr, + be_metaservice_endpoint, be_cluster_id) os.makedirs(cluster.get_path(), exist_ok=True) os.makedirs(get_status_path(name), exist_ok=True) cluster._save_meta() @@ -795,6 +856,21 @@ class Cluster(object): def _get_meta_file(name): return os.path.join(get_cluster_path(name), "meta") + def is_host_network(self): + return getattr(self, "remote_master_fe", "") + + def get_remote_fe_node(self): + if not self.is_host_network(): + return None + meta = { + "image": "", + "is_remote": True, + "ports": { + "query_port": int(self.remote_master_fe.split(":")[1]) + }, + } + return Node.new(self, Node.TYPE_FE, 1, meta) + def get_image(self): return self.image @@ -805,7 +881,7 @@ class Cluster(object): if node_type == Node.TYPE_FDB: continue for _, node_meta in group.nodes.items(): - node_meta.image = image + node_meta["image"] = image def get_path(self): return get_cluster_path(self.name) @@ -816,28 +892,48 @@ class Cluster(object): raise Exception("Unknown node_type: {}".format(node_type)) return group - def get_node(self, node_type, id): + def get_all_node_net_infos(self): + return [ + NodeNetInfo(node.node_type(), node.id, node.get_ip(), + node.meta["ports"]) + for node in self.get_all_nodes(None, True) + ] + + def get_node(self, node_type, id, include_remote=False): group = self.get_group(node_type) meta = group.get_node(id) if not meta: + if include_remote and node_type == Node.TYPE_FE: + node = self.get_remote_fe_node() + if node and node.id == id: + return node raise Exception("No found {} with id {}".format(node_type, id)) return Node.new(self, node_type, id, meta) - def get_all_nodes(self, node_type=None): + def get_all_nodes(self, node_type=None, include_remote=False): if node_type is None: nodes = [] for nt, group in self.groups.items(): for id, meta in group.get_all_nodes().items(): nodes.append(Node.new(self, nt, id, meta)) + if include_remote: + node = self.get_remote_fe_node() + if node: + nodes.append(node) return nodes group = self.groups.get(node_type, None) if not group: raise Exception("Unknown node_type: {}".format(node_type)) - return [ + nodes = [ Node.new(self, node_type, id, meta) for id, meta in group.get_all_nodes().items() ] + if include_remote: + node = self.get_remote_fe_node() + if node: + nodes.append(node) + return nodes def get_all_nodes_num(self): num = 0 @@ -846,7 +942,8 @@ class Cluster(object): return num def add(self, node_type, id=None): - node_meta = NodeMeta(self.image) + node_meta = {} + node_meta["image"] = self.image id = self.get_group(node_type).add(id, node_meta) node = self.get_node(node_type, id) if not os.path.exists(node.get_path()): @@ -855,15 +952,19 @@ class Cluster(object): return node def get_fdb_cluster(self): - return "123456:123456@{}:{}".format( - self.get_node(Node.TYPE_FDB, 1).get_ip(), FDB_PORT) + fdb = self.get_node(Node.TYPE_FDB, 1) + return "123456:123456@{}:{}".format(fdb.get_ip(), + fdb.meta["ports"]["fdb_port"]) def get_meta_server_addr(self): - return "{}:{}".format(self.get_node(Node.TYPE_MS, 1).get_ip(), MS_PORT) + meta_server = self.get_node(Node.TYPE_MS, 1) + return "{}:{}".format(meta_server.get_ip(), + meta_server.meta["ports"]["brpc_listen_port"]) def get_recycle_addr(self): - return "{}:{}".format( - self.get_node(Node.TYPE_RECYCLE, 1).get_ip(), MS_PORT) + recycler = self.get_node(Node.TYPE_RECYCLE, 1) + return "{}:{}".format(recycler.get_ip(), + recycler.meta["ports"]["brpc_listen_port"]) def remove(self, node_type, id): group = self.get_group(node_type) @@ -882,21 +983,21 @@ class Cluster(object): for node_type in self.groups.keys(): for node in self.get_all_nodes(node_type): services[node.service_name()] = node.compose() - compose = { "version": "3", - "networks": { + "services": services, + } + if not self.is_host_network(): + compose["networks"] = { utils.with_doris_prefix(self.name): { "driver": "bridge", "ipam": { "config": [{ "subnet": "{}.0.0/16".format(self.subnet), }] - } - } - }, - "services": services, - } + }, + }, + } utils.write_compose_file(self.get_compose_file(), compose) diff --git a/docker/runtime/doris-compose/command.py b/docker/runtime/doris-compose/command.py index 150162ff074..479077f4182 100644 --- a/docker/runtime/doris-compose/command.py +++ b/docker/runtime/doris-compose/command.py @@ -31,33 +31,27 @@ LOG = utils.get_logger() def wait_ready_service(wait_timeout, cluster, fe_ids, be_ids): - - def is_fe_ready_service(ip): - return utils.is_socket_avail(ip, CLUSTER.FE_QUERY_PORT) - - def is_be_ready_service(ip): - return utils.is_socket_avail(ip, CLUSTER.BE_HEARTBEAT_PORT) - if wait_timeout == 0: return if wait_timeout == -1: wait_timeout = 1000000000 expire_ts = time.time() + wait_timeout while True: - db_mgr = database.get_db_mgr(cluster.name, False) + db_mgr = database.get_db_mgr(cluster.name, + cluster.get_all_node_net_infos(), False) dead_frontends = [] for id in fe_ids: fe = cluster.get_node(CLUSTER.Node.TYPE_FE, id) fe_state = db_mgr.get_fe(id) - if not fe_state or not fe_state.alive or not is_fe_ready_service( - fe.get_ip()): + if not fe_state or not fe_state.alive or not utils.is_socket_avail( + fe.get_ip(), fe.meta["ports"]["query_port"]): dead_frontends.append(id) dead_backends = [] for id in be_ids: be = cluster.get_node(CLUSTER.Node.TYPE_BE, id) be_state = db_mgr.get_be(id) - if not be_state or not be_state.alive or not is_be_ready_service( - be.get_ip()): + if not be_state or not be_state.alive or not utils.is_socket_avail( + be.get_ip(), be.meta["ports"]["webserver_port"]): dead_backends.append(id) if not dead_frontends and not dead_backends: break @@ -114,8 +108,6 @@ def get_ids_related_nodes(cluster, for node_type, ids in type_ids: nodes.extend(get_ids_related_nodes_with_type(node_type, ids)) - related_node_num = len(nodes) - return len(nodes) == cluster.get_all_nodes_num(), nodes, len(nodes) @@ -247,7 +239,8 @@ class NeedStartCommand(SimpleCommand): cluster, related_nodes = super().run(args) fe_ids = [node.id for node in related_nodes if node.is_fe()] be_ids = [node.id for node in related_nodes if node.is_be()] - wait_ready_service(args.wait_timeout, cluster, fe_ids, be_ids) + if not cluster.is_host_network(): + wait_ready_service(args.wait_timeout, cluster, fe_ids, be_ids) class UpCommand(Command): @@ -335,6 +328,7 @@ class UpCommand(Command): type=str, help="Specify recycle configs for doris_cloud.conf. "\ "Example: --recycle-config \"log_level = warn\".") + group1.add_argument( "--fe-follower", default=False, @@ -392,6 +386,21 @@ class UpCommand(Command): action=self._get_parser_bool_action(True), help="Manager fe be via sql instead of http") + parser.add_argument( + "--remote-master-fe", + type=str, + help= + "Specify remote master fe address with ip:query_port, and all the container use host network. " \ + "Only use when creating new cluster." + ) + + parser.add_argument( + "--local-network-ip", + type=str, + help= "Specify local network ip, no need specify, will auto chose a proper ip. "\ + "Only use when creating new cluster and specify --remote-master-fe." + ) + if self._support_boolean_action(): parser.add_argument( "--be-metaservice-endpoint", @@ -422,7 +431,7 @@ class UpCommand(Command): dest='be_cluster_id', default=True, action=self._get_parser_bool_action(False), - help="Do not set BE cluser ID in conf. Default is False.") + help="Do not set BE cluster ID in conf. Default is False.") parser.add_argument( "--fdb-version", @@ -466,6 +475,7 @@ class UpCommand(Command): raise Exception("Need specific not empty cluster name") for_all = True add_fdb_num = 0 + is_new_cluster = False try: cluster = CLUSTER.Cluster.load(args.NAME) @@ -483,6 +493,7 @@ class UpCommand(Command): for_all = False except: # a new cluster + is_new_cluster = True if not args.IMAGE: raise Exception("New cluster must specific image") from None if args.fe_id != None: @@ -499,7 +510,7 @@ class UpCommand(Command): args.recycle_id = None if args.add_fe_num is None: - args.add_fe_num = 3 + args.add_fe_num = 0 if args.remote_master_fe else 3 if args.add_be_num is None: args.add_be_num = 3 @@ -517,11 +528,26 @@ class UpCommand(Command): args.add_ms_num = 0 args.add_recycle_num = 0 + if args.remote_master_fe: + if not args.local_network_ip: + args.local_network_ip = utils.get_local_ip() + parts = args.remote_master_fe.split(":") + if len(parts) != 2: + raise Exception( + f"invalid --remote-master-fe-addr {args.remote_master_fe}, should be 'ip:query_port'" + ) + if not parts[0]: + args.remote_master_fe = args.local_network_ip + ":" + parts[ + 1] + if args.cloud: + args.sql_mode_node_mgr = True + cluster = CLUSTER.Cluster.new( args.NAME, args.IMAGE, args.cloud, args.root, args.fe_config, args.be_config, args.ms_config, args.recycle_config, - args.fe_follower, args.be_disks, args.be_cluster, args.reg_be, - args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr, + args.remote_master_fe, args.local_network_ip, args.fe_follower, + args.be_disks, args.be_cluster, args.reg_be, args.coverage_dir, + cloud_store_config, args.sql_mode_node_mgr, args.be_metaservice_endpoint, args.be_cluster_id) LOG.info("Create new cluster {} succ, cluster path is {}".format( args.NAME, cluster.get_path())) @@ -605,32 +631,75 @@ class UpCommand(Command): .format(related_node_num))) else: LOG.info("Using SQL mode for node management ? {}".format( - args.sql_mode_node_mgr)) - - # Wait for FE master to be elected - LOG.info("Waiting for FE master to be elected...") - expire_ts = time.time() + 30 - while expire_ts > time.time(): - ready = False - db_mgr = database.get_db_mgr(args.NAME, False) - for id in add_fe_ids: - fe_state = db_mgr.get_fe(id) - if fe_state is not None and fe_state.alive: - ready = True + cluster.sql_mode_node_mgr)) + + if cluster.remote_master_fe: + if is_new_cluster: + if not cluster.is_cloud: + with open( + CLUSTER.get_master_fe_addr_path(cluster.name), + "w") as f: + f.write(cluster.remote_master_fe) + else: + cloud_config = "\n".join([ + f"meta_service_endpoint = {cluster.get_meta_server_addr()}", + "deploy_mode = cloud", + f"cluster_id = {CLUSTER.CLUSTER_ID}", + ]) + ans = input( + f"\nAdd remote fe {cluster.remote_master_fe} fe.conf with follow config: \n\n" \ + f"{cloud_config}\n\nConfirm ? y/n: ") + if ans != 'y': + LOG.info( + "Up cluster failed due to not confirm write the above config." + ) + return + + LOG.info("Waiting connect to remote FE...") + expire_ts = time.time() + 3600 * 5 + parts = cluster.remote_master_fe.split(":") + fe_ip = parts[0] + fe_port = int(parts[1]) + ready = False + while expire_ts > time.time(): + if utils.is_socket_avail(fe_ip, fe_port): + ready = True + break + if not ready: + raise Exception( + "Cannot connect to remote master fe: " + + cluster.remote_master_fe) + + LOG.info("After connect to remote FE...") + else: + # Wait for FE master to be elected + LOG.info("Waiting for FE master to be elected...") + expire_ts = time.time() + 30 + while expire_ts > time.time(): + ready = False + db_mgr = database.get_db_mgr( + args.NAME, cluster.get_all_node_net_infos(), False) + for id in add_fe_ids: + fe_state = db_mgr.get_fe(id) + if fe_state is not None and fe_state.alive: + ready = True + break + if ready: break - if ready: - break - LOG.info("there is no fe ready") - time.sleep(1) - LOG.info("after Waiting for FE master to be elected...") - if cluster.is_cloud and args.sql_mode_node_mgr: - db_mgr = database.get_db_mgr(args.NAME, False) + LOG.info("there is no fe ready") + time.sleep(1) + LOG.info("after Waiting for FE master to be elected...") + if cluster.is_cloud and cluster.sql_mode_node_mgr: + db_mgr = database.get_db_mgr(args.NAME, + cluster.get_all_node_net_infos(), + False) master_fe_endpoint = CLUSTER.get_master_fe_endpoint( cluster.name, True) # Add FEs except master_fe for fe in cluster.get_all_nodes(CLUSTER.Node.TYPE_FE): - fe_endpoint = f"{fe.get_ip()}:{CLUSTER.FE_EDITLOG_PORT}" - if fe_endpoint != master_fe_endpoint: + fe_querypoint = f"{fe.get_ip()}:{fe.meta['ports']['query_port']}" + fe_endpoint = f"{fe.get_ip()}:{fe.meta['ports']['edit_log_port']}" + if fe_querypoint != master_fe_endpoint: try: db_mgr.add_fe( fe_endpoint, "FOLLOWER" @@ -642,19 +711,19 @@ class UpCommand(Command): # Add BEs for be in cluster.get_all_nodes(CLUSTER.Node.TYPE_BE): - be_endpoint = f"{be.get_ip()}:{CLUSTER.BE_HEARTBEAT_PORT}" + be_endpoint = f"{be.get_ip()}:{be.meta['ports']['heartbeat_service_port']}" try: db_mgr.add_be(be_endpoint) LOG.info(f"Added BE {be_endpoint} successfully.") except Exception as e: LOG.error(f"Failed to add BE {be_endpoint}: {str(e)}") + if is_new_cluster: + cloud_store_config = self._get_cloud_store_config() + db_mgr.create_default_storage_vault(cloud_store_config) - cloud_store_config = self._get_cloud_store_config() - - db_mgr.create_default_storage_vault(cloud_store_config) - - wait_ready_service(args.wait_timeout, cluster, add_fe_ids, - add_be_ids) + if not cluster.is_host_network(): + wait_ready_service(args.wait_timeout, cluster, add_fe_ids, + add_be_ids) LOG.info( utils.render_green( "Up cluster {} succ, related node num {}".format( @@ -753,51 +822,62 @@ class DownCommand(Command): "it will send dropp to fe, otherwise send decommission to fe.") def run(self, args): + cluster_name = args.NAME cluster = None + try: - cluster = CLUSTER.Cluster.load(args.NAME) - except: - return "Cluster not exists or load failed" - for_all, related_nodes, related_node_num = get_ids_related_nodes( - cluster, - args.fe_id, - args.be_id, - args.ms_id, - args.recycle_id, - args.fdb_id, - ignore_not_exists=True) + cluster = CLUSTER.Cluster.load(cluster_name) + for_all, related_nodes, related_node_num = get_ids_related_nodes( + cluster, + args.fe_id, + args.be_id, + args.ms_id, + args.recycle_id, + args.fdb_id, + ignore_not_exists=True) + except Exception as e: + for_all = not args.fe_id and not args.be_id and not args.ms_id and not args.recycle_id + related_nodes = [] + related_node_num = 0 + if not for_all: + raise e - LOG.info("down cluster " + args.NAME + " for all " + str(for_all)) + LOG.info("down cluster " + args.NAME + " for all " + + str(for_all).lower()) if for_all: - if os.path.exists(cluster.get_compose_file()): + compose_file = CLUSTER.get_compose_file(cluster_name) + if os.path.exists(compose_file): try: utils.exec_docker_compose_command( - cluster.get_compose_file(), "down", - ["-v", "--remove-orphans"]) + compose_file, "down", ["-v", "--remove-orphans"]) except Exception as e: LOG.warn("down cluster has exception: " + str(e)) try: - utils.remove_docker_network(cluster.name) + utils.remove_docker_network(cluster_name) except Exception as e: LOG.warn("remove network has exception: " + str(e)) if args.clean: - utils.enable_dir_with_rw_perm(cluster.get_path()) - shutil.rmtree(cluster.get_path()) + cluster_path = CLUSTER.get_cluster_path(cluster_name) + if os.path.exists(cluster_path): + utils.enable_dir_with_rw_perm(cluster_path) + shutil.rmtree(cluster_path) LOG.info( utils.render_yellow( "Clean cluster data cause has specific --clean")) else: - db_mgr = database.get_db_mgr(cluster.name) + db_mgr = database.get_db_mgr(cluster.name, + cluster.get_all_node_net_infos()) for node in related_nodes: if node.is_fe(): - fe_endpoint = "{}:{}".format(node.get_ip(), - CLUSTER.FE_EDITLOG_PORT) + fe_endpoint = "{}:{}".format( + node.get_ip(), node.meta["ports"]["edit_log_port"]) db_mgr.drop_fe(fe_endpoint) elif node.is_be(): - be_endpoint = "{}:{}".format(node.get_ip(), - CLUSTER.BE_HEARTBEAT_PORT) + be_endpoint = "{}:{}".format( + node.get_ip(), + node.meta["ports"]["heartbeat_service_port"]) if args.drop_force: db_mgr.drop_be(be_endpoint) else: @@ -831,7 +911,7 @@ class DownCommand(Command): LOG.info( utils.render_green( "Down cluster {} succ, related node num {}".format( - args.NAME, related_node_num))) + cluster_name, related_node_num))) return "down cluster succ" @@ -853,6 +933,11 @@ class ListNode(object): self.tablet_num = "" self.last_heartbeat = "" self.err_msg = "" + self.query_port = "" + self.http_port = "" + self.heartbeat_port = "" + self.edit_log_port = "" + self.heartbeat_port = "" def info(self, detail): result = [ @@ -862,29 +947,20 @@ class ListNode(object): self.last_heartbeat, self.err_msg ] if detail: - query_port = "" - http_port = "" - heartbeat_port = "" - edit_log_port = "" node_path = CLUSTER.get_node_path(self.cluster_name, self.node_type, self.id) - if self.node_type == CLUSTER.Node.TYPE_FE: - query_port = CLUSTER.FE_QUERY_PORT - http_port = CLUSTER.FE_HTTP_PORT - edit_log_port = CLUSTER.FE_EDITLOG_PORT - elif self.node_type == CLUSTER.Node.TYPE_BE: - http_port = CLUSTER.BE_WEBSVR_PORT - heartbeat_port = CLUSTER.BE_HEARTBEAT_PORT - elif self.node_type == CLUSTER.Node.TYPE_MS or self.node_type == CLUSTER.Node.TYPE_RECYCLE: - http_port = CLUSTER.MS_PORT - else: - pass result += [ - query_port, http_port, node_path, edit_log_port, heartbeat_port + self.query_port, self.http_port, node_path, self.edit_log_port, + self.heartbeat_port ] return result - def update_db_info(self, db_mgr): + def update_db_info(self, cluster, db_mgr): + try: + node = cluster.get_node(self.node_type, self.id, True) + except: + node = None + ports = node.meta["ports"] if node else {} if self.node_type == CLUSTER.Node.TYPE_FE: fe = db_mgr.get_fe(self.id) if fe: @@ -892,6 +968,9 @@ class ListNode(object): self.is_master = str(fe.is_master).lower() self.last_heartbeat = fe.last_heartbeat self.err_msg = fe.err_msg + self.query_port = fe.query_port + self.http_port = fe.http_port + self.edit_log_port = fe.edit_log_port elif self.node_type == CLUSTER.Node.TYPE_BE: self.backend_id = -1 be = db_mgr.get_be(self.id) @@ -901,6 +980,15 @@ class ListNode(object): self.tablet_num = be.tablet_num self.last_heartbeat = be.last_heartbeat self.err_msg = be.err_msg + self.http_port = be.http_port + self.heartbeat_port = be.heartbeat_service_port + elif self.node_type == CLUSTER.Node.TYPE_MS or self.node_type == CLUSTER.Node.TYPE_RECYCLE: + if ports: + self.http_port = ports.get("brpc_listen_port", -1) + if node and node.meta.get("is_remote", False): + self.ip = node.get_ip() + self.container_id = "<remote>" + self.image = "<remote>" class GenConfCommand(Command): @@ -929,12 +1017,12 @@ class GenConfCommand(Command): def run(self, args): base_conf = ''' -jdbcUrl = "jdbc:mysql://{fe_ip}:9030/?useLocalSessionState=true&allowLoadLocalInfile=true" -targetJdbcUrl = "jdbc:mysql://{fe_ip}:9030/?useLocalSessionState=true&allowLoadLocalInfile=true" -feSourceThriftAddress = "{fe_ip}:9020" -feTargetThriftAddress = "{fe_ip}:9020" +jdbcUrl = "jdbc:mysql://{fe_ip}:{query_port}/?useLocalSessionState=true&allowLoadLocalInfile=true" +targetJdbcUrl = "jdbc:mysql://{fe_ip}:{query_port}/?useLocalSessionState=true&allowLoadLocalInfile=true" +feSourceThriftAddress = "{fe_ip}:{rpc_port}" +feTargetThriftAddress = "{fe_ip}:{rpc_port}" syncerAddress = "{fe_ip}:9190" -feHttpAddress = "{fe_ip}:8030" +feHttpAddress = "{fe_ip}:{http_port}" ''' cloud_conf = ''' @@ -954,21 +1042,24 @@ cloudUniqueId= "{fe_cloud_unique_id}" args.NAME, CLUSTER.LOCAL_DORIS_PATH)) return - master_fe_ip = master_fe_ip_ep[:master_fe_ip_ep.find(':')] + db_mgr = database.get_db_mgr(args.NAME, + cluster.get_all_node_net_infos(), False) fe_ip = "" - if not args.connect_follow_fe: - fe_ip = master_fe_ip - else: - for fe in cluster.get_all_nodes(CLUSTER.Node.TYPE_FE): - if fe.get_ip() == master_fe_ip: - continue - else: - fe_ip = fe.get_ip() - break - if not fe_ip: - raise Exception( - "Not found follow fe, pls add a follow fe use command `up <your-cluster> --add-fe-num 1`" - ) + rpc_port = 0 + query_port = 0 + http_port = 0 + for fe in db_mgr.get_all_fe().values(): + if not fe.alive: + continue + if fe.is_master == (not args.connect_follow_fe): + fe_ip = fe.ip + rpc_port = fe.rpc_port + query_port = fe.query_port + http_port = fe.http_port + break + if not fe_ip: + role = "follower" if args.connect_follow_fe else "master" + raise Exception(f"Not found an alive {role} fe") relative_custom_file_path = "regression-test/conf/regression-conf-custom.groovy" regression_conf_custom = os.path.join(args.DORIS_ROOT_PATH, @@ -990,26 +1081,31 @@ cloudUniqueId= "{fe_cloud_unique_id}" with open(regression_conf_custom, "w") as f: # write auto gen config f.write(annotation_start) - f.write(base_conf.format(fe_ip=fe_ip)) + f.write( + base_conf.format(fe_ip=fe_ip, + rpc_port=rpc_port, + query_port=query_port, + http_port=http_port)) if cluster.is_cloud: multi_cluster_bes = ",".join([ - "{}:{}:{}:{}:{}".format(be.get_ip(), - CLUSTER.BE_HEARTBEAT_PORT, - CLUSTER.BE_WEBSVR_PORT, - be.cloud_unique_id(), - CLUSTER.BE_BRPC_PORT) + "{}:{}:{}:{}:{}".format( + be.get_ip(), + be.meta["ports"]["heartbeat_service_port"], + be.meta["ports"]["webserver_port"], + be.cloud_unique_id(), be.meta["ports"]["brpc_port"]) for be in cluster.get_all_nodes(CLUSTER.Node.TYPE_BE) ]) + master_fe = cluster.get_remote_fe_node( + ) if cluster.is_host_network() else cluster.get_node( + CLUSTER.Node.TYPE_FE, 1) f.write( cloud_conf.format( fe_ip=fe_ip, ms_endpoint=cluster.get_meta_server_addr(), recycle_endpoint=cluster.get_recycle_addr(), multi_cluster_bes=multi_cluster_bes, - fe_cloud_unique_id=cluster.get_node( - CLUSTER.Node.TYPE_FE, 1).cloud_unique_id())) + fe_cloud_unique_id=master_fe.cloud_unique_id())) f.write(annotation_end + "\n\n") - annotation_end_line_count = -1 # write not-auto gen config in_annotation = False @@ -1045,12 +1141,31 @@ class ListCommand(Command): action=self._get_parser_bool_action(True), help="Print more detail fields.") + def _hint_cluster_bad(self, cluster_name): + cluster_path = CLUSTER.get_cluster_path(cluster_name) + if not os.path.exists(cluster_path): + LOG.info( + utils.render_yellow( + f"Not exits cluster directory in '{CLUSTER.LOCAL_DORIS_PATH}'" + )) + elif not os.path.exists(CLUSTER.Cluster._get_meta_file(cluster_name)): + LOG.error( + utils.render_red( + f"Not exits cluster meta file in '{cluster_path}'")) + else: + try: + CLUSTER.Cluster.load(cluster_name) + except: + LOG.error(utils.render_red("meta file is bad or incompatible with current doris-compose.py. " \ + "Run command `down --clean` to destroy it then recreate a new one")) + def run(self, args): COMPOSE_MISSING = "(missing)" COMPOSE_BAD = "(bad)" COMPOSE_GOOD = "" SERVICE_DEAD = "dead" + SERVICE_RUNNING = "running" class ComposeService(object): @@ -1061,6 +1176,12 @@ class ListCommand(Command): def parse_cluster_compose_file(cluster_name): compose_file = CLUSTER.get_compose_file(cluster_name) + try: + cluster = CLUSTER.Cluster.load(cluster_name) + ip_for_host_mode = cluster.local_network_ip if cluster.is_host_network( + ) else "" + except: + ip_for_host_mode = "" if not os.path.exists(compose_file): return COMPOSE_MISSING, {} try: @@ -1073,7 +1194,7 @@ class ListCommand(Command): return COMPOSE_GOOD, { service: ComposeService( - service, + service, ip_for_host_mode if ip_for_host_mode else list(service_conf["networks"].values())[0] ["ipv4_address"], service_conf["image"]) for service, service_conf in services.items() @@ -1108,7 +1229,7 @@ class ListCommand(Command): TYPE_COMPOSESERVICE = type(ComposeService("", "", "")) if not args.NAME: header = ("CLUSTER", "OWNER", "STATUS", "MASTER FE", "CLOUD", - "CONFIG FILES") + "NETWORK MODE", "CONFIG FILES") rows = [] for name in sorted(clusters.keys()): cluster_info = clusters[name] @@ -1126,16 +1247,20 @@ class ListCommand(Command): compose_file = CLUSTER.get_compose_file(name) is_cloud = "" + network_mode = "" try: cluster = CLUSTER.Cluster.load(name) is_cloud = "true" if cluster.is_cloud else "false" + network_mode = "host" if cluster.is_host_network( + ) else "bridge" except: pass - rows.append((name, owner, show_status, - CLUSTER.get_master_fe_endpoint(name), is_cloud, - "{}{}".format(compose_file, - cluster_info["status"]))) + rows.append( + (name, owner, show_status, + CLUSTER.get_master_fe_endpoint(name), is_cloud, + network_mode, "{}{}".format(compose_file, + cluster_info["status"]))) return self._print_table(header, rows) header = [ @@ -1157,8 +1282,15 @@ class ListCommand(Command): fe_ids = {} be_ids = {} services = clusters[cluster_name]["services"] - db_mgr = database.get_db_mgr(cluster_name, False) + cluster = None + try: + cluster = CLUSTER.Cluster.load(cluster_name) + except: + pass + db_mgr = database.get_db_mgr( + cluster_name, + cluster.get_all_node_net_infos() if cluster else [], False) nodes = [] for service_name, container in services.items(): _, node_type, id = utils.parse_service_name(container.name) @@ -1166,7 +1298,7 @@ class ListCommand(Command): node.cluster_name = cluster_name node.node_type = node_type node.id = id - node.update_db_info(db_mgr) + node.update_db_info(cluster, db_mgr) nodes.append(node) if node_type == CLUSTER.Node.TYPE_FE: @@ -1182,12 +1314,15 @@ class ListCommand(Command): node.created = dateutil.parser.parse( container.attrs.get("Created")).astimezone().strftime( "%Y-%m-%d %H:%M:%S") - node.ip = list( - container.attrs["NetworkSettings"] - ["Networks"].values())[0]["IPAMConfig"]["IPv4Address"] - node.image = ",".join(container.image.tags) + if cluster and cluster.is_host_network(): + node.ip = cluster.local_network_ip + else: + node.ip = list( + container.attrs["NetworkSettings"]["Networks"]. + values())[0]["IPAMConfig"]["IPv4Address"] + node.image = container.attrs["Config"]["Image"] if not node.image: - node.image = container.attrs["Config"]["Image"] + node.image = ",".join(container.image.tags) node.container_id = container.short_id node.status = container.status if node.container_id and \ @@ -1203,8 +1338,8 @@ class ListCommand(Command): node.cluster_name = cluster_name node.node_type = CLUSTER.Node.TYPE_FE node.id = id - node.status = SERVICE_DEAD - node.update_db_info(db_mgr) + node.status = SERVICE_RUNNING if fe.alive else SERVICE_DEAD + node.update_db_info(cluster, db_mgr) nodes.append(node) for id, be in db_mgr.be_states.items(): if be_ids.get(id, False): @@ -1213,8 +1348,8 @@ class ListCommand(Command): node.cluster_name = cluster_name node.node_type = CLUSTER.Node.TYPE_BE node.id = id - node.status = SERVICE_DEAD - node.update_db_info(db_mgr) + node.status = SERVICE_RUNNING if be.alive else SERVICE_DEAD + node.update_db_info(cluster, db_mgr) nodes.append(node) def get_node_seq(node): @@ -1223,7 +1358,11 @@ class ListCommand(Command): for node in sorted(nodes, key=get_node_seq): rows.append(node.info(args.detail)) - return self._print_table(header, rows) + ret = self._print_table(header, rows) + if len(args.NAME) == 1 and len(rows) == 0: + self._hint_cluster_bad(args.NAME[0]) + + return ret class InfoCommand(Command): diff --git a/docker/runtime/doris-compose/database.py b/docker/runtime/doris-compose/database.py index 9c0b2f66f83..50c052dc740 100644 --- a/docker/runtime/doris-compose/database.py +++ b/docker/runtime/doris-compose/database.py @@ -27,35 +27,49 @@ LOG = utils.get_logger() class FEState(object): - def __init__(self, id, is_master, alive, last_heartbeat, err_msg, role): + def __init__(self, id, ip, is_master, alive, last_heartbeat, err_msg, role, + query_port, rpc_port, http_port, edit_log_port): self.id = id + self.ip = ip self.is_master = is_master self.alive = alive self.last_heartbeat = last_heartbeat self.err_msg = err_msg self.role = role + self.query_port = query_port + self.rpc_port = rpc_port + self.http_port = http_port + self.edit_log_port = edit_log_port class BEState(object): - def __init__(self, id, backend_id, decommissioned, alive, tablet_num, - last_heartbeat, err_msg): + def __init__(self, id, ip, backend_id, decommissioned, alive, tablet_num, + last_heartbeat, err_msg, http_port, heartbeat_service_port): self.id = id + self.ip = ip self.backend_id = backend_id self.decommissioned = decommissioned self.alive = alive self.tablet_num = tablet_num self.last_heartbeat = last_heartbeat self.err_msg = err_msg + self.http_port = http_port + self.heartbeat_service_port = heartbeat_service_port class DBManager(object): - def __init__(self): + def __init__(self, all_node_net_infos): + self.all_node_net_infos = all_node_net_infos self.fe_states = {} self.be_states = {} self.conn = None - self.master_fe_ip = "" + self.fe_ip = "" + self.fe_port = -1 + + def get_all_fe(self): + return self.fe_states def get_fe(self, id): return self.fe_states.get(id, None) @@ -187,26 +201,42 @@ class DBManager(object): def _load_fe_states(self): fe_states = {} alive_master_fe_ip = None + alive_master_fe_port = None for record in self._exec_query("show frontends"): name = record["Name"] ip = record["Host"] role = record["Role"] is_master = utils.is_true(record["IsMaster"]) alive = utils.is_true(record["Alive"]) - id = CLUSTER.Node.get_id_from_ip(ip) + query_port = int(record["QueryPort"]) + http_port = int(record["HttpPort"]) + rpc_port = int(record["RpcPort"]) + edit_log_port = int(record["EditLogPort"]) + id = None + for net_info in self.all_node_net_infos: + if net_info.ip == ip and net_info.ports.get("query_port", + -1) == query_port: + id = net_info.id + break + if not id: + id = CLUSTER.Node.get_id_from_ip(ip) last_heartbeat = utils.escape_null(record["LastHeartbeat"]) err_msg = record["ErrMsg"] - fe = FEState(id, is_master, alive, last_heartbeat, err_msg, role) + fe = FEState(id, ip, is_master, alive, last_heartbeat, err_msg, + role, query_port, rpc_port, http_port, edit_log_port) fe_states[id] = fe if is_master and alive: alive_master_fe_ip = ip + alive_master_fe_port = query_port LOG.debug( - "record of show frontends, name {}, ip {}, alive {}, is_master {}, role {}" - .format(name, ip, alive, is_master, role)) + "record of show frontends, name {}, ip {}, port {}, alive {}, is_master {}, role {}" + .format(name, ip, query_port, alive, is_master, role)) self.fe_states = fe_states - if alive_master_fe_ip and alive_master_fe_ip != self.master_fe_ip: - self.master_fe_ip = alive_master_fe_ip + if alive_master_fe_ip and (alive_master_fe_ip != self.fe_ip + or alive_master_fe_port != self.fe_port): + self.fe_ip = alive_master_fe_ip + self.fe_port = alive_master_fe_port self._reset_conn() def _load_be_states(self): @@ -216,11 +246,23 @@ class DBManager(object): alive = utils.is_true(record["Alive"]) decommissioned = utils.is_true(record["SystemDecommissioned"]) tablet_num = int(record["TabletNum"]) - id = CLUSTER.Node.get_id_from_ip(record["Host"]) + ip = record["Host"] + heartbeat_service_port = int(record["HeartbeatPort"]) + http_port = int(record["HttpPort"]) + id = None + for net_info in self.all_node_net_infos: + if net_info.ip == ip and net_info.ports.get( + "heartbeat_service_port", + -1) == heartbeat_service_port: + id = net_info.id + break + if not id: + id = CLUSTER.Node.get_id_from_ip(ip) last_heartbeat = utils.escape_null(record["LastHeartbeat"]) err_msg = record["ErrMsg"] - be = BEState(id, backend_id, decommissioned, alive, tablet_num, - last_heartbeat, err_msg) + be = BEState(id, ip, backend_id, decommissioned, alive, tablet_num, + last_heartbeat, err_msg, http_port, + heartbeat_service_port) be_states[id] = be self.be_states = be_states @@ -254,25 +296,30 @@ class DBManager(object): def _reset_conn(self): self.conn = pymysql.connect(user="root", - host=self.master_fe_ip, + host=self.fe_ip, read_timeout=10, connect_timeout=3, - port=CLUSTER.FE_QUERY_PORT) + port=self.fe_port) -def get_db_mgr(cluster_name, required_load_succ=True): +def get_db_mgr(cluster_name, all_node_net_infos, required_load_succ=True): assert cluster_name - db_mgr = DBManager() - master_fe_ip_file = os.path.join(CLUSTER.get_status_path(cluster_name), - "master_fe_ip") - master_fe_ip = None - if os.path.exists(master_fe_ip_file): - with open(master_fe_ip_file, "r") as f: - master_fe_ip = f.read().strip() - - if not master_fe_ip: + db_mgr = DBManager(all_node_net_infos) + master_fe_query_addr_file = CLUSTER.get_master_fe_addr_path(cluster_name) + master_fe_query_addr = None + if os.path.exists(master_fe_query_addr_file): + with open(master_fe_query_addr_file, "r") as f: + master_fe_query_addr = f.read().strip() + + if not master_fe_query_addr: return db_mgr + pos = master_fe_query_addr.find(':') + master_fe_ip = master_fe_query_addr[:pos] + master_fe_port = int(master_fe_query_addr[pos + 1:]) + + chose_fe_ip = "" + chose_fe_port = -1 alive_fe = None cluster = CLUSTER.Cluster.load(cluster_name) containers = utils.get_doris_containers(cluster_name).get(cluster_name, []) @@ -283,13 +330,22 @@ def get_db_mgr(cluster_name, required_load_succ=True): node = cluster.get_node(node_type, id) if not alive_fe: alive_fe = node - if node.get_ip() == master_fe_ip: + if node.get_ip() == master_fe_ip and node.meta["ports"][ + "query_port"] == master_fe_port: alive_fe = node break - if not alive_fe: + if alive_fe: + chose_fe_ip = alive_fe.get_ip() + chose_fe_port = alive_fe.meta["ports"]["query_port"] + elif utils.is_socket_avail(master_fe_ip, master_fe_port): + chose_fe_ip = master_fe_ip + chose_fe_port = master_fe_port + else: + LOG.debug("no available alive fe") return db_mgr - db_mgr.master_fe_ip = alive_fe.get_ip() + db_mgr.fe_ip = chose_fe_ip + db_mgr.fe_port = chose_fe_port try: db_mgr.load_states() except Exception as e: diff --git a/docker/runtime/doris-compose/resource/common.sh b/docker/runtime/doris-compose/resource/common.sh index 40833d01dc6..2c53ca587a5 100644 --- a/docker/runtime/doris-compose/resource/common.sh +++ b/docker/runtime/doris-compose/resource/common.sh @@ -16,11 +16,13 @@ # under the License. export MASTER_FE_IP="" -export MASTER_FE_IP_FILE=$DORIS_HOME/status/master_fe_ip +export MASTER_FE_PORT="" +export MASTER_FE_QUERY_ADDR_FILE=$DORIS_HOME/status/master_fe_query_addr export HAS_INIT_FDB_FILE=${DORIS_HOME}/status/has_init_fdb export HAS_CREATE_INSTANCE_FILE=$DORIS_HOME/status/has_create_instance export LOG_FILE=$DORIS_HOME/log/health.out export LOCK_FILE=$DORIS_HOME/status/token +export MY_TYPE_ID="${MY_TYPE}-${MY_ID}" health_log() { echo "$(date +'%Y-%m-%d %H:%M:%S') $@" | tee -a $LOG_FILE @@ -32,7 +34,7 @@ lock_cluster() { health_log "start acquire token" while true; do if [ -f $LOCK_FILE ]; then - if [ "a$(cat $LOCK_FILE)" == "a${MY_IP}" ]; then + if [ "a$(cat $LOCK_FILE)" == "a${MY_TYPE_ID}" ]; then health_log "rm $LOCK_FILE generate by myself" rm $LOCK_FILE continue @@ -57,12 +59,12 @@ lock_cluster() { fi if [ ! -f $LOCK_FILE ]; then - echo $MY_IP >$LOCK_FILE + echo ${MY_TYPE_ID} >$LOCK_FILE fi sleep 0.1 - if [ "a$(cat $LOCK_FILE)" == "a${MY_IP}" ]; then + if [ "a$(cat $LOCK_FILE)" == "a${MY_TYPE_ID}" ]; then break fi @@ -77,16 +79,18 @@ unlock_cluster() { return fi - if [ "a$(cat $LOCK_FILE)" == "a${MY_IP}" ]; then + if [ "a$(cat $LOCK_FILE)" == "a${MY_TYPE_ID}" ]; then rm $LOCK_FILE fi } wait_master_fe_ready() { while true; do - MASTER_FE_IP=$(cat $MASTER_FE_IP_FILE) - if [ -n "$MASTER_FE_IP" ]; then - health_log "master fe ${MASTER_FE_IP} has ready." + master_fe_query_addr=$(cat $MASTER_FE_QUERY_ADDR_FILE) + if [ -n "$master_fe_query_addr" ]; then + MASTER_FE_IP=$(echo ${master_fe_query_addr} | cut -d ":" -f 1) + MASTER_FE_PORT=$(echo ${master_fe_query_addr} | cut -d ":" -f 2) + health_log "master fe ${master_fe_query_addr} has ready." break fi health_log "master fe has not ready." diff --git a/docker/runtime/doris-compose/resource/init_be.sh b/docker/runtime/doris-compose/resource/init_be.sh index 08cc914f6af..67b967e1d54 100755 --- a/docker/runtime/doris-compose/resource/init_be.sh +++ b/docker/runtime/doris-compose/resource/init_be.sh @@ -28,13 +28,13 @@ add_local_be() { wait_master_fe_ready while true; do - #lsof -i:$BE_HEARTBEAT_PORT + #lsof -i:$MY_HEARTBEAT_PORT #if [ $? -ne 0 ]; then # sleep 1 # continue #fi - output=$(mysql -P $FE_QUERY_PORT -h $MASTER_FE_IP -u root --execute "ALTER SYSTEM ADD BACKEND '$MY_IP:$BE_HEARTBEAT_PORT';" 2>&1) + output=$(mysql -P $MASTER_FE_PORT -h $MASTER_FE_IP -u root --execute "ALTER SYSTEM ADD BACKEND '$MY_IP:$MY_HEARTBEAT_PORT';" 2>&1) res=$? health_log "$output" [ $res -eq 0 ] && break @@ -54,10 +54,9 @@ add_cloud_be() { return fi - cluster_file_name="${DORIS_HOME}/conf/CLUSTER_NAME" - cluster_name=$(cat $cluster_file_name) + cluster_name="${CLUSTER_NAME}" if [ -z $cluster_name ]; then - health_log "Empty cluster name, it should specific in file ${cluster_file_name}" + health_log "Empty cluster name, should specific with --be-cluster" exit 1 fi @@ -68,7 +67,7 @@ add_cloud_be() { nodes='{ "cloud_unique_id": "'"${CLOUD_UNIQUE_ID}"'", "ip": "'"${MY_IP}"'", - "heartbeat_port": "'"${BE_HEARTBEAT_PORT}"'" + "heartbeat_port": "'"${MY_HEARTBEAT_PORT}"'" }' lock_cluster diff --git a/docker/runtime/doris-compose/resource/init_cloud.sh b/docker/runtime/doris-compose/resource/init_cloud.sh index 18dfc4430e2..22883ab2e40 100644 --- a/docker/runtime/doris-compose/resource/init_cloud.sh +++ b/docker/runtime/doris-compose/resource/init_cloud.sh @@ -38,11 +38,18 @@ wait_fdb_ready() { } check_init_cloud() { + if [ "$MY_TYPE" != "ms" -o "$MY_ID" != "1" ]; then + return + fi + if [ -f $HAS_CREATE_INSTANCE_FILE ]; then return fi - if [ "$MY_TYPE" != "ms" -o "$MY_ID" != "1" ]; then + # Check if SQL_MODE_NODE_MGR is set + if [[ "$SQL_MODE_NODE_MGR" -eq 1 ]]; then + health_log "SQL_MODE_NODE_MGR is set, skipping create_instance" + touch $HAS_CREATE_INSTANCE_FILE return fi @@ -50,13 +57,6 @@ check_init_cloud() { lock_cluster - # Check if SQL_MODE_NODE_MGR is set - if [[ "$SQL_MODE_NODE_MGR" -eq 1 ]]; then - health_log "SQL_MODE_NODE_MGR is set, skipping create_instance" - touch $HAS_CREATE_INSTANCE_FILE - return - fi - output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/create_instance?token=greedisgood9999" \ -d '{"instance_id":"default_instance_id", "name": "default_instance", diff --git a/docker/runtime/doris-compose/resource/init_fe.sh b/docker/runtime/doris-compose/resource/init_fe.sh index a58723db1d7..415e66b5c8f 100755 --- a/docker/runtime/doris-compose/resource/init_fe.sh +++ b/docker/runtime/doris-compose/resource/init_fe.sh @@ -23,12 +23,13 @@ DIR=$( source $DIR/common.sh REGISTER_FILE=$DORIS_HOME/status/fe-$MY_IP-register +MASTER_EDITLOG_PORT="" add_local_fe() { wait_master_fe_ready while true; do - output=$(mysql -P $FE_QUERY_PORT -h $MASTER_FE_IP -u root --execute "ALTER SYSTEM ADD FOLLOWER '$MY_IP:$FE_EDITLOG_PORT';" 2>&1) + output=$(mysql -P $MASTER_FE_PORT -h $MASTER_FE_IP -u root --execute "ALTER SYSTEM ADD FOLLOWER '$MY_IP:$MY_EDITLOG_PORT';" 2>&1) res=$? health_log "${output}\n" [ $res -eq 0 ] && break @@ -43,10 +44,10 @@ fe_daemon() { set +e while true; do sleep 1 - output=$(mysql -P $FE_QUERY_PORT -h $MY_IP -u root --execute "SHOW FRONTENDS;") + output=$(mysql -P $MY_QUERY_PORT -h $MY_IP -u root --execute "SHOW FRONTENDS;") code=$? if [ $code -ne 0 ]; then - health_log "exec show frontends bad: $output" + health_log "daemon get frontends exec show frontends bad: $output" continue fi header=$(grep IsMaster <<<$output) @@ -56,10 +57,12 @@ fe_daemon() { fi host_index=-1 is_master_index=-1 + query_port_index=-1 i=1 for field in $header; do [[ "$field" = "Host" ]] && host_index=$i [[ "$field" = "IsMaster" ]] && is_master_index=$i + [[ "$field" = "QueryPort" ]] && query_port_index=$i ((i = i + 1)) done if [ $host_index -eq -1 ]; then @@ -70,12 +73,17 @@ fe_daemon() { health_log "header not found IsMaster" continue fi - echo "$output" | awk -v is_master="$is_master_index" -v host="$host_index" '{print $is_master $host}' | grep $MY_IP | grep true 2>&1 + if [ $query_port_index -eq -1 ]; then + health_log "header not found QueryPort" + continue + fi + echo "$output" | awk -v query_port="$query_port_index" -v is_master="$is_master_index" -v host="$host_index" '{print $query_port $is_master $host}' | grep $MY_QUERY_PORT | grep $MY_IP | grep true 2>&1 if [ $? -eq 0 ]; then - echo $MY_IP >$MASTER_FE_IP_FILE - if [ "$MASTER_FE_IP" != "$MY_IP" ]; then - health_log "change to master, last master is $MASTER_FE_IP" + echo ${MY_IP}:${MY_QUERY_PORT} >$MASTER_FE_QUERY_ADDR_FILE + if [ "$MASTER_FE_IP" != "$MY_IP" -o "${MASTER_FE_PORT}" != "${MY_QUERY_PORT}" ]; then + health_log "change to master, last master is ${MASTER_FE_IP}:${MASTER_FE_PORT}" MASTER_FE_IP=$MY_IP + MASTER_FE_PORT=$MY_QUERY_PORT fi fi done @@ -122,7 +130,7 @@ start_cloud_fe() { nodes='{ "cloud_unique_id": "'"${CLOUD_UNIQUE_ID}"'", "ip": "'"${MY_IP}"'", - "edit_log_port": "'"${FE_EDITLOG_PORT}"'", + "edit_log_port": "'"${MY_EDITLOG_PORT}"'", "node_type": "'"${node_type}"'" }' @@ -190,6 +198,23 @@ wait_process() { wait_pid $pid } +fetch_master_fe_editlog_port() { + set +e + while true; do + output=$(mysql -P $MY_QUERY_PORT -h $MASTER_FE_IP -u root -N -s --execute "select EditLogPort from frontends() where IsMaster = 'true';") + code=$? + if [ $code -ne 0 ]; then + health_log "get editlog port exec show frontends bad: $output" + sleep 1 + continue + fi + if [ -n "$output" ]; then + MASTER_EDITLOG_PORT=${output} + break + fi + done +} + start_local_fe() { if [ "$MY_ID" = "1" -a ! -f $REGISTER_FILE ]; then touch $REGISTER_FILE @@ -201,7 +226,8 @@ start_local_fe() { else add_local_fe fe_daemon & - run_fe --helper $MASTER_FE_IP:$FE_EDITLOG_PORT + fetch_master_fe_editlog_port + run_fe --helper $MASTER_FE_IP:$MASTER_EDITLOG_PORT fi } diff --git a/docker/runtime/doris-compose/utils.py b/docker/runtime/doris-compose/utils.py index dcb821ddffd..47c3fe45f4b 100644 --- a/docker/runtime/doris-compose/utils.py +++ b/docker/runtime/doris-compose/utils.py @@ -285,12 +285,31 @@ def copy_image_directory(image, image_dir, local_dir): entrypoint="cp -r {} /opt/mount/".format(image_dir)) +def get_avail_port(): + with contextlib.closing(socket.socket(socket.AF_INET, + socket.SOCK_STREAM)) as sock: + sock.bind(("", 0)) + _, port = sock.getsockname() + return port + + def is_socket_avail(ip, port): with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: return sock.connect_ex((ip, port)) == 0 +def get_local_ip(): + with contextlib.closing(socket.socket(socket.AF_INET, + socket.SOCK_DGRAM)) as sock: + sock.settimeout(0) + try: + sock.connect(('10.255.255.255', 1)) + return sock.getsockname()[0] + except Exception: + return '127.0.0.1' + + def enable_dir_with_rw_perm(dir): if not os.path.exists(dir): return --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org