noob-se7en commented on code in PR #16783:
URL: https://github.com/apache/pinot/pull/16783#discussion_r2390047303
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -412,79 +489,82 @@ public long getPartitionIngestionTimeMs(int partitionId) {
*/
public long getPartitionIngestionDelayMs(int partitionId) {
IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
- return ingestionInfo != null ?
getIngestionDelayMs(ingestionInfo._ingestionTimeMs) : 0;
+ long ingestionTimeMs = 0;
+ if ((ingestionInfo != null) && (ingestionInfo._ingestionTimeMs > 0)) {
+ ingestionTimeMs = ingestionInfo._ingestionTimeMs;
+ }
+ // Compute aged delay for current partition
+ long agedIngestionDelayMs = _clock.millis() - ingestionTimeMs;
+ // Correct to zero for any time shifts due to NTP or time reset.
+ return Math.max(agedIngestionDelayMs, 0);
}
- /*
- * Method to get end to end ingestion delay for a given partition.
- *
- * @param partitionId partition for which we are retrieving the delay
+ /**
+ * Computes the ingestion lag for the given partition based on offset
difference.
+ * <p>
+ * The lag is calculated as the difference between the latest upstream offset
+ * and the current consuming offset. Only {@link LongMsgOffset} types are
supported.
*
- * @return End to end ingestion delay in milliseconds for the given
partition ID.
+ * @param partitionId partition for which the ingestion lag is computed
+ * @return offset lag for the given partition
*/
- public long getPartitionEndToEndIngestionDelayMs(int partitionId) {
- IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
- return ingestionInfo != null ?
getIngestionDelayMs(ingestionInfo._firstStreamIngestionTimeMs) : 0;
- }
-
public long getPartitionIngestionOffsetLag(int partitionId) {
- IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
- if (ingestionInfo == null) {
- return 0;
- }
- StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
- StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
- if (currentOffset == null || latestOffset == null) {
+ StreamPartitionMsgOffset latestOffset =
_partitionIdToLatestOffset.get(partitionId);
+ if (latestOffset == null) {
return 0;
}
- // TODO: Support other types of offsets
- if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof
LongMsgOffset)) {
- return 0;
+ IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+ long currentOffset = 0;
+ if (ingestionInfo != null) {
Review Comment:
update: There was 1 edge case on this condition and I have fixed this.
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -412,79 +489,82 @@ public long getPartitionIngestionTimeMs(int partitionId) {
*/
public long getPartitionIngestionDelayMs(int partitionId) {
IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
- return ingestionInfo != null ?
getIngestionDelayMs(ingestionInfo._ingestionTimeMs) : 0;
+ long ingestionTimeMs = 0;
+ if ((ingestionInfo != null) && (ingestionInfo._ingestionTimeMs > 0)) {
+ ingestionTimeMs = ingestionInfo._ingestionTimeMs;
+ }
+ // Compute aged delay for current partition
+ long agedIngestionDelayMs = _clock.millis() - ingestionTimeMs;
+ // Correct to zero for any time shifts due to NTP or time reset.
+ return Math.max(agedIngestionDelayMs, 0);
}
- /*
- * Method to get end to end ingestion delay for a given partition.
- *
- * @param partitionId partition for which we are retrieving the delay
+ /**
+ * Computes the ingestion lag for the given partition based on offset
difference.
+ * <p>
+ * The lag is calculated as the difference between the latest upstream offset
+ * and the current consuming offset. Only {@link LongMsgOffset} types are
supported.
*
- * @return End to end ingestion delay in milliseconds for the given
partition ID.
+ * @param partitionId partition for which the ingestion lag is computed
+ * @return offset lag for the given partition
*/
- public long getPartitionEndToEndIngestionDelayMs(int partitionId) {
- IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
- return ingestionInfo != null ?
getIngestionDelayMs(ingestionInfo._firstStreamIngestionTimeMs) : 0;
- }
-
public long getPartitionIngestionOffsetLag(int partitionId) {
- IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
- if (ingestionInfo == null) {
- return 0;
- }
- StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
- StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
- if (currentOffset == null || latestOffset == null) {
+ StreamPartitionMsgOffset latestOffset =
_partitionIdToLatestOffset.get(partitionId);
+ if (latestOffset == null) {
return 0;
}
- // TODO: Support other types of offsets
- if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof
LongMsgOffset)) {
- return 0;
+ IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+ long currentOffset = 0;
+ if (ingestionInfo != null) {
+ assert ingestionInfo._currentOffset instanceof LongMsgOffset;
+ currentOffset = ((LongMsgOffset)
(ingestionInfo._currentOffset)).getOffset();
}
- return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset)
currentOffset).getOffset();
+ assert latestOffset instanceof LongMsgOffset;
+ return Math.max(0, ((LongMsgOffset) latestOffset).getOffset() -
currentOffset);
}
- // Get the consuming offset for a given partition
+ /**
+ * Retrieves the latest offset consumed for the given partition.
+ *
+ * @param partitionId partition for which the consuming offset was retrieved
+ * @return consuming offset value for the given partition, or {@code 0} if
no ingestion info is available
+ */
public long getPartitionIngestionConsumingOffset(int partitionId) {
IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
if (ingestionInfo == null) {
return 0;
}
StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
- if (currentOffset == null) {
- return 0;
- }
- // TODO: Support other types of offsets
- if (!(currentOffset instanceof LongMsgOffset)) {
- return 0;
- }
+ assert currentOffset != null;
Review Comment:
update: There was 1 edge case on this condition and I have fixed this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]