AndrewJSchofield commented on code in PR #17583:
URL: https://github.com/apache/kafka/pull/17583#discussion_r1817078939


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -149,15 +149,27 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        log.trace("Try to complete the delayed share fetch request for group 
{}, member {}, topic partitions {}",
-            shareFetchData.groupId(), shareFetchData.memberId(),
-            shareFetchData.partitionMaxBytes().keySet());
+        // There can be multiple threads which might invoke tryComplete for 
same share fetch request
+        // hence check if delay share fetch request is already completed. If 
yes, return true.
+        // However, this check alone cannot guarantee that request is really 
completed. It is possible that
+        // tryComplete is invoked by multiple threads and state has yet not 
updated. Hence, we need to check
+        // the forceComplete response as well.
+        if (isCompleted()) {
+            return true;
+        }
 
         topicPartitionDataFromTryComplete = acquirablePartitions();
 
-        if (!topicPartitionDataFromTryComplete.isEmpty())
-            return forceComplete();
-        log.info("Can't acquire records for any partition in the share fetch 
request for group {}, member {}, " +
+        if (!topicPartitionDataFromTryComplete.isEmpty()) {
+            boolean result = forceComplete();
+            // If invocation of forceComplete is not successful, then that 
means the request is already completed
+            // hence release the acquired locks.
+            if (!result) {
+                releasePartitionLocks(shareFetchData.groupId(), 
topicPartitionDataFromTryComplete.keySet());
+            }
+            return result;
+        }
+        log.trace("Can't acquire records for any partition in the share fetch 
request for group {}, member {}, " +

Review Comment:
   Thanks. That seems much more appropriate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to