junrao commented on code in PR #19592:
URL: https://github.com/apache/kafka/pull/19592#discussion_r2070639999
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -668,21 +677,24 @@ private void processRemoteFetchOrException(
private boolean maybeCompletePendingRemoteFetch() {
boolean canComplete = false;
- TopicIdPartition topicIdPartition =
remoteFetchOpt.get().topicIdPartition();
- try {
-
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
- } catch (KafkaStorageException e) { // Case a
- log.debug("TopicPartition {} is in an offline log directory,
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
- canComplete = true;
- } catch (UnknownTopicOrPartitionException e) { // Case b
- log.debug("Broker no longer knows of topicPartition {}, satisfy {}
immediately", topicIdPartition, shareFetch.fetchParams());
- canComplete = true;
- } catch (NotLeaderOrFollowerException e) { // Case c
- log.debug("Broker is no longer the leader or follower of
topicPartition {}, satisfy {} immediately", topicIdPartition,
shareFetch.fetchParams());
- canComplete = true;
+ for (TopicIdPartition topicIdPartition :
pendingRemoteFetchesOpt.get().fetchOffsetMetadataMap().keySet()) {
Review Comment:
Could we change the comment for case d to say "All remote storage read
requests completed"?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -291,11 +293,11 @@ public boolean tryComplete() {
// replicaManager.readFromLog to populate the offset metadata
and update the fetch offset metadata for
// those topic partitions.
LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
- // Store the remote fetch info and the topic partition for
which we need to perform remote fetch.
- Optional<TopicPartitionRemoteFetchInfo>
topicPartitionRemoteFetchInfoOpt =
maybePrepareRemoteStorageFetchInfo(topicPartitionData,
replicaManagerReadResponse);
Review Comment:
There seems to be an existing issue with remote fetch. Consider the
following.
1. tryComplete() is called on 1 partition. The fetch offset is still in
local log and readFromLog() returns a local fetchOffsetMetadata, which is then
cached in sharePartition. The partition doesn't satisfy minBytes yet.
tryCompete() returns false.
2. tryComplete() is called again on that partition. Since
fetchOffsetMetadata is cached in sharePartition, readFromLog() is not called.
Now, the partition satisfies minBytes.
3. onComplete() is called. The cached fetchOffsetMetadata is now only
available in remote storage and readFromLog() returns an empty Records and a
non-empty `logReadResult.info().delayedRemoteStorageFetch`. Since there is no
logic to handle remote fetch in completeLocalLogShareFetchRequest(), an empty
response is sent to the client.
4. The client will fetch with the same offset and step 2 and 3 will be
repeated.
If a client gets into this situation, it will never make progress. Is this
correct?
##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -1287,13 +1303,15 @@ public void
testRemoteStorageFetchTryCompleteThrowsException() {
assertTrue(delayedShareFetch.isCompleted());
// The future of shareFetch completes.
assertTrue(shareFetch.isCompleted());
+ // The remoteFetchTask created for tp1 is cancelled successfully.
+ assertTrue(remoteFetchTask.isCancelled());
assertFalse(future.isCompletedExceptionally());
- assertEquals(Set.of(tp1), future.join().keySet());
+ assertEquals(Set.of(tp1, tp2), future.join().keySet());
// Exception occurred and was handled.
- Mockito.verify(exceptionHandler, times(1)).accept(any(), any());
- // Verify the locks are released for both local and remote read topic
partitions tp0 and tp1 because of exception occurrence.
+ Mockito.verify(exceptionHandler, times(2)).accept(any(), any());
+ // Verify the locks are released for both local and remote read topic
partitions tp0, tp1 and tp2 because of exception occurrence.
Review Comment:
both => all
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]