This is an automated email from the ASF dual-hosted git repository.
squah-confluent pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b033b2e8f40 KAFKA-20114: Fix producer ID retry backoff race (#22204)
b033b2e8f40 is described below
commit b033b2e8f4017674fb873979588296affefbf72f
Author: ChickenchickenLove <[email protected]>
AuthorDate: Sat May 9 15:12:29 2026 +0900
KAFKA-20114: Fix producer ID retry backoff race (#22204)
### Description
In `RPCProducerIDManager`, there was a race condition between
`maybeRequestNextBlock()` and `handleUnsuccessfulResponse()`, which may
be called by different threads. This race condition could lead to a
premature retry. To fix this issue, this patch reorders the operation in
`maybeRequestNextBlock()`.
### Considered parts
- It is difficult to add a unit test for this diff because the race
condition cannot be controlled deterministically without relying on
scheduling. Instead of adding a unit test, I added comments to the
paired methods to clarify the intended ordering and concurrency
assumptions.
- Race condition between `if (nextProducerIdBlock.get() != null)`and `if
(!requestInFlight.compareAndSet(false, true))`
- After `if (nextProducerIdBlock.get() != null)` is checked, another
thread may set `nextProducerIdBlock`. However, this does not cause a
premature retry. In `sanityCheckResponse(...)`, the thread only sets
`nextProducerIdBlock` and does not update `requestInFlight`. Therefore,
CAS in `maybeRequestNextBlock()`will will fail, and a premature retry
will not occur.
Reviewers: Sean Quah <[email protected]>
---
.../transaction/RPCProducerIdManager.java | 46 ++++++++++++++--------
1 file changed, 30 insertions(+), 16 deletions(-)
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java
index 72d70fd2e19..59ec7aff7c9 100644
---
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java
@@ -60,9 +60,7 @@ public class RPCProducerIdManager implements
ProducerIdManager {
final AtomicReference<ProducerIdsBlock> nextProducerIdBlock = new
AtomicReference<>(null);
final AtomicReference<ProducerIdsBlock> currentProducerIdBlock = new
AtomicReference<>(ProducerIdsBlock.EMPTY);
private final AtomicBoolean requestInFlight = new AtomicBoolean(false);
-
- // Setting the value of backoffDeadlineMs should be handled only in the
response handler thread.
- // Otherwise, consider using compareAndSet() instead of set().
+
private final AtomicLong backoffDeadlineMs = new AtomicLong(NO_RETRY);
public RPCProducerIdManager(int brokerId,
@@ -102,7 +100,7 @@ public class RPCProducerIdManager implements
ProducerIdManager {
throw
Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full.
Waiting for next block");
} else {
currentProducerIdBlock.set(block);
- requestInFlight.set(false);
+ clearRequestInFlight(NO_RETRY);
iteration++;
}
}
@@ -111,14 +109,27 @@ public class RPCProducerIdManager implements
ProducerIdManager {
}
private void maybeRequestNextBlock() {
+ if (nextProducerIdBlock.get() != null) {
+ return;
+ }
+
+ // KAFKA-20114 - Acquire requestInFlight before reading
backoffDeadlineMs. The response handler
+ // updates backoffDeadlineMs before clearing requestInFlight, so a
successful CAS
+ // after that clear observes the updated backoff and avoids a
premature retry.
+ if (!requestInFlight.compareAndSet(false, true)) {
+ return;
+ }
+
var retryTimestamp = backoffDeadlineMs.get();
- if (retryTimestamp == NO_RETRY || time.milliseconds() >=
retryTimestamp) {
- // Send a request only if we reached the retry deadline, or if no
deadline was set.
- if (nextProducerIdBlock.get() == null &&
- requestInFlight.compareAndSet(false, true)) {
- sendRequest();
- }
+ var now = time.milliseconds();
+
+ // Don't send a request if there is a retry deadline and the deadline
has not passed yet.
+ if (retryTimestamp != NO_RETRY && now < retryTimestamp) {
+ requestInFlight.set(false);
+ return;
}
+
+ sendRequest();
}
protected void sendRequest() {
@@ -137,8 +148,7 @@ public class RPCProducerIdManager implements
ProducerIdManager {
@Override
public void onTimeout() {
log.warn("{} Timed out when requesting AllocateProducerIds
from the controller.", logPrefix);
- backoffDeadlineMs.set(NO_RETRY);
- requestInFlight.set(false);
+ clearRequestInFlight(NO_RETRY);
}
});
}
@@ -146,8 +156,7 @@ public class RPCProducerIdManager implements
ProducerIdManager {
private void handleUnsuccessfulResponse() {
// There is no need to compare and set because only one thread
// handles the AllocateProducerIds response.
- backoffDeadlineMs.set(time.milliseconds() + RETRY_BACKOFF_MS);
- requestInFlight.set(false);
+ clearRequestInFlight(time.milliseconds() + RETRY_BACKOFF_MS);
}
protected void handleAllocateProducerIdsResponse(ClientResponse
clientResponse) {
@@ -186,8 +195,6 @@ public class RPCProducerIdManager implements
ProducerIdManager {
}
if (!successfulResponse) {
handleUnsuccessfulResponse();
- } else {
- backoffDeadlineMs.set(NO_RETRY);
}
}
@@ -202,4 +209,11 @@ public class RPCProducerIdManager implements
ProducerIdManager {
}
return false;
}
+
+ private void clearRequestInFlight(long newBackoffDeadlineMs) {
+ // KAFKA-20114 - Update the backoff before clearing requestInFlight.
maybeRequestNextBlock
+ // relies on this ordering when it acquires requestInFlight before
reading the deadline.
+ backoffDeadlineMs.set(newBackoffDeadlineMs);
+ requestInFlight.set(false);
+ }
}