junrao commented on code in PR #19592:
URL: https://github.com/apache/kafka/pull/19592#discussion_r2071876211
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -274,6 +276,32 @@ private void
processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicI
}
}
+ /**
+ * This function updates the cached fetch offset metadata to null
corresponding to the share partition's fetch offset.
+ * This is required in the case when a topic partition that has local log
fetch during tryComplete changes to remote
Review Comment:
has local log fetch during tryComplete changes to => has local log fetch
during tryComplete, but changes to
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -272,13 +276,39 @@ private void
processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicI
}
}
+ /**
+ * This function updates the cached fetch offset metadata to null
corresponding to the share partition's fetch offset.
+ * This is required in the case when a topic partition that has local log
fetch during tryComplete changes to remote
+ * storage fetch in onComplete. In this situation, if the cached
fetchOffsetMetadata got updated in tryComplete, then
+ * we will enter a state where each share fetch request for this topic
partition from client will use the cached
+ * fetchOffsetMetadata in tryComplete and return an empty response to the
client from onComplete.
+ * Hence, we require to set offsetMetadata to null for this fetch offset,
which would cause tryComplete to update
+ * fetchOffsetMetadata and thereby we will identify this partition for
remote storage fetch.
+ * @param topicPartitionData - Map containing the fetch offset for the
topic partitions.
+ * @param replicaManagerReadResponse - Map containing the readFromLog
response from replicaManager for the topic partitions.
+ */
+ private void updateFetchOffsetMetadataForRemoteFetchPartitions(
Review Comment:
updateFetchOffsetMetadataForRemoteFetchPartitions =>
resetFetchOffsetMetadataForRemoteFetchPartitions ?
Also, should we call this from completeRemoteStorageShareFetchRequest() too
for the locally read partitions returning delayedRemoteStorageFetch?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -248,6 +250,8 @@ private void
processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicI
// updated in a different tryComplete thread.
responseData = combineLogReadResponse(topicPartitionData,
localPartitionsAlreadyFetched);
+
updateFetchOffsetMetadataForRemoteFetchPartitions(topicPartitionData,
responseData);
Review Comment:
This is an existing issue. Here, we add the partition to responseData if
entry.getValue().info().delayedRemoteStorageFetch.isEmpty() is true. However,
in completeRemoteStorageShareFetchRequest(), we skip it.
```
for (Map.Entry<TopicIdPartition, LogReadResult> entry :
responseData.entrySet()) {
if
(entry.getValue().info().delayedRemoteStorageFetch.isEmpty()) {
shareFetchPartitionData.add(
```
It would be useful to make them consistent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]