krishan1390 commented on code in PR #15563: URL: https://github.com/apache/pinot/pull/15563#discussion_r2055316372
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -475,7 +475,10 @@ protected boolean consumeLoop() messageBatch.getMessageCount(), messageBatch.getUnfilteredMessageCount(), messageBatch.isEndOfPartitionGroup()); } - _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup(); + // We need to check for both endOfPartitionGroup and messageCount == 0, because + // endOfPartitionGroup can be true even if this is the last batch of messages (has been observed for kinesis) + // To process the last batch of messages, we need to set _endOfPartitionGroup to false in such a case + _endOfPartitionGroup = messageBatch.getMessageCount() == 0 && messageBatch.isEndOfPartitionGroup(); Review Comment: This was observed after a shard is split and we try to pause and resume table with smallest offset. The server doesn't process the last batch of messages and commits the segment. Then the controller determines that the shard is not completely consumed and starts a new segment for the parent shard. The server goes through the same loop and doesn't process the last batch of messages and commits a segment with 0 docs. This cycle repeats. Note that this is intermittent based on how kinesis responds (sometimes kinesis can respond with isEndOfPartitionGroup false too) ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -475,7 +475,10 @@ protected boolean consumeLoop() messageBatch.getMessageCount(), messageBatch.getUnfilteredMessageCount(), messageBatch.isEndOfPartitionGroup()); } - _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup(); + // We need to check for both endOfPartitionGroup and messageCount == 0, because + // endOfPartitionGroup can be true even if this is the last batch of messages (has been observed for kinesis) + // To process the last batch of messages, we need to set _endOfPartitionGroup to false in such a case + _endOfPartitionGroup = messageBatch.getMessageCount() == 0 && messageBatch.isEndOfPartitionGroup(); Review Comment: although functionally we can move to KinesisMessageBatch, but I think a function named "isEndOfPartitionGroup()" should return true in this case where message count > 0 but is the last message in the shard. also we will need to duplicate the check in PulsarMessageBatch too because that too is susceptible to the same bug. I am fine to move it if you still think it should be moved. Let me know ########## pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java: ########## @@ -93,11 +93,31 @@ private KinesisMessageBatch getKinesisMessageBatch(KinesisPartitionGroupOffset s return new KinesisMessageBatch(List.of(), startOffset, true); } - // Read records - rateLimitRequests(); - GetRecordsRequest getRecordRequest = - GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build(); - GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordRequest); + // Read records from kinesis. + // Based on getRecords documentation, we might get a response with empty records but a non-null nextShardIterator. + // This method is also used to accurately determine if we reached end of shard. So, we need to use nextShardIterator + // and call getRecords again until we get non-empty records or null nextShardIterator. + // To prevent an infinite loop due to some bug, we will limit the number of attempts + GetRecordsResponse getRecordsResponse; + int attempts = 0; + while (true) { Review Comment: thanks. I see that primarily that change to remove the while loop was to address the "Return the message batch immediately without combining multiple of them" through removing "messages.size() >= _config.getNumMaxRecordsToFetch()" This particular behaviour doesn't change and we won't combine multiple batches. The new while loop just handles cases where message batch is empty and shard iterator is not null. It iterates until it finds a null shard iterator or a non empty message batch. this seems like a good change right ? although we can make the client in the client method consumedEndOfShard() but I think adding it to getKinesisMessageBatch() is more appropriate so that the case is handled for other use cases too. what do you suggest now ? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -994,15 +994,17 @@ Set<Integer> getPartitionIds(StreamConfig streamConfig) @VisibleForTesting Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState idealState) { Set<Integer> partitionIds = new HashSet<>(); - boolean allPartitionIdsFetched = true; + boolean allPartitionIdsFetched = false; for (int i = 0; i < streamConfigs.size(); i++) { final int index = i; try { partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream() .map(partitionId -> IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, index)) .collect(Collectors.toSet())); + allPartitionIdsFetched = true; Review Comment: thanks. good catch. fixed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org