Jackie-Jiang commented on code in PR #12697:
URL: https://github.com/apache/pinot/pull/12697#discussion_r1550684755


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java:
##########
@@ -71,52 +61,22 @@ public int getUnfilteredMessageCount() {
   }
 
   @Override
-  public StreamMessage getMessageAtIndex(int index) {
-    return _messageList.get(index);
-  }
-
-  @Override
-  public int getMessageOffsetAtIndex(int index) {
-    return ByteBuffer.wrap(_messageList.get(index).getValue()).arrayOffset();
-  }
-
-  @Override
-  public int getMessageLengthAtIndex(int index) {
-    return _messageList.get(index).getValue().length;
-  }
-
-  @Override
-  public long getNextStreamMessageOffsetAtIndex(int index) {
-    throw new UnsupportedOperationException("This method is deprecated");
-  }
-
-  @Override
-  public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int 
index) {
-    return new LongMsgOffset(((KafkaStreamMessage) 
_messageList.get(index)).getNextOffset());
+  public BytesStreamMessage getStreamMessage(int index) {
+    return _messages.get(index);
   }
 
   @Override
   public StreamPartitionMsgOffset getOffsetOfNextBatch() {
-    return new LongMsgOffset(_lastOffset + 1);
-  }
-
-  @Override
-  public RowMetadata getMetadataAtIndex(int index) {
-    return _messageList.get(index).getMetadata();
+    return new LongMsgOffset(_offsetOfNextBatch);
   }
 
   @Override
-  public byte[] getMessageBytesAtIndex(int index) {
-    return _messageList.get(index).getValue();
-  }
-
-  @Override
-  public StreamMessage getStreamMessage(int index) {
-    return _messageList.get(index);
+  public StreamPartitionMsgOffset getFirstMessageOffset() {

Review Comment:
   We perform a null check after reading the value (the logic already exists), 
so this is actually a bug fix. When there is no value consumed, we should 
return `null` instead of negative offset. It is backward compatible.
   Added an `@Nullable` annotation to this method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to