This is an automated email from the ASF dual-hosted git repository. jiaguo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ed431a174e Add instance level consumer dir usage metric (#14430) ed431a174e is described below commit ed431a174ed110a213c65094538797e9cfb3bbe3 Author: Sabrina Zhao <yifz...@linkedin.com> AuthorDate: Thu Nov 14 09:13:33 2024 -0800 Add instance level consumer dir usage metric (#14430) * add instance level consumer dir usage metric * address comments * address comment * trigger test --- .../apache/pinot/common/metrics/ServerGauge.java | 3 ++- .../core/data/manager/InstanceDataManager.java | 5 +++++ .../manager/realtime/RealtimeTableDataManager.java | 20 ++++++++++++-------- .../server/starter/helix/BaseServerStarter.java | 21 +++++++++++++++++++++ .../starter/helix/HelixInstanceDataManager.java | 13 +++++++++++++ 5 files changed, 53 insertions(+), 9 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java index fa9f372823..b999e7b8e4 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java @@ -76,7 +76,8 @@ public enum ServerGauge implements AbstractMetrics.Gauge { // Needed to track if valid doc id snapshots are present for faster restarts UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT("upsertValidDocIdSnapshotCount", false), UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT("upsertPrimaryKeysInSnapshotCount", false), - REALTIME_INGESTION_OFFSET_LAG("offsetLag", false); + REALTIME_INGESTION_OFFSET_LAG("offsetLag", false), + REALTIME_CONSUMER_DIR_USAGE("bytes", true); private final String _gaugeName; private final String _unit; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java index 73f497582e..95a135f1e4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java @@ -194,4 +194,9 @@ public interface InstanceDataManager { * @param isServerReadyToServeQueries supplier to retrieve state of server. */ void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean> isServerReadyToServeQueries); + + /** + * Returns consumer directory paths on the instance + */ + List<File> getConsumerDirPaths(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index b4b33baa02..34e2366966 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -369,6 +369,17 @@ public class RealtimeTableDataManager extends BaseTableDataManager { } public String getConsumerDir() { + File consumerDir = getConsumerDirPath(); + if (!consumerDir.exists()) { + if (!consumerDir.mkdirs()) { + _logger.error("Failed to create consumer directory {}", consumerDir.getAbsolutePath()); + } + } + + return consumerDir.getAbsolutePath(); + } + + public File getConsumerDirPath() { String consumerDirPath = _instanceDataManagerConfig.getConsumerDir(); File consumerDir; // If a consumer directory has been configured, use it to create a per-table path under the consumer dir. @@ -379,14 +390,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { consumerDirPath = _tableDataDir + File.separator + CONSUMERS_DIR; consumerDir = new File(consumerDirPath); } - - if (!consumerDir.exists()) { - if (!consumerDir.mkdirs()) { - _logger.error("Failed to create consumer directory {}", consumerDir.getAbsolutePath()); - } - } - - return consumerDir.getAbsolutePath(); + return consumerDir; } public boolean isDedupEnabled() { diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 98f700c277..6ee02db860 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -21,6 +21,7 @@ package org.apache.pinot.server.starter.helix; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -33,6 +34,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Supplier; import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; @@ -52,6 +54,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.Utils; import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.restlet.resources.SystemResourceInfo; @@ -130,6 +133,7 @@ import org.slf4j.LoggerFactory; public abstract class BaseServerStarter implements ServiceStartable { private static final Logger LOGGER = LoggerFactory.getLogger(BaseServerStarter.class); + private static final long CONSUMER_DIRECTORY_EXCEPTION_VALUE = -1L; protected String _helixClusterName; protected String _zkAddress; protected PinotConfiguration _serverConf; @@ -707,6 +711,23 @@ public abstract class BaseServerStarter implements ServiceStartable { _serverQueriesDisabledTracker = new ServerQueriesDisabledTracker(_helixClusterName, _instanceId, _helixManager, serverMetrics); _serverQueriesDisabledTracker.start(); + + // Add metrics for consumer directory usage + serverMetrics.setOrUpdateGlobalGauge(ServerGauge.REALTIME_CONSUMER_DIR_USAGE, () -> { + List<File> instanceConsumerDirs = instanceDataManager.getConsumerDirPaths(); + long totalSize = 0; + try { + for (File consumerDir : instanceConsumerDirs) { + if (consumerDir.exists()) { + totalSize += FileUtils.sizeOfDirectory(consumerDir); + } + } + return totalSize; + } catch (Exception e) { + LOGGER.warn("Failed to gather size info for consumer directories", e); + return CONSUMER_DIRECTORY_EXCEPTION_VALUE; + } + }); } /** diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 4cf21a61fb..72009cb183 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -52,6 +52,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader; import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; import org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender; import org.apache.pinot.core.data.manager.realtime.SegmentUploader; import org.apache.pinot.core.util.SegmentRefreshSemaphore; @@ -180,6 +181,18 @@ public class HelixInstanceDataManager implements InstanceDataManager { } } + @Override + public List<File> getConsumerDirPaths() { + List<File> consumerDirs = new ArrayList<>(); + for (TableDataManager tableDataManager : _tableDataManagerMap.values()) { + if (tableDataManager instanceof RealtimeTableDataManager) { + File consumerDir = ((RealtimeTableDataManager) tableDataManager).getConsumerDirPath(); + consumerDirs.add(consumerDir); + } + } + return consumerDirs; + } + @Override public String getInstanceId() { return _instanceId; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org