krishan1390 commented on code in PR #15563:
URL: https://github.com/apache/pinot/pull/15563#discussion_r2055316372


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -475,7 +475,10 @@ protected boolean consumeLoop()
               messageBatch.getMessageCount(), 
messageBatch.getUnfilteredMessageCount(),
               messageBatch.isEndOfPartitionGroup());
         }
-        _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
+        // We need to check for both endOfPartitionGroup and messageCount == 
0, because
+        // endOfPartitionGroup can be true even if this is the last batch of 
messages (has been observed for kinesis)
+        // To process the last batch of messages, we need to set 
_endOfPartitionGroup to false in such a case
+        _endOfPartitionGroup = messageBatch.getMessageCount() == 0 && 
messageBatch.isEndOfPartitionGroup();

Review Comment:
   This was observed after a shard is split and we try to pause and resume 
table with smallest offset. The server doesn't process the last batch of 
messages and commits the segment. 
   
   Then the controller determines that the shard is not completely consumed and 
starts a new segment for the parent shard. The server goes through the same 
loop and doesn't process the last batch of messages and commits a segment with 
0 docs. This cycle repeats. 
   
   Note that this is intermittent based on how kinesis responds (sometimes 
kinesis can respond with isEndOfPartitionGroup false too)



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -475,7 +475,10 @@ protected boolean consumeLoop()
               messageBatch.getMessageCount(), 
messageBatch.getUnfilteredMessageCount(),
               messageBatch.isEndOfPartitionGroup());
         }
-        _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
+        // We need to check for both endOfPartitionGroup and messageCount == 
0, because
+        // endOfPartitionGroup can be true even if this is the last batch of 
messages (has been observed for kinesis)
+        // To process the last batch of messages, we need to set 
_endOfPartitionGroup to false in such a case
+        _endOfPartitionGroup = messageBatch.getMessageCount() == 0 && 
messageBatch.isEndOfPartitionGroup();

Review Comment:
   although functionally we can move to KinesisMessageBatch, but I think a 
function named "isEndOfPartitionGroup()" should return true in this case where 
message count > 0 but is the last message in the shard. 
   
   also we will need to duplicate the check in PulsarMessageBatch too because 
that too is susceptible to the same bug. 
   
   I am fine to move it if you still think it should be moved. Let me know



##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -93,11 +93,31 @@ private KinesisMessageBatch 
getKinesisMessageBatch(KinesisPartitionGroupOffset s
       return new KinesisMessageBatch(List.of(), startOffset, true);
     }
 
-    // Read records
-    rateLimitRequests();
-    GetRecordsRequest getRecordRequest =
-        
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build();
-    GetRecordsResponse getRecordsResponse = 
_kinesisClient.getRecords(getRecordRequest);
+    // Read records from kinesis.
+    // Based on getRecords documentation, we might get a response with empty 
records but a non-null nextShardIterator.
+    // This method is also used to accurately determine if we reached end of 
shard. So, we need to use nextShardIterator
+    // and call getRecords again until we get non-empty records or null 
nextShardIterator.
+    // To prevent an infinite loop due to some bug, we will limit the number 
of attempts
+    GetRecordsResponse getRecordsResponse;
+    int attempts = 0;
+    while (true) {

Review Comment:
   thanks. I see that primarily that change to remove the while loop was to 
address the "Return the message batch immediately without combining multiple of 
them" through removing "messages.size() >= _config.getNumMaxRecordsToFetch()"
   
   This particular behaviour doesn't change and we won't combine multiple 
batches. 
   
   The new while loop just handles cases where message batch is empty and shard 
iterator is not null. It iterates until it finds a null shard iterator or a non 
empty message batch. 
   
   this seems like a good change right ? although we can make the client in the 
client method consumedEndOfShard() but I think adding it to 
getKinesisMessageBatch() is more appropriate so that the case is handled for 
other use cases too. 
   
   what do you suggest now ? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -994,15 +994,17 @@ Set<Integer> getPartitionIds(StreamConfig streamConfig)
   @VisibleForTesting
   Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState 
idealState) {
     Set<Integer> partitionIds = new HashSet<>();
-    boolean allPartitionIdsFetched = true;
+    boolean allPartitionIdsFetched = false;
     for (int i = 0; i < streamConfigs.size(); i++) {
       final int index = i;
       try {
         partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream()
             .map(partitionId -> 
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, 
index))
             .collect(Collectors.toSet()));
+        allPartitionIdsFetched = true;

Review Comment:
   thanks. good catch. fixed it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to