This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 19a00cf547b [feature](doris compose) start node wait for ready service 
(#42266)
19a00cf547b is described below

commit 19a00cf547bb9c76ed2ccbab2a1d9ea4b85402ec
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Mon Oct 28 11:28:19 2024 +0800

    [feature](doris compose) start node wait for ready service (#42266)
    
    when SuiteCluster call start/restart frontends and backends, it will
    wait nodes ready for service.
---
 docker/runtime/doris-compose/command.py            | 102 +++++++++++++++------
 docker/runtime/doris-compose/utils.py              |   8 ++
 .../org/apache/doris/clone/TabletScheduler.java    |   9 +-
 .../apache/doris/journal/bdbje/BDBJEJournal.java   |   4 +-
 .../doris/regression/suite/SuiteCluster.groovy     |  34 +++----
 .../test_fe_tablet_same_backend.groovy             |   8 +-
 6 files changed, 107 insertions(+), 58 deletions(-)

diff --git a/docker/runtime/doris-compose/command.py 
b/docker/runtime/doris-compose/command.py
index cffc76df7c7..469437c28b2 100644
--- a/docker/runtime/doris-compose/command.py
+++ b/docker/runtime/doris-compose/command.py
@@ -30,6 +30,47 @@ import time
 LOG = utils.get_logger()
 
 
+def wait_ready_service(wait_timeout, cluster, fe_ids, be_ids):
+
+    def is_fe_ready_service(ip):
+        return utils.is_socket_avail(ip, CLUSTER.FE_QUERY_PORT)
+
+    def is_be_ready_service(ip):
+        return utils.is_socket_avail(ip, CLUSTER.BE_HEARTBEAT_PORT)
+
+    if wait_timeout == 0:
+        return
+    if wait_timeout == -1:
+        wait_timeout = 1000000000
+    expire_ts = time.time() + wait_timeout
+    while True:
+        db_mgr = database.get_db_mgr(cluster.name, False)
+        dead_frontends = []
+        for id in fe_ids:
+            fe = cluster.get_node(CLUSTER.Node.TYPE_FE, id)
+            fe_state = db_mgr.get_fe(id)
+            if not fe_state or not fe_state.alive or not is_fe_ready_service(
+                    fe.get_ip()):
+                dead_frontends.append(id)
+        dead_backends = []
+        for id in be_ids:
+            be = cluster.get_node(CLUSTER.Node.TYPE_BE, id)
+            be_state = db_mgr.get_be(id)
+            if not be_state or not be_state.alive or not is_be_ready_service(
+                    be.get_ip()):
+                dead_backends.append(id)
+        if not dead_frontends and not dead_backends:
+            break
+        if time.time() >= expire_ts:
+            err = ""
+            if dead_frontends:
+                err += "dead fe: " + str(dead_frontends) + ". "
+            if dead_backends:
+                err += "dead be: " + str(dead_backends) + ". "
+            raise Exception(err)
+        time.sleep(1)
+
+
 # return for_all, related_nodes, related_node_num
 def get_ids_related_nodes(cluster,
                           fe_ids,
@@ -156,10 +197,11 @@ class SimpleCommand(Command):
         parser.add_argument("NAME", help="Specify cluster name.")
         self._add_parser_ids_args(parser)
         self._add_parser_common_args(parser)
+        return parser
 
     def run(self, args):
         cluster = CLUSTER.Cluster.load(args.NAME)
-        _, related_nodes, related_node_num = get_ids_related_nodes(
+        for_all, related_nodes, related_node_num = get_ids_related_nodes(
             cluster, args.fe_id, args.be_id, args.ms_id, args.recycle_id,
             args.fdb_id)
         utils.exec_docker_compose_command(cluster.get_compose_file(),
@@ -170,6 +212,31 @@ class SimpleCommand(Command):
             utils.render_green("{} succ, total related node num {}".format(
                 show_cmd, related_node_num)))
 
+        if for_all:
+            related_nodes = cluster.get_all_nodes()
+        return cluster, related_nodes
+
+
+class NeedStartCommand(SimpleCommand):
+
+    def add_parser(self, args_parsers):
+        parser = super().add_parser(args_parsers)
+        parser.add_argument(
+            "--wait-timeout",
+            type=int,
+            default=0,
+            help=
+            "Specify wait seconds for fe/be ready for service: 0 not wait 
(default), "\
+            "> 0 max wait seconds, -1 wait unlimited."
+        )
+        return parser
+
+    def run(self, args):
+        cluster, related_nodes = super().run(args)
+        fe_ids = [node.id for node in related_nodes if node.is_fe()]
+        be_ids = [node.id for node in related_nodes if node.is_be()]
+        wait_ready_service(args.wait_timeout, cluster, fe_ids, be_ids)
+
 
 class UpCommand(Command):
 
@@ -568,33 +635,8 @@ class UpCommand(Command):
 
                 db_mgr.create_default_storage_vault(cloud_store_config)
 
-            if args.wait_timeout != 0:
-                if args.wait_timeout == -1:
-                    args.wait_timeout = 1000000000
-                expire_ts = time.time() + args.wait_timeout
-                while True:
-                    db_mgr = database.get_db_mgr(args.NAME, False)
-                    dead_frontends = []
-                    for id in add_fe_ids:
-                        fe_state = db_mgr.get_fe(id)
-                        if not fe_state or not fe_state.alive:
-                            dead_frontends.append(id)
-                    dead_backends = []
-                    for id in add_be_ids:
-                        be_state = db_mgr.get_be(id)
-                        if not be_state or not be_state.alive:
-                            dead_backends.append(id)
-                    if not dead_frontends and not dead_backends:
-                        break
-                    if time.time() >= expire_ts:
-                        err = ""
-                        if dead_frontends:
-                            err += "dead fe: " + str(dead_frontends) + ". "
-                        if dead_backends:
-                            err += "dead be: " + str(dead_backends) + ". "
-                        raise Exception(err)
-                    time.sleep(1)
-
+            wait_ready_service(args.wait_timeout, cluster, add_fe_ids,
+                               add_be_ids)
             LOG.info(
                 utils.render_green(
                     "Up cluster {} succ, related node num {}".format(
@@ -1216,9 +1258,9 @@ class GetCloudIniCommand(Command):
 ALL_COMMANDS = [
     UpCommand("up"),
     DownCommand("down"),
-    SimpleCommand("start", "Start the doris containers. "),
+    NeedStartCommand("start", "Start the doris containers. "),
     SimpleCommand("stop", "Stop the doris containers. "),
-    SimpleCommand("restart", "Restart the doris containers. "),
+    NeedStartCommand("restart", "Restart the doris containers. "),
     SimpleCommand("pause", "Pause the doris containers. "),
     SimpleCommand("unpause", "Unpause the doris containers. "),
     GetCloudIniCommand("get-cloud-ini"),
diff --git a/docker/runtime/doris-compose/utils.py 
b/docker/runtime/doris-compose/utils.py
index 735947e86bd..bfe71dbbbfb 100644
--- a/docker/runtime/doris-compose/utils.py
+++ b/docker/runtime/doris-compose/utils.py
@@ -15,11 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import contextlib
 import docker
 import json
 import logging
 import os
 import pwd
+import socket
 import subprocess
 import time
 import yaml
@@ -278,6 +280,12 @@ def copy_image_directory(image, image_dir, local_dir):
         entrypoint="cp -r  {}  /opt/mount/".format(image_dir))
 
 
+def is_socket_avail(ip, port):
+    with contextlib.closing(socket.socket(socket.AF_INET,
+                                          socket.SOCK_STREAM)) as sock:
+        return sock.connect_ex((ip, port)) == 0
+
+
 def enable_dir_with_rw_perm(dir):
     if not os.path.exists(dir):
         return
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index ed2a8335db7..bb88afd2b6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -85,6 +85,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * TabletScheduler saved the tablets produced by TabletChecker and try to 
schedule them.
@@ -1535,9 +1536,13 @@ public class TabletScheduler extends MasterDaemon {
         List<BePathLoadStatPair> allFitPaths =
                 !allFitPathsSameMedium.isEmpty() ? allFitPathsSameMedium : 
allFitPathsDiffMedium;
         if (allFitPaths.isEmpty()) {
+            List<String> backendsInfo = 
Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values().stream()
+                    .filter(be -> be.getLocationTag() == tag)
+                    .map(Backend::getDetailsForCreateReplica)
+                    .collect(Collectors.toList());
             throw new SchedException(Status.UNRECOVERABLE, 
String.format("unable to find dest path for new replica"
-                    + " for replica allocation { %s } with tag %s storage 
medium %s",
-                    tabletCtx.getReplicaAlloc(), tag, 
tabletCtx.getStorageMedium()));
+                    + " for replica allocation { %s } with tag %s storage 
medium %s, backends on this tag is: %s",
+                    tabletCtx.getReplicaAlloc(), tag, 
tabletCtx.getStorageMedium(), backendsInfo));
         }
 
         BePathLoadStatPairComparator comparator = new 
BePathLoadStatPairComparator(allFitPaths);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index 924f10b5ea1..1c1bcf6354c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -163,12 +163,12 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
                     MetricRepo.HISTO_JOURNAL_BATCH_DATA_SIZE.update(dataSize);
                 }
 
-                if (entitySize > 32) {
+                if (entitySize > Config.batch_edit_log_max_item_num) {
                     LOG.warn("write bdb journal batch is too large, batch size 
{}, the first journal id {}, "
                             + "data size {}", entitySize, firstId, dataSize);
                 }
 
-                if (dataSize > 640 * 1024) {  // 640KB
+                if (dataSize > Config.batch_edit_log_max_byte_size) {  // 640KB
                     LOG.warn("write bdb journal batch data is too large, data 
size {}, the first journal id {}, "
                             + "batch size {}", dataSize, firstId, entitySize);
                 }
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index 2aaece2c678..159e622f454 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -522,40 +522,38 @@ class SuiteCluster {
         return this.isCloudMode
     }
 
+    int START_WAIT_TIMEOUT = 120
+
     // if not specific fe indices, then start all frontends
     void startFrontends(int... indices) {
-        runFrontendsCmd('start', indices)
-        waitHbChanged()
+        runFrontendsCmd(START_WAIT_TIMEOUT + 5, "start  --wait-timeout 
${START_WAIT_TIMEOUT}".toString(), indices)
     }
 
     // if not specific be indices, then start all backends
     void startBackends(int... indices) {
-        runBackendsCmd('start', indices)
-        waitHbChanged()
+        runBackendsCmd(START_WAIT_TIMEOUT + 5, "start  --wait-timeout 
${START_WAIT_TIMEOUT}".toString(), indices)
     }
 
     // if not specific fe indices, then stop all frontends
     void stopFrontends(int... indices) {
-        runFrontendsCmd('stop', indices)
+        runFrontendsCmd(60, 'stop', indices)
         waitHbChanged()
     }
 
     // if not specific be indices, then stop all backends
     void stopBackends(int... indices) {
-        runBackendsCmd('stop', indices)
+        runBackendsCmd(60, 'stop', indices)
         waitHbChanged()
     }
 
     // if not specific fe indices, then restart all frontends
     void restartFrontends(int... indices) {
-        runFrontendsCmd('restart', indices)
-        waitHbChanged()
+        runFrontendsCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout 
${START_WAIT_TIMEOUT}".toString(), indices)
     }
 
     // if not specific be indices, then restart all backends
     void restartBackends(int... indices) {
-        runBackendsCmd('restart', indices)
-        waitHbChanged()
+        runBackendsCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout 
${START_WAIT_TIMEOUT}".toString(), indices)
     }
 
     // if not specific fe indices, then drop all frontends
@@ -564,7 +562,7 @@ class SuiteCluster {
         if (clean) {
             cmd += ' --clean'
         }
-        runFrontendsCmd(cmd, indices)
+        runFrontendsCmd(60, cmd, indices)
     }
 
     // if not specific be indices, then decommission all backends
@@ -582,7 +580,7 @@ class SuiteCluster {
         if (clean) {
             cmd += ' --clean'
         }
-        runBackendsCmd(cmd, indices)
+        runBackendsCmd(60, cmd, indices)
     }
 
     void checkFeIsAlive(int index, boolean isAlive) {
@@ -622,18 +620,14 @@ class SuiteCluster {
         Thread.sleep(7000)
     }
 
-    private void runFrontendsCmd(String op, int... indices) {
+    private void runFrontendsCmd(int timeoutSecond, String op, int... indices) 
{
         def cmd = op + ' ' + name + ' --fe-id ' + indices.join(' ')
-        runCmd(cmd)
+        runCmd(cmd, timeoutSecond)
     }
 
-    private void runBackendsCmd(Integer timeoutSecond = null, String op, 
int... indices) {
+    private void runBackendsCmd(int timeoutSecond, String op, int... indices) {
         def cmd = op + ' ' + name + ' --be-id ' + indices.join(' ')
-        if (timeoutSecond == null) {
-            runCmd(cmd)
-        } else {
-            runCmd(cmd, timeoutSecond)
-        }
+        runCmd(cmd, timeoutSecond)
     }
 
     private Object runCmd(String cmd, int timeoutSecond = 60) throws Exception 
{
diff --git 
a/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
 
b/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
index 9f345b4828c..973492bd4b5 100644
--- 
a/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
+++ 
b/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
@@ -98,7 +98,7 @@ suite('test_fe_tablet_same_backend', 'multi_cluster,docker') {
     }
 
     def checkAllTable = { isAllBeAliveOrDeadLong ->
-        dockerAwaitUntil(5) {
+        dockerAwaitUntil(60) {
             checkAllTableImpl(isAllBeAliveOrDeadLong, true)
         }
         checkAllTableImpl(isAllBeAliveOrDeadLong, false)
@@ -128,7 +128,7 @@ suite('test_fe_tablet_same_backend', 
'multi_cluster,docker') {
         checkAllTable(true)
 
         cluster.stopBackends(choseDeadBeIndex)
-        dockerAwaitUntil(10) {
+        dockerAwaitUntil(60) {
             def chosenBe = cluster.getBeByIndex(choseDeadBeIndex)
             !chosenBe.alive
         }
@@ -144,13 +144,13 @@ suite('test_fe_tablet_same_backend', 
'multi_cluster,docker') {
 
         def choseRestartFeIndex = cluster.getOneFollowerFe().index
         cluster.stopFrontends(choseRestartFeIndex)
-        dockerAwaitUntil(10) {
+        dockerAwaitUntil(60) {
             def chosenFe = cluster.getFeByIndex(choseRestartFeIndex)
             !chosenFe.alive
         }
 
         cluster.startFrontends(choseRestartFeIndex)
-        dockerAwaitUntil(10) {
+        dockerAwaitUntil(60) {
             def chosenFe = cluster.getFeByIndex(choseRestartFeIndex)
             chosenFe.alive
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to