This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-untar-failure-server-meter
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit ef93477b20f84ffe57e0ee39481d3c759d5f86b7
Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
AuthorDate: Wed Jul 29 13:38:55 2020 -0700

    Add untar failure server meter
---
 .../apache/pinot/common/metrics/ServerMeter.java   |  1 +
 .../server/starter/helix/HelixServerStarter.java   | 10 +++----
 .../starter/helix/SegmentFetcherAndLoader.java     | 32 ++++++++++++++--------
 3 files changed, 26 insertions(+), 17 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 3438415..0a8a30e 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -61,6 +61,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   NUM_MISSING_SEGMENTS("segments", false),
   RELOAD_FAILURES("segments", false),
   REFRESH_FAILURES("segments", false),
+  UNTAR_FAILURES("segments", false),
 
   // Netty connection metrics
   NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index e86e697..09b18ce 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -159,13 +159,13 @@ public class HelixServerStarter implements 
ServiceStartable {
         
_serverConf.getProperty(CommonConstants.Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, 
false) ? NetUtil
             .getHostnameOrAddress() : NetUtil.getHostAddress());
     _port = _serverConf.getProperty(KEY_OF_SERVER_NETTY_PORT, 
DEFAULT_SERVER_NETTY_PORT);
-    
+
     _instanceId = 
Optional.ofNullable(_serverConf.getProperty(CONFIG_OF_INSTANCE_ID))
 
         // InstanceId is not configured. Fallback to an auto generated config.
         .orElseGet(this::initializeDefaultInstanceId);
   }
-  
+
   private String initializeDefaultInstanceId() {
     String instanceId = PREFIX_OF_SERVER_INSTANCE + _host + "_" + _port;
 
@@ -367,8 +367,9 @@ public class HelixServerStarter implements ServiceStartable 
{
         
.init(_serverConf.subset(SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
     ServerConf serverInstanceConfig = 
DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(_serverConf);
     _serverInstance = new ServerInstance(serverInstanceConfig, _helixManager);
+    ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
     InstanceDataManager instanceDataManager = 
_serverInstance.getInstanceDataManager();
-    SegmentFetcherAndLoader fetcherAndLoader = new 
SegmentFetcherAndLoader(_serverConf, instanceDataManager);
+    SegmentFetcherAndLoader fetcherAndLoader = new 
SegmentFetcherAndLoader(_serverConf, instanceDataManager, serverMetrics);
     StateModelFactory<?> stateModelFactory =
         new SegmentOnlineOfflineStateModelFactory(_instanceId, 
instanceDataManager, fetcherAndLoader);
     _helixManager.getStateMachineEngine()
@@ -400,7 +401,6 @@ public class HelixServerStarter implements ServiceStartable 
{
     _adminApiApplication.start(adminApiPort);
     setAdminApiPort(adminApiPort);
 
-    ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
     // Register message handler factory
     SegmentMessageHandlerFactory messageHandlerFactory =
         new SegmentMessageHandlerFactory(fetcherAndLoader, 
instanceDataManager, serverMetrics);
@@ -643,7 +643,7 @@ public class HelixServerStarter implements ServiceStartable 
{
     properties.put(KEY_OF_SERVER_NETTY_PORT, port);
     properties.put(CONFIG_OF_INSTANCE_DATA_DIR, "/tmp/PinotServer/test" + port 
+ "/index");
     properties.put(CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, "/tmp/PinotServer/test" 
+ port + "/segmentTar");
-    
+
     HelixServerStarter serverStarter =
         new HelixServerStarter("quickstart", "localhost:2191", new 
PinotConfiguration(properties));
     serverStarter.start();
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
index 85256af..e5f3c35 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
@@ -28,6 +28,8 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
@@ -53,10 +55,12 @@ public class SegmentFetcherAndLoader {
   private static final String ENCODED_SUFFIX = ".enc";
 
   private final InstanceDataManager _instanceDataManager;
+  private final ServerMetrics _serverMetrics;
 
-  public SegmentFetcherAndLoader(PinotConfiguration config, 
InstanceDataManager instanceDataManager)
+  public SegmentFetcherAndLoader(PinotConfiguration config, 
InstanceDataManager instanceDataManager, ServerMetrics serverMetrics)
       throws Exception {
     _instanceDataManager = instanceDataManager;
+    _serverMetrics = serverMetrics;
 
     PinotConfiguration pinotFSConfig = 
config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY);
     PinotConfiguration segmentFetcherFactoryConfig =
@@ -202,18 +206,22 @@ public class SegmentFetcherAndLoader {
           .info("Downloaded tarred segment: {} for table: {} from: {} to: {}, 
file length: {}", segmentName, tableName,
               uri, tempTarFile, tempTarFile.length());
 
-      // If an exception is thrown when untarring, it means the tar file is 
broken OR not found after the retry.
-      // Thus, there's no need to retry again.
-      File tempIndexDir = TarGzCompressionUtils.untar(tempTarFile, 
tempSegmentDir).get(0);
-
-      File indexDir = new File(new 
File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName);
-      if (indexDir.exists()) {
-        LOGGER.info("Deleting existing index directory for segment: {} for 
table: {}", segmentName, tableName);
-        FileUtils.deleteDirectory(indexDir);
+      try {
+        // If an exception is thrown when untarring, it means the tar file is 
broken OR not found after the retry.
+        // Thus, there's no need to retry again.
+        File tempIndexDir = TarGzCompressionUtils.untar(tempTarFile, 
tempSegmentDir).get(0);
+        File indexDir = new File(new 
File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName);
+        if (indexDir.exists()) {
+          LOGGER.info("Deleting existing index directory for segment: {} for 
table: {}", segmentName, tableName);
+          FileUtils.deleteDirectory(indexDir);
+        }
+        FileUtils.moveDirectory(tempIndexDir, indexDir);
+        LOGGER.info("Successfully downloaded segment: {} for table: {} to: 
{}", segmentName, tableName, indexDir);
+        return indexDir.getAbsolutePath();
+      } catch (Exception e) {
+        _serverMetrics.addMeteredTableValue(tableName, 
ServerMeter.UNTAR_FAILURES, 1L);
+        throw e;
       }
-      FileUtils.moveDirectory(tempIndexDir, indexDir);
-      LOGGER.info("Successfully downloaded segment: {} for table: {} to: {}", 
segmentName, tableName, indexDir);
-      return indexDir.getAbsolutePath();
     } finally {
       FileUtils.deleteQuietly(tempDir);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to