This is an automated email from the ASF dual-hosted git repository.

squah-confluent pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b033b2e8f40 KAFKA-20114: Fix producer ID retry backoff race (#22204)
b033b2e8f40 is described below

commit b033b2e8f4017674fb873979588296affefbf72f
Author: ChickenchickenLove <[email protected]>
AuthorDate: Sat May 9 15:12:29 2026 +0900

    KAFKA-20114: Fix producer ID retry backoff race (#22204)
    
    ### Description
    In `RPCProducerIDManager`, there was a race condition between
    `maybeRequestNextBlock()` and `handleUnsuccessfulResponse()`, which may
    be called by different threads. This race condition could lead to a
    premature retry. To fix this issue, this patch reorders the operation in
    `maybeRequestNextBlock()`.
    
    ### Considered parts
    - It is difficult to add a unit test for this diff because the race
    condition cannot be controlled deterministically without relying on
    scheduling. Instead of adding a unit test, I added comments to the
    paired methods to clarify the intended ordering and concurrency
    assumptions.
    - Race condition between `if (nextProducerIdBlock.get() != null)`and `if
    (!requestInFlight.compareAndSet(false, true))`
      - After `if (nextProducerIdBlock.get() != null)` is checked, another
    thread may set `nextProducerIdBlock`. However, this does not cause a
    premature retry. In `sanityCheckResponse(...)`, the thread only sets
    `nextProducerIdBlock` and does not update `requestInFlight`. Therefore,
    CAS in `maybeRequestNextBlock()`will will fail, and a premature retry
    will not occur.
    
    Reviewers: Sean Quah <[email protected]>
---
 .../transaction/RPCProducerIdManager.java          | 46 ++++++++++++++--------
 1 file changed, 30 insertions(+), 16 deletions(-)

diff --git 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java
 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java
index 72d70fd2e19..59ec7aff7c9 100644
--- 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java
+++ 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java
@@ -60,9 +60,7 @@ public class RPCProducerIdManager implements 
ProducerIdManager {
     final AtomicReference<ProducerIdsBlock> nextProducerIdBlock = new 
AtomicReference<>(null);
     final AtomicReference<ProducerIdsBlock> currentProducerIdBlock = new 
AtomicReference<>(ProducerIdsBlock.EMPTY);
     private final AtomicBoolean requestInFlight = new AtomicBoolean(false);
-    
-    // Setting the value of backoffDeadlineMs should be handled only in the 
response handler thread.
-    // Otherwise, consider using compareAndSet() instead of set().
+
     private final AtomicLong backoffDeadlineMs = new AtomicLong(NO_RETRY);
 
     public RPCProducerIdManager(int brokerId,
@@ -102,7 +100,7 @@ public class RPCProducerIdManager implements 
ProducerIdManager {
                     throw 
Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. 
Waiting for next block");
                 } else {
                     currentProducerIdBlock.set(block);
-                    requestInFlight.set(false);
+                    clearRequestInFlight(NO_RETRY);
                     iteration++;
                 }
             }
@@ -111,14 +109,27 @@ public class RPCProducerIdManager implements 
ProducerIdManager {
     }
 
     private void maybeRequestNextBlock() {
+        if (nextProducerIdBlock.get() != null) {
+            return;
+        }
+
+        // KAFKA-20114 - Acquire requestInFlight before reading 
backoffDeadlineMs. The response handler
+        // updates backoffDeadlineMs before clearing requestInFlight, so a 
successful CAS
+        // after that clear observes the updated backoff and avoids a 
premature retry.
+        if (!requestInFlight.compareAndSet(false, true)) {
+            return;
+        }
+
         var retryTimestamp = backoffDeadlineMs.get();
-        if (retryTimestamp == NO_RETRY || time.milliseconds() >= 
retryTimestamp) {
-            // Send a request only if we reached the retry deadline, or if no 
deadline was set.
-            if (nextProducerIdBlock.get() == null &&
-                    requestInFlight.compareAndSet(false, true)) {
-                sendRequest();
-            }
+        var now = time.milliseconds();
+
+        // Don't send a request if there is a retry deadline and the deadline 
has not passed yet.
+        if (retryTimestamp != NO_RETRY && now < retryTimestamp) {
+            requestInFlight.set(false);
+            return;
         }
+
+        sendRequest();
     }
 
     protected void sendRequest() {
@@ -137,8 +148,7 @@ public class RPCProducerIdManager implements 
ProducerIdManager {
             @Override
             public void onTimeout() {
                 log.warn("{} Timed out when requesting AllocateProducerIds 
from the controller.", logPrefix);
-                backoffDeadlineMs.set(NO_RETRY);
-                requestInFlight.set(false);
+                clearRequestInFlight(NO_RETRY);
             }
         });
     }
@@ -146,8 +156,7 @@ public class RPCProducerIdManager implements 
ProducerIdManager {
     private void handleUnsuccessfulResponse() {
         // There is no need to compare and set because only one thread
         // handles the AllocateProducerIds response.
-        backoffDeadlineMs.set(time.milliseconds() + RETRY_BACKOFF_MS);
-        requestInFlight.set(false);
+        clearRequestInFlight(time.milliseconds() + RETRY_BACKOFF_MS);
     }
 
     protected void handleAllocateProducerIdsResponse(ClientResponse 
clientResponse) {
@@ -186,8 +195,6 @@ public class RPCProducerIdManager implements 
ProducerIdManager {
         }
         if (!successfulResponse) {
             handleUnsuccessfulResponse();
-        } else {
-            backoffDeadlineMs.set(NO_RETRY);
         }
     }
 
@@ -202,4 +209,11 @@ public class RPCProducerIdManager implements 
ProducerIdManager {
         }
         return false;
     }
+    
+    private void clearRequestInFlight(long newBackoffDeadlineMs) {
+        // KAFKA-20114 - Update the backoff before clearing requestInFlight. 
maybeRequestNextBlock
+        // relies on this ordering when it acquires requestInFlight before 
reading the deadline.
+        backoffDeadlineMs.set(newBackoffDeadlineMs);
+        requestInFlight.set(false);
+    }
 }

Reply via email to