navina commented on code in PR #10418:
URL: https://github.com/apache/pinot/pull/10418#discussion_r1146899073


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java:
##########
@@ -116,4 +116,14 @@ default StreamPartitionMsgOffset getOffsetOfNextBatch() {
   default boolean isEndOfPartitionGroup() {
     return false;
   }
+
+  /**
+   * This is useful while determining ingestion delay for a message batch. 
Retaining metadata for last message in
+   * a batch can enable us to estimate the ingestion delay for the batch.

Review Comment:
   Can you also add an example on how the batch may look like?
   Also mention that it is possible for the batch to be empty (all data is 
filtered), in which case the row metadata may still be non-null to facilitate 
ingestion delay computation.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java:
##########
@@ -116,4 +116,14 @@ default StreamPartitionMsgOffset getOffsetOfNextBatch() {
   default boolean isEndOfPartitionGroup() {
     return false;
   }
+
+  /**
+   * This is useful while determining ingestion delay for a message batch. 
Retaining metadata for last message in
+   * a batch can enable us to estimate the ingestion delay for the batch.
+   *
+   * @return null by default.
+   */
+  default public StreamMessageMetadata getLastMessageMetadata() {

Review Comment:
   nit: Add `@Nullable` annotation to this method 
   
   Can we do better than returning `null`? maybe, by default, check the message 
at the last index and if it exists, return its metadata. if there are no 
messages, return null. Do you think this is better?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -214,12 +214,20 @@ public void updateIngestionDelay(long ingestionTimeMs, 
long firstStreamIngestion
       // Do not update the ingestion delay metrics during server startup period
       return;
     }
+    if ((ingestionTimeMs < 0) && (firstStreamIngestionTimeMs < 0)) {

Review Comment:
   should this conditional be an `||` instead of `&&` ?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -618,14 +618,18 @@ private boolean processStreamEvents(MessageBatch 
messagesAndOffsets, long idlePi
     updateCurrentDocumentCountMetrics();
     if (messagesAndOffsets.getUnfilteredMessageCount() > 0) {
       _hasMessagesFetched = true;
+      if (messageCount == 0) {
+        // If we did not get any events but got some unfiltered messages, we 
attempt to estimate the ingestion

Review Comment:
   can we rephrase this comment as it is super confusing:
   `If we received events from stream but all were filtered, we attempt to ...` 
?
   
   better yet,  use `batch.getMessageCount() == 0` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to