Jackie-Jiang commented on code in PR #12697: URL: https://github.com/apache/pinot/pull/12697#discussion_r1550681223
########## pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java: ########## @@ -47,53 +50,60 @@ public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, i } @Override - public MessageBatch<StreamMessage<byte[]>> fetchMessages(StreamPartitionMsgOffset startMsgOffset, - StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) { - final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset(); - final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset(); - return fetchMessages(startOffset, endOffset, timeoutMillis); - } - - public synchronized MessageBatch<StreamMessage<byte[]>> fetchMessages(long startOffset, long endOffset, - int timeoutMillis) { + public synchronized KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) { + long startOffset = ((LongMsgOffset) startMsgOffset).getOffset(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Polling partition: {}, startOffset: {}, endOffset: {} timeout: {}ms", _topicPartition, startOffset, - endOffset, timeoutMillis); + LOGGER.debug("Polling partition: {}, startOffset: {}, timeout: {}ms", _topicPartition, startOffset, timeoutMs); } if (_lastFetchedOffset < 0 || _lastFetchedOffset != startOffset - 1) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Seeking to offset: {}", startOffset); } _consumer.seek(_topicPartition, startOffset); } - ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis)); - List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition); - List<StreamMessage<byte[]>> filtered = new ArrayList<>(messageAndOffsets.size()); - long firstOffset = startOffset; - long lastOffset = startOffset; - StreamMessageMetadata rowMetadata = null; - if (!messageAndOffsets.isEmpty()) { - firstOffset = messageAndOffsets.get(0).offset(); - } - for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) { - long offset = messageAndOffset.offset(); - _lastFetchedOffset = offset; - if (offset >= startOffset && (endOffset > offset || endOffset < 0)) { - Bytes message = messageAndOffset.value(); - rowMetadata = (StreamMessageMetadata) _kafkaMetadataExtractor.extract(messageAndOffset); + ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMs)); + List<ConsumerRecord<String, Bytes>> records = consumerRecords.records(_topicPartition); + List<BytesStreamMessage> filteredRecords = new ArrayList<>(records.size()); + long firstOffset = -1; + long offsetOfNextBatch = startOffset; + StreamMessageMetadata lastMessageMetadata = null; + if (!records.isEmpty()) { + firstOffset = records.get(0).offset(); + _lastFetchedOffset = records.get(records.size() - 1).offset(); + offsetOfNextBatch = _lastFetchedOffset + 1; Review Comment: This is verified. If this value is not properly set, the consumption won't match the expected rows -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org