This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5d03ccff57e KAFKA-15529 Fix race condition between isConsumed and 
position updates in AsyncKafkaConsumer fetch path (#21476)
5d03ccff57e is described below

commit 5d03ccff57ed398f020e9394bf575735f21e69cc
Author: majialong <[email protected]>
AuthorDate: Sat Apr 4 13:44:57 2026 +0800

    KAFKA-15529 Fix race condition between isConsumed and position updates in 
AsyncKafkaConsumer fetch path (#21476)
    
    Previously, `isConsumed` was not volatile and `drain()` set `isConsumed
    = true` before the `position` was updated. In the window between
    `drain()` and the `position` update, the background thread could race
    in, observe **_isConsumed == true but read a stale position_**, leading
    to duplicate fetch requests with the old offset.
    
    This PR declares `isConsumed` as volatile and reorders the operations so
    that the subscription `position` is always updated before `drain()` sets
    `isConsumed = true,` ensuring the background thread always sees the
    `updated position` when it observes `isConsumed == true`.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../clients/consumer/internals/CompletedFetch.java |  9 ++++--
 .../clients/consumer/internals/FetchCollector.java |  6 ++++
 .../consumer/internals/FetchCollectorTest.java     | 32 ++++++++++++++++++++++
 gradle/spotbugs-exclude.xml                        |  3 ++
 4 files changed, 48 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
index 5345bdcc8e9..f6fc3beeb84 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
@@ -78,7 +78,8 @@ public class CompletedFetch {
     private boolean corruptLastRecord = false;
     private long nextFetchOffset;
     private Optional<Integer> lastEpoch;
-    private boolean isConsumed = false;
+    private volatile boolean isConsumed = false;
+    private boolean exhausted = false;
     private boolean initialized = false;
 
     CompletedFetch(Logger log,
@@ -121,6 +122,10 @@ public class CompletedFetch {
         return isConsumed;
     }
 
+    boolean isExhausted() {
+        return exhausted;
+    }
+
 
     /**
      * After each partition is parsed, we update the current metric totals 
with the total bytes
@@ -192,7 +197,7 @@ public class CompletedFetch {
                     // fetching the same batch repeatedly).
                     if (currentBatch != null)
                         nextFetchOffset = currentBatch.nextOffset();
-                    drain();
+                    exhausted = true;
                     return null;
                 }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
index 632939f3a37..9765d1a0387 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
@@ -186,6 +186,12 @@ public class FetchCollector<K, V> {
                     positionAdvanced = true;
                 }
 
+                // Drain after position update to ensure the background thread 
sees the updated position
+                // before it sees isConsumed=true. This prevents duplicate 
fetch requests for the old offset.
+                if (nextInLineFetch.isExhausted()) {
+                    nextInLineFetch.drain();
+                }
+
                 Long partitionLag = subscriptions.partitionLag(tp, 
fetchConfig.isolationLevel);
                 if (partitionLag != null)
                     metricsManager.recordPartitionLag(tp, partitionLag);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
index 6b65fc53742..1f4e728794f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
@@ -57,6 +57,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
@@ -71,7 +72,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -160,6 +163,35 @@ public class FetchCollectorTest {
         assertTrue(completedFetch.isConsumed());
     }
 
+    @Test
+    public void testPositionUpdatedBeforeDrainOnExhaustedFetch() {
+        // Set maxPollRecords to DEFAULT_RECORD_COUNT + 1 so the fetchRecords 
loop calls nextFetchedRecord
+        // one extra time, triggering exhaustion and drain within the same 
collectFetch.
+        buildDependencies(DEFAULT_RECORD_COUNT + 1);
+        assignAndSeek(topicAPartition0);
+
+        CompletedFetch completedFetch = spy(completedFetchBuilder
+            .recordCount(DEFAULT_RECORD_COUNT)
+            .build());
+
+        // Record the subscription position at the moment drain() is called.
+        AtomicLong positionAtDrainTime = new AtomicLong(-1);
+        doAnswer(invocation -> {
+            
positionAtDrainTime.set(subscriptions.position(topicAPartition0).offset);
+            invocation.callRealMethod();
+            return null;
+        }).when(completedFetch).drain();
+
+        fetchBuffer.add(completedFetch);
+
+        Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
+        assertEquals(DEFAULT_RECORD_COUNT, fetch.numRecords());
+        assertTrue(completedFetch.isConsumed());
+
+        // Verify that when drain() was invoked, the position had already been 
advanced.
+        assertEquals(DEFAULT_RECORD_COUNT, positionAtDrainTime.get());
+    }
+
     @Test
     public void testFetchWithReadReplica() {
         buildDependencies();
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 6581d229ba6..c4818c76c67 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -494,6 +494,7 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
         These are possibly real bugs, and have not been evaluated, they were 
just bulk excluded to unblock upgrading Spotbugs.
          -->
         <Or>
+            <Class 
name="org.apache.kafka.clients.consumer.internals.CompletedFetch"/>
             <Class name="org.apache.kafka.clients.producer.MockProducer"/>
             <Class 
name="org.apache.kafka.clients.producer.internals.ProducerBatch"/>
             <Class name="org.apache.kafka.common.utils.ChunkedBytesStream"/>
@@ -529,6 +530,7 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
         These are possibly real bugs, and have not been evaluated, they were 
just bulk excluded to unblock upgrading Spotbugs.
          -->
         <Or>
+            <Class 
name="org.apache.kafka.clients.consumer.internals.CompletedFetch"/>
             <Class 
name="org.apache.kafka.clients.producer.internals.ProducerBatch"/>
             <Class 
name="org.apache.kafka.clients.producer.internals.RecordAccumulator"/>
             <Class 
name="org.apache.kafka.common.security.kerberos.KerberosLogin"/>
@@ -573,6 +575,7 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
             <Class 
name="org.apache.kafka.clients.consumer.internals.AbstractMembershipManager"/>
             <Class 
name="org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer"/>
             <Class 
name="org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer"/>
+            <Class 
name="org.apache.kafka.clients.consumer.internals.CompletedFetch"/>
             <Class 
name="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"/>
             <Class 
name="org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager"/>
             <Class 
name="org.apache.kafka.clients.consumer.internals.StreamsMembershipManager"/>

Reply via email to