This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d7411c5d38 Fix ramping delay caused by long lasting sequence of 
unfiltered messa… (#10418)
d7411c5d38 is described below

commit d7411c5d381e0c44df09092af023f5cd113e6584
Author: Juan Gomez <jugo...@linkedin.com>
AuthorDate: Fri Mar 31 22:31:28 2023 -0700

    Fix ramping delay caused by long lasting sequence of unfiltered messa… 
(#10418)
---
 .../manager/realtime/IngestionDelayTracker.java    | 12 ++++-
 .../realtime/LLRealtimeSegmentDataManager.java     | 51 +++++++++++++++-------
 .../plugin/stream/kafka20/KafkaMessageBatch.java   | 16 ++++++-
 .../kafka20/KafkaPartitionLevelConsumer.java       |  5 ++-
 .../org/apache/pinot/spi/stream/MessageBatch.java  | 14 ++++++
 5 files changed, 78 insertions(+), 20 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index 6e11297fd0..57ca21def1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -214,12 +214,20 @@ public class IngestionDelayTracker {
       // Do not update the ingestion delay metrics during server startup period
       return;
     }
+    if ((ingestionTimeMs < 0) && (firstStreamIngestionTimeMs < 0)) {
+      // If stream does not return a valid ingestion timestamps don't publish 
a metric
+      return;
+    }
     IngestionTimestamps previousMeasure = 
_partitionToIngestionTimestampsMap.put(partitionGroupId,
         new IngestionTimestamps(ingestionTimeMs, firstStreamIngestionTimeMs));
     if (previousMeasure == null) {
       // First time we start tracking a partition we should start tracking it 
via metric
-      _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId,
-          ServerGauge.REALTIME_INGESTION_DELAY_MS, () -> 
getPartitionIngestionDelayMs(partitionGroupId));
+      // Only publish the metric if supported by the underlying stream. If not 
supported the stream
+      // returns Long.MIN_VALUE
+      if (ingestionTimeMs >= 0) {
+        _serverMetrics.setOrUpdatePartitionGauge(_metricName, 
partitionGroupId, ServerGauge.REALTIME_INGESTION_DELAY_MS,
+            () -> getPartitionIngestionDelayMs(partitionGroupId));
+      }
       if (firstStreamIngestionTimeMs >= 0) {
         // Only publish this metric when creation time is supported by the 
underlying stream
         // When this timestamp is not supported it always returns the value 
Long.MIN_VALUE
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 6236babece..4f3428a5fe 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -527,9 +527,12 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     int indexedMessageCount = 0;
     int streamMessageCount = 0;
     boolean canTakeMore = true;
+    boolean hasTransformedRows = false;
 
     TransformPipeline.Result reusedResult = new TransformPipeline.Result();
     boolean prematureExit = false;
+    RowMetadata msgMetadata = null;
+
     for (int index = 0; index < messageCount; index++) {
       prematureExit = _shouldStop || endCriteriaReached();
       if (prematureExit) {
@@ -562,7 +565,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
 
       // Decode message
       StreamDataDecoderResult decodedRow = 
_streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
-      RowMetadata msgMetadata = 
messagesAndOffsets.getStreamMessage(index).getMetadata();
+      msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata();
       if (decodedRow.getException() != null) {
         // TODO: based on a config, decide whether the record should be 
silently dropped or stop further consumption on
         // decode error
@@ -591,7 +594,11 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
               _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED,
                   reusedResult.getIncompleteRowCount(), 
realtimeIncompleteRowsConsumedMeter);
         }
-        for (GenericRow transformedRow : reusedResult.getTransformedRows()) {
+        List<GenericRow> transformedRows = reusedResult.getTransformedRows();
+        if (transformedRows.size() > 0) {
+          hasTransformedRows = true;
+        }
+        for (GenericRow transformedRow : transformedRows) {
           try {
             canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
             indexedMessageCount++;
@@ -614,18 +621,31 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       _numRowsConsumed++;
       streamMessageCount++;
     }
-    updateIngestionDelay(indexedMessageCount);
+
+    if (indexedMessageCount > 0) {
+      // Record Ingestion delay for this partition with metadata for last 
message we processed
+      updateIngestionDelay(_lastRowMetadata);
+    } else if (!hasTransformedRows && (msgMetadata != null)) {
+      // If all messages were filtered by transformation, we still attempt to 
update ingestion delay using
+      // the metadata for the last message we processed if any.
+      updateIngestionDelay(msgMetadata);
+    }
+
     updateCurrentDocumentCountMetrics();
     if (messagesAndOffsets.getUnfilteredMessageCount() > 0) {
       _hasMessagesFetched = true;
+      if (messageCount == 0) {
+        // If we received events from the stream but all were filtered, we 
attempt to estimate the ingestion
+        // delay from the metadata of the last filtered message received.
+        updateIngestionDelay(messagesAndOffsets.getLastMessageMetadata());
+      }
       if (streamMessageCount > 0 && _segmentLogger.isDebugEnabled()) {
         _segmentLogger.debug("Indexed {} messages ({} messages read from 
stream) current offset {}",
             indexedMessageCount, streamMessageCount, _currentOffset);
       }
     } else if (!prematureExit) {
       // Record Pinot ingestion delay as zero since we are up-to-date and no 
new events
-      long currentTimeMs = System.currentTimeMillis();
-      _realtimeTableDataManager.updateIngestionDelay(currentTimeMs, 
currentTimeMs, _partitionGroupId);
+      setIngestionDelayToZero();
       if (_segmentLogger.isDebugEnabled()) {
         _segmentLogger.debug("empty batch received - sleeping for {}ms", 
idlePipeSleepTimeMillis);
       }
@@ -1566,18 +1586,19 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     _partitionMetadataProvider = 
_streamConsumerFactory.createPartitionMetadataProvider(_clientId, 
_partitionGroupId);
   }
 
+  private void updateIngestionDelay(RowMetadata metadata) {
+    if (metadata != null) {
+      
_realtimeTableDataManager.updateIngestionDelay(metadata.getRecordIngestionTimeMs(),
+          metadata.getFirstStreamRecordIngestionTimeMs(), _partitionGroupId);
+    }
+  }
+
   /*
-   * Updates the ingestion delay if messages were processed using the time 
stamp for the last consumed event.
-   *
-   * @param indexedMessagesCount
+   * Sets ingestion delay to zero in situations where we are caught up 
processing events.
    */
-  private void updateIngestionDelay(int indexedMessageCount) {
-    if ((indexedMessageCount > 0) && (_lastRowMetadata != null)) {
-      // Record Ingestion delay for this partition
-      
_realtimeTableDataManager.updateIngestionDelay(_lastRowMetadata.getRecordIngestionTimeMs(),
-          _lastRowMetadata.getFirstStreamRecordIngestionTimeMs(),
-          _partitionGroupId);
-    }
+  private void setIngestionDelayToZero() {
+    long currentTimeMs = System.currentTimeMillis();
+    _realtimeTableDataManager.updateIngestionDelay(currentTimeMs, 
currentTimeMs, _partitionGroupId);
   }
 
   // This should be done during commit? We may not always commit when we build 
a segment....
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
index 1852c8bdc8..dbc3e8d2a6 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
@@ -24,6 +24,7 @@ import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.RowMetadata;
 import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 
 
@@ -31,16 +32,29 @@ public class KafkaMessageBatch implements 
MessageBatch<StreamMessage<byte[]>> {
   private final List<StreamMessage<byte[]>> _messageList;
   private final int _unfilteredMessageCount;
   private final long _lastOffset;
+  private final StreamMessageMetadata _lastMessageMetadata;
 
   /**
    * @param unfilteredMessageCount how many messages were received from the 
topic before being filtered
    * @param lastOffset the offset of the last message in the batch
    * @param batch the messages, which may be smaller than {@see 
unfilteredMessageCount}
+   * @param lastMessageMetadata metadata for last filtered message in the 
batch, useful for estimating ingestion delay
+   *                            when a batch has all messages filtered.
    */
-  public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, 
List<StreamMessage<byte[]>> batch) {
+  public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, 
List<StreamMessage<byte[]>> batch,
+      StreamMessageMetadata lastMessageMetadata) {
     _messageList = batch;
     _lastOffset = lastOffset;
     _unfilteredMessageCount = unfilteredMessageCount;
+    _lastMessageMetadata = lastMessageMetadata;
+  }
+
+  @Override
+  /**
+   * Returns the metadata for the last filtered message if any, null otherwise.
+   */
+  public StreamMessageMetadata getLastMessageMetadata() {
+    return _lastMessageMetadata;
   }
 
   @Override
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index 46504bbc0d..ff90f4b1a3 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -70,15 +70,16 @@ public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHa
     List<ConsumerRecord<String, Bytes>> messageAndOffsets = 
consumerRecords.records(_topicPartition);
     List<StreamMessage<byte[]>> filtered = new 
ArrayList<>(messageAndOffsets.size());
     long lastOffset = startOffset;
+    StreamMessageMetadata rowMetadata = null;
     for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
       long offset = messageAndOffset.offset();
       _lastFetchedOffset = offset;
       if (offset >= startOffset && (endOffset > offset || endOffset < 0)) {
         Bytes message = messageAndOffset.value();
+        rowMetadata = (StreamMessageMetadata) 
_kafkaMetadataExtractor.extract(messageAndOffset);
         if (message != null) {
           String key = messageAndOffset.key();
           byte[] keyBytes = key != null ? key.getBytes(StandardCharsets.UTF_8) 
: null;
-          StreamMessageMetadata rowMetadata = (StreamMessageMetadata) 
_kafkaMetadataExtractor.extract(messageAndOffset);
           filtered.add(new KafkaStreamMessage(keyBytes, message.get(), 
rowMetadata));
         } else if (LOGGER.isDebugEnabled()) {
           LOGGER.debug("Tombstone message at offset: {}", offset);
@@ -89,6 +90,6 @@ public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHa
             endOffset);
       }
     }
-    return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, 
filtered);
+    return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, 
filtered, rowMetadata);
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index d22a46af67..7ae4226e47 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.spi.stream;
 
+import javax.annotation.Nullable;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
 
@@ -116,4 +117,17 @@ public interface MessageBatch<T> {
   default boolean isEndOfPartitionGroup() {
     return false;
   }
+
+  /**
+   * This is useful while determining ingestion delay for a message batch. 
Retaining metadata for last filtered message
+   * in a batch can enable us to estimate the ingestion delay for the batch.
+   * Note that a batch can be fully filtered, and we can still retain the 
metadata for the last filtered message to
+   * facilitate computing ingestion delay in the face of a fully filtered 
batch.
+   *
+   * @return null by default.
+   */
+  @Nullable
+  default public StreamMessageMetadata getLastMessageMetadata() {
+    return null;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to