Jackie-Jiang commented on code in PR #12697:
URL: https://github.com/apache/pinot/pull/12697#discussion_r1550681223


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java:
##########
@@ -47,53 +50,60 @@ public KafkaPartitionLevelConsumer(String clientId, 
StreamConfig streamConfig, i
   }
 
   @Override
-  public MessageBatch<StreamMessage<byte[]>> 
fetchMessages(StreamPartitionMsgOffset startMsgOffset,
-      StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
-    final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
-    final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : 
((LongMsgOffset) endMsgOffset).getOffset();
-    return fetchMessages(startOffset, endOffset, timeoutMillis);
-  }
-
-  public synchronized MessageBatch<StreamMessage<byte[]>> fetchMessages(long 
startOffset, long endOffset,
-      int timeoutMillis) {
+  public synchronized KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset 
startMsgOffset, int timeoutMs) {
+    long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
     if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Polling partition: {}, startOffset: {}, endOffset: {} 
timeout: {}ms", _topicPartition, startOffset,
-          endOffset, timeoutMillis);
+      LOGGER.debug("Polling partition: {}, startOffset: {}, timeout: {}ms", 
_topicPartition, startOffset, timeoutMs);
     }
     if (_lastFetchedOffset < 0 || _lastFetchedOffset != startOffset - 1) {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("Seeking to offset: {}", startOffset);
       }
       _consumer.seek(_topicPartition, startOffset);
     }
-    ConsumerRecords<String, Bytes> consumerRecords = 
_consumer.poll(Duration.ofMillis(timeoutMillis));
-    List<ConsumerRecord<String, Bytes>> messageAndOffsets = 
consumerRecords.records(_topicPartition);
-    List<StreamMessage<byte[]>> filtered = new 
ArrayList<>(messageAndOffsets.size());
-    long firstOffset = startOffset;
-    long lastOffset = startOffset;
-    StreamMessageMetadata rowMetadata = null;
-    if (!messageAndOffsets.isEmpty()) {
-      firstOffset = messageAndOffsets.get(0).offset();
-    }
-    for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
-      long offset = messageAndOffset.offset();
-      _lastFetchedOffset = offset;
-      if (offset >= startOffset && (endOffset > offset || endOffset < 0)) {
-        Bytes message = messageAndOffset.value();
-        rowMetadata = (StreamMessageMetadata) 
_kafkaMetadataExtractor.extract(messageAndOffset);
+    ConsumerRecords<String, Bytes> consumerRecords = 
_consumer.poll(Duration.ofMillis(timeoutMs));
+    List<ConsumerRecord<String, Bytes>> records = 
consumerRecords.records(_topicPartition);
+    List<BytesStreamMessage> filteredRecords = new ArrayList<>(records.size());
+    long firstOffset = -1;
+    long offsetOfNextBatch = startOffset;
+    StreamMessageMetadata lastMessageMetadata = null;
+    if (!records.isEmpty()) {
+      firstOffset = records.get(0).offset();
+      _lastFetchedOffset = records.get(records.size() - 1).offset();
+      offsetOfNextBatch = _lastFetchedOffset + 1;

Review Comment:
   This is verified. If this value is not properly set, the consumption won't 
match the expected rows



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to