apoorvmittal10 commented on code in PR #19928:
URL: https://github.com/apache/kafka/pull/19928#discussion_r2136306887
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -808,9 +809,18 @@ private void
releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topi
// then we should check if there is a pending share fetch request for
the topic-partition and complete it.
// We add the action to delayed actions queue to avoid an infinite
call stack, which could happen if
// we directly call delayedShareFetchPurgatory.checkAndComplete
- replicaManager.addToActionQueue(() ->
topicIdPartitions.forEach(topicIdPartition ->
+ replicaManager.addToActionQueue(() ->
topicIdPartitions.forEach(topicIdPartition -> {
replicaManager.completeDelayedShareFetchRequest(
- new DelayedShareFetchGroupKey(shareFetch.groupId(),
topicIdPartition.topicId(), topicIdPartition.partition()))));
+ new DelayedShareFetchGroupKey(shareFetch.groupId(),
topicIdPartition.topicId(), topicIdPartition.partition()));
+ // As DelayedShareFetch operation is watched over multiple keys,
same operation might be
+ // completed and can contain references to data fetched. Hence, if
the operation is not
+ // removed from other watched keys then there can be a memory
leak. The removal of the
+ // operation is dependent on the purge task by
DelayedOperationPurgatory. Hence, this can
+ // also be prevented by setting smaller value for configuration
{@link ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG}.
+ // However, it's best to trigger the check on all the keys that
are being watched which
+ // should free the memory for the completed operation.
+ replicaManager.completeDelayedShareFetchRequest(new
DelayedShareFetchPartitionKey(topicIdPartition));
Review Comment:
I have added the test results
[here](https://github.com/apache/kafka/pull/19928#issuecomment-2956678794).
Also monitored the memory consumption in jconsole, which looks stable. There is
no degradation when not reading from remote storage.
Without this fix, if we run a producer in parallel to share consumer then
also the issue cannot happen as produce also triggers purgatory to check on
watch keys per topic-partition itself.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]