apoorvmittal10 commented on code in PR #17583:
URL: https://github.com/apache/kafka/pull/17583#discussion_r1817079818


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -149,15 +149,27 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        log.trace("Try to complete the delayed share fetch request for group 
{}, member {}, topic partitions {}",
-            shareFetchData.groupId(), shareFetchData.memberId(),
-            shareFetchData.partitionMaxBytes().keySet());
+        // There can be multiple threads which might invoke tryComplete for 
same share fetch request
+        // hence check if delay share fetch request is already completed. If 
yes, return true.
+        // However, this check alone cannot guarantee that request is really 
completed. It is possible that
+        // tryComplete is invoked by multiple threads and state has yet not 
updated. Hence, we need to check
+        // the forceComplete response as well.
+        if (isCompleted()) {

Review Comment:
   So I rechecked and added log line to see the tryComplete is being called 
even when `completed` is true. 
   
   Here is my understanding, I can see in DelayedOperation.scala:
   
   1. tryComplete() is always executed safely i.e. from `safeTryComplete` or 
`safeTryCompleteOrElse` which takes a lock on DelayedOperation itself hence no 
2 threads can execute `tryComplete()` simultaneously. Correct?
   2. You are right that `tryCompleteWatched` has `completed` check already. 
But the issue does exist.
   
   This triggers only when there are multiple share consumers for same group 
and same topic partition. I have traced the calls and can find following: the 
calls originates from `addToActionQueue` defined in `onCompleted` of 
DelayedShareFetch. Though the request goes through `tryCompletedWatch` but then 
again the `tryComplete` is called despite completed. The conditional variable 
in DelayedOperation etc. seems fine to me. Not sure how it triggers.
   
   ```
   [2024-10-25 17:34:32,670] INFO Share fetch request for group SG1, member 
hYDnbPjATHqM7uFXBGYKTw is already completed 
(kafka.server.share.DelayedShareFetch)
   [2024-10-25 17:34:32,703] INFO Share fetch request for group SG1, member 
hYDnbPjATHqM7uFXBGYKTw is already completed 
(kafka.server.share.DelayedShareFetch)
   [2024-10-25 17:34:32,754] INFO Share fetch request for group SG1, member 
hYDnbPjATHqM7uFXBGYKTw is already completed 
(kafka.server.share.DelayedShareFetch)
   [2024-10-25 17:34:33,191] INFO Share fetch request for group SG1, member 
hYDnbPjATHqM7uFXBGYKTw is already completed 
(kafka.server.share.DelayedShareFetch)
   [2024-10-25 17:34:33,391] INFO Share fetch request for group SG1, member 
OE4al-DNR6C8u3tiD05rGQ is already completed 
(kafka.server.share.DelayedShareFetch)
   [2024-10-25 17:34:33,682] INFO Share fetch request for group SG1, member 
OE4al-DNR6C8u3tiD05rGQ is already completed 
(kafka.server.share.DelayedShareFetch)
   [2024-10-25 17:34:34,363] INFO Share fetch request for group SG1, member 
OE4al-DNR6C8u3tiD05rGQ is already completed 
(kafka.server.share.DelayedShareFetch)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to