KKcorps commented on code in PR #15563:
URL: https://github.com/apache/pinot/pull/15563#discussion_r2059576964


##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -93,11 +93,31 @@ private KinesisMessageBatch 
getKinesisMessageBatch(KinesisPartitionGroupOffset s
       return new KinesisMessageBatch(List.of(), startOffset, true);
     }
 
-    // Read records
-    rateLimitRequests();
-    GetRecordsRequest getRecordRequest =
-        
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build();
-    GetRecordsResponse getRecordsResponse = 
_kinesisClient.getRecords(getRecordRequest);
+    // Read records from kinesis.
+    // Based on getRecords documentation, we might get a response with empty 
records but a non-null nextShardIterator.
+    // This method is also used to accurately determine if we reached end of 
shard. So, we need to use nextShardIterator
+    // and call getRecords again until we get non-empty records or null 
nextShardIterator.
+    // To prevent an infinite loop due to some bug, we will limit the number 
of attempts
+    GetRecordsResponse getRecordsResponse;
+    int attempts = 0;
+    while (true) {

Review Comment:
   No, the reason we also did it was because it was leading to confusion when 
debugging via logs where consumer thread was retrying vs KinesisConsumer object.
   I'd prefer keeping this the old way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to