This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 45c4bdc48a2 MINOR: Clarify docs for consumer leave options & 
rebalances (#22058)
45c4bdc48a2 is described below

commit 45c4bdc48a2b9d0fd6b72e1b55b754b8dc7a247e
Author: Lianet Magrans <[email protected]>
AuthorDate: Tue Apr 14 17:07:27 2026 -0400

    MINOR: Clarify docs for consumer leave options & rebalances (#22058)
    
    Clarify behaviour of close options for static and dyn members.
    
    Added integration test to validate how static members leave triggers a
    rebalance if closed with option to leave.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../clients/consumer/PlaintextConsumerTest.java    | 46 ++++++++++++++++++++++
 .../kafka/clients/consumer/CloseOptions.java       |  9 +++--
 .../internals/ConsumerHeartbeatRequestManager.java |  6 ++-
 .../internals/ConsumerMembershipManager.java       |  6 +--
 4 files changed, 60 insertions(+), 7 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
index ed9cfc40ee1..6a1b174d188 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
@@ -1880,6 +1880,52 @@ public class PlaintextConsumerTest {
         }, "Metrics for removed partitions should be cleaned up");
     }
 
+    /**
+     * Tests that when a static member closes with {@link 
CloseOptions.GroupMembershipOperation#LEAVE_GROUP},
+     * the other members in the group receive a rebalance callback. This is in 
contrast to the default
+     * behavior where static members remain in the group on close (no 
rebalance triggered).
+     */
+    @ClusterTest
+    public void testAsyncStaticMemberCloseWithLeaveGroupTriggersRebalance() 
throws Exception {
+        var topicName = "test-static-member-leave-group";
+        var groupId = "test-group-" + UUID.randomUUID();
+        cluster.createTopic(topicName, 2, (short) 1);
+
+        Map<String, Object> consumer1Config = new HashMap<>();
+        consumer1Config.put(GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
+        consumer1Config.put(GROUP_ID_CONFIG, groupId);
+        consumer1Config.put(GROUP_INSTANCE_ID_CONFIG, "instance-1");
+
+        Map<String, Object> consumer2Config = new HashMap<>();
+        consumer2Config.put(GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
+        consumer2Config.put(GROUP_ID_CONFIG, groupId);
+        consumer2Config.put(GROUP_INSTANCE_ID_CONFIG, "instance-2");
+
+        var listener1 = new TestConsumerReassignmentListener();
+        var listener2 = new TestConsumerReassignmentListener();
+
+        try (Consumer<byte[], byte[]> consumer1 = 
cluster.consumer(consumer1Config);
+             Consumer<byte[], byte[]> consumer2 = 
cluster.consumer(consumer2Config)) {
+
+            consumer1.subscribe(List.of(topicName), listener1);
+            consumer2.subscribe(List.of(topicName), listener2);
+
+            awaitRebalance(consumer1, listener1);
+            awaitRebalance(consumer2, listener2);
+
+            var initialAssignedCalls = listener2.callsToAssigned;
+
+            // Consumer 1 closes with LEAVE_GROUP - this should trigger a 
rebalance
+            
consumer1.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP));
+            awaitRebalance(consumer2, listener2);
+
+            // Consumer 2 should have received another assignment callback due 
to the rebalance
+            assertTrue(listener2.callsToAssigned > initialAssignedCalls,
+                "Consumer 2 should have received a rebalance after static 
consumer 1 left the group permanently. " +
+                "Initial assigned calls: " + initialAssignedCalls + ", 
current: " + listener2.callsToAssigned);
+        }
+    }
+
     public static class SerializerImpl implements Serializer<byte[]> {
         private final ByteArraySerializer serializer = new 
ByteArraySerializer();
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java
index 70619f69748..b0862ce76b2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java
@@ -31,11 +31,14 @@ import java.util.Optional;
  */
 public class CloseOptions {
     /**
-     * Enum to specify the group membership operation upon leaving group.
+     * Enum to specify the group membership operation upon leaving a group.
      *
      * <ul>
-     *   <li><b>{@code LEAVE_GROUP}</b>:  means the consumer will leave the 
group.</li>
-     *   <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will remain in 
the group.</li>
+     *   <li><b>{@code LEAVE_GROUP}</b>: The consumer will leave the group. 
This is the default for dynamic members,
+     *       and can be used by static members when they want to permanently 
leave the group and trigger a rebalance.</li>
+     *   <li><b>{@code REMAIN_IN_GROUP}</b>: The consumer will remain in the 
group. This is the default for static members,
+     *       allowing them to rejoin quickly without triggering a rebalance. 
When used by dynamic members, no leave
+     *       heartbeat will be sent and the member will be removed by the 
coordinator after the session timeout expires.</li>
      *   <li><b>{@code DEFAULT}</b>: Applies the default behavior:
      *     <ul>
      *       <li>For <b>static members</b>: The consumer will remain in the 
group.</li>
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
index c5f95305a47..1f916f8b9cb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
@@ -216,8 +216,12 @@ public class ConsumerHeartbeatRequestManager extends 
AbstractHeartbeatRequestMan
     protected boolean shouldSendLeaveHeartbeatNow() {
         // If the consumer has dynamic membership,
         // we should skip the leaving heartbeat when leaveGroupOperation is 
REMAIN_IN_GROUP
-        if (membershipManager.groupInstanceId().isEmpty() && REMAIN_IN_GROUP 
== membershipManager.leaveGroupOperation())
+        if (membershipManager.groupInstanceId().isEmpty() && REMAIN_IN_GROUP 
== membershipManager.leaveGroupOperation()) {
+            logger.debug("Dynamic member {} closed with REMAIN_IN_GROUP. No 
leave heartbeat will be sent, " +
+                "the member will be removed by the coordinator after session 
timeout.",
+                membershipManager.memberId());
             return false;
+        }
         return membershipManager().state() == MemberState.LEAVING;
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
index f58c6c3f296..3266920aea5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
@@ -540,9 +540,9 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
     @Override
     public int leaveGroupEpoch() {
         boolean isStaticMember = groupInstanceId.isPresent();
-        // Currently, the server doesn't have a mechanism for static members 
to permanently leave the group.
-        // Therefore, we use LEAVE_GROUP_MEMBER_EPOCH to force the 
GroupMetadataManager to fence
-        // this member, effectively removing it from the group.
+        // The mechanism to make static members permanently leave the group is 
to
+        // send an HB to leave with the -1 epoch (used by dynamic members).
+        // This will make the group coordinator fence this member, effectively 
removing it from the group.
         if (LEAVE_GROUP == leaveGroupOperation) {
             return ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
         }

Reply via email to