KKcorps commented on code in PR #15563:
URL: https://github.com/apache/pinot/pull/15563#discussion_r2054663147


##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -93,11 +93,31 @@ private KinesisMessageBatch 
getKinesisMessageBatch(KinesisPartitionGroupOffset s
       return new KinesisMessageBatch(List.of(), startOffset, true);
     }
 
-    // Read records
-    rateLimitRequests();
-    GetRecordsRequest getRecordRequest =
-        
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build();
-    GetRecordsResponse getRecordsResponse = 
_kinesisClient.getRecords(getRecordRequest);
+    // Read records from kinesis.
+    // Based on getRecords documentation, we might get a response with empty 
records but a non-null nextShardIterator.
+    // This method is also used to accurately determine if we reached end of 
shard. So, we need to use nextShardIterator
+    // and call getRecords again until we get non-empty records or null 
nextShardIterator.
+    // To prevent an infinite loop due to some bug, we will limit the number 
of attempts
+    GetRecordsResponse getRecordsResponse;
+    int attempts = 0;
+    while (true) {

Review Comment:
   Let's remove this. We had this while loop code a while back but chose to 
remove it in #12806 
   we want the consumer thread to handle empty record response



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to