KKcorps commented on code in PR #15563: URL: https://github.com/apache/pinot/pull/15563#discussion_r2054663147
########## pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java: ########## @@ -93,11 +93,31 @@ private KinesisMessageBatch getKinesisMessageBatch(KinesisPartitionGroupOffset s return new KinesisMessageBatch(List.of(), startOffset, true); } - // Read records - rateLimitRequests(); - GetRecordsRequest getRecordRequest = - GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build(); - GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordRequest); + // Read records from kinesis. + // Based on getRecords documentation, we might get a response with empty records but a non-null nextShardIterator. + // This method is also used to accurately determine if we reached end of shard. So, we need to use nextShardIterator + // and call getRecords again until we get non-empty records or null nextShardIterator. + // To prevent an infinite loop due to some bug, we will limit the number of attempts + GetRecordsResponse getRecordsResponse; + int attempts = 0; + while (true) { Review Comment: Let's remove this. We had this while loop code a while back but chose to remove it in #12806 we want the consumer thread to handle empty record response -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org