This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch add-untar-failure-server-meter in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit ef93477b20f84ffe57e0ee39481d3c759d5f86b7 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Wed Jul 29 13:38:55 2020 -0700 Add untar failure server meter --- .../apache/pinot/common/metrics/ServerMeter.java | 1 + .../server/starter/helix/HelixServerStarter.java | 10 +++---- .../starter/helix/SegmentFetcherAndLoader.java | 32 ++++++++++++++-------- 3 files changed, 26 insertions(+), 17 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 3438415..0a8a30e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -61,6 +61,7 @@ public enum ServerMeter implements AbstractMetrics.Meter { NUM_MISSING_SEGMENTS("segments", false), RELOAD_FAILURES("segments", false), REFRESH_FAILURES("segments", false), + UNTAR_FAILURES("segments", false), // Netty connection metrics NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true), diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java index e86e697..09b18ce 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java @@ -159,13 +159,13 @@ public class HelixServerStarter implements ServiceStartable { _serverConf.getProperty(CommonConstants.Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false) ? NetUtil .getHostnameOrAddress() : NetUtil.getHostAddress()); _port = _serverConf.getProperty(KEY_OF_SERVER_NETTY_PORT, DEFAULT_SERVER_NETTY_PORT); - + _instanceId = Optional.ofNullable(_serverConf.getProperty(CONFIG_OF_INSTANCE_ID)) // InstanceId is not configured. Fallback to an auto generated config. .orElseGet(this::initializeDefaultInstanceId); } - + private String initializeDefaultInstanceId() { String instanceId = PREFIX_OF_SERVER_INSTANCE + _host + "_" + _port; @@ -367,8 +367,9 @@ public class HelixServerStarter implements ServiceStartable { .init(_serverConf.subset(SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER)); ServerConf serverInstanceConfig = DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(_serverConf); _serverInstance = new ServerInstance(serverInstanceConfig, _helixManager); + ServerMetrics serverMetrics = _serverInstance.getServerMetrics(); InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager(); - SegmentFetcherAndLoader fetcherAndLoader = new SegmentFetcherAndLoader(_serverConf, instanceDataManager); + SegmentFetcherAndLoader fetcherAndLoader = new SegmentFetcherAndLoader(_serverConf, instanceDataManager, serverMetrics); StateModelFactory<?> stateModelFactory = new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager, fetcherAndLoader); _helixManager.getStateMachineEngine() @@ -400,7 +401,6 @@ public class HelixServerStarter implements ServiceStartable { _adminApiApplication.start(adminApiPort); setAdminApiPort(adminApiPort); - ServerMetrics serverMetrics = _serverInstance.getServerMetrics(); // Register message handler factory SegmentMessageHandlerFactory messageHandlerFactory = new SegmentMessageHandlerFactory(fetcherAndLoader, instanceDataManager, serverMetrics); @@ -643,7 +643,7 @@ public class HelixServerStarter implements ServiceStartable { properties.put(KEY_OF_SERVER_NETTY_PORT, port); properties.put(CONFIG_OF_INSTANCE_DATA_DIR, "/tmp/PinotServer/test" + port + "/index"); properties.put(CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, "/tmp/PinotServer/test" + port + "/segmentTar"); - + HelixServerStarter serverStarter = new HelixServerStarter("quickstart", "localhost:2191", new PinotConfiguration(properties)); serverStarter.start(); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java index 85256af..e5f3c35 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java @@ -28,6 +28,8 @@ import org.apache.commons.io.FileUtils; import org.apache.pinot.common.Utils; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMeter; +import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; @@ -53,10 +55,12 @@ public class SegmentFetcherAndLoader { private static final String ENCODED_SUFFIX = ".enc"; private final InstanceDataManager _instanceDataManager; + private final ServerMetrics _serverMetrics; - public SegmentFetcherAndLoader(PinotConfiguration config, InstanceDataManager instanceDataManager) + public SegmentFetcherAndLoader(PinotConfiguration config, InstanceDataManager instanceDataManager, ServerMetrics serverMetrics) throws Exception { _instanceDataManager = instanceDataManager; + _serverMetrics = serverMetrics; PinotConfiguration pinotFSConfig = config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY); PinotConfiguration segmentFetcherFactoryConfig = @@ -202,18 +206,22 @@ public class SegmentFetcherAndLoader { .info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName, tableName, uri, tempTarFile, tempTarFile.length()); - // If an exception is thrown when untarring, it means the tar file is broken OR not found after the retry. - // Thus, there's no need to retry again. - File tempIndexDir = TarGzCompressionUtils.untar(tempTarFile, tempSegmentDir).get(0); - - File indexDir = new File(new File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName); - if (indexDir.exists()) { - LOGGER.info("Deleting existing index directory for segment: {} for table: {}", segmentName, tableName); - FileUtils.deleteDirectory(indexDir); + try { + // If an exception is thrown when untarring, it means the tar file is broken OR not found after the retry. + // Thus, there's no need to retry again. + File tempIndexDir = TarGzCompressionUtils.untar(tempTarFile, tempSegmentDir).get(0); + File indexDir = new File(new File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName); + if (indexDir.exists()) { + LOGGER.info("Deleting existing index directory for segment: {} for table: {}", segmentName, tableName); + FileUtils.deleteDirectory(indexDir); + } + FileUtils.moveDirectory(tempIndexDir, indexDir); + LOGGER.info("Successfully downloaded segment: {} for table: {} to: {}", segmentName, tableName, indexDir); + return indexDir.getAbsolutePath(); + } catch (Exception e) { + _serverMetrics.addMeteredTableValue(tableName, ServerMeter.UNTAR_FAILURES, 1L); + throw e; } - FileUtils.moveDirectory(tempIndexDir, indexDir); - LOGGER.info("Successfully downloaded segment: {} for table: {} to: {}", segmentName, tableName, indexDir); - return indexDir.getAbsolutePath(); } finally { FileUtils.deleteQuietly(tempDir); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org