This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 19a00cf547b [feature](doris compose) start node wait for ready service (#42266) 19a00cf547b is described below commit 19a00cf547bb9c76ed2ccbab2a1d9ea4b85402ec Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Mon Oct 28 11:28:19 2024 +0800 [feature](doris compose) start node wait for ready service (#42266) when SuiteCluster call start/restart frontends and backends, it will wait nodes ready for service. --- docker/runtime/doris-compose/command.py | 102 +++++++++++++++------ docker/runtime/doris-compose/utils.py | 8 ++ .../org/apache/doris/clone/TabletScheduler.java | 9 +- .../apache/doris/journal/bdbje/BDBJEJournal.java | 4 +- .../doris/regression/suite/SuiteCluster.groovy | 34 +++---- .../test_fe_tablet_same_backend.groovy | 8 +- 6 files changed, 107 insertions(+), 58 deletions(-) diff --git a/docker/runtime/doris-compose/command.py b/docker/runtime/doris-compose/command.py index cffc76df7c7..469437c28b2 100644 --- a/docker/runtime/doris-compose/command.py +++ b/docker/runtime/doris-compose/command.py @@ -30,6 +30,47 @@ import time LOG = utils.get_logger() +def wait_ready_service(wait_timeout, cluster, fe_ids, be_ids): + + def is_fe_ready_service(ip): + return utils.is_socket_avail(ip, CLUSTER.FE_QUERY_PORT) + + def is_be_ready_service(ip): + return utils.is_socket_avail(ip, CLUSTER.BE_HEARTBEAT_PORT) + + if wait_timeout == 0: + return + if wait_timeout == -1: + wait_timeout = 1000000000 + expire_ts = time.time() + wait_timeout + while True: + db_mgr = database.get_db_mgr(cluster.name, False) + dead_frontends = [] + for id in fe_ids: + fe = cluster.get_node(CLUSTER.Node.TYPE_FE, id) + fe_state = db_mgr.get_fe(id) + if not fe_state or not fe_state.alive or not is_fe_ready_service( + fe.get_ip()): + dead_frontends.append(id) + dead_backends = [] + for id in be_ids: + be = cluster.get_node(CLUSTER.Node.TYPE_BE, id) + be_state = db_mgr.get_be(id) + if not be_state or not be_state.alive or not is_be_ready_service( + be.get_ip()): + dead_backends.append(id) + if not dead_frontends and not dead_backends: + break + if time.time() >= expire_ts: + err = "" + if dead_frontends: + err += "dead fe: " + str(dead_frontends) + ". " + if dead_backends: + err += "dead be: " + str(dead_backends) + ". " + raise Exception(err) + time.sleep(1) + + # return for_all, related_nodes, related_node_num def get_ids_related_nodes(cluster, fe_ids, @@ -156,10 +197,11 @@ class SimpleCommand(Command): parser.add_argument("NAME", help="Specify cluster name.") self._add_parser_ids_args(parser) self._add_parser_common_args(parser) + return parser def run(self, args): cluster = CLUSTER.Cluster.load(args.NAME) - _, related_nodes, related_node_num = get_ids_related_nodes( + for_all, related_nodes, related_node_num = get_ids_related_nodes( cluster, args.fe_id, args.be_id, args.ms_id, args.recycle_id, args.fdb_id) utils.exec_docker_compose_command(cluster.get_compose_file(), @@ -170,6 +212,31 @@ class SimpleCommand(Command): utils.render_green("{} succ, total related node num {}".format( show_cmd, related_node_num))) + if for_all: + related_nodes = cluster.get_all_nodes() + return cluster, related_nodes + + +class NeedStartCommand(SimpleCommand): + + def add_parser(self, args_parsers): + parser = super().add_parser(args_parsers) + parser.add_argument( + "--wait-timeout", + type=int, + default=0, + help= + "Specify wait seconds for fe/be ready for service: 0 not wait (default), "\ + "> 0 max wait seconds, -1 wait unlimited." + ) + return parser + + def run(self, args): + cluster, related_nodes = super().run(args) + fe_ids = [node.id for node in related_nodes if node.is_fe()] + be_ids = [node.id for node in related_nodes if node.is_be()] + wait_ready_service(args.wait_timeout, cluster, fe_ids, be_ids) + class UpCommand(Command): @@ -568,33 +635,8 @@ class UpCommand(Command): db_mgr.create_default_storage_vault(cloud_store_config) - if args.wait_timeout != 0: - if args.wait_timeout == -1: - args.wait_timeout = 1000000000 - expire_ts = time.time() + args.wait_timeout - while True: - db_mgr = database.get_db_mgr(args.NAME, False) - dead_frontends = [] - for id in add_fe_ids: - fe_state = db_mgr.get_fe(id) - if not fe_state or not fe_state.alive: - dead_frontends.append(id) - dead_backends = [] - for id in add_be_ids: - be_state = db_mgr.get_be(id) - if not be_state or not be_state.alive: - dead_backends.append(id) - if not dead_frontends and not dead_backends: - break - if time.time() >= expire_ts: - err = "" - if dead_frontends: - err += "dead fe: " + str(dead_frontends) + ". " - if dead_backends: - err += "dead be: " + str(dead_backends) + ". " - raise Exception(err) - time.sleep(1) - + wait_ready_service(args.wait_timeout, cluster, add_fe_ids, + add_be_ids) LOG.info( utils.render_green( "Up cluster {} succ, related node num {}".format( @@ -1216,9 +1258,9 @@ class GetCloudIniCommand(Command): ALL_COMMANDS = [ UpCommand("up"), DownCommand("down"), - SimpleCommand("start", "Start the doris containers. "), + NeedStartCommand("start", "Start the doris containers. "), SimpleCommand("stop", "Stop the doris containers. "), - SimpleCommand("restart", "Restart the doris containers. "), + NeedStartCommand("restart", "Restart the doris containers. "), SimpleCommand("pause", "Pause the doris containers. "), SimpleCommand("unpause", "Unpause the doris containers. "), GetCloudIniCommand("get-cloud-ini"), diff --git a/docker/runtime/doris-compose/utils.py b/docker/runtime/doris-compose/utils.py index 735947e86bd..bfe71dbbbfb 100644 --- a/docker/runtime/doris-compose/utils.py +++ b/docker/runtime/doris-compose/utils.py @@ -15,11 +15,13 @@ # specific language governing permissions and limitations # under the License. +import contextlib import docker import json import logging import os import pwd +import socket import subprocess import time import yaml @@ -278,6 +280,12 @@ def copy_image_directory(image, image_dir, local_dir): entrypoint="cp -r {} /opt/mount/".format(image_dir)) +def is_socket_avail(ip, port): + with contextlib.closing(socket.socket(socket.AF_INET, + socket.SOCK_STREAM)) as sock: + return sock.connect_ex((ip, port)) == 0 + + def enable_dir_with_rw_perm(dir): if not os.path.exists(dir): return diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index ed2a8335db7..bb88afd2b6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -85,6 +85,7 @@ import java.util.Map; import java.util.Optional; import java.util.Queue; import java.util.Set; +import java.util.stream.Collectors; /** * TabletScheduler saved the tablets produced by TabletChecker and try to schedule them. @@ -1535,9 +1536,13 @@ public class TabletScheduler extends MasterDaemon { List<BePathLoadStatPair> allFitPaths = !allFitPathsSameMedium.isEmpty() ? allFitPathsSameMedium : allFitPathsDiffMedium; if (allFitPaths.isEmpty()) { + List<String> backendsInfo = Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values().stream() + .filter(be -> be.getLocationTag() == tag) + .map(Backend::getDetailsForCreateReplica) + .collect(Collectors.toList()); throw new SchedException(Status.UNRECOVERABLE, String.format("unable to find dest path for new replica" - + " for replica allocation { %s } with tag %s storage medium %s", - tabletCtx.getReplicaAlloc(), tag, tabletCtx.getStorageMedium())); + + " for replica allocation { %s } with tag %s storage medium %s, backends on this tag is: %s", + tabletCtx.getReplicaAlloc(), tag, tabletCtx.getStorageMedium(), backendsInfo)); } BePathLoadStatPairComparator comparator = new BePathLoadStatPairComparator(allFitPaths); diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java index 924f10b5ea1..1c1bcf6354c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java @@ -163,12 +163,12 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B MetricRepo.HISTO_JOURNAL_BATCH_DATA_SIZE.update(dataSize); } - if (entitySize > 32) { + if (entitySize > Config.batch_edit_log_max_item_num) { LOG.warn("write bdb journal batch is too large, batch size {}, the first journal id {}, " + "data size {}", entitySize, firstId, dataSize); } - if (dataSize > 640 * 1024) { // 640KB + if (dataSize > Config.batch_edit_log_max_byte_size) { // 640KB LOG.warn("write bdb journal batch data is too large, data size {}, the first journal id {}, " + "batch size {}", dataSize, firstId, entitySize); } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 2aaece2c678..159e622f454 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -522,40 +522,38 @@ class SuiteCluster { return this.isCloudMode } + int START_WAIT_TIMEOUT = 120 + // if not specific fe indices, then start all frontends void startFrontends(int... indices) { - runFrontendsCmd('start', indices) - waitHbChanged() + runFrontendsCmd(START_WAIT_TIMEOUT + 5, "start --wait-timeout ${START_WAIT_TIMEOUT}".toString(), indices) } // if not specific be indices, then start all backends void startBackends(int... indices) { - runBackendsCmd('start', indices) - waitHbChanged() + runBackendsCmd(START_WAIT_TIMEOUT + 5, "start --wait-timeout ${START_WAIT_TIMEOUT}".toString(), indices) } // if not specific fe indices, then stop all frontends void stopFrontends(int... indices) { - runFrontendsCmd('stop', indices) + runFrontendsCmd(60, 'stop', indices) waitHbChanged() } // if not specific be indices, then stop all backends void stopBackends(int... indices) { - runBackendsCmd('stop', indices) + runBackendsCmd(60, 'stop', indices) waitHbChanged() } // if not specific fe indices, then restart all frontends void restartFrontends(int... indices) { - runFrontendsCmd('restart', indices) - waitHbChanged() + runFrontendsCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout ${START_WAIT_TIMEOUT}".toString(), indices) } // if not specific be indices, then restart all backends void restartBackends(int... indices) { - runBackendsCmd('restart', indices) - waitHbChanged() + runBackendsCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout ${START_WAIT_TIMEOUT}".toString(), indices) } // if not specific fe indices, then drop all frontends @@ -564,7 +562,7 @@ class SuiteCluster { if (clean) { cmd += ' --clean' } - runFrontendsCmd(cmd, indices) + runFrontendsCmd(60, cmd, indices) } // if not specific be indices, then decommission all backends @@ -582,7 +580,7 @@ class SuiteCluster { if (clean) { cmd += ' --clean' } - runBackendsCmd(cmd, indices) + runBackendsCmd(60, cmd, indices) } void checkFeIsAlive(int index, boolean isAlive) { @@ -622,18 +620,14 @@ class SuiteCluster { Thread.sleep(7000) } - private void runFrontendsCmd(String op, int... indices) { + private void runFrontendsCmd(int timeoutSecond, String op, int... indices) { def cmd = op + ' ' + name + ' --fe-id ' + indices.join(' ') - runCmd(cmd) + runCmd(cmd, timeoutSecond) } - private void runBackendsCmd(Integer timeoutSecond = null, String op, int... indices) { + private void runBackendsCmd(int timeoutSecond, String op, int... indices) { def cmd = op + ' ' + name + ' --be-id ' + indices.join(' ') - if (timeoutSecond == null) { - runCmd(cmd) - } else { - runCmd(cmd, timeoutSecond) - } + runCmd(cmd, timeoutSecond) } private Object runCmd(String cmd, int timeoutSecond = 60) throws Exception { diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy b/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy index 9f345b4828c..973492bd4b5 100644 --- a/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy +++ b/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy @@ -98,7 +98,7 @@ suite('test_fe_tablet_same_backend', 'multi_cluster,docker') { } def checkAllTable = { isAllBeAliveOrDeadLong -> - dockerAwaitUntil(5) { + dockerAwaitUntil(60) { checkAllTableImpl(isAllBeAliveOrDeadLong, true) } checkAllTableImpl(isAllBeAliveOrDeadLong, false) @@ -128,7 +128,7 @@ suite('test_fe_tablet_same_backend', 'multi_cluster,docker') { checkAllTable(true) cluster.stopBackends(choseDeadBeIndex) - dockerAwaitUntil(10) { + dockerAwaitUntil(60) { def chosenBe = cluster.getBeByIndex(choseDeadBeIndex) !chosenBe.alive } @@ -144,13 +144,13 @@ suite('test_fe_tablet_same_backend', 'multi_cluster,docker') { def choseRestartFeIndex = cluster.getOneFollowerFe().index cluster.stopFrontends(choseRestartFeIndex) - dockerAwaitUntil(10) { + dockerAwaitUntil(60) { def chosenFe = cluster.getFeByIndex(choseRestartFeIndex) !chosenFe.alive } cluster.startFrontends(choseRestartFeIndex) - dockerAwaitUntil(10) { + dockerAwaitUntil(60) { def chosenFe = cluster.getFeByIndex(choseRestartFeIndex) chosenFe.alive } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org