This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5d03ccff57e KAFKA-15529 Fix race condition between isConsumed and
position updates in AsyncKafkaConsumer fetch path (#21476)
5d03ccff57e is described below
commit 5d03ccff57ed398f020e9394bf575735f21e69cc
Author: majialong <[email protected]>
AuthorDate: Sat Apr 4 13:44:57 2026 +0800
KAFKA-15529 Fix race condition between isConsumed and position updates in
AsyncKafkaConsumer fetch path (#21476)
Previously, `isConsumed` was not volatile and `drain()` set `isConsumed
= true` before the `position` was updated. In the window between
`drain()` and the `position` update, the background thread could race
in, observe **_isConsumed == true but read a stale position_**, leading
to duplicate fetch requests with the old offset.
This PR declares `isConsumed` as volatile and reorders the operations so
that the subscription `position` is always updated before `drain()` sets
`isConsumed = true,` ensuring the background thread always sees the
`updated position` when it observes `isConsumed == true`.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../clients/consumer/internals/CompletedFetch.java | 9 ++++--
.../clients/consumer/internals/FetchCollector.java | 6 ++++
.../consumer/internals/FetchCollectorTest.java | 32 ++++++++++++++++++++++
gradle/spotbugs-exclude.xml | 3 ++
4 files changed, 48 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
index 5345bdcc8e9..f6fc3beeb84 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
@@ -78,7 +78,8 @@ public class CompletedFetch {
private boolean corruptLastRecord = false;
private long nextFetchOffset;
private Optional<Integer> lastEpoch;
- private boolean isConsumed = false;
+ private volatile boolean isConsumed = false;
+ private boolean exhausted = false;
private boolean initialized = false;
CompletedFetch(Logger log,
@@ -121,6 +122,10 @@ public class CompletedFetch {
return isConsumed;
}
+ boolean isExhausted() {
+ return exhausted;
+ }
+
/**
* After each partition is parsed, we update the current metric totals
with the total bytes
@@ -192,7 +197,7 @@ public class CompletedFetch {
// fetching the same batch repeatedly).
if (currentBatch != null)
nextFetchOffset = currentBatch.nextOffset();
- drain();
+ exhausted = true;
return null;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
index 632939f3a37..9765d1a0387 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
@@ -186,6 +186,12 @@ public class FetchCollector<K, V> {
positionAdvanced = true;
}
+ // Drain after position update to ensure the background thread
sees the updated position
+ // before it sees isConsumed=true. This prevents duplicate
fetch requests for the old offset.
+ if (nextInLineFetch.isExhausted()) {
+ nextInLineFetch.drain();
+ }
+
Long partitionLag = subscriptions.partitionLag(tp,
fetchConfig.isolationLevel);
if (partitionLag != null)
metricsManager.recordPartitionLag(tp, partitionLag);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
index 6b65fc53742..1f4e728794f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
@@ -57,6 +57,7 @@ import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
@@ -71,7 +72,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -160,6 +163,35 @@ public class FetchCollectorTest {
assertTrue(completedFetch.isConsumed());
}
+ @Test
+ public void testPositionUpdatedBeforeDrainOnExhaustedFetch() {
+ // Set maxPollRecords to DEFAULT_RECORD_COUNT + 1 so the fetchRecords
loop calls nextFetchedRecord
+ // one extra time, triggering exhaustion and drain within the same
collectFetch.
+ buildDependencies(DEFAULT_RECORD_COUNT + 1);
+ assignAndSeek(topicAPartition0);
+
+ CompletedFetch completedFetch = spy(completedFetchBuilder
+ .recordCount(DEFAULT_RECORD_COUNT)
+ .build());
+
+ // Record the subscription position at the moment drain() is called.
+ AtomicLong positionAtDrainTime = new AtomicLong(-1);
+ doAnswer(invocation -> {
+
positionAtDrainTime.set(subscriptions.position(topicAPartition0).offset);
+ invocation.callRealMethod();
+ return null;
+ }).when(completedFetch).drain();
+
+ fetchBuffer.add(completedFetch);
+
+ Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
+ assertEquals(DEFAULT_RECORD_COUNT, fetch.numRecords());
+ assertTrue(completedFetch.isConsumed());
+
+ // Verify that when drain() was invoked, the position had already been
advanced.
+ assertEquals(DEFAULT_RECORD_COUNT, positionAtDrainTime.get());
+ }
+
@Test
public void testFetchWithReadReplica() {
buildDependencies();
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 6581d229ba6..c4818c76c67 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -494,6 +494,7 @@ For a detailed description of spotbugs bug categories, see
https://spotbugs.read
These are possibly real bugs, and have not been evaluated, they were
just bulk excluded to unblock upgrading Spotbugs.
-->
<Or>
+ <Class
name="org.apache.kafka.clients.consumer.internals.CompletedFetch"/>
<Class name="org.apache.kafka.clients.producer.MockProducer"/>
<Class
name="org.apache.kafka.clients.producer.internals.ProducerBatch"/>
<Class name="org.apache.kafka.common.utils.ChunkedBytesStream"/>
@@ -529,6 +530,7 @@ For a detailed description of spotbugs bug categories, see
https://spotbugs.read
These are possibly real bugs, and have not been evaluated, they were
just bulk excluded to unblock upgrading Spotbugs.
-->
<Or>
+ <Class
name="org.apache.kafka.clients.consumer.internals.CompletedFetch"/>
<Class
name="org.apache.kafka.clients.producer.internals.ProducerBatch"/>
<Class
name="org.apache.kafka.clients.producer.internals.RecordAccumulator"/>
<Class
name="org.apache.kafka.common.security.kerberos.KerberosLogin"/>
@@ -573,6 +575,7 @@ For a detailed description of spotbugs bug categories, see
https://spotbugs.read
<Class
name="org.apache.kafka.clients.consumer.internals.AbstractMembershipManager"/>
<Class
name="org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer"/>
<Class
name="org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer"/>
+ <Class
name="org.apache.kafka.clients.consumer.internals.CompletedFetch"/>
<Class
name="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"/>
<Class
name="org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager"/>
<Class
name="org.apache.kafka.clients.consumer.internals.StreamsMembershipManager"/>