Jackie-Jiang commented on code in PR #12697: URL: https://github.com/apache/pinot/pull/12697#discussion_r1550684755
########## pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java: ########## @@ -71,52 +61,22 @@ public int getUnfilteredMessageCount() { } @Override - public StreamMessage getMessageAtIndex(int index) { - return _messageList.get(index); - } - - @Override - public int getMessageOffsetAtIndex(int index) { - return ByteBuffer.wrap(_messageList.get(index).getValue()).arrayOffset(); - } - - @Override - public int getMessageLengthAtIndex(int index) { - return _messageList.get(index).getValue().length; - } - - @Override - public long getNextStreamMessageOffsetAtIndex(int index) { - throw new UnsupportedOperationException("This method is deprecated"); - } - - @Override - public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int index) { - return new LongMsgOffset(((KafkaStreamMessage) _messageList.get(index)).getNextOffset()); + public BytesStreamMessage getStreamMessage(int index) { + return _messages.get(index); } @Override public StreamPartitionMsgOffset getOffsetOfNextBatch() { - return new LongMsgOffset(_lastOffset + 1); - } - - @Override - public RowMetadata getMetadataAtIndex(int index) { - return _messageList.get(index).getMetadata(); + return new LongMsgOffset(_offsetOfNextBatch); } @Override - public byte[] getMessageBytesAtIndex(int index) { - return _messageList.get(index).getValue(); - } - - @Override - public StreamMessage getStreamMessage(int index) { - return _messageList.get(index); + public StreamPartitionMsgOffset getFirstMessageOffset() { Review Comment: We perform a null check after reading the value (the logic already exists), so this is actually a bug fix. When there is no value consumed, we should return `null` instead of negative offset. It is backward compatible. Added an `@Nullable` annotation to this method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org