This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 29c560f523 Move offset validation logic to consumer classes (#13015) 29c560f523 is described below commit 29c560f523bab4529e6685c7df061105d8dc3df1 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Thu May 23 19:02:09 2024 +0530 Move offset validation logic to consumer classes (#13015) * Enhance Kinesis consumer * Simplify the handling * Address comments * Move offset validation logic to consumer classes * Add missing message interface to message batch * fix linting * remove unused interface * Cleanup and refactoring * lint fixes --------- Co-authored-by: Xiaotian (Jackie) Jiang <jackie....@gmail.com> Co-authored-by: Kartik Khare <kharekar...@kartiks-macbook-pro.tail8a064.ts.net> Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local> --- .../realtime/RealtimeSegmentDataManager.java | 21 ++++++++------------- .../plugin/stream/kafka20/KafkaMessageBatch.java | 9 ++++++++- .../stream/kafka20/KafkaPartitionLevelConsumer.java | 5 ++++- .../plugin/stream/kinesis/KinesisConsumer.java | 1 - .../org/apache/pinot/spi/stream/MessageBatch.java | 8 ++++++++ 5 files changed, 28 insertions(+), 16 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 01fffced36..b441f086de 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -468,10 +468,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { throw t; } - StreamPartitionMsgOffset batchFirstOffset = messageBatch.getFirstMessageOffset(); - if (batchFirstOffset != null) { - validateStartOffset(_currentOffset, batchFirstOffset); - } + reportDataLoss(messageBatch); boolean endCriteriaReached = processStreamEvents(messageBatch, idlePipeSleepTimeMillis); @@ -922,18 +919,16 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { } /** - * Checks if the begin offset of the stream partition has been fast-forwarded. - * batchFirstOffset should be less than or equal to startOffset. - * If batchFirstOffset is greater, then some messages were not received. + * Checks and reports if the consumer is going through data loss. * - * @param startOffset The offset of the first message desired, inclusive. - * @param batchFirstOffset The offset of the first message in the batch. + * @param messageBatch Message batch to validate */ - private void validateStartOffset(StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset batchFirstOffset) { - if (batchFirstOffset.compareTo(startOffset) > 0) { + private void reportDataLoss(MessageBatch messageBatch) { + if (messageBatch.hasDataLoss()) { _serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.STREAM_DATA_LOSS, 1L); - String message = - "startOffset(" + startOffset + ") is older than topic's beginning offset(" + batchFirstOffset + ")"; + String message = String.format("Message loss detected in stream partition: %s for table: %s startOffset: %s " + + "batchFirstOffset: %s", _partitionGroupId, _tableNameWithType, _startOffset, + messageBatch.getFirstMessageOffset()); _segmentLogger.error(message); _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), message, null)); } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java index 3f137b54af..1e3361ba00 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java @@ -33,6 +33,7 @@ public class KafkaMessageBatch implements MessageBatch<byte[]> { private final long _offsetOfNextBatch; private final long _firstOffset; private final StreamMessageMetadata _lastMessageMetadata; + private final boolean _hasDataLoss; /** * @param messages the messages, which may be smaller than {@see unfilteredMessageCount} @@ -43,12 +44,13 @@ public class KafkaMessageBatch implements MessageBatch<byte[]> { * delay when a batch has all messages filtered. */ public KafkaMessageBatch(List<BytesStreamMessage> messages, int unfilteredMessageCount, long offsetOfNextBatch, - long firstOffset, @Nullable StreamMessageMetadata lastMessageMetadata) { + long firstOffset, @Nullable StreamMessageMetadata lastMessageMetadata, boolean hasDataLoss) { _messages = messages; _unfilteredMessageCount = unfilteredMessageCount; _offsetOfNextBatch = offsetOfNextBatch; _firstOffset = firstOffset; _lastMessageMetadata = lastMessageMetadata; + _hasDataLoss = hasDataLoss; } @Override @@ -82,4 +84,9 @@ public class KafkaMessageBatch implements MessageBatch<byte[]> { public StreamMessageMetadata getLastMessageMetadata() { return _lastMessageMetadata; } + + @Override + public boolean hasDataLoss() { + return _hasDataLoss; + } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java index 36a74c1e65..03731df079 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java @@ -61,6 +61,7 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa } _consumer.seek(_topicPartition, startOffset); } + ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMs)); List<ConsumerRecord<String, Bytes>> records = consumerRecords.records(_topicPartition); List<BytesStreamMessage> filteredRecords = new ArrayList<>(records.size()); @@ -84,7 +85,9 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa lastMessageMetadata = messageMetadata; } } - return new KafkaMessageBatch(filteredRecords, records.size(), offsetOfNextBatch, firstOffset, lastMessageMetadata); + + return new KafkaMessageBatch(filteredRecords, records.size(), offsetOfNextBatch, firstOffset, lastMessageMetadata, + firstOffset > startOffset); } private StreamMessageMetadata extractMessageMetadata(ConsumerRecord<String, Bytes> record) { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index a8e40c87a8..e7bb76797a 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -66,7 +66,6 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti KinesisPartitionGroupOffset startOffset = (KinesisPartitionGroupOffset) startMsgOffset; String shardId = startOffset.getShardId(); String startSequenceNumber = startOffset.getSequenceNumber(); - // Get the shard iterator String shardIterator; if (startSequenceNumber.equals(_nextStartSequenceNumber)) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java index 2f00c82657..9a2eae1c30 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java @@ -96,6 +96,14 @@ public interface MessageBatch<T> { return false; } + /** + * Returns {code true} if the current batch has data loss. + * This is useful to determine if there were gaps in the stream. + */ + default boolean hasDataLoss() { + return false; + } + @Deprecated default T getMessageAtIndex(int index) { throw new UnsupportedOperationException(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org