navina commented on code in PR #10418: URL: https://github.com/apache/pinot/pull/10418#discussion_r1146899073
########## pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java: ########## @@ -116,4 +116,14 @@ default StreamPartitionMsgOffset getOffsetOfNextBatch() { default boolean isEndOfPartitionGroup() { return false; } + + /** + * This is useful while determining ingestion delay for a message batch. Retaining metadata for last message in + * a batch can enable us to estimate the ingestion delay for the batch. Review Comment: Can you also add an example on how the batch may look like? Also mention that it is possible for the batch to be empty (all data is filtered), in which case the row metadata may still be non-null to facilitate ingestion delay computation. ########## pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java: ########## @@ -116,4 +116,14 @@ default StreamPartitionMsgOffset getOffsetOfNextBatch() { default boolean isEndOfPartitionGroup() { return false; } + + /** + * This is useful while determining ingestion delay for a message batch. Retaining metadata for last message in + * a batch can enable us to estimate the ingestion delay for the batch. + * + * @return null by default. + */ + default public StreamMessageMetadata getLastMessageMetadata() { Review Comment: nit: Add `@Nullable` annotation to this method Can we do better than returning `null`? maybe, by default, check the message at the last index and if it exists, return its metadata. if there are no messages, return null. Do you think this is better? ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java: ########## @@ -214,12 +214,20 @@ public void updateIngestionDelay(long ingestionTimeMs, long firstStreamIngestion // Do not update the ingestion delay metrics during server startup period return; } + if ((ingestionTimeMs < 0) && (firstStreamIngestionTimeMs < 0)) { Review Comment: should this conditional be an `||` instead of `&&` ? ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java: ########## @@ -618,14 +618,18 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi updateCurrentDocumentCountMetrics(); if (messagesAndOffsets.getUnfilteredMessageCount() > 0) { _hasMessagesFetched = true; + if (messageCount == 0) { + // If we did not get any events but got some unfiltered messages, we attempt to estimate the ingestion Review Comment: can we rephrase this comment as it is super confusing: `If we received events from stream but all were filtered, we attempt to ...` ? better yet, use `batch.getMessageCount() == 0` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org