noob-se7en commented on code in PR #16783:
URL: https://github.com/apache/pinot/pull/16783#discussion_r2352626023


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -412,79 +489,82 @@ public long getPartitionIngestionTimeMs(int partitionId) {
    */
   public long getPartitionIngestionDelayMs(int partitionId) {
     IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
-    return ingestionInfo != null ? 
getIngestionDelayMs(ingestionInfo._ingestionTimeMs) : 0;
+    long ingestionTimeMs = 0;
+    if ((ingestionInfo != null) && (ingestionInfo._ingestionTimeMs > 0)) {
+      ingestionTimeMs = ingestionInfo._ingestionTimeMs;
+    }
+    // Compute aged delay for current partition
+    long agedIngestionDelayMs = _clock.millis() - ingestionTimeMs;
+    // Correct to zero for any time shifts due to NTP or time reset.
+    return Math.max(agedIngestionDelayMs, 0);
   }
 
-  /*
-   * Method to get end to end ingestion delay for a given partition.
-   *
-   * @param partitionId partition for which we are retrieving the delay
+  /**
+   * Computes the ingestion lag for the given partition based on offset 
difference.
+   * <p>
+   * The lag is calculated as the difference between the latest upstream offset
+   * and the current consuming offset. Only {@link LongMsgOffset} types are 
supported.
    *
-   * @return End to end ingestion delay in milliseconds for the given 
partition ID.
+   * @param partitionId partition for which the ingestion lag is computed
+   * @return offset lag for the given partition
    */
-  public long getPartitionEndToEndIngestionDelayMs(int partitionId) {
-    IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
-    return ingestionInfo != null ? 
getIngestionDelayMs(ingestionInfo._firstStreamIngestionTimeMs) : 0;
-  }
-
   public long getPartitionIngestionOffsetLag(int partitionId) {
-    IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
-    if (ingestionInfo == null) {
-      return 0;
-    }
-    StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
-    StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
-    if (currentOffset == null || latestOffset == null) {
+    StreamPartitionMsgOffset latestOffset = 
_partitionIdToLatestOffset.get(partitionId);
+    if (latestOffset == null) {
       return 0;
     }
-    // TODO: Support other types of offsets
-    if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof 
LongMsgOffset)) {
-      return 0;
+    IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+    long currentOffset = 0;
+    if (ingestionInfo != null) {
+      assert ingestionInfo._currentOffset instanceof LongMsgOffset;
+      currentOffset = ((LongMsgOffset) 
(ingestionInfo._currentOffset)).getOffset();
     }
-    return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) 
currentOffset).getOffset();
+    assert latestOffset instanceof LongMsgOffset;
+    return Math.max(0, ((LongMsgOffset) latestOffset).getOffset() - 
currentOffset);
   }
 
-  // Get the consuming offset for a given partition
+  /**
+   * Retrieves the latest offset consumed for the given partition.
+   *
+   * @param partitionId partition for which the consuming offset was retrieved
+   * @return consuming offset value for the given partition, or {@code 0} if 
no ingestion info is available
+   */
   public long getPartitionIngestionConsumingOffset(int partitionId) {
     IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
     if (ingestionInfo == null) {
       return 0;
     }
     StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
-    if (currentOffset == null) {
-      return 0;
-    }
-    // TODO: Support other types of offsets
-    if (!(currentOffset instanceof LongMsgOffset)) {
-      return 0;
-    }
+    assert currentOffset != null;
+    assert currentOffset instanceof LongMsgOffset;
     return ((LongMsgOffset) currentOffset).getOffset();
   }
 
-  // Get the latest offset in upstream data source for a given partition
-  public long getPartitionIngestionUpstreamOffset(int partitionId) {
-    IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
-    if (ingestionInfo == null) {
-      return 0;
-    }
-    StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
+  /**
+   * Retrieves the latest offset in the upstream data source for the given 
partition.
+   *
+   * @param partitionId partition for which the latest upstream offset is 
retrieved
+   * @return latest offset value for the given partition, or {@code 0} if not 
available
+   */
+  public long getLatestPartitionOffset(int partitionId) {
+    StreamPartitionMsgOffset latestOffset = 
_partitionIdToLatestOffset.get(partitionId);
     if (latestOffset == null) {
       return 0;
     }
-    // TODO: Support other types of offsets
-    if (!(latestOffset instanceof LongMsgOffset)) {
-      return 0;
-    }
+    assert latestOffset instanceof LongMsgOffset;
     return ((LongMsgOffset) latestOffset).getOffset();

Review Comment:
   This method is only used when stream supports offset lag. So if stream 
supports offset latestOffset will always be LongMsgOffset as currently only 
kafka stream supports offset and its type in LongMsgOffset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to