This is an automated email from the ASF dual-hosted git repository.

jiaguo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ed431a174e Add instance level consumer dir usage metric (#14430)
ed431a174e is described below

commit ed431a174ed110a213c65094538797e9cfb3bbe3
Author: Sabrina Zhao <yifz...@linkedin.com>
AuthorDate: Thu Nov 14 09:13:33 2024 -0800

    Add instance level consumer dir usage metric (#14430)
    
    * add instance level consumer dir usage metric
    
    * address comments
    
    * address comment
    
    * trigger test
---
 .../apache/pinot/common/metrics/ServerGauge.java    |  3 ++-
 .../core/data/manager/InstanceDataManager.java      |  5 +++++
 .../manager/realtime/RealtimeTableDataManager.java  | 20 ++++++++++++--------
 .../server/starter/helix/BaseServerStarter.java     | 21 +++++++++++++++++++++
 .../starter/helix/HelixInstanceDataManager.java     | 13 +++++++++++++
 5 files changed, 53 insertions(+), 9 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index fa9f372823..b999e7b8e4 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -76,7 +76,8 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   // Needed to track if valid doc id snapshots are present for faster restarts
   UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT("upsertValidDocIdSnapshotCount", false),
   UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT("upsertPrimaryKeysInSnapshotCount", 
false),
-  REALTIME_INGESTION_OFFSET_LAG("offsetLag", false);
+  REALTIME_INGESTION_OFFSET_LAG("offsetLag", false),
+  REALTIME_CONSUMER_DIR_USAGE("bytes", true);
 
   private final String _gaugeName;
   private final String _unit;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 73f497582e..95a135f1e4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -194,4 +194,9 @@ public interface InstanceDataManager {
    * @param isServerReadyToServeQueries supplier to retrieve state of server.
    */
   void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean> 
isServerReadyToServeQueries);
+
+  /**
+   * Returns consumer directory paths on the instance
+   */
+  List<File> getConsumerDirPaths();
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index b4b33baa02..34e2366966 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -369,6 +369,17 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   }
 
   public String getConsumerDir() {
+    File consumerDir = getConsumerDirPath();
+    if (!consumerDir.exists()) {
+      if (!consumerDir.mkdirs()) {
+        _logger.error("Failed to create consumer directory {}", 
consumerDir.getAbsolutePath());
+      }
+    }
+
+    return consumerDir.getAbsolutePath();
+  }
+
+  public File getConsumerDirPath() {
     String consumerDirPath = _instanceDataManagerConfig.getConsumerDir();
     File consumerDir;
     // If a consumer directory has been configured, use it to create a 
per-table path under the consumer dir.
@@ -379,14 +390,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       consumerDirPath = _tableDataDir + File.separator + CONSUMERS_DIR;
       consumerDir = new File(consumerDirPath);
     }
-
-    if (!consumerDir.exists()) {
-      if (!consumerDir.mkdirs()) {
-        _logger.error("Failed to create consumer directory {}", 
consumerDir.getAbsolutePath());
-      }
-    }
-
-    return consumerDir.getAbsolutePath();
+    return consumerDir;
   }
 
   public boolean isDedupEnabled() {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 98f700c277..6ee02db860 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -21,6 +21,7 @@ package org.apache.pinot.server.starter.helix;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -33,6 +34,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
@@ -52,6 +54,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.restlet.resources.SystemResourceInfo;
@@ -130,6 +133,7 @@ import org.slf4j.LoggerFactory;
 public abstract class BaseServerStarter implements ServiceStartable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseServerStarter.class);
 
+  private static final long CONSUMER_DIRECTORY_EXCEPTION_VALUE = -1L;
   protected String _helixClusterName;
   protected String _zkAddress;
   protected PinotConfiguration _serverConf;
@@ -707,6 +711,23 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     _serverQueriesDisabledTracker =
         new ServerQueriesDisabledTracker(_helixClusterName, _instanceId, 
_helixManager, serverMetrics);
     _serverQueriesDisabledTracker.start();
+
+    // Add metrics for consumer directory usage
+    
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.REALTIME_CONSUMER_DIR_USAGE, 
() -> {
+      List<File> instanceConsumerDirs = 
instanceDataManager.getConsumerDirPaths();
+      long totalSize = 0;
+      try {
+        for (File consumerDir : instanceConsumerDirs) {
+          if (consumerDir.exists()) {
+            totalSize += FileUtils.sizeOfDirectory(consumerDir);
+          }
+        }
+        return totalSize;
+      } catch (Exception e) {
+        LOGGER.warn("Failed to gather size info for consumer directories", e);
+        return CONSUMER_DIRECTORY_EXCEPTION_VALUE;
+      }
+    });
   }
 
   /**
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 4cf21a61fb..72009cb183 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -52,6 +52,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
 import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import 
org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
 import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
 import org.apache.pinot.core.util.SegmentRefreshSemaphore;
@@ -180,6 +181,18 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     }
   }
 
+  @Override
+  public List<File> getConsumerDirPaths() {
+    List<File> consumerDirs = new ArrayList<>();
+    for (TableDataManager tableDataManager : _tableDataManagerMap.values()) {
+      if (tableDataManager instanceof RealtimeTableDataManager) {
+        File consumerDir = ((RealtimeTableDataManager) 
tableDataManager).getConsumerDirPath();
+        consumerDirs.add(consumerDir);
+      }
+    }
+    return consumerDirs;
+  }
+
   @Override
   public String getInstanceId() {
     return _instanceId;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to