This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9225ec0144 Add a tracker for end-to-end consumption delay of events. 
(#10121)
9225ec0144 is described below

commit 9225ec01449ee1b894ffaaa01e7e2e3d9e914cb1
Author: Juan Gomez <jugo...@linkedin.com>
AuthorDate: Mon Jan 23 15:33:04 2023 -0800

    Add a tracker for end-to-end consumption delay of events. (#10121)
---
 .../apache/pinot/common/metrics/ServerGauge.java   |  5 +-
 .../manager/realtime/IngestionDelayTracker.java    | 59 ++++++++++++++----
 .../realtime/LLRealtimeSegmentDataManager.java     |  4 +-
 .../manager/realtime/RealtimeTableDataManager.java |  4 +-
 .../realtime/IngestionDelayTrackerTest.java        | 69 +++++++++++++++++-----
 .../org/apache/pinot/spi/stream/RowMetadata.java   | 18 +++++-
 .../pinot/spi/stream/StreamMessageMetadata.java    | 21 +++++--
 7 files changed, 141 insertions(+), 39 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index 7dd52e90ea..f3d0fa95eb 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -46,8 +46,9 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   DEDUP_PRIMARY_KEYS_COUNT("dedupPrimaryKeysCount", false),
   CONSUMPTION_QUOTA_UTILIZATION("ratio", false),
   JVM_HEAP_USED_BYTES("bytes", true),
-  // Ingestion delay metric
-  REALTIME_INGESTION_DELAY_MS("milliseconds", false);
+  // Ingestion delay metrics
+  REALTIME_INGESTION_DELAY_MS("milliseconds", false),
+  END_TO_END_REALTIME_INGESTION_DELAY_MS("milliseconds", false);
 
   private final String _gaugeName;
   private final String _unit;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index 6452866195..6e11297fd0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -76,6 +76,15 @@ import org.slf4j.LoggerFactory;
 
 public class IngestionDelayTracker {
 
+  // Class to wrap supported timestamps collected for an ingested event
+  private static class IngestionTimestamps {
+    IngestionTimestamps(long ingestionTimesMs, long 
firstStreamIngestionTimeMs) {
+      _ingestionTimeMs = ingestionTimesMs;
+      _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
+    }
+    private final long _ingestionTimeMs;
+    private final long _firstStreamIngestionTimeMs;
+  }
   // Sleep interval for timer thread that triggers read of ideal state
   private static final int TIMER_THREAD_TICK_INTERVAL_MS = 300000; // 5 
minutes +/- precision in timeouts
   // Once a partition is marked for verification, we wait 10 minutes to pull 
its ideal state.
@@ -85,7 +94,7 @@ public class IngestionDelayTracker {
   private static final Logger _logger = 
LoggerFactory.getLogger(IngestionDelayTracker.class.getSimpleName());
 
   // HashMap used to store ingestion time measures for all partitions active 
for the current table.
-  private final Map<Integer, Long> _partitionToIngestionTimeMsMap = new 
ConcurrentHashMap<>();
+  private final Map<Integer, IngestionTimestamps> 
_partitionToIngestionTimestampsMap = new ConcurrentHashMap<>();
   // We mark partitions that go from CONSUMING to ONLINE in 
_partitionsMarkedForVerification: if they do not
   // go back to CONSUMING in some period of time, we verify whether they are 
still hosted in this server by reading
   // ideal state. This is done with the goal of minimizing reading ideal state 
for efficiency reasons.
@@ -141,9 +150,9 @@ public class IngestionDelayTracker {
    *
    * @param ingestionTimeMs original ingestion time in milliseconds.
    */
-  private long getIngestionDelayMs(Long ingestionTimeMs) {
-    if (ingestionTimeMs == null) {
-      return 0; // return 0 when not initialized
+  private long getIngestionDelayMs(long ingestionTimeMs) {
+    if (ingestionTimeMs < 0) {
+      return 0;
     }
     // Compute aged delay for current partition
     long agedIngestionDelayMs = _clock.millis() - ingestionTimeMs;
@@ -159,7 +168,7 @@ public class IngestionDelayTracker {
    * @param partitionGroupId partition ID which we should stop tracking.
    */
   private void removePartitionId(int partitionGroupId) {
-    _partitionToIngestionTimeMsMap.remove(partitionGroupId);
+    _partitionToIngestionTimestampsMap.remove(partitionGroupId);
     // If we are removing a partition we should stop reading its ideal state.
     _partitionsMarkedForVerification.remove(partitionGroupId);
     _serverMetrics.removePartitionGauge(_metricName, partitionGroupId, 
ServerGauge.REALTIME_INGESTION_DELAY_MS);
@@ -196,21 +205,28 @@ public class IngestionDelayTracker {
    * Called by LLRealTimeSegmentDataManagers to post ingestion time updates to 
this tracker class.
    *
    * @param ingestionTimeMs ingestion time being recorded.
+   * @param firstStreamIngestionTimeMs time the event was ingested in the 
first stage of the ingestion pipeline.
    * @param partitionGroupId partition ID for which this ingestion time is 
being recorded.
    */
-  public void updateIngestionDelay(long ingestionTimeMs, int partitionGroupId) 
{
+  public void updateIngestionDelay(long ingestionTimeMs, long 
firstStreamIngestionTimeMs, int partitionGroupId) {
     // Store new measure and wipe old one for this partition
-    // TODO: see if we can install gauges after the server is ready.
     if (!_isServerReadyToServeQueries.get()) {
       // Do not update the ingestion delay metrics during server startup period
       return;
     }
-    Long previousMeasure = _partitionToIngestionTimeMsMap.put(partitionGroupId,
-        ingestionTimeMs);
+    IngestionTimestamps previousMeasure = 
_partitionToIngestionTimestampsMap.put(partitionGroupId,
+        new IngestionTimestamps(ingestionTimeMs, firstStreamIngestionTimeMs));
     if (previousMeasure == null) {
       // First time we start tracking a partition we should start tracking it 
via metric
       _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId,
           ServerGauge.REALTIME_INGESTION_DELAY_MS, () -> 
getPartitionIngestionDelayMs(partitionGroupId));
+      if (firstStreamIngestionTimeMs >= 0) {
+        // Only publish this metric when creation time is supported by the 
underlying stream
+        // When this timestamp is not supported it always returns the value 
Long.MIN_VALUE
+        _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId,
+            ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
+            () -> getPartitionEndToEndIngestionDelayMs(partitionGroupId));
+      }
     }
     // If we are consuming we do not need to track this partition for removal.
     _partitionsMarkedForVerification.remove(partitionGroupId);
@@ -283,8 +299,27 @@ public class IngestionDelayTracker {
    */
   public long getPartitionIngestionDelayMs(int partitionGroupId) {
     // Not protected as this will only be invoked when metric is installed 
which happens after server ready
-    Long currentMeasure = _partitionToIngestionTimeMsMap.get(partitionGroupId);
-    return getIngestionDelayMs(currentMeasure);
+    IngestionTimestamps currentMeasure = 
_partitionToIngestionTimestampsMap.get(partitionGroupId);
+    if (currentMeasure == null) { // Guard just in case we read the metric 
without initializing it
+      return 0;
+    }
+    return getIngestionDelayMs(currentMeasure._ingestionTimeMs);
+  }
+
+  /*
+   * Method to get end to end ingestion delay for a given partition.
+   *
+   * @param partitionGroupId partition for which we are retrieving the delay
+   *
+   * @return End to end ingestion delay in milliseconds for the given 
partition ID.
+   */
+  public long getPartitionEndToEndIngestionDelayMs(int partitionGroupId) {
+    // Not protected as this will only be invoked when metric is installed 
which happens after server ready
+    IngestionTimestamps currentMeasure = 
_partitionToIngestionTimestampsMap.get(partitionGroupId);
+    if (currentMeasure == null) { // Guard just in case we read the metric 
without initializing it
+      return 0;
+    }
+    return getIngestionDelayMs(currentMeasure._firstStreamIngestionTimeMs);
   }
 
   /*
@@ -299,7 +334,7 @@ public class IngestionDelayTracker {
       return;
     }
     // Remove partitions so their related metrics get uninstalled.
-    for (Map.Entry<Integer, Long> entry : 
_partitionToIngestionTimeMsMap.entrySet()) {
+    for (Map.Entry<Integer, IngestionTimestamps> entry : 
_partitionToIngestionTimestampsMap.entrySet()) {
       removePartitionId(entry.getKey());
     }
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 45570b59a4..aa83cda832 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -615,7 +615,8 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       }
     } else if (!prematureExit) {
       // Record Pinot ingestion delay as zero since we are up-to-date and no 
new events
-      
_realtimeTableDataManager.updateIngestionDelay(System.currentTimeMillis(), 
_partitionGroupId);
+      long currentTimeMs = System.currentTimeMillis();
+      _realtimeTableDataManager.updateIngestionDelay(currentTimeMs, 
currentTimeMs, _partitionGroupId);
       if (_segmentLogger.isDebugEnabled()) {
         _segmentLogger.debug("empty batch received - sleeping for {}ms", 
idlePipeSleepTimeMillis);
       }
@@ -1571,6 +1572,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     if ((indexedMessageCount > 0) && (_lastRowMetadata != null)) {
       // Record Ingestion delay for this partition
       
_realtimeTableDataManager.updateIngestionDelay(_lastRowMetadata.getRecordIngestionTimeMs(),
+          _lastRowMetadata.getFirstStreamRecordIngestionTimeMs(),
           _partitionGroupId);
     }
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 4ae54b6bbc..5f5a47207c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -240,8 +240,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
    * @param ingestionTimeMs Ingestion delay being reported.
    * @param partitionGroupId Partition ID for which delay is being updated.
    */
-  public void updateIngestionDelay(long ingestionTimeMs, int partitionGroupId) 
{
-    _ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs, 
partitionGroupId);
+  public void updateIngestionDelay(long ingestionTimeMs, long 
firstStreamIngestionTimeMs, int partitionGroupId) {
+    _ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs, 
firstStreamIngestionTimeMs, partitionGroupId);
   }
 
   /*
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
index e2d19fc0ac..177fe33269 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
@@ -85,29 +85,37 @@ public class IngestionDelayTrackerTest {
 
     // Test we follow a single partition up and down
     for (long i = 0; i <= maxTestDelay; i++) {
-      ingestionDelayTracker.updateIngestionDelay(i, partition0);
+      ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition0);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
 clock.millis() - i);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
+          clock.millis() - (i + 1));
     }
 
     // Test tracking down a measure for a given partition
     for (long i = maxTestDelay; i >= 0; i--) {
-      ingestionDelayTracker.updateIngestionDelay(i, partition0);
+      ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition0);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
 clock.millis() - i);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
+          clock.millis() - (i + 1));
     }
 
     // Make the current partition maximum
-    ingestionDelayTracker.updateIngestionDelay(maxTestDelay, partition0);
+    ingestionDelayTracker.updateIngestionDelay(maxTestDelay, maxTestDelay, 
partition0);
 
     // Bring up partition1 delay up and verify values
     for (long i = 0; i <= 2 * maxTestDelay; i++) {
-      ingestionDelayTracker.updateIngestionDelay(i, partition1);
+      ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition1);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
 clock.millis() - i);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
+          clock.millis() - (i + 1));
     }
 
     // Bring down values of partition1 and verify values
     for (long i = 2 * maxTestDelay; i >= 0; i--) {
-      ingestionDelayTracker.updateIngestionDelay(i, partition1);
+      ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition1);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
 clock.millis() - i);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
+          clock.millis() - (i + 1));
     }
 
     ingestionDelayTracker.shutdown();
@@ -132,24 +140,34 @@ public class IngestionDelayTrackerTest {
     ZoneId zoneId = ZoneId.systemDefault();
     Clock clock = Clock.fixed(now, zoneId);
     ingestionDelayTracker.setClock(clock);
-    ingestionDelayTracker.updateIngestionDelay(clock.millis() - 
partition0Delay0, partition0);
+    ingestionDelayTracker.updateIngestionDelay((clock.millis() - 
partition0Delay0),
+        (clock.millis() - partition0Delay0), partition0);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
 partition0Delay0);
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
 partition0Delay0);
     // Advance clock and test aging
     Clock offsetClock = Clock.offset(clock, 
Duration.ofMillis(partition0Offset0Ms));
     ingestionDelayTracker.setClock(offsetClock);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
         (partition0Delay0 + partition0Offset0Ms));
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
+        (partition0Delay0 + partition0Offset0Ms));
 
-    ingestionDelayTracker.updateIngestionDelay(offsetClock.millis() - 
partition0Delay1, partition0);
+    ingestionDelayTracker.updateIngestionDelay((offsetClock.millis() - 
partition0Delay1),
+        (offsetClock.millis() - partition0Delay1), partition0);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
 partition0Delay1);
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
 partition0Delay1);
+
     // Add some offset to the last sample and make sure we age that measure 
properly
     offsetClock = Clock.offset(offsetClock, 
Duration.ofMillis(partition0Offset1Ms));
     ingestionDelayTracker.setClock(offsetClock);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
         (partition0Delay1 + partition0Offset1Ms));
 
-    ingestionDelayTracker.updateIngestionDelay(offsetClock.millis() - 
partition1Delay0, partition1);
+    ingestionDelayTracker.updateIngestionDelay((offsetClock.millis() - 
partition1Delay0),
+        (offsetClock.millis() - partition1Delay0), partition1);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
 partition1Delay0);
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
 partition1Delay0);
+
     // Add some offset to the last sample and make sure we age that measure 
properly
     offsetClock = Clock.offset(offsetClock, 
Duration.ofMillis(partition1Offset0Ms));
     ingestionDelayTracker.setClock(offsetClock);
@@ -173,26 +191,45 @@ public class IngestionDelayTrackerTest {
 
     // Record a number of partitions with delay equal to partition id
     for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; 
partitionGroupId++) {
-      ingestionDelayTracker.updateIngestionDelay(clock.millis() - 
partitionGroupId, partitionGroupId);
+      ingestionDelayTracker.updateIngestionDelay((clock.millis() - 
partitionGroupId),
+          (clock.millis() - partitionGroupId), partitionGroupId);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId),
 partitionGroupId);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId),
+          partitionGroupId);
     }
-    // Verify that as we remove partitions the next available maximum takes 
over
     for (int partitionGroupId = maxPartition; partitionGroupId >= 0; 
partitionGroupId--) {
       
ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionGroupId);
     }
     for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; 
partitionGroupId++) {
       // Untracked partitions must return 0
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId),
 0);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId),
 0);
     }
   }
 
   @Test
-  public void testTickInactivePartitions() {
-    Assert.assertTrue(true);
-  }
+  public void testShutdown() {
+    final long maxTestDelay = 100;
 
-  @Test
-  public void testMarkPartitionForConfirmation() {
-    Assert.assertTrue(true);
+    IngestionDelayTracker ingestionDelayTracker = createTracker();
+    // Use fixed clock so samples don't age
+    Instant now = Instant.now();
+    ZoneId zoneId = ZoneId.systemDefault();
+    Clock clock = Clock.fixed(now, zoneId);
+    ingestionDelayTracker.setClock(clock);
+
+    // Test Shutdown with partitions active
+    for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; 
partitionGroupId++) {
+      ingestionDelayTracker.updateIngestionDelay((clock.millis() - 
partitionGroupId),
+          (clock.millis() - partitionGroupId), partitionGroupId);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId),
 partitionGroupId);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId),
+          partitionGroupId);
+    }
+    ingestionDelayTracker.shutdown();
+
+    // Test shutdown with no partitions
+    ingestionDelayTracker = createTracker();
+    ingestionDelayTracker.shutdown();
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
index 4c4f17792e..8a5eac3981 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
@@ -39,8 +39,8 @@ public interface RowMetadata {
 
   /**
    * Returns the timestamp associated with the record. This typically refers 
to the time it was ingested into the
-   * upstream source. In some cases, it may be the time at which the record 
was created, aka event time (eg. in kafka,
-   * a topic may be configured to use record `CreateTime` instead of 
`LogAppendTime`).
+   * (last) upstream source. In some cases, it may be the time at which the 
record was created, aka event time
+   * (eg. in kafka, a topic may be configured to use record `CreateTime` 
instead of `LogAppendTime`).
    *
    * Expected to be used for stream-based sources.
    *
@@ -49,6 +49,20 @@ public interface RowMetadata {
    */
   long getRecordIngestionTimeMs();
 
+  /**
+   * When supported by the underlying stream, this method returns the 
timestamp in milliseconds associated with
+   * the ingestion of the record in the first stream.
+   *
+   * Complex ingestion pipelines may be composed of multiple streams:
+   * (EventCreation) -> {First Stream} -> ... -> {Last Stream}
+   *
+   * @return timestamp (epoch in milliseconds) when the row was initially 
ingested upstream for the first
+   *         time Long.MIN_VALUE if not supported by the underlying stream.
+   */
+  default long getFirstStreamRecordIngestionTimeMs() {
+    return Long.MIN_VALUE;
+  }
+
   /**
    * Returns the stream message headers
    *
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
index 2b0860690a..ac67249441 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
@@ -30,23 +30,31 @@ import org.apache.pinot.spi.data.readers.GenericRow;
  */
 public class StreamMessageMetadata implements RowMetadata {
   private final long _recordIngestionTimeMs;
+  private final long _firstStreamRecordIngestionTimeMs;
   private final GenericRow _headers;
   private final Map<String, String> _metadata;
 
   public StreamMessageMetadata(long recordIngestionTimeMs, @Nullable 
GenericRow headers) {
-    this(recordIngestionTimeMs, headers, Collections.emptyMap());
+    this(recordIngestionTimeMs, Long.MIN_VALUE, headers, 
Collections.emptyMap());
   }
 
+  public StreamMessageMetadata(long recordIngestionTimeMs, @Nullable 
GenericRow headers,
+      Map<String, String> metadata) {
+    this(recordIngestionTimeMs, Long.MIN_VALUE, headers, metadata);
+  }
   /**
    * Construct the stream based message/row message metadata
    *
-   * @param recordIngestionTimeMs  the time that the message was ingested by 
the stream provider
+   * @param recordIngestionTimeMs  the time that the message was ingested by 
the stream provider.
    *                         use Long.MIN_VALUE if not applicable
+   * @param firstStreamRecordIngestionTimeMs the time that the message was 
ingested by the first stream provider
+   *                         in the ingestion pipeline. use Long.MIN_VALUE if 
not applicable
    * @param metadata
    */
-  public StreamMessageMetadata(long recordIngestionTimeMs, @Nullable 
GenericRow headers,
-      Map<String, String> metadata) {
+  public StreamMessageMetadata(long recordIngestionTimeMs, long 
firstStreamRecordIngestionTimeMs,
+      @Nullable GenericRow headers, Map<String, String> metadata) {
     _recordIngestionTimeMs = recordIngestionTimeMs;
+    _firstStreamRecordIngestionTimeMs = firstStreamRecordIngestionTimeMs;
     _headers = headers;
     _metadata = metadata;
   }
@@ -56,6 +64,11 @@ public class StreamMessageMetadata implements RowMetadata {
     return _recordIngestionTimeMs;
   }
 
+  @Override
+  public long getFirstStreamRecordIngestionTimeMs() {
+    return _firstStreamRecordIngestionTimeMs;
+  }
+
   @Override
   public GenericRow getHeaders() {
     return _headers;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to