adixitconfluent commented on code in PR #19592:
URL: https://github.com/apache/kafka/pull/19592#discussion_r2071250949
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -291,11 +293,11 @@ public boolean tryComplete() {
// replicaManager.readFromLog to populate the offset metadata
and update the fetch offset metadata for
// those topic partitions.
LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
- // Store the remote fetch info and the topic partition for
which we need to perform remote fetch.
- Optional<TopicPartitionRemoteFetchInfo>
topicPartitionRemoteFetchInfoOpt =
maybePrepareRemoteStorageFetchInfo(topicPartitionData,
replicaManagerReadResponse);
Review Comment:
Hi @junrao , thanks for pointing out this problem. I think this situation
can potentially occur. In order to handle it, I propose that we can update the
cached share partition's fetch offset metadata to `null` for this offset during
`onComplete` while completing local log share fetch request. Something like
below. I have added this in my latest commit. Let me know if this looks good.
Thanks!
```
private void updateFetchOffsetMetadataForRemoteFetchPartitions(
LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse
) {
replicaManagerReadResponse.forEach((topicIdPartition, logReadResult)
-> {
if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
SharePartition sharePartition =
sharePartitions.get(topicIdPartition);
sharePartition.updateFetchOffsetMetadata(
topicPartitionData.get(topicIdPartition),
null
);
}
});
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]