This is an automated email from the ASF dual-hosted git repository. lide pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new fe182c71488 branch-2.1: [fix](bug) Fix dead lock in Tablet Stat Mgr (#47242) fe182c71488 is described below commit fe182c71488afe265a0d3971aef4444709043880 Author: xy720 <22125576+xy...@users.noreply.github.com> AuthorDate: Tue Jan 21 20:45:37 2025 +0800 branch-2.1: [fix](bug) Fix dead lock in Tablet Stat Mgr (#47242) --- .../main/java/org/apache/doris/common/Config.java | 4 ++ .../org/apache/doris/catalog/TabletStatMgr.java | 57 +++++++++++++++++++--- .../java/org/apache/doris/metric/MetricRepo.java | 6 +++ 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index de749bc08be..bbfa2f30e3a 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -283,6 +283,10 @@ public class Config extends ConfigBase { "Queue size to store heartbeat task in heartbeat_mgr"}) public static int heartbeat_mgr_blocking_queue_size = 1024; + @ConfField(masterOnly = true, description = {"TabletStatMgr线程数", + "Num of thread to update tablet stat"}) + public static int tablet_stat_mgr_threads_num = -1; + @ConfField(masterOnly = true, description = {"Agent任务线程池的线程数", "Num of thread to handle agent task in agent task thread-pool"}) public static int max_agent_task_threads_num = 4096; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index 7224d66ad44..4b3843274ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -20,7 +20,11 @@ package org.apache.doris.catalog; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; +import org.apache.doris.common.MarkedCountDownLatch; +import org.apache.doris.common.Status; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.system.Backend; import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.TNetworkAddress; @@ -33,7 +37,9 @@ import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; -import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /* * TabletStatMgr is for collecting tablet(replica) statistics from backends. @@ -42,7 +48,13 @@ import java.util.concurrent.ForkJoinPool; public class TabletStatMgr extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(TabletStatMgr.class); - private ForkJoinPool taskPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); + private final ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool( + Config.tablet_stat_mgr_threads_num > 0 + ? Config.tablet_stat_mgr_threads_num + : Runtime.getRuntime().availableProcessors(), + 1024, "tablet-stat-mgr", true); + + private MarkedCountDownLatch<Long, Backend> updateTabletStatsLatch = null; public TabletStatMgr() { super("tablet stat mgr", Config.tablet_stat_update_interval_second * 1000); @@ -52,9 +64,13 @@ public class TabletStatMgr extends MasterDaemon { protected void runAfterCatalogReady() { ImmutableMap<Long, Backend> backends = Env.getCurrentSystemInfo().getIdToBackend(); long start = System.currentTimeMillis(); - taskPool.submit(() -> { - // no need to get tablet stat if backend is not alive - backends.values().stream().filter(Backend::isAlive).parallel().forEach(backend -> { + // no need to get tablet stat if backend is not alive + List<Backend> aliveBackends = backends.values().stream().filter(Backend::isAlive) + .collect(Collectors.toList()); + updateTabletStatsLatch = new MarkedCountDownLatch<>(aliveBackends.size()); + aliveBackends.forEach(backend -> { + updateTabletStatsLatch.addMark(backend.getId(), backend); + executor.submit(() -> { BackendService.Client client = null; TNetworkAddress address = null; boolean ok = false; @@ -67,8 +83,10 @@ public class TabletStatMgr extends MasterDaemon { result.getTabletsStatsSize()); } updateTabletStat(backend.getId(), result); + updateTabletStatsLatch.markedCountDown(backend.getId(), backend); ok = true; } catch (Throwable e) { + updateTabletStatsLatch.markedCountDownWithStatus(backend.getId(), backend, Status.CANCELLED); LOG.warn("task exec error. backend[{}]", backend.getId(), e); } @@ -82,7 +100,8 @@ public class TabletStatMgr extends MasterDaemon { LOG.warn("client pool recyle error. backend[{}]", backend.getId(), e); } }); - }).join(); + }); + waitForTabletStatUpdate(); if (LOG.isDebugEnabled()) { LOG.debug("finished to get tablet stat of all backends. cost: {} ms", (System.currentTimeMillis() - start)); @@ -204,6 +223,32 @@ public class TabletStatMgr extends MasterDaemon { (System.currentTimeMillis() - start)); } + public void waitForTabletStatUpdate() { + boolean ok = true; + try { + if (!updateTabletStatsLatch.await(600, TimeUnit.SECONDS)) { + LOG.info("timeout waiting {} update tablet stats tasks finish after {} seconds.", + updateTabletStatsLatch.getCount(), 600); + ok = false; + } + } catch (InterruptedException e) { + LOG.warn("InterruptedException, {}", this, e); + } + if (!ok || !updateTabletStatsLatch.getStatus().ok()) { + List<Long> unfinishedBackendIds = updateTabletStatsLatch.getLeftMarks().stream() + .map(Map.Entry::getKey).collect(Collectors.toList()); + Status status = Status.TIMEOUT; + if (!updateTabletStatsLatch.getStatus().ok()) { + status = updateTabletStatsLatch.getStatus(); + } + LOG.warn("Failed to update tablet stats reason: {}, unfinished backends: {}", + status.getErrorMsg(), unfinishedBackendIds); + if (MetricRepo.isInit) { + MetricRepo.COUNTER_UPDATE_TABLET_STAT_FAILED.increase(1L); + } + } + } + private void updateTabletStat(Long beId, TTabletStatResult result) { TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); if (result.isSetTabletStatList()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java index f029b4f6255..eed713cc708 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -90,6 +90,8 @@ public final class MetricRepo { public static LongCounterMetric COUNTER_CACHE_HIT_SQL; public static LongCounterMetric COUNTER_CACHE_HIT_PARTITION; + public static LongCounterMetric COUNTER_UPDATE_TABLET_STAT_FAILED; + public static LongCounterMetric COUNTER_EDIT_LOG_WRITE; public static LongCounterMetric COUNTER_EDIT_LOG_READ; public static LongCounterMetric COUNTER_EDIT_LOG_CURRENT; @@ -454,6 +456,10 @@ public final class MetricRepo { "counter of failed transactions"); COUNTER_TXN_FAILED.addLabel(new MetricLabel("type", "failed")); DORIS_METRIC_REGISTER.addMetrics(COUNTER_TXN_FAILED); + COUNTER_UPDATE_TABLET_STAT_FAILED = new LongCounterMetric("update_tablet_stat_failed", MetricUnit.REQUESTS, + "counter of failed to update tablet stat"); + COUNTER_UPDATE_TABLET_STAT_FAILED.addLabel(new MetricLabel("type", "failed")); + DORIS_METRIC_REGISTER.addMetrics(COUNTER_UPDATE_TABLET_STAT_FAILED); HISTO_TXN_EXEC_LATENCY = METRIC_REGISTER.histogram( MetricRegistry.name("txn", "exec", "latency", "ms")); HISTO_TXN_PUBLISH_LATENCY = METRIC_REGISTER.histogram( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org