navina commented on code in PR #9224:
URL: https://github.com/apache/pinot/pull/9224#discussion_r971758010


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java:
##########
@@ -43,30 +45,36 @@ public KafkaPartitionLevelConsumer(String clientId, 
StreamConfig streamConfig, i
   }
 
   @Override
-  public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset 
startMsgOffset,
+  public MessageBatch<KafkaStreamMessage> 
fetchMessages(StreamPartitionMsgOffset startMsgOffset,
       StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
     final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
     final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : 
((LongMsgOffset) endMsgOffset).getOffset();
     return fetchMessages(startOffset, endOffset, timeoutMillis);
   }
 
-  public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, 
int timeoutMillis) {
+  public MessageBatch<KafkaStreamMessage> fetchMessages(long startOffset, long 
endOffset, int timeoutMillis) {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout: 
{}ms", _topicPartition, startOffset,
           endOffset, timeoutMillis);
     }
     _consumer.seek(_topicPartition, startOffset);
     ConsumerRecords<String, Bytes> consumerRecords = 
_consumer.poll(Duration.ofMillis(timeoutMillis));
     List<ConsumerRecord<String, Bytes>> messageAndOffsets = 
consumerRecords.records(_topicPartition);
-    List<MessageAndOffsetAndMetadata> filtered = new 
ArrayList<>(messageAndOffsets.size());
+    List<KafkaStreamMessage> filtered = new 
ArrayList<>(messageAndOffsets.size());
     long lastOffset = startOffset;
     for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
+      String key = messageAndOffset.key();
+      byte[] keyBytes = key == null ? null : 
key.getBytes(StandardCharsets.UTF_8);
       Bytes message = messageAndOffset.value();
       long offset = messageAndOffset.offset();
       if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
         if (message != null) {
+          StreamMessageMetadata rowMetadata = null;
+          if (_config.isPopulateMetadata()) {
+            rowMetadata = (StreamMessageMetadata) 
_rowMetadataExtractor.extract(messageAndOffset);
+          }
           filtered.add(
-              new MessageAndOffsetAndMetadata(message.get(), offset, 
_rowMetadataExtractor.extract(messageAndOffset)));
+              new KafkaStreamMessage(keyBytes, message.get(), offset, 
rowMetadata));

Review Comment:
   I don't think there is a good reason behind this. just how the code evolved. 
I can make record offset to be part of metadata. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to