This is an automated email from the ASF dual-hosted git repository. sajjad pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9225ec0144 Add a tracker for end-to-end consumption delay of events. (#10121) 9225ec0144 is described below commit 9225ec01449ee1b894ffaaa01e7e2e3d9e914cb1 Author: Juan Gomez <jugo...@linkedin.com> AuthorDate: Mon Jan 23 15:33:04 2023 -0800 Add a tracker for end-to-end consumption delay of events. (#10121) --- .../apache/pinot/common/metrics/ServerGauge.java | 5 +- .../manager/realtime/IngestionDelayTracker.java | 59 ++++++++++++++---- .../realtime/LLRealtimeSegmentDataManager.java | 4 +- .../manager/realtime/RealtimeTableDataManager.java | 4 +- .../realtime/IngestionDelayTrackerTest.java | 69 +++++++++++++++++----- .../org/apache/pinot/spi/stream/RowMetadata.java | 18 +++++- .../pinot/spi/stream/StreamMessageMetadata.java | 21 +++++-- 7 files changed, 141 insertions(+), 39 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java index 7dd52e90ea..f3d0fa95eb 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java @@ -46,8 +46,9 @@ public enum ServerGauge implements AbstractMetrics.Gauge { DEDUP_PRIMARY_KEYS_COUNT("dedupPrimaryKeysCount", false), CONSUMPTION_QUOTA_UTILIZATION("ratio", false), JVM_HEAP_USED_BYTES("bytes", true), - // Ingestion delay metric - REALTIME_INGESTION_DELAY_MS("milliseconds", false); + // Ingestion delay metrics + REALTIME_INGESTION_DELAY_MS("milliseconds", false), + END_TO_END_REALTIME_INGESTION_DELAY_MS("milliseconds", false); private final String _gaugeName; private final String _unit; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java index 6452866195..6e11297fd0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java @@ -76,6 +76,15 @@ import org.slf4j.LoggerFactory; public class IngestionDelayTracker { + // Class to wrap supported timestamps collected for an ingested event + private static class IngestionTimestamps { + IngestionTimestamps(long ingestionTimesMs, long firstStreamIngestionTimeMs) { + _ingestionTimeMs = ingestionTimesMs; + _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs; + } + private final long _ingestionTimeMs; + private final long _firstStreamIngestionTimeMs; + } // Sleep interval for timer thread that triggers read of ideal state private static final int TIMER_THREAD_TICK_INTERVAL_MS = 300000; // 5 minutes +/- precision in timeouts // Once a partition is marked for verification, we wait 10 minutes to pull its ideal state. @@ -85,7 +94,7 @@ public class IngestionDelayTracker { private static final Logger _logger = LoggerFactory.getLogger(IngestionDelayTracker.class.getSimpleName()); // HashMap used to store ingestion time measures for all partitions active for the current table. - private final Map<Integer, Long> _partitionToIngestionTimeMsMap = new ConcurrentHashMap<>(); + private final Map<Integer, IngestionTimestamps> _partitionToIngestionTimestampsMap = new ConcurrentHashMap<>(); // We mark partitions that go from CONSUMING to ONLINE in _partitionsMarkedForVerification: if they do not // go back to CONSUMING in some period of time, we verify whether they are still hosted in this server by reading // ideal state. This is done with the goal of minimizing reading ideal state for efficiency reasons. @@ -141,9 +150,9 @@ public class IngestionDelayTracker { * * @param ingestionTimeMs original ingestion time in milliseconds. */ - private long getIngestionDelayMs(Long ingestionTimeMs) { - if (ingestionTimeMs == null) { - return 0; // return 0 when not initialized + private long getIngestionDelayMs(long ingestionTimeMs) { + if (ingestionTimeMs < 0) { + return 0; } // Compute aged delay for current partition long agedIngestionDelayMs = _clock.millis() - ingestionTimeMs; @@ -159,7 +168,7 @@ public class IngestionDelayTracker { * @param partitionGroupId partition ID which we should stop tracking. */ private void removePartitionId(int partitionGroupId) { - _partitionToIngestionTimeMsMap.remove(partitionGroupId); + _partitionToIngestionTimestampsMap.remove(partitionGroupId); // If we are removing a partition we should stop reading its ideal state. _partitionsMarkedForVerification.remove(partitionGroupId); _serverMetrics.removePartitionGauge(_metricName, partitionGroupId, ServerGauge.REALTIME_INGESTION_DELAY_MS); @@ -196,21 +205,28 @@ public class IngestionDelayTracker { * Called by LLRealTimeSegmentDataManagers to post ingestion time updates to this tracker class. * * @param ingestionTimeMs ingestion time being recorded. + * @param firstStreamIngestionTimeMs time the event was ingested in the first stage of the ingestion pipeline. * @param partitionGroupId partition ID for which this ingestion time is being recorded. */ - public void updateIngestionDelay(long ingestionTimeMs, int partitionGroupId) { + public void updateIngestionDelay(long ingestionTimeMs, long firstStreamIngestionTimeMs, int partitionGroupId) { // Store new measure and wipe old one for this partition - // TODO: see if we can install gauges after the server is ready. if (!_isServerReadyToServeQueries.get()) { // Do not update the ingestion delay metrics during server startup period return; } - Long previousMeasure = _partitionToIngestionTimeMsMap.put(partitionGroupId, - ingestionTimeMs); + IngestionTimestamps previousMeasure = _partitionToIngestionTimestampsMap.put(partitionGroupId, + new IngestionTimestamps(ingestionTimeMs, firstStreamIngestionTimeMs)); if (previousMeasure == null) { // First time we start tracking a partition we should start tracking it via metric _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId, ServerGauge.REALTIME_INGESTION_DELAY_MS, () -> getPartitionIngestionDelayMs(partitionGroupId)); + if (firstStreamIngestionTimeMs >= 0) { + // Only publish this metric when creation time is supported by the underlying stream + // When this timestamp is not supported it always returns the value Long.MIN_VALUE + _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId, + ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, + () -> getPartitionEndToEndIngestionDelayMs(partitionGroupId)); + } } // If we are consuming we do not need to track this partition for removal. _partitionsMarkedForVerification.remove(partitionGroupId); @@ -283,8 +299,27 @@ public class IngestionDelayTracker { */ public long getPartitionIngestionDelayMs(int partitionGroupId) { // Not protected as this will only be invoked when metric is installed which happens after server ready - Long currentMeasure = _partitionToIngestionTimeMsMap.get(partitionGroupId); - return getIngestionDelayMs(currentMeasure); + IngestionTimestamps currentMeasure = _partitionToIngestionTimestampsMap.get(partitionGroupId); + if (currentMeasure == null) { // Guard just in case we read the metric without initializing it + return 0; + } + return getIngestionDelayMs(currentMeasure._ingestionTimeMs); + } + + /* + * Method to get end to end ingestion delay for a given partition. + * + * @param partitionGroupId partition for which we are retrieving the delay + * + * @return End to end ingestion delay in milliseconds for the given partition ID. + */ + public long getPartitionEndToEndIngestionDelayMs(int partitionGroupId) { + // Not protected as this will only be invoked when metric is installed which happens after server ready + IngestionTimestamps currentMeasure = _partitionToIngestionTimestampsMap.get(partitionGroupId); + if (currentMeasure == null) { // Guard just in case we read the metric without initializing it + return 0; + } + return getIngestionDelayMs(currentMeasure._firstStreamIngestionTimeMs); } /* @@ -299,7 +334,7 @@ public class IngestionDelayTracker { return; } // Remove partitions so their related metrics get uninstalled. - for (Map.Entry<Integer, Long> entry : _partitionToIngestionTimeMsMap.entrySet()) { + for (Map.Entry<Integer, IngestionTimestamps> entry : _partitionToIngestionTimestampsMap.entrySet()) { removePartitionId(entry.getKey()); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 45570b59a4..aa83cda832 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -615,7 +615,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } } else if (!prematureExit) { // Record Pinot ingestion delay as zero since we are up-to-date and no new events - _realtimeTableDataManager.updateIngestionDelay(System.currentTimeMillis(), _partitionGroupId); + long currentTimeMs = System.currentTimeMillis(); + _realtimeTableDataManager.updateIngestionDelay(currentTimeMs, currentTimeMs, _partitionGroupId); if (_segmentLogger.isDebugEnabled()) { _segmentLogger.debug("empty batch received - sleeping for {}ms", idlePipeSleepTimeMillis); } @@ -1571,6 +1572,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { if ((indexedMessageCount > 0) && (_lastRowMetadata != null)) { // Record Ingestion delay for this partition _realtimeTableDataManager.updateIngestionDelay(_lastRowMetadata.getRecordIngestionTimeMs(), + _lastRowMetadata.getFirstStreamRecordIngestionTimeMs(), _partitionGroupId); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 4ae54b6bbc..5f5a47207c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -240,8 +240,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager { * @param ingestionTimeMs Ingestion delay being reported. * @param partitionGroupId Partition ID for which delay is being updated. */ - public void updateIngestionDelay(long ingestionTimeMs, int partitionGroupId) { - _ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs, partitionGroupId); + public void updateIngestionDelay(long ingestionTimeMs, long firstStreamIngestionTimeMs, int partitionGroupId) { + _ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs, firstStreamIngestionTimeMs, partitionGroupId); } /* diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java index e2d19fc0ac..177fe33269 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java @@ -85,29 +85,37 @@ public class IngestionDelayTrackerTest { // Test we follow a single partition up and down for (long i = 0; i <= maxTestDelay; i++) { - ingestionDelayTracker.updateIngestionDelay(i, partition0); + ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition0); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), clock.millis() - i); + Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0), + clock.millis() - (i + 1)); } // Test tracking down a measure for a given partition for (long i = maxTestDelay; i >= 0; i--) { - ingestionDelayTracker.updateIngestionDelay(i, partition0); + ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition0); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), clock.millis() - i); + Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0), + clock.millis() - (i + 1)); } // Make the current partition maximum - ingestionDelayTracker.updateIngestionDelay(maxTestDelay, partition0); + ingestionDelayTracker.updateIngestionDelay(maxTestDelay, maxTestDelay, partition0); // Bring up partition1 delay up and verify values for (long i = 0; i <= 2 * maxTestDelay; i++) { - ingestionDelayTracker.updateIngestionDelay(i, partition1); + ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition1); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), clock.millis() - i); + Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1), + clock.millis() - (i + 1)); } // Bring down values of partition1 and verify values for (long i = 2 * maxTestDelay; i >= 0; i--) { - ingestionDelayTracker.updateIngestionDelay(i, partition1); + ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition1); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), clock.millis() - i); + Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1), + clock.millis() - (i + 1)); } ingestionDelayTracker.shutdown(); @@ -132,24 +140,34 @@ public class IngestionDelayTrackerTest { ZoneId zoneId = ZoneId.systemDefault(); Clock clock = Clock.fixed(now, zoneId); ingestionDelayTracker.setClock(clock); - ingestionDelayTracker.updateIngestionDelay(clock.millis() - partition0Delay0, partition0); + ingestionDelayTracker.updateIngestionDelay((clock.millis() - partition0Delay0), + (clock.millis() - partition0Delay0), partition0); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), partition0Delay0); + Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0), partition0Delay0); // Advance clock and test aging Clock offsetClock = Clock.offset(clock, Duration.ofMillis(partition0Offset0Ms)); ingestionDelayTracker.setClock(offsetClock); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), (partition0Delay0 + partition0Offset0Ms)); + Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0), + (partition0Delay0 + partition0Offset0Ms)); - ingestionDelayTracker.updateIngestionDelay(offsetClock.millis() - partition0Delay1, partition0); + ingestionDelayTracker.updateIngestionDelay((offsetClock.millis() - partition0Delay1), + (offsetClock.millis() - partition0Delay1), partition0); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), partition0Delay1); + Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0), partition0Delay1); + // Add some offset to the last sample and make sure we age that measure properly offsetClock = Clock.offset(offsetClock, Duration.ofMillis(partition0Offset1Ms)); ingestionDelayTracker.setClock(offsetClock); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), (partition0Delay1 + partition0Offset1Ms)); - ingestionDelayTracker.updateIngestionDelay(offsetClock.millis() - partition1Delay0, partition1); + ingestionDelayTracker.updateIngestionDelay((offsetClock.millis() - partition1Delay0), + (offsetClock.millis() - partition1Delay0), partition1); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), partition1Delay0); + Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1), partition1Delay0); + // Add some offset to the last sample and make sure we age that measure properly offsetClock = Clock.offset(offsetClock, Duration.ofMillis(partition1Offset0Ms)); ingestionDelayTracker.setClock(offsetClock); @@ -173,26 +191,45 @@ public class IngestionDelayTrackerTest { // Record a number of partitions with delay equal to partition id for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; partitionGroupId++) { - ingestionDelayTracker.updateIngestionDelay(clock.millis() - partitionGroupId, partitionGroupId); + ingestionDelayTracker.updateIngestionDelay((clock.millis() - partitionGroupId), + (clock.millis() - partitionGroupId), partitionGroupId); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId), partitionGroupId); + Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId), + partitionGroupId); } - // Verify that as we remove partitions the next available maximum takes over for (int partitionGroupId = maxPartition; partitionGroupId >= 0; partitionGroupId--) { ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionGroupId); } for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; partitionGroupId++) { // Untracked partitions must return 0 Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId), 0); + Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId), 0); } } @Test - public void testTickInactivePartitions() { - Assert.assertTrue(true); - } + public void testShutdown() { + final long maxTestDelay = 100; - @Test - public void testMarkPartitionForConfirmation() { - Assert.assertTrue(true); + IngestionDelayTracker ingestionDelayTracker = createTracker(); + // Use fixed clock so samples don't age + Instant now = Instant.now(); + ZoneId zoneId = ZoneId.systemDefault(); + Clock clock = Clock.fixed(now, zoneId); + ingestionDelayTracker.setClock(clock); + + // Test Shutdown with partitions active + for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; partitionGroupId++) { + ingestionDelayTracker.updateIngestionDelay((clock.millis() - partitionGroupId), + (clock.millis() - partitionGroupId), partitionGroupId); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId), partitionGroupId); + Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId), + partitionGroupId); + } + ingestionDelayTracker.shutdown(); + + // Test shutdown with no partitions + ingestionDelayTracker = createTracker(); + ingestionDelayTracker.shutdown(); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java index 4c4f17792e..8a5eac3981 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java @@ -39,8 +39,8 @@ public interface RowMetadata { /** * Returns the timestamp associated with the record. This typically refers to the time it was ingested into the - * upstream source. In some cases, it may be the time at which the record was created, aka event time (eg. in kafka, - * a topic may be configured to use record `CreateTime` instead of `LogAppendTime`). + * (last) upstream source. In some cases, it may be the time at which the record was created, aka event time + * (eg. in kafka, a topic may be configured to use record `CreateTime` instead of `LogAppendTime`). * * Expected to be used for stream-based sources. * @@ -49,6 +49,20 @@ public interface RowMetadata { */ long getRecordIngestionTimeMs(); + /** + * When supported by the underlying stream, this method returns the timestamp in milliseconds associated with + * the ingestion of the record in the first stream. + * + * Complex ingestion pipelines may be composed of multiple streams: + * (EventCreation) -> {First Stream} -> ... -> {Last Stream} + * + * @return timestamp (epoch in milliseconds) when the row was initially ingested upstream for the first + * time Long.MIN_VALUE if not supported by the underlying stream. + */ + default long getFirstStreamRecordIngestionTimeMs() { + return Long.MIN_VALUE; + } + /** * Returns the stream message headers * diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java index 2b0860690a..ac67249441 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java @@ -30,23 +30,31 @@ import org.apache.pinot.spi.data.readers.GenericRow; */ public class StreamMessageMetadata implements RowMetadata { private final long _recordIngestionTimeMs; + private final long _firstStreamRecordIngestionTimeMs; private final GenericRow _headers; private final Map<String, String> _metadata; public StreamMessageMetadata(long recordIngestionTimeMs, @Nullable GenericRow headers) { - this(recordIngestionTimeMs, headers, Collections.emptyMap()); + this(recordIngestionTimeMs, Long.MIN_VALUE, headers, Collections.emptyMap()); } + public StreamMessageMetadata(long recordIngestionTimeMs, @Nullable GenericRow headers, + Map<String, String> metadata) { + this(recordIngestionTimeMs, Long.MIN_VALUE, headers, metadata); + } /** * Construct the stream based message/row message metadata * - * @param recordIngestionTimeMs the time that the message was ingested by the stream provider + * @param recordIngestionTimeMs the time that the message was ingested by the stream provider. * use Long.MIN_VALUE if not applicable + * @param firstStreamRecordIngestionTimeMs the time that the message was ingested by the first stream provider + * in the ingestion pipeline. use Long.MIN_VALUE if not applicable * @param metadata */ - public StreamMessageMetadata(long recordIngestionTimeMs, @Nullable GenericRow headers, - Map<String, String> metadata) { + public StreamMessageMetadata(long recordIngestionTimeMs, long firstStreamRecordIngestionTimeMs, + @Nullable GenericRow headers, Map<String, String> metadata) { _recordIngestionTimeMs = recordIngestionTimeMs; + _firstStreamRecordIngestionTimeMs = firstStreamRecordIngestionTimeMs; _headers = headers; _metadata = metadata; } @@ -56,6 +64,11 @@ public class StreamMessageMetadata implements RowMetadata { return _recordIngestionTimeMs; } + @Override + public long getFirstStreamRecordIngestionTimeMs() { + return _firstStreamRecordIngestionTimeMs; + } + @Override public GenericRow getHeaders() { return _headers; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org