richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r772989592



##########
File path: 
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
##########
@@ -18,61 +18,53 @@
  */
 package org.apache.pinot.plugin.stream.kafka20;
 
-import com.google.common.collect.Iterables;
-import java.io.IOException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.plugin.stream.kafka.MessageAndOffset;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHandler
     implements PartitionLevelConsumer {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);
 
   public KafkaPartitionLevelConsumer(String clientId, StreamConfig 
streamConfig, int partition) {
     super(clientId, streamConfig, partition);
   }
 
   @Override
-  public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, 
StreamPartitionMsgOffset endMsgOffset,
-      int timeoutMillis)
-      throws TimeoutException {
+  public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset 
startMsgOffset,
+      StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
     final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
     final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : 
((LongMsgOffset) endMsgOffset).getOffset();
     return fetchMessages(startOffset, endOffset, timeoutMillis);
   }
 
-  public MessageBatch fetchMessages(long startOffset, long endOffset, int 
timeoutMillis)
-      throws TimeoutException {
+  public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, 
int timeoutMillis) {
     _consumer.seek(_topicPartition, startOffset);
     ConsumerRecords<String, Bytes> consumerRecords = 
_consumer.poll(Duration.ofMillis(timeoutMillis));
-    final Iterable<ConsumerRecord<String, Bytes>> messageAndOffsetIterable =
-        buildOffsetFilteringIterable(consumerRecords.records(_topicPartition), 
startOffset, endOffset);
-    return new KafkaMessageBatch(messageAndOffsetIterable);
-  }
-
-  private Iterable<ConsumerRecord<String, Bytes>> buildOffsetFilteringIterable(
-      final List<ConsumerRecord<String, Bytes>> messageAndOffsets, final long 
startOffset, final long endOffset) {
-    return Iterables.filter(messageAndOffsets, input -> {
-      // Filter messages that are either null or have an offset ∉ 
[startOffset, endOffset]
-      return input != null && input.value() != null && input.offset() >= 
startOffset && (endOffset > input.offset()
-          || endOffset == -1);
-    });
-  }
-
-  @Override
-  public void close()
-      throws IOException {
-    super.close();
+    List<ConsumerRecord<String, Bytes>> messageAndOffsets = 
consumerRecords.records(_topicPartition);
+    List<MessageAndOffset> filtered = new 
ArrayList<>(messageAndOffsets.size());
+    long lastOffset = startOffset;
+    for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
+      if (messageAndOffset != null) {
+        Bytes message = messageAndOffset.value();
+        long offset = messageAndOffset.offset();
+        if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
+          if (message != null) {
+            filtered.add(new MessageAndOffset(message.get(), offset));
+          }
+          lastOffset = offset;
+        }
+      }
+    }
+    return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, 
filtered);

Review comment:
       I don’t think that’s an accurate reading of the code. lastOffset takes 
the value of the last message *in the requested offset range* even if it’s a 
tombstone, so in general it is not equal to start + 1 unless all messages were 
outside of the polled offset range. It’s not clear why the offset range 
filtering exists, as in, I don’t think the offset range check can fail. 
   
   @xiangfu0 can you comment why the offset range check exists since I believe 
you added it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to