This is an automated email from the ASF dual-hosted git repository.

lide pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new fe182c71488 branch-2.1: [fix](bug) Fix dead lock in Tablet Stat Mgr 
(#47242)
fe182c71488 is described below

commit fe182c71488afe265a0d3971aef4444709043880
Author: xy720 <22125576+xy...@users.noreply.github.com>
AuthorDate: Tue Jan 21 20:45:37 2025 +0800

    branch-2.1: [fix](bug) Fix dead lock in Tablet Stat Mgr (#47242)
---
 .../main/java/org/apache/doris/common/Config.java  |  4 ++
 .../org/apache/doris/catalog/TabletStatMgr.java    | 57 +++++++++++++++++++---
 .../java/org/apache/doris/metric/MetricRepo.java   |  6 +++
 3 files changed, 61 insertions(+), 6 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index de749bc08be..bbfa2f30e3a 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -283,6 +283,10 @@ public class Config extends ConfigBase {
             "Queue size to store heartbeat task in heartbeat_mgr"})
     public static int heartbeat_mgr_blocking_queue_size = 1024;
 
+    @ConfField(masterOnly = true, description = {"TabletStatMgr线程数",
+            "Num of thread to update tablet stat"})
+    public static int tablet_stat_mgr_threads_num = -1;
+
     @ConfField(masterOnly = true, description = {"Agent任务线程池的线程数",
             "Num of thread to handle agent task in agent task thread-pool"})
     public static int max_agent_task_threads_num = 4096;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
index 7224d66ad44..4b3843274ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
@@ -20,7 +20,11 @@ package org.apache.doris.catalog;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.MarkedCountDownLatch;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -33,7 +37,9 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /*
  * TabletStatMgr is for collecting tablet(replica) statistics from backends.
@@ -42,7 +48,13 @@ import java.util.concurrent.ForkJoinPool;
 public class TabletStatMgr extends MasterDaemon {
     private static final Logger LOG = 
LogManager.getLogger(TabletStatMgr.class);
 
-    private ForkJoinPool taskPool = new 
ForkJoinPool(Runtime.getRuntime().availableProcessors());
+    private final ExecutorService executor = 
ThreadPoolManager.newDaemonFixedThreadPool(
+            Config.tablet_stat_mgr_threads_num > 0
+                    ? Config.tablet_stat_mgr_threads_num
+                    : Runtime.getRuntime().availableProcessors(),
+            1024, "tablet-stat-mgr", true);
+
+    private MarkedCountDownLatch<Long, Backend> updateTabletStatsLatch = null;
 
     public TabletStatMgr() {
         super("tablet stat mgr", Config.tablet_stat_update_interval_second * 
1000);
@@ -52,9 +64,13 @@ public class TabletStatMgr extends MasterDaemon {
     protected void runAfterCatalogReady() {
         ImmutableMap<Long, Backend> backends = 
Env.getCurrentSystemInfo().getIdToBackend();
         long start = System.currentTimeMillis();
-        taskPool.submit(() -> {
-            // no need to get tablet stat if backend is not alive
-            
backends.values().stream().filter(Backend::isAlive).parallel().forEach(backend 
-> {
+        // no need to get tablet stat if backend is not alive
+        List<Backend> aliveBackends = 
backends.values().stream().filter(Backend::isAlive)
+                .collect(Collectors.toList());
+        updateTabletStatsLatch = new 
MarkedCountDownLatch<>(aliveBackends.size());
+        aliveBackends.forEach(backend -> {
+            updateTabletStatsLatch.addMark(backend.getId(), backend);
+            executor.submit(() -> {
                 BackendService.Client client = null;
                 TNetworkAddress address = null;
                 boolean ok = false;
@@ -67,8 +83,10 @@ public class TabletStatMgr extends MasterDaemon {
                                 result.getTabletsStatsSize());
                     }
                     updateTabletStat(backend.getId(), result);
+                    updateTabletStatsLatch.markedCountDown(backend.getId(), 
backend);
                     ok = true;
                 } catch (Throwable e) {
+                    
updateTabletStatsLatch.markedCountDownWithStatus(backend.getId(), backend, 
Status.CANCELLED);
                     LOG.warn("task exec error. backend[{}]", backend.getId(), 
e);
                 }
 
@@ -82,7 +100,8 @@ public class TabletStatMgr extends MasterDaemon {
                     LOG.warn("client pool recyle error. backend[{}]", 
backend.getId(), e);
                 }
             });
-        }).join();
+        });
+        waitForTabletStatUpdate();
         if (LOG.isDebugEnabled()) {
             LOG.debug("finished to get tablet stat of all backends. cost: {} 
ms",
                     (System.currentTimeMillis() - start));
@@ -204,6 +223,32 @@ public class TabletStatMgr extends MasterDaemon {
                 (System.currentTimeMillis() - start));
     }
 
+    public void waitForTabletStatUpdate() {
+        boolean ok = true;
+        try {
+            if (!updateTabletStatsLatch.await(600, TimeUnit.SECONDS)) {
+                LOG.info("timeout waiting {} update tablet stats tasks finish 
after {} seconds.",
+                        updateTabletStatsLatch.getCount(), 600);
+                ok = false;
+            }
+        } catch (InterruptedException e) {
+            LOG.warn("InterruptedException, {}", this, e);
+        }
+        if (!ok || !updateTabletStatsLatch.getStatus().ok()) {
+            List<Long> unfinishedBackendIds = 
updateTabletStatsLatch.getLeftMarks().stream()
+                    .map(Map.Entry::getKey).collect(Collectors.toList());
+            Status status = Status.TIMEOUT;
+            if (!updateTabletStatsLatch.getStatus().ok()) {
+                status = updateTabletStatsLatch.getStatus();
+            }
+            LOG.warn("Failed to update tablet stats reason: {}, unfinished 
backends: {}",
+                    status.getErrorMsg(), unfinishedBackendIds);
+            if (MetricRepo.isInit) {
+                MetricRepo.COUNTER_UPDATE_TABLET_STAT_FAILED.increase(1L);
+            }
+        }
+    }
+
     private void updateTabletStat(Long beId, TTabletStatResult result) {
         TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
         if (result.isSetTabletStatList()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java 
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index f029b4f6255..eed713cc708 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -90,6 +90,8 @@ public final class MetricRepo {
     public static LongCounterMetric COUNTER_CACHE_HIT_SQL;
     public static LongCounterMetric COUNTER_CACHE_HIT_PARTITION;
 
+    public static LongCounterMetric COUNTER_UPDATE_TABLET_STAT_FAILED;
+
     public static LongCounterMetric COUNTER_EDIT_LOG_WRITE;
     public static LongCounterMetric COUNTER_EDIT_LOG_READ;
     public static LongCounterMetric COUNTER_EDIT_LOG_CURRENT;
@@ -454,6 +456,10 @@ public final class MetricRepo {
                 "counter of failed transactions");
         COUNTER_TXN_FAILED.addLabel(new MetricLabel("type", "failed"));
         DORIS_METRIC_REGISTER.addMetrics(COUNTER_TXN_FAILED);
+        COUNTER_UPDATE_TABLET_STAT_FAILED = new 
LongCounterMetric("update_tablet_stat_failed", MetricUnit.REQUESTS,
+            "counter of failed to update tablet stat");
+        COUNTER_UPDATE_TABLET_STAT_FAILED.addLabel(new MetricLabel("type", 
"failed"));
+        DORIS_METRIC_REGISTER.addMetrics(COUNTER_UPDATE_TABLET_STAT_FAILED);
         HISTO_TXN_EXEC_LATENCY = METRIC_REGISTER.histogram(
             MetricRegistry.name("txn", "exec", "latency", "ms"));
         HISTO_TXN_PUBLISH_LATENCY = METRIC_REGISTER.histogram(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to