This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new fc8ee6a09c Metrics for Segment Upload/Download (#15585) fc8ee6a09c is described below commit fc8ee6a09c4084262b2ba31968f677f3409746dd Author: Praveen <praveenkchagan...@gmail.com> AuthorDate: Wed Apr 23 13:42:22 2025 -0700 Metrics for Segment Upload/Download (#15585) * Segment metrics * Upload guage * only controller metrics * cleanup * review comments 2 --- .../pinot/common/metrics/ControllerGauge.java | 11 ++- .../pinot/common/metrics/ControllerMeter.java | 6 +- .../pinot/common/metrics/ControllerTimer.java | 7 +- .../resources/LLCSegmentCompletionHandlers.java | 10 +++ .../PinotSegmentUploadDownloadRestletResource.java | 11 ++- .../controller/api/resources/ResourceUtils.java | 90 +++++++++++++++++++++- .../pinot/controller/api/upload/ZKOperator.java | 11 ++- 7 files changed, 137 insertions(+), 9 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 3ae4db405a..51ff09387c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -207,7 +207,16 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT("pauselessSegmentsInUnrecoverableErrorCount", false), // ZK JUTE max buffer size in bytes - ZK_JUTE_MAX_BUFFER("zkJuteMaxBuffer", true); + ZK_JUTE_MAX_BUFFER("zkJuteMaxBuffer", true), + + // Bytes to be read from deep store + DEEP_STORE_READ_BYTES_IN_PROGRESS("deepStoreReadBytesInProgress", true), + // Count of deep store segment downloads that are currently in progress + DEEP_STORE_READ_OPS_IN_PROGRESS("deepStoreReadOpsInProgress", true), + // Bytes to be written to deep store + DEEP_STORE_WRITE_BYTES_IN_PROGRESS("deepStoreWriteBytesInProgress", true), + // Count of deep store segment writes that are currently in progress + DEEP_STORE_WRITE_OPS_IN_PROGRESS("deepStoreWriteOpsInProgress", true); private final String _gaugeName; private final String _unit; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java index 850588cb45..5f934561ad 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java @@ -69,7 +69,11 @@ public enum ControllerMeter implements AbstractMetrics.Meter { IDEAL_STATE_UPDATE_FAILURE("IdealStateUpdateFailure", false), IDEAL_STATE_UPDATE_RETRY("IdealStateUpdateRetry", false), IDEAL_STATE_UPDATE_SUCCESS("IdealStateUpdateSuccess", false), - SEGMENT_SIZE_AUTO_REDUCTION("SegmentSizeAutoReduction", false); + SEGMENT_SIZE_AUTO_REDUCTION("SegmentSizeAutoReduction", false), + // Total Bytes read from deep store + DEEP_STORE_READ_BYTES_COMPLETED("deepStoreReadBytesCompleted", true), + // Total Bytes written to deep store + DEEP_STORE_WRITE_BYTES_COMPLETED("deepStoreWriteBytesCompleted", true); private final String _brokerMeterName; private final String _unit; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java index c56b46343c..23d4807627 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java @@ -30,7 +30,12 @@ public enum ControllerTimer implements AbstractMetrics.Timer { CRON_SCHEDULER_JOB_EXECUTION_TIME_MS("cronSchedulerJobExecutionTimeMs", false), IDEAL_STATE_UPDATE_TIME_MS("IdealStateUpdateTimeMs", false), // How long it took the server to start. - STARTUP_SUCCESS_DURATION_MS("startupSuccessDurationMs", true); + STARTUP_SUCCESS_DURATION_MS("startupSuccessDurationMs", true), + // Time taken to read the segment from deep store + DEEP_STORE_SEGMENT_READ_TIME_MS("deepStoreSegmentReadTimeMs", true), + // Time taken to write the segment to deep store + DEEP_STORE_SEGMENT_WRITE_TIME_MS("deepStoreSegmentWriteTimeMs", true); + private final String _timerName; private final boolean _global; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java index 6e3011e666..6df9c1c63a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java @@ -39,6 +39,7 @@ import javax.ws.rs.core.MediaType; import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.metrics.ControllerGauge; +import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.TarCompressionUtils; @@ -71,6 +72,9 @@ public class LLCSegmentCompletionHandlers { @Inject SegmentCompletionManager _segmentCompletionManager; + @Inject + ControllerMetrics _controllerMetrics; + @VisibleForTesting public static String getScheme() { return SCHEME; @@ -227,7 +231,13 @@ public class LLCSegmentCompletionHandlers { URI segmentFileURI = URIUtils.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(), rawTableName, URIUtils.encode(SegmentCompletionUtils.generateTmpSegmentFileName(segmentName))); + // Emit metrics related to deep-store upload operation + long startTimeMs = System.currentTimeMillis(); + long segmentSizeBytes = localTempFile.length(); + ResourceUtils.emitPreSegmentUploadMetrics(_controllerMetrics, rawTableName, segmentSizeBytes); PinotFSFactory.create(segmentFileURI.getScheme()).copyFromLocalFile(localTempFile, segmentFileURI); + ResourceUtils.emitPostSegmentUploadMetrics(_controllerMetrics, rawTableName, startTimeMs, segmentSizeBytes); + SegmentCompletionProtocol.Response.Params responseParams = new SegmentCompletionProtocol.Response.Params() .withStreamPartitionMsgOffset(requestParams.getStreamPartitionMsgOffset()) .withSegmentLocation(segmentFileURI.toString()) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index deada81d92..f160c5c60a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -214,8 +214,15 @@ public class PinotSegmentUploadDownloadRestletResource { segmentFile = org.apache.pinot.common.utils.FileUtils.concatAndValidateFile(tableDir, segmentName + "-" + UUID.randomUUID(), "Invalid segment name: %s", segmentName); - + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + // Emit metrics related to deep-store download operation + long deepStoreDownloadStartTimeMs = System.currentTimeMillis(); + long segmentSizeInBytes = segmentFile.length(); + ResourceUtils.emitPreSegmentDownloadMetrics(_controllerMetrics, rawTableName, segmentSizeInBytes); pinotFS.copyToLocalFile(remoteSegmentFileURI, segmentFile); + ResourceUtils.emitPostSegmentDownloadMetrics(_controllerMetrics, rawTableName, + System.currentTimeMillis() - deepStoreDownloadStartTimeMs, segmentSizeInBytes); + // Streaming in the tmp file and delete it afterward. builder.entity((StreamingOutput) output -> { try { @@ -233,6 +240,7 @@ public class PinotSegmentUploadDownloadRestletResource { private SuccessResponse uploadSegment(@Nullable String tableName, TableType tableType, @Nullable FormDataMultiPart multiPart, boolean copySegmentToFinalLocation, boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers, Request request) { + long segmentUploadStartTimeMs = System.currentTimeMillis(); if (StringUtils.isNotEmpty(tableName)) { TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(tableName); if (tableTypeFromTableName != null && tableTypeFromTableName != tableType) { @@ -412,7 +420,6 @@ public class PinotSegmentUploadDownloadRestletResource { zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI, segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes, enableParallelPushProtection, allowRefresh, headers); - return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + tableNameWithType); } catch (WebApplicationException e) { throw e; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ResourceUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ResourceUtils.java index ab091c8138..3d6fce7448 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ResourceUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ResourceUtils.java @@ -19,10 +19,16 @@ package org.apache.pinot.controller.api.resources; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import org.apache.pinot.common.exception.TableNotFoundException; +import org.apache.pinot.common.metrics.ControllerGauge; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.metrics.ControllerTimer; import org.apache.pinot.controller.api.access.AccessControl; import org.apache.pinot.controller.api.access.AccessControlFactory; import org.apache.pinot.controller.api.access.AccessControlUtils; @@ -34,13 +40,20 @@ import org.apache.pinot.spi.config.table.TableType; import org.glassfish.grizzly.http.server.Request; import org.slf4j.Logger; - public class ResourceUtils { + private ResourceUtils() { } + // Shared static variable + private static AtomicLong _deepStoreWriteOpsInProgress = new AtomicLong(0); + private static AtomicLong _deepStoreWriteBytesInProgress = new AtomicLong(0); + private static AtomicLong _deepStoreReadOpsInProgress = new AtomicLong(0); + private static AtomicLong _deepStoreReadBytesInProgress = new AtomicLong(0); + public static List<String> getExistingTableNamesWithType(PinotHelixResourceManager pinotHelixResourceManager, - String tableName, @Nullable TableType tableType, Logger logger) { + String tableName, @Nullable TableType tableType, + Logger logger) { try { return pinotHelixResourceManager.getExistingTableNamesWithType(tableName, tableType); } catch (TableNotFoundException e) { @@ -78,4 +91,77 @@ public class ResourceUtils { throw new ControllerApplicationException(logger, "Permission denied", Response.Status.FORBIDDEN); } } + + public static void emitPreSegmentUploadMetrics(ControllerMetrics controllerMetrics, String rawTableName, + long segmentSizeInBytes) { + long writeCount = _deepStoreWriteOpsInProgress.incrementAndGet(); + controllerMetrics.setOrUpdateTableGauge(rawTableName, ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS, + writeCount); + controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS, writeCount); + + long segmentBytesUploading = _deepStoreWriteBytesInProgress.addAndGet(segmentSizeInBytes); + controllerMetrics.setOrUpdateTableGauge(rawTableName, ControllerGauge.DEEP_STORE_WRITE_BYTES_IN_PROGRESS, + segmentBytesUploading); + controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_WRITE_BYTES_IN_PROGRESS, segmentBytesUploading); + } + + public static void emitPostSegmentUploadMetrics(ControllerMetrics controllerMetrics, String rawTableName, + long startTimeMs, long segmentSizeInBytes) { + long writeCount = _deepStoreWriteOpsInProgress.decrementAndGet(); + controllerMetrics.setOrUpdateTableGauge(rawTableName, ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS, + writeCount); + controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS, writeCount); + + long segmentBytesUploading = _deepStoreWriteBytesInProgress.addAndGet(-segmentSizeInBytes); + controllerMetrics.setOrUpdateTableGauge(rawTableName, ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS, + segmentBytesUploading); + controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS, segmentBytesUploading); + + long durationMs = System.currentTimeMillis() - startTimeMs; + controllerMetrics.addTimedTableValue(rawTableName, ControllerTimer.DEEP_STORE_SEGMENT_WRITE_TIME_MS, durationMs, + TimeUnit.MILLISECONDS); + controllerMetrics.addTimedValue(ControllerTimer.DEEP_STORE_SEGMENT_WRITE_TIME_MS, durationMs, + TimeUnit.MILLISECONDS); + + controllerMetrics.addMeteredTableValue(rawTableName, ControllerMeter.DEEP_STORE_WRITE_BYTES_COMPLETED, + segmentSizeInBytes); + controllerMetrics.addMeteredGlobalValue(ControllerMeter.DEEP_STORE_WRITE_BYTES_COMPLETED, segmentSizeInBytes); + } + + public static void emitPreSegmentDownloadMetrics(ControllerMetrics controllerMetrics, String rawTableName, + long segmentSizeInBytes) { + long readCount = _deepStoreReadOpsInProgress.incrementAndGet(); + controllerMetrics.setOrUpdateTableGauge(rawTableName, ControllerGauge.DEEP_STORE_READ_OPS_IN_PROGRESS, + readCount); + controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_READ_OPS_IN_PROGRESS, readCount); + + long segmentBytesDownloading = _deepStoreReadBytesInProgress.addAndGet(segmentSizeInBytes); + controllerMetrics.setOrUpdateTableGauge(rawTableName, ControllerGauge.DEEP_STORE_READ_BYTES_IN_PROGRESS, + segmentBytesDownloading); + controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_READ_BYTES_IN_PROGRESS, + segmentBytesDownloading); + } + + public static void emitPostSegmentDownloadMetrics(ControllerMetrics controllerMetrics, String rawTableName, + long startTimeMs, long segmentSizeInBytes) { + long readCount = _deepStoreReadOpsInProgress.decrementAndGet(); + controllerMetrics.setOrUpdateTableGauge(rawTableName, ControllerGauge.DEEP_STORE_READ_OPS_IN_PROGRESS, readCount); + controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_READ_OPS_IN_PROGRESS, readCount); + + long segmentBytesDownloading = _deepStoreReadBytesInProgress.addAndGet(-segmentSizeInBytes); + controllerMetrics.setOrUpdateTableGauge(rawTableName, ControllerGauge.DEEP_STORE_READ_BYTES_IN_PROGRESS, + segmentBytesDownloading); + controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_READ_BYTES_IN_PROGRESS, + segmentBytesDownloading); + + long durationMs = System.currentTimeMillis() - startTimeMs; + controllerMetrics.addTimedTableValue(rawTableName, ControllerTimer.DEEP_STORE_SEGMENT_READ_TIME_MS, + durationMs, TimeUnit.MILLISECONDS); + controllerMetrics.addTimedValue(ControllerTimer.DEEP_STORE_SEGMENT_READ_TIME_MS, durationMs, + TimeUnit.MILLISECONDS); + + controllerMetrics.addMeteredTableValue(rawTableName, ControllerMeter.DEEP_STORE_READ_BYTES_COMPLETED, + segmentSizeInBytes); + controllerMetrics.addMeteredGlobalValue(ControllerMeter.DEEP_STORE_READ_BYTES_COMPLETED, segmentSizeInBytes); + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java index 8486853650..8ff39b9def 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java @@ -40,10 +40,12 @@ import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.api.resources.ResourceUtils; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -771,16 +773,21 @@ public class ZKOperator { } else { // In push types other than METADATA, local segmentFile contains the complete segment. // Move local segment to final location - copyFromSegmentFileToDeepStore(segmentFile, finalSegmentLocationURI); + copyFromSegmentFileToDeepStore(segmentFile, finalSegmentLocationURI, tableNameWithType); LOGGER.info("Copied segment: {} of table: {} to final location: {}", segmentName, tableNameWithType, finalSegmentLocationURI); } } - private void copyFromSegmentFileToDeepStore(File segmentFile, URI finalSegmentLocationURI) + private void copyFromSegmentFileToDeepStore(File segmentFile, URI finalSegmentLocationURI, String tableNameWithType) throws Exception { LOGGER.info("Copying segment from: {} to: {}", segmentFile.getAbsolutePath(), finalSegmentLocationURI); + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + long segmentSizeInBytes = segmentFile.length(); + long startTimeMs = System.currentTimeMillis(); + ResourceUtils.emitPreSegmentUploadMetrics(_controllerMetrics, rawTableName, segmentSizeInBytes); PinotFSFactory.create(finalSegmentLocationURI.getScheme()).copyFromLocalFile(segmentFile, finalSegmentLocationURI); + ResourceUtils.emitPostSegmentUploadMetrics(_controllerMetrics, rawTableName, startTimeMs, segmentSizeInBytes); } private void copyFromSegmentURIToDeepStore(URI sourceDownloadURI, URI finalSegmentLocationURI) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org