This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 40e9fcd7426 KAFKA-20119 Clarify that `Consumer#unsubscribe` does not 
trigger auto-commit (#21424)
40e9fcd7426 is described below

commit 40e9fcd742667e489547e1d0d7905f352616795a
Author: TaiJuWu <[email protected]>
AuthorDate: Wed Apr 8 13:21:02 2026 +0800

    KAFKA-20119 Clarify that `Consumer#unsubscribe` does not trigger 
auto-commit (#21424)
    
    Document that unsubscribe() doesn't commit offsets even with auto-commit
    enabled,  and add test to verify this behavior
    
    Reviewers: Lianet Magrans <[email protected]>, David Jacot
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../clients/consumer/PlaintextConsumerTest.java    | 43 +++++++++++++++
 .../kafka/clients/consumer/KafkaConsumer.java      |  4 ++
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 62 ++++++++++++++++++++++
 .../consumer/internals/AsyncKafkaConsumerTest.java | 32 +++++++++++
 4 files changed, 141 insertions(+)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
index ec9704c71ee..ed9cfc40ee1 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
@@ -1823,6 +1823,49 @@ public class PlaintextConsumerTest {
         return result.get();
     }
 
+    @ClusterTest
+    public void 
testClassicConsumerUnsubscribeDoesNotCommitOffsetsWithAutoCommitEnabled() 
throws Exception {
+        
testUnsubscribeDoesNotCommitOffsetsWithAutoCommitEnabled(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerUnsubscribeDoesNotCommitOffsetsWithAutoCommitEnabled() throws 
Exception {
+        
testUnsubscribeDoesNotCommitOffsetsWithAutoCommitEnabled(GroupProtocol.CONSUMER);
+    }
+
+    /**
+     * Verify that {@link Consumer#unsubscribe()} does not commit offsets even 
when
+     * {@code enable.auto.commit} is enabled. A second consumer using the same 
group ID
+     * should see no committed offsets after the first consumer unsubscribes.
+     */
+    private void 
testUnsubscribeDoesNotCommitOffsetsWithAutoCommitEnabled(GroupProtocol 
groupProtocol) throws Exception {
+        var numRecords = 10;
+        var groupId = "unsubscribe-no-commit-test";
+        sendRecords(cluster, TP, numRecords);
+
+        // Consumer 1: subscribe, consume records, then unsubscribe (without 
explicit commit)
+        Map<String, Object> config = new HashMap<>();
+        config.put(GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT));
+        config.put(GROUP_ID_CONFIG, groupId);
+        config.put(ENABLE_AUTO_COMMIT_CONFIG, true);
+
+        try (Consumer<byte[], byte[]> consumer1 = cluster.consumer(config)) {
+            consumer1.subscribe(List.of(TOPIC));
+            consumeRecords(consumer1, numRecords);
+
+            // Unsubscribe - this should NOT commit offsets even though 
auto-commit is enabled
+            consumer1.unsubscribe();
+        }
+
+        // Consumer 2: use the same group ID to check committed offsets
+        try (Consumer<byte[], byte[]> consumer2 = cluster.consumer(config)) {
+            consumer2.subscribe(List.of(TOPIC));
+            OffsetAndMetadata committed = 
consumer2.committed(Set.of(TP)).get(TP);
+            assertNull(committed,
+                    "unsubscribe() should not commit offsets even when 
auto-commit is enabled");
+        }
+    }
+
     private void awaitMetricsCleanup(
         Consumer<?, ?> consumer,
         String metricName,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 201c467cd28..1897bf54fad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -832,6 +832,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> 
{
     /**
      * Unsubscribe from topics currently subscribed with {@link 
#subscribe(Collection)} or {@link #subscribe(Pattern)}.
      * This also clears any partitions directly assigned through {@link 
#assign(Collection)}.
+     * <p>
+     * <b>Note:</b> Unlike {@link #close()}, this method does not commit the 
pending offsets before
+     * unsubscribing, even if {@code enable.auto.commit} is enabled. To avoid 
duplicate processing upon re-joining,
+     * it is recommended to explicitly call {@link #commitSync()} before 
invoking this method.
      *
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors (e.g. rebalance callback errors)
      */
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ae5dd65a826..da4a799fa5e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1790,6 +1790,68 @@ public class KafkaConsumerTest {
         client.requests().clear();
     }
 
+    /**
+     * Verify that unsubscribe() does not commit offsets even when auto-commit 
is enabled.
+     * This ensures users are aware that they need to explicitly call 
commitSync() before
+     * unsubscribing to avoid duplicate processing upon re-joining the group.
+     */
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+    @SuppressWarnings("unchecked")
+    public void 
testUnsubscribeDoesNotCommitOffsetsEvenWithAutoCommitEnabled(GroupProtocol 
groupProtocol) {
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+
+        Map<String, Integer> tpCounts = new HashMap<>();
+        tpCounts.put(topic, 1);
+        initMetadata(client, tpCounts);
+        Node node = metadata.fetch().nodes().get(0);
+
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
+
+        // Create consumer with auto-commit enabled
+        consumer = newConsumer(groupProtocol, time, client, subscription, 
metadata, assignor, true, groupInstanceId);
+
+        initializeSubscriptionWithSingleTopic(consumer, 
getConsumerRebalanceListener(consumer));
+
+        // Mock rebalance responses
+        prepareRebalance(client, node, assignor, List.of(tp0), null);
+
+        consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
+        consumer.poll(Duration.ZERO);
+
+        // Verify that subscription are set up correctly
+        assertEquals(Set.of(topic), consumer.subscription());
+
+        // Mock a fetch response so that we have consumed some data
+        Map<TopicPartition, FetchInfo> fetches = new HashMap<>();
+        fetches.put(tp0, new FetchInfo(0, 10));
+        client.respondFrom(fetchResponse(fetches), node);
+        client.poll(0, time.milliseconds());
+
+        ConsumerRecords<String, String> records = (ConsumerRecords<String, 
String>) consumer.poll(Duration.ofMillis(1));
+        assertEquals(10, records.count());
+        assertEquals(10L, consumer.position(tp0));
+
+        // Clear previous requests to focus on unsubscribe behavior
+        client.requests().clear();
+
+        // Call unsubscribe - this should NOT commit offsets even though 
auto-commit is enabled
+        consumer.unsubscribe();
+
+        // Verify that subscription and assignment are both cleared
+        assertEquals(Collections.emptySet(), consumer.subscription());
+        assertEquals(Collections.emptySet(), consumer.assignment());
+
+        // Verify that no offset commit request was sent despite auto-commit 
being enabled
+        for (ClientRequest req : client.requests()) {
+            assertNotSame(ApiKeys.OFFSET_COMMIT, req.requestBuilder().apiKey(),
+                    "unsubscribe() should not commit offsets even when 
auto-commit is enabled");
+        }
+
+        client.requests().clear();
+    }
+
     // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
     //       Once it is implemented, this should use both group protocols.
     @ParameterizedTest
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 801942225b7..327f1b25afc 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -1935,6 +1935,38 @@ public class AsyncKafkaConsumerTest {
         
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
     }
 
+    /**
+     * Verify that unsubscribe() does not commit offsets even when auto-commit 
is enabled.
+     * This ensures users are aware that they need to explicitly call 
commitSync() before
+     * unsubscribing to avoid duplicate processing upon re-joining the group.
+     */
+    @Test
+    public void testUnsubscribeDoesNotCommitOffsetsEvenWithAutoCommitEnabled() 
{
+        Properties props = requiredConsumerConfigAndGroupId("test-group");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+
+        consumer = newConsumer(props);
+
+        // Subscribe to a topic
+        completeTopicSubscriptionChangeEventSuccessfully();
+        consumer.subscribe(singleton("topic"));
+
+        // Clear any previous invocations to focus on unsubscribe behavior
+        clearInvocations(applicationEventHandler);
+
+        // Call unsubscribe - this should NOT commit offsets even though 
auto-commit is enabled
+        completeUnsubscribeApplicationEventSuccessfully();
+        consumer.unsubscribe();
+
+        // Verify that UnsubscribeEvent was sent
+        
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
+
+        // Verify that no commit event (sync or async) was sent despite 
auto-commit being enabled
+        verify(applicationEventHandler, 
never()).add(ArgumentMatchers.isA(SyncCommitEvent.class));
+        verify(applicationEventHandler, 
never()).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
+        verify(applicationEventHandler, 
never()).add(ArgumentMatchers.isA(CommitOnCloseEvent.class));
+    }
+
     @Test
     public void testSeekToBeginning() {
         Collection<TopicPartition> topics = Collections.singleton(new 
TopicPartition("test", 0));

Reply via email to