adixitconfluent commented on code in PR #16969:
URL: https://github.com/apache/kafka/pull/16969#discussion_r1739745572
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -564,16 +592,29 @@ void maybeProcessFetchQueue() {
);
} else {
sharePartition.releaseFetchLock();
- log.info("Record lock partition limit exceeded for
SharePartition with key {}, " +
- "cannot acquire more records", sharePartitionKey);
}
}
});
- if (topicPartitionData.isEmpty()) {
- // No locks for share partitions could be acquired, so we
complete the request and
- // will re-fetch for the client in next poll.
+ if (shareFetchPartitionData.partitionMaxBytes.isEmpty()) {
+ // If there are no partitions to fetch then complete the
future with an empty map.
shareFetchPartitionData.future.complete(Collections.emptyMap());
+ // Release the lock so that other threads can process the
queue.
+ releaseProcessFetchQueueLock();
+ if (!fetchQueue.isEmpty())
+ maybeProcessFetchQueue();
+ return;
+ }
+ if (topicPartitionData.isEmpty()) {
+ // No locks for any of the share partitions in the fetch
request could be acquired.
+ Set<Object> delayedShareFetchWatchKeys = new HashSet<>();
+ shareFetchPartitionData.partitionMaxBytes.keySet().forEach(
+ topicIdPartition -> delayedShareFetchWatchKeys.add(
+ new DelayedShareFetchKey(topicIdPartition,
shareFetchPartitionData.groupId)));
+
+ // Add the share fetch to the delayed share fetch purgatory to
process the fetch request.
+ addDelayedShareFetch(new
DelayedShareFetch(shareFetchPartitionData, replicaManager, partitionCacheMap),
Review Comment:
IIUC, we do not want that on adding an entry to the purgatory, it should try
to acquire the partition level lock to check if we can acquire records in the
partition because we have already tried to acquire the lock while processing
the fetch queue and failed .
There is no method in `DelayedOperationPurgatory` by which we can do an
operation like add an operation and keys associated to it and just watch it
without trying. So, I have introduced a variable `isTryingForFirstTime` in
`DelayedShareFetch` where once you add an entry to the purgatory the
corresponding tryComplete won't acquire the partition lock.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]