navina commented on code in PR #9224: URL: https://github.com/apache/pinot/pull/9224#discussion_r971758010
########## pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java: ########## @@ -43,30 +45,36 @@ public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, i } @Override - public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset startMsgOffset, + public MessageBatch<KafkaStreamMessage> fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) { final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset(); final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset(); return fetchMessages(startOffset, endOffset, timeoutMillis); } - public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, int timeoutMillis) { + public MessageBatch<KafkaStreamMessage> fetchMessages(long startOffset, long endOffset, int timeoutMillis) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout: {}ms", _topicPartition, startOffset, endOffset, timeoutMillis); } _consumer.seek(_topicPartition, startOffset); ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis)); List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition); - List<MessageAndOffsetAndMetadata> filtered = new ArrayList<>(messageAndOffsets.size()); + List<KafkaStreamMessage> filtered = new ArrayList<>(messageAndOffsets.size()); long lastOffset = startOffset; for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) { + String key = messageAndOffset.key(); + byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8); Bytes message = messageAndOffset.value(); long offset = messageAndOffset.offset(); if (offset >= startOffset & (endOffset > offset | endOffset == -1)) { if (message != null) { + StreamMessageMetadata rowMetadata = null; + if (_config.isPopulateMetadata()) { + rowMetadata = (StreamMessageMetadata) _rowMetadataExtractor.extract(messageAndOffset); + } filtered.add( - new MessageAndOffsetAndMetadata(message.get(), offset, _rowMetadataExtractor.extract(messageAndOffset))); + new KafkaStreamMessage(keyBytes, message.get(), offset, rowMetadata)); Review Comment: I don't think there is a good reason behind this. just how the code evolved. I can make record offset to be part of metadata. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org