navina commented on code in PR #10418:
URL: https://github.com/apache/pinot/pull/10418#discussion_r1144142308


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java:
##########
@@ -116,4 +116,12 @@ default StreamPartitionMsgOffset getOffsetOfNextBatch() {
   default boolean isEndOfPartitionGroup() {
     return false;
   }
+
+  /**
+   * We need this to determine ingestion delay when we receive only null 
messages (Tombstone messages)
+   * @return last metadata for a null message received by the string
+   */
+  default public StreamMessageMetadata getLastTombstoneMetadata() {

Review Comment:
   Better not to use the term "tombstone" as this interface applies to all 
stream plugins. 
   
   Is this the metadata for the last filtered message or unfiltered message? 
   
   I feel that adding an API that is so specific to a single scenario is not 
very useful. Can we change the semantics of the `MessageBatch` to say that the 
message batch may have filtered the data for all records in the batch but keeps 
the metadata around?   



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1574,12 +1586,24 @@ private void createPartitionMetadataProvider(String 
reason) {
   private void updateIngestionDelay(int indexedMessageCount) {
     if ((indexedMessageCount > 0) && (_lastRowMetadata != null)) {
       // Record Ingestion delay for this partition
-      
_realtimeTableDataManager.updateIngestionDelay(_lastRowMetadata.getRecordIngestionTimeMs(),
-          _lastRowMetadata.getFirstStreamRecordIngestionTimeMs(),
-          _partitionGroupId);
+      updateIngestionDelay(_lastRowMetadata);
     }
   }
 
+  private void updateIngestionDelay(RowMetadata metadata) {
+    
_realtimeTableDataManager.updateIngestionDelay(metadata.getRecordIngestionTimeMs(),
+        metadata.getFirstStreamRecordIngestionTimeMs(),

Review Comment:
   Should we update the ingestion delay metric when any of 
`getRecordIngestionTimeMs` or `getFirstStreamRecordIngestionTimeMs` is not 
valid ? Looks like the default value for this is `Long.MIN_VALUE` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to