adixitconfluent commented on code in PR #16969:
URL: https://github.com/apache/kafka/pull/16969#discussion_r1751801277


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -560,88 +588,58 @@ void maybeProcessFetchQueue() {
                         
maybeCompleteInitializationWithException(sharePartitionKey, 
shareFetchPartitionData.future, throwable);
                         return;
                     }
-                    // Fetch messages for the partition only if it is active.
-                    int partitionMaxBytes = 
shareFetchPartitionData.partitionMaxBytes.getOrDefault(topicIdPartition, 0);
-                    // Add the share partition to the list of partitions to be 
fetched only if we can
-                    // acquire the fetch lock on it.
-                    if (sharePartition.maybeAcquireFetchLock()) {
-                        // If the share partition is already at capacity, we 
should not attempt to fetch.

Review Comment:
   Hi @apoorvmittal10 , yes, I think we need some refactor between 
SharePartitionManager and DelayedShareFetch code to address the share partition 
initialization. I have created a JIRA 
https://issues.apache.org/jira/browse/KAFKA-17510 and we can address this in 
the future PRs since it does not affect the reliability of share groups code in 
AK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to