adixitconfluent commented on code in PR #19592:
URL: https://github.com/apache/kafka/pull/19592#discussion_r2072854744


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -248,14 +250,18 @@ private void 
processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicI
                 // updated in a different tryComplete thread.
                 responseData = combineLogReadResponse(topicPartitionData, 
localPartitionsAlreadyFetched);
 
+            
resetFetchOffsetMetadataForRemoteFetchPartitions(topicPartitionData, 
responseData);
+
             List<ShareFetchPartitionData> shareFetchPartitionDataList = new 
ArrayList<>();
-            responseData.forEach((topicIdPartition, logReadResult) ->
-                shareFetchPartitionDataList.add(new ShareFetchPartitionData(
-                    topicIdPartition,
-                    topicPartitionData.get(topicIdPartition),
-                    logReadResult.toFetchPartitionData(false)
-                ))
-            );
+            responseData.forEach((topicIdPartition, logReadResult) -> {
+                if (logReadResult.info().delayedRemoteStorageFetch.isEmpty()) {

Review Comment:
   If we have some topic partitions which have non-empty 
`delayedRemoteStorageFetch` data in `logReadResut` while completing share fetch 
request for local log partitions in `onComplete`, we will ignore them. Becuase 
in order to complete them, we need to create RemoteFetch objects for them and 
wait for their completion in the purgatory and that is not possible in 
`onComplete`



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -248,14 +250,18 @@ private void 
processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicI
                 // updated in a different tryComplete thread.
                 responseData = combineLogReadResponse(topicPartitionData, 
localPartitionsAlreadyFetched);
 
+            
resetFetchOffsetMetadataForRemoteFetchPartitions(topicPartitionData, 
responseData);
+
             List<ShareFetchPartitionData> shareFetchPartitionDataList = new 
ArrayList<>();
-            responseData.forEach((topicIdPartition, logReadResult) ->
-                shareFetchPartitionDataList.add(new ShareFetchPartitionData(
-                    topicIdPartition,
-                    topicPartitionData.get(topicIdPartition),
-                    logReadResult.toFetchPartitionData(false)
-                ))
-            );
+            responseData.forEach((topicIdPartition, logReadResult) -> {
+                if (logReadResult.info().delayedRemoteStorageFetch.isEmpty()) {

Review Comment:
   If we have some topic partitions which have non-empty 
`delayedRemoteStorageFetch` data in `logReadResut` while completing share fetch 
request for local log partitions in `onComplete`, we will ignore them. Becuase 
in order to complete them, we need to create `RemoteFetch` objects for them and 
wait for their completion in the purgatory and that is not possible in 
`onComplete`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to