This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 45c4bdc48a2 MINOR: Clarify docs for consumer leave options &
rebalances (#22058)
45c4bdc48a2 is described below
commit 45c4bdc48a2b9d0fd6b72e1b55b754b8dc7a247e
Author: Lianet Magrans <[email protected]>
AuthorDate: Tue Apr 14 17:07:27 2026 -0400
MINOR: Clarify docs for consumer leave options & rebalances (#22058)
Clarify behaviour of close options for static and dyn members.
Added integration test to validate how static members leave triggers a
rebalance if closed with option to leave.
Reviewers: Andrew Schofield <[email protected]>
---
.../clients/consumer/PlaintextConsumerTest.java | 46 ++++++++++++++++++++++
.../kafka/clients/consumer/CloseOptions.java | 9 +++--
.../internals/ConsumerHeartbeatRequestManager.java | 6 ++-
.../internals/ConsumerMembershipManager.java | 6 +--
4 files changed, 60 insertions(+), 7 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
index ed9cfc40ee1..6a1b174d188 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
@@ -1880,6 +1880,52 @@ public class PlaintextConsumerTest {
}, "Metrics for removed partitions should be cleaned up");
}
+ /**
+ * Tests that when a static member closes with {@link
CloseOptions.GroupMembershipOperation#LEAVE_GROUP},
+ * the other members in the group receive a rebalance callback. This is in
contrast to the default
+ * behavior where static members remain in the group on close (no
rebalance triggered).
+ */
+ @ClusterTest
+ public void testAsyncStaticMemberCloseWithLeaveGroupTriggersRebalance()
throws Exception {
+ var topicName = "test-static-member-leave-group";
+ var groupId = "test-group-" + UUID.randomUUID();
+ cluster.createTopic(topicName, 2, (short) 1);
+
+ Map<String, Object> consumer1Config = new HashMap<>();
+ consumer1Config.put(GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
+ consumer1Config.put(GROUP_ID_CONFIG, groupId);
+ consumer1Config.put(GROUP_INSTANCE_ID_CONFIG, "instance-1");
+
+ Map<String, Object> consumer2Config = new HashMap<>();
+ consumer2Config.put(GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
+ consumer2Config.put(GROUP_ID_CONFIG, groupId);
+ consumer2Config.put(GROUP_INSTANCE_ID_CONFIG, "instance-2");
+
+ var listener1 = new TestConsumerReassignmentListener();
+ var listener2 = new TestConsumerReassignmentListener();
+
+ try (Consumer<byte[], byte[]> consumer1 =
cluster.consumer(consumer1Config);
+ Consumer<byte[], byte[]> consumer2 =
cluster.consumer(consumer2Config)) {
+
+ consumer1.subscribe(List.of(topicName), listener1);
+ consumer2.subscribe(List.of(topicName), listener2);
+
+ awaitRebalance(consumer1, listener1);
+ awaitRebalance(consumer2, listener2);
+
+ var initialAssignedCalls = listener2.callsToAssigned;
+
+ // Consumer 1 closes with LEAVE_GROUP - this should trigger a
rebalance
+
consumer1.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP));
+ awaitRebalance(consumer2, listener2);
+
+ // Consumer 2 should have received another assignment callback due
to the rebalance
+ assertTrue(listener2.callsToAssigned > initialAssignedCalls,
+ "Consumer 2 should have received a rebalance after static
consumer 1 left the group permanently. " +
+ "Initial assigned calls: " + initialAssignedCalls + ",
current: " + listener2.callsToAssigned);
+ }
+ }
+
public static class SerializerImpl implements Serializer<byte[]> {
private final ByteArraySerializer serializer = new
ByteArraySerializer();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java
index 70619f69748..b0862ce76b2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java
@@ -31,11 +31,14 @@ import java.util.Optional;
*/
public class CloseOptions {
/**
- * Enum to specify the group membership operation upon leaving group.
+ * Enum to specify the group membership operation upon leaving a group.
*
* <ul>
- * <li><b>{@code LEAVE_GROUP}</b>: means the consumer will leave the
group.</li>
- * <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will remain in
the group.</li>
+ * <li><b>{@code LEAVE_GROUP}</b>: The consumer will leave the group.
This is the default for dynamic members,
+ * and can be used by static members when they want to permanently
leave the group and trigger a rebalance.</li>
+ * <li><b>{@code REMAIN_IN_GROUP}</b>: The consumer will remain in the
group. This is the default for static members,
+ * allowing them to rejoin quickly without triggering a rebalance.
When used by dynamic members, no leave
+ * heartbeat will be sent and the member will be removed by the
coordinator after the session timeout expires.</li>
* <li><b>{@code DEFAULT}</b>: Applies the default behavior:
* <ul>
* <li>For <b>static members</b>: The consumer will remain in the
group.</li>
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
index c5f95305a47..1f916f8b9cb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
@@ -216,8 +216,12 @@ public class ConsumerHeartbeatRequestManager extends
AbstractHeartbeatRequestMan
protected boolean shouldSendLeaveHeartbeatNow() {
// If the consumer has dynamic membership,
// we should skip the leaving heartbeat when leaveGroupOperation is
REMAIN_IN_GROUP
- if (membershipManager.groupInstanceId().isEmpty() && REMAIN_IN_GROUP
== membershipManager.leaveGroupOperation())
+ if (membershipManager.groupInstanceId().isEmpty() && REMAIN_IN_GROUP
== membershipManager.leaveGroupOperation()) {
+ logger.debug("Dynamic member {} closed with REMAIN_IN_GROUP. No
leave heartbeat will be sent, " +
+ "the member will be removed by the coordinator after session
timeout.",
+ membershipManager.memberId());
return false;
+ }
return membershipManager().state() == MemberState.LEAVING;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
index f58c6c3f296..3266920aea5 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
@@ -540,9 +540,9 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
@Override
public int leaveGroupEpoch() {
boolean isStaticMember = groupInstanceId.isPresent();
- // Currently, the server doesn't have a mechanism for static members
to permanently leave the group.
- // Therefore, we use LEAVE_GROUP_MEMBER_EPOCH to force the
GroupMetadataManager to fence
- // this member, effectively removing it from the group.
+ // The mechanism to make static members permanently leave the group is
to
+ // send an HB to leave with the -1 epoch (used by dynamic members).
+ // This will make the group coordinator fence this member, effectively
removing it from the group.
if (LEAVE_GROUP == leaveGroupOperation) {
return ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
}