This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 40e9fcd7426 KAFKA-20119 Clarify that `Consumer#unsubscribe` does not
trigger auto-commit (#21424)
40e9fcd7426 is described below
commit 40e9fcd742667e489547e1d0d7905f352616795a
Author: TaiJuWu <[email protected]>
AuthorDate: Wed Apr 8 13:21:02 2026 +0800
KAFKA-20119 Clarify that `Consumer#unsubscribe` does not trigger
auto-commit (#21424)
Document that unsubscribe() doesn't commit offsets even with auto-commit
enabled, and add test to verify this behavior
Reviewers: Lianet Magrans <[email protected]>, David Jacot
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../clients/consumer/PlaintextConsumerTest.java | 43 +++++++++++++++
.../kafka/clients/consumer/KafkaConsumer.java | 4 ++
.../kafka/clients/consumer/KafkaConsumerTest.java | 62 ++++++++++++++++++++++
.../consumer/internals/AsyncKafkaConsumerTest.java | 32 +++++++++++
4 files changed, 141 insertions(+)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
index ec9704c71ee..ed9cfc40ee1 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
@@ -1823,6 +1823,49 @@ public class PlaintextConsumerTest {
return result.get();
}
+ @ClusterTest
+ public void
testClassicConsumerUnsubscribeDoesNotCommitOffsetsWithAutoCommitEnabled()
throws Exception {
+
testUnsubscribeDoesNotCommitOffsetsWithAutoCommitEnabled(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void
testAsyncConsumerUnsubscribeDoesNotCommitOffsetsWithAutoCommitEnabled() throws
Exception {
+
testUnsubscribeDoesNotCommitOffsetsWithAutoCommitEnabled(GroupProtocol.CONSUMER);
+ }
+
+ /**
+ * Verify that {@link Consumer#unsubscribe()} does not commit offsets even
when
+ * {@code enable.auto.commit} is enabled. A second consumer using the same
group ID
+ * should see no committed offsets after the first consumer unsubscribes.
+ */
+ private void
testUnsubscribeDoesNotCommitOffsetsWithAutoCommitEnabled(GroupProtocol
groupProtocol) throws Exception {
+ var numRecords = 10;
+ var groupId = "unsubscribe-no-commit-test";
+ sendRecords(cluster, TP, numRecords);
+
+ // Consumer 1: subscribe, consume records, then unsubscribe (without
explicit commit)
+ Map<String, Object> config = new HashMap<>();
+ config.put(GROUP_PROTOCOL_CONFIG,
groupProtocol.name().toLowerCase(Locale.ROOT));
+ config.put(GROUP_ID_CONFIG, groupId);
+ config.put(ENABLE_AUTO_COMMIT_CONFIG, true);
+
+ try (Consumer<byte[], byte[]> consumer1 = cluster.consumer(config)) {
+ consumer1.subscribe(List.of(TOPIC));
+ consumeRecords(consumer1, numRecords);
+
+ // Unsubscribe - this should NOT commit offsets even though
auto-commit is enabled
+ consumer1.unsubscribe();
+ }
+
+ // Consumer 2: use the same group ID to check committed offsets
+ try (Consumer<byte[], byte[]> consumer2 = cluster.consumer(config)) {
+ consumer2.subscribe(List.of(TOPIC));
+ OffsetAndMetadata committed =
consumer2.committed(Set.of(TP)).get(TP);
+ assertNull(committed,
+ "unsubscribe() should not commit offsets even when
auto-commit is enabled");
+ }
+ }
+
private void awaitMetricsCleanup(
Consumer<?, ?> consumer,
String metricName,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 201c467cd28..1897bf54fad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -832,6 +832,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
/**
* Unsubscribe from topics currently subscribed with {@link
#subscribe(Collection)} or {@link #subscribe(Pattern)}.
* This also clears any partitions directly assigned through {@link
#assign(Collection)}.
+ * <p>
+ * <b>Note:</b> Unlike {@link #close()}, this method does not commit the
pending offsets before
+ * unsubscribing, even if {@code enable.auto.commit} is enabled. To avoid
duplicate processing upon re-joining,
+ * it is recommended to explicitly call {@link #commitSync()} before
invoking this method.
*
* @throws org.apache.kafka.common.KafkaException for any other
unrecoverable errors (e.g. rebalance callback errors)
*/
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ae5dd65a826..da4a799fa5e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1790,6 +1790,68 @@ public class KafkaConsumerTest {
client.requests().clear();
}
+ /**
+ * Verify that unsubscribe() does not commit offsets even when auto-commit
is enabled.
+ * This ensures users are aware that they need to explicitly call
commitSync() before
+ * unsubscribing to avoid duplicate processing upon re-joining the group.
+ */
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+ @SuppressWarnings("unchecked")
+ public void
testUnsubscribeDoesNotCommitOffsetsEvenWithAutoCommitEnabled(GroupProtocol
groupProtocol) {
+ ConsumerMetadata metadata = createMetadata(subscription);
+ MockClient client = new MockClient(time, metadata);
+
+ Map<String, Integer> tpCounts = new HashMap<>();
+ tpCounts.put(topic, 1);
+ initMetadata(client, tpCounts);
+ Node node = metadata.fetch().nodes().get(0);
+
+ ConsumerPartitionAssignor assignor = new RangeAssignor();
+
+ // Create consumer with auto-commit enabled
+ consumer = newConsumer(groupProtocol, time, client, subscription,
metadata, assignor, true, groupInstanceId);
+
+ initializeSubscriptionWithSingleTopic(consumer,
getConsumerRebalanceListener(consumer));
+
+ // Mock rebalance responses
+ prepareRebalance(client, node, assignor, List.of(tp0), null);
+
+ consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
+ consumer.poll(Duration.ZERO);
+
+ // Verify that subscription are set up correctly
+ assertEquals(Set.of(topic), consumer.subscription());
+
+ // Mock a fetch response so that we have consumed some data
+ Map<TopicPartition, FetchInfo> fetches = new HashMap<>();
+ fetches.put(tp0, new FetchInfo(0, 10));
+ client.respondFrom(fetchResponse(fetches), node);
+ client.poll(0, time.milliseconds());
+
+ ConsumerRecords<String, String> records = (ConsumerRecords<String,
String>) consumer.poll(Duration.ofMillis(1));
+ assertEquals(10, records.count());
+ assertEquals(10L, consumer.position(tp0));
+
+ // Clear previous requests to focus on unsubscribe behavior
+ client.requests().clear();
+
+ // Call unsubscribe - this should NOT commit offsets even though
auto-commit is enabled
+ consumer.unsubscribe();
+
+ // Verify that subscription and assignment are both cleared
+ assertEquals(Collections.emptySet(), consumer.subscription());
+ assertEquals(Collections.emptySet(), consumer.assignment());
+
+ // Verify that no offset commit request was sent despite auto-commit
being enabled
+ for (ClientRequest req : client.requests()) {
+ assertNotSame(ApiKeys.OFFSET_COMMIT, req.requestBuilder().apiKey(),
+ "unsubscribe() should not commit offsets even when
auto-commit is enabled");
+ }
+
+ client.requests().clear();
+ }
+
// TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
// Once it is implemented, this should use both group protocols.
@ParameterizedTest
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 801942225b7..327f1b25afc 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -1935,6 +1935,38 @@ public class AsyncKafkaConsumerTest {
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
}
+ /**
+ * Verify that unsubscribe() does not commit offsets even when auto-commit
is enabled.
+ * This ensures users are aware that they need to explicitly call
commitSync() before
+ * unsubscribing to avoid duplicate processing upon re-joining the group.
+ */
+ @Test
+ public void testUnsubscribeDoesNotCommitOffsetsEvenWithAutoCommitEnabled()
{
+ Properties props = requiredConsumerConfigAndGroupId("test-group");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+
+ consumer = newConsumer(props);
+
+ // Subscribe to a topic
+ completeTopicSubscriptionChangeEventSuccessfully();
+ consumer.subscribe(singleton("topic"));
+
+ // Clear any previous invocations to focus on unsubscribe behavior
+ clearInvocations(applicationEventHandler);
+
+ // Call unsubscribe - this should NOT commit offsets even though
auto-commit is enabled
+ completeUnsubscribeApplicationEventSuccessfully();
+ consumer.unsubscribe();
+
+ // Verify that UnsubscribeEvent was sent
+
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
+
+ // Verify that no commit event (sync or async) was sent despite
auto-commit being enabled
+ verify(applicationEventHandler,
never()).add(ArgumentMatchers.isA(SyncCommitEvent.class));
+ verify(applicationEventHandler,
never()).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
+ verify(applicationEventHandler,
never()).add(ArgumentMatchers.isA(CommitOnCloseEvent.class));
+ }
+
@Test
public void testSeekToBeginning() {
Collection<TopicPartition> topics = Collections.singleton(new
TopicPartition("test", 0));