This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 5dd0c3a06d Adding metrics for pauseless observability (#15384) 5dd0c3a06d is described below commit 5dd0c3a06d8fa67d6512a1f86ea8778b83d8e734 Author: 9aman <35227405+9a...@users.noreply.github.com> AuthorDate: Fri Mar 28 14:59:32 2025 +0530 Adding metrics for pauseless observability (#15384) * Adding metrics to signify: 1. Pauseless is enabled/ disabled on server and controller 2. Build failures during the segment commit protocol * Adding metrics for re-ingestion failure --- .../org/apache/pinot/common/metrics/ControllerGauge.java | 3 +++ .../java/org/apache/pinot/common/metrics/ServerGauge.java | 2 ++ .../java/org/apache/pinot/common/metrics/ServerMeter.java | 6 +++++- .../helix/core/realtime/SegmentCompletionManager.java | 15 +++++++++++++-- .../data/manager/realtime/RealtimeSegmentDataManager.java | 11 +++++++++++ .../pinot/server/api/resources/ReingestionResource.java | 3 +++ 6 files changed, 37 insertions(+), 3 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 6418aca8d1..62452adf40 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -194,6 +194,9 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { // Metric used to track errors during the periodic table retention management RETENTION_MANAGER_ERROR("retentionManagerError", false), + // Gauge to reflect whether pauseless is enabled or not + PAUSELESS_CONSUMPTION_ENABLED("pauselessConsumptionEnabled", false), + // Metric used to track when segments in error state are detected for pauseless table PAUSELESS_SEGMENTS_IN_ERROR_COUNT("pauselessSegmentsInErrorCount", false), diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java index 548ec15a07..bfe0ee1f3c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java @@ -41,6 +41,8 @@ public enum ServerGauge implements AbstractMetrics.Gauge { REALTIME_MERGED_TEXT_IDX_DOCUMENT_AVG_LEN("bytes", false), REALTIME_SEGMENT_NUM_PARTITIONS("realtimeSegmentNumPartitions", false), LLC_SIMULTANEOUS_SEGMENT_BUILDS("llcSimultaneousSegmentBuilds", true), + // Gauge to reflect whether pauseless is enabled or not + PAUSELESS_CONSUMPTION_ENABLED("pauselessConsumptionEnabled", false), // Upsert metrics UPSERT_PRIMARY_KEYS_COUNT("upsertPrimaryKeysCount", false), // Dedup metrics diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index d2aa0b3e55..35996ebb0b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -98,6 +98,7 @@ public enum ServerMeter implements AbstractMetrics.Meter { SEGMENT_DOWNLOAD_FAILURES("segments", false), SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES("segments", false), SEGMENT_DOWNLOAD_FROM_PEERS_FAILURES("segments", false), + SEGMENT_BUILD_FAILURE("segments", false), SEGMENT_UPLOAD_FAILURE("segments", false), SEGMENT_UPLOAD_SUCCESS("segments", false), // Emitted only by Server to Deep-store segment uploader. @@ -184,7 +185,10 @@ public enum ServerMeter implements AbstractMetrics.Meter { PREDOWNLOAD_SEGMENT_DOWNLOAD_COUNT("predownloadSegmentCount", true), PREDOWNLOAD_SEGMENT_DOWNLOAD_FAILURE_COUNT("predownloadSegmentFailureCount", true), PREDOWNLOAD_SUCCEED("predownloadSucceed", true), - PREDOWNLOAD_FAILED("predownloadFailed", true); + PREDOWNLOAD_FAILED("predownloadFailed", true), + + // reingestion metrics + SEGMENT_REINGESTION_FAILURE("segments", false); private final String _meterName; private final String _unit; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java index 7f09efd2ee..f326399023 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.helix.HelixManager; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.common.utils.LLCSegmentName; @@ -138,8 +139,18 @@ public class SegmentCompletionManager { } if (factoryName == null) { - factoryName = PauselessConsumptionUtils.isPauselessEnabled(tableConfig) - ? _segmentCompletionConfig.getDefaultPauselessFsmScheme() : _segmentCompletionConfig.getDefaultFsmScheme(); + // Create a metric identifier at partition level granularity, similar to server metrics + // in RealtimeSegmentValidationManager + String tableNameAndPartitionGroupId = realtimeTableName + "-" + llcSegmentName.getPartitionGroupId(); + if (PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) { + factoryName = _segmentCompletionConfig.getDefaultPauselessFsmScheme(); + _controllerMetrics.setValueOfTableGauge(tableNameAndPartitionGroupId, + ControllerGauge.PAUSELESS_CONSUMPTION_ENABLED, 1); + } else { + factoryName = _segmentCompletionConfig.getDefaultFsmScheme(); + _controllerMetrics.setValueOfTableGauge(tableNameAndPartitionGroupId, + ControllerGauge.PAUSELESS_CONSUMPTION_ENABLED, 0); + } } Preconditions.checkState(SegmentCompletionFSMFactory.isFactoryTypeSupported(factoryName), diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 730055a89c..a5ac3c2e95 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -859,6 +859,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { // respectively. // Refer to the PR for the new commit protocol: https://github.com/apache/pinot/pull/14741 if (PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)) { + _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.PAUSELESS_CONSUMPTION_ENABLED, 1); if (!startSegmentCommit()) { // If for any reason commit failed, we don't want to be in COMMITTING state when we hold. // Change the state to HOLDING before looping around. @@ -867,6 +868,8 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { hold(); break; } + } else { + _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.PAUSELESS_CONSUMPTION_ENABLED, 0); } long buildTimeSeconds = response.getBuildTimeSeconds(); buildSegmentForCommit(buildTimeSeconds * 1000L); @@ -1102,6 +1105,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { String errorMessage = "Interrupted while waiting for semaphore"; _segmentLogger.error(errorMessage, e); _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e)); + _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.SEGMENT_BUILD_FAILURE, 1); return null; } try { @@ -1134,6 +1138,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { // Precondition checks fail, the segment build would fail consistently _segmentBuildFailedWithDeterministicError = true; } + _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.SEGMENT_BUILD_FAILURE, 1); return null; } final long buildTimeMillis = now() - lockAcquireTimeMillis; @@ -1155,6 +1160,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { + indexDir; _segmentLogger.error(errorMessage, e); _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e)); + _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.SEGMENT_BUILD_FAILURE, 1); return null; } finally { FileUtils.deleteQuietly(tempSegmentFolder); @@ -1175,6 +1181,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { + segmentTarFile; _segmentLogger.error(errorMessage, e); _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e)); + _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.SEGMENT_BUILD_FAILURE, 1); return null; } @@ -1184,6 +1191,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { + " under index directory: " + indexDir; _segmentLogger.error(errorMessage); _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, null)); + _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.SEGMENT_BUILD_FAILURE, 1); return null; } File creationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(indexDir); @@ -1192,6 +1200,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { + indexDir; _segmentLogger.error(errorMessage); _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, null)); + _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.SEGMENT_BUILD_FAILURE, 1); return null; } Map<String, File> metadataFiles = new HashMap<>(); @@ -1322,6 +1331,8 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { private void cleanupMetrics() { _serverMetrics.removeTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING); _serverMetrics.removeTableGauge(_clientId, ServerGauge.STREAM_DATA_LOSS); + _serverMetrics.removeTableGauge(_clientId, ServerGauge.PAUSELESS_CONSUMPTION_ENABLED); + _serverMetrics.removeTableMeter(_clientId, ServerMeter.SEGMENT_BUILD_FAILURE); } protected void hold() diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReingestionResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReingestionResource.java index c137b52e4c..9b1c93e4de 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReingestionResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReingestionResource.java @@ -52,6 +52,7 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; import org.apache.pinot.segment.local.realtime.writer.StatelessRealtimeSegmentWriter; @@ -210,6 +211,8 @@ public class ReingestionResource { tableDataManager.getSegmentBuildSemaphore()); } catch (Exception e) { LOGGER.error("Error during async re-ingestion for job {} (segment={})", jobId, segmentName, e); + _serverInstance.getServerMetrics() + .addMeteredTableValue(realtimeTableName, ServerMeter.SEGMENT_REINGESTION_FAILURE, 1); } finally { _runningJobs.remove(jobId); _reingestingSegments.remove(segmentName); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org