This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 29c560f523 Move offset validation logic to consumer classes (#13015)
29c560f523 is described below

commit 29c560f523bab4529e6685c7df061105d8dc3df1
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Thu May 23 19:02:09 2024 +0530

    Move offset validation logic to consumer classes (#13015)
    
    * Enhance Kinesis consumer
    
    * Simplify the handling
    
    * Address comments
    
    * Move offset validation logic to consumer classes
    
    * Add missing message interface to message batch
    
    * fix linting
    
    * remove unused interface
    
    * Cleanup and refactoring
    
    * lint fixes
    
    ---------
    
    Co-authored-by: Xiaotian (Jackie) Jiang <jackie....@gmail.com>
    Co-authored-by: Kartik Khare 
<kharekar...@kartiks-macbook-pro.tail8a064.ts.net>
    Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local>
---
 .../realtime/RealtimeSegmentDataManager.java        | 21 ++++++++-------------
 .../plugin/stream/kafka20/KafkaMessageBatch.java    |  9 ++++++++-
 .../stream/kafka20/KafkaPartitionLevelConsumer.java |  5 ++++-
 .../plugin/stream/kinesis/KinesisConsumer.java      |  1 -
 .../org/apache/pinot/spi/stream/MessageBatch.java   |  8 ++++++++
 5 files changed, 28 insertions(+), 16 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 01fffced36..b441f086de 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -468,10 +468,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
         throw t;
       }
 
-      StreamPartitionMsgOffset batchFirstOffset = 
messageBatch.getFirstMessageOffset();
-      if (batchFirstOffset != null) {
-        validateStartOffset(_currentOffset, batchFirstOffset);
-      }
+      reportDataLoss(messageBatch);
 
       boolean endCriteriaReached = processStreamEvents(messageBatch, 
idlePipeSleepTimeMillis);
 
@@ -922,18 +919,16 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   }
 
   /**
-   * Checks if the begin offset of the stream partition has been 
fast-forwarded.
-   * batchFirstOffset should be less than or equal to startOffset.
-   * If batchFirstOffset is greater, then some messages were not received.
+   * Checks and reports if the consumer is going through data loss.
    *
-   * @param startOffset The offset of the first message desired, inclusive.
-   * @param batchFirstOffset The offset of the first message in the batch.
+   * @param messageBatch Message batch to validate
    */
-  private void validateStartOffset(StreamPartitionMsgOffset startOffset, 
StreamPartitionMsgOffset batchFirstOffset) {
-    if (batchFirstOffset.compareTo(startOffset) > 0) {
+  private void reportDataLoss(MessageBatch messageBatch) {
+    if (messageBatch.hasDataLoss()) {
       _serverMetrics.addMeteredTableValue(_tableStreamName, 
ServerMeter.STREAM_DATA_LOSS, 1L);
-      String message =
-          "startOffset(" + startOffset + ") is older than topic's beginning 
offset(" + batchFirstOffset + ")";
+      String message = String.format("Message loss detected in stream 
partition: %s for table: %s startOffset: %s "
+              + "batchFirstOffset: %s", _partitionGroupId, _tableNameWithType, 
_startOffset,
+          messageBatch.getFirstMessageOffset());
       _segmentLogger.error(message);
       _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), message, null));
     }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
index 3f137b54af..1e3361ba00 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
@@ -33,6 +33,7 @@ public class KafkaMessageBatch implements 
MessageBatch<byte[]> {
   private final long _offsetOfNextBatch;
   private final long _firstOffset;
   private final StreamMessageMetadata _lastMessageMetadata;
+  private final boolean _hasDataLoss;
 
   /**
    * @param messages the messages, which may be smaller than {@see 
unfilteredMessageCount}
@@ -43,12 +44,13 @@ public class KafkaMessageBatch implements 
MessageBatch<byte[]> {
    *                            delay when a batch has all messages filtered.
    */
   public KafkaMessageBatch(List<BytesStreamMessage> messages, int 
unfilteredMessageCount, long offsetOfNextBatch,
-      long firstOffset, @Nullable StreamMessageMetadata lastMessageMetadata) {
+      long firstOffset, @Nullable StreamMessageMetadata lastMessageMetadata, 
boolean hasDataLoss) {
     _messages = messages;
     _unfilteredMessageCount = unfilteredMessageCount;
     _offsetOfNextBatch = offsetOfNextBatch;
     _firstOffset = firstOffset;
     _lastMessageMetadata = lastMessageMetadata;
+    _hasDataLoss = hasDataLoss;
   }
 
   @Override
@@ -82,4 +84,9 @@ public class KafkaMessageBatch implements 
MessageBatch<byte[]> {
   public StreamMessageMetadata getLastMessageMetadata() {
     return _lastMessageMetadata;
   }
+
+  @Override
+  public boolean hasDataLoss() {
+    return _hasDataLoss;
+  }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index 36a74c1e65..03731df079 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -61,6 +61,7 @@ public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHa
       }
       _consumer.seek(_topicPartition, startOffset);
     }
+
     ConsumerRecords<String, Bytes> consumerRecords = 
_consumer.poll(Duration.ofMillis(timeoutMs));
     List<ConsumerRecord<String, Bytes>> records = 
consumerRecords.records(_topicPartition);
     List<BytesStreamMessage> filteredRecords = new ArrayList<>(records.size());
@@ -84,7 +85,9 @@ public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHa
         lastMessageMetadata = messageMetadata;
       }
     }
-    return new KafkaMessageBatch(filteredRecords, records.size(), 
offsetOfNextBatch, firstOffset, lastMessageMetadata);
+
+    return new KafkaMessageBatch(filteredRecords, records.size(), 
offsetOfNextBatch, firstOffset, lastMessageMetadata,
+        firstOffset > startOffset);
   }
 
   private StreamMessageMetadata extractMessageMetadata(ConsumerRecord<String, 
Bytes> record) {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index a8e40c87a8..e7bb76797a 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -66,7 +66,6 @@ public class KinesisConsumer extends KinesisConnectionHandler 
implements Partiti
     KinesisPartitionGroupOffset startOffset = (KinesisPartitionGroupOffset) 
startMsgOffset;
     String shardId = startOffset.getShardId();
     String startSequenceNumber = startOffset.getSequenceNumber();
-
     // Get the shard iterator
     String shardIterator;
     if (startSequenceNumber.equals(_nextStartSequenceNumber)) {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 2f00c82657..9a2eae1c30 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -96,6 +96,14 @@ public interface MessageBatch<T> {
     return false;
   }
 
+  /**
+   * Returns {code true} if the current batch has data loss.
+   * This is useful to determine if there were gaps in the stream.
+   */
+  default boolean hasDataLoss() {
+    return false;
+  }
+
   @Deprecated
   default T getMessageAtIndex(int index) {
     throw new UnsupportedOperationException();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to