This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d9c3b925884 KAFKA-20167 Introduce CloseOptions.DEFAULT for Kafka 
Streams (#21579)
d9c3b925884 is described below

commit d9c3b9258849487b29e915d058f61e08074d51e5
Author: Ken Huang <[email protected]>
AuthorDate: Thu May 28 00:49:16 2026 +0800

    KAFKA-20167 Introduce CloseOptions.DEFAULT for Kafka Streams (#21579)
    
    This patch is addressed the
    
    
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1284+Introduce+CloseOptions.DEFAULT+for+Kafka+Streams
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../consumer/internals/RequestManagers.java        |   1 +
 .../StreamsGroupHeartbeatRequestManager.java       |  47 +++++++-
 .../internals/StreamsMembershipManager.java        |  99 ++++++++++++-----
 .../events/ApplicationEventProcessor.java          |   2 +-
 .../StreamsGroupHeartbeatRequestManagerTest.java   |  51 +++++++++
 .../internals/StreamsMembershipManagerTest.java    | 118 ++++++++++++++++++++-
 .../KafkaStreamsCloseOptionsIntegrationTest.java   |  83 +++++++++++++++
 .../integration/utils/IntegrationTestUtils.java    |  43 ++++++++
 .../org/apache/kafka/streams/CloseOptions.java     |  34 ++++--
 .../org/apache/kafka/streams/KafkaStreams.java     |  29 +++--
 .../streams/processor/internals/StreamThread.java  |  22 +++-
 .../org/apache/kafka/streams/KafkaStreamsTest.java |   7 +-
 .../processor/internals/StreamThreadTest.java      |  93 ++++++++++++++++
 13 files changed, 567 insertions(+), 62 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index dca5067bf4f..36ebbb3bbf3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -222,6 +222,7 @@ public class RequestManagers implements Closeable {
                     if (streamsRebalanceData.isPresent()) {
                         streamsMembershipManager = new 
StreamsMembershipManager(
                             groupRebalanceConfig.groupId,
+                            groupRebalanceConfig.groupInstanceId,
                             streamsRebalanceData.get(),
                             subscriptions,
                             backgroundEventHandler,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index f7b551e812d..4080439283d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -50,6 +50,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP;
 import static 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY;
 import static 
org.apache.kafka.clients.consumer.internals.RequestState.RETRY_BACKOFF_JITTER;
 
@@ -384,6 +385,13 @@ public class StreamsGroupHeartbeatRequestManager 
implements RequestManager {
             heartbeatState.reset();
             return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), 
Collections.singletonList(leaveHeartbeat));
         }
+        if (membershipManager.state() == MemberState.LEAVING && 
shouldSkipLeaveHeartbeat()) {
+            logger.info("Dynamic member {} skipping leave heartbeat 
(operation=REMAIN_IN_GROUP). " +
+                "The broker will remove the member from the group via session 
timeout.",
+                membershipManager.memberId());
+            membershipManager.onHeartbeatRequestSkipped();
+            return EMPTY;
+        }
         if (shouldHeartbeatBeforeIntervalExpires() || 
heartbeatRequestState.canSendRequest(currentTimeMs)) {
             NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequestAndHandleResponse(currentTimeMs);
             return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), 
Collections.singletonList(request));
@@ -411,7 +419,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
      */
     @Override
     public NetworkClientDelegate.PollResult pollOnClose(long currentTimeMs) {
-        if (membershipManager.isLeavingGroup()) {
+        if (membershipManager.isLeavingGroup() && !shouldSkipLeaveHeartbeat()) 
{
             NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequestAndLogResponse(currentTimeMs);
             return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), 
List.of(request));
         }
@@ -458,7 +466,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
 
     /**
      * A heartbeat should be sent without waiting for the heartbeat interval 
to expire if:
-     * - the member is leaving the group
+     * - the member should send a leave heartbeat (see {@link 
#shouldSendLeaveHeartbeat()})
      * or
      * - the member is joining the group or acknowledging the assignment and 
for both cases there is no heartbeat request
      *   in flight.
@@ -466,12 +474,41 @@ public class StreamsGroupHeartbeatRequestManager 
implements RequestManager {
      * @return true if a heartbeat should be sent before the interval expires, 
false otherwise
      */
     private boolean shouldHeartbeatBeforeIntervalExpires() {
-        return membershipManager.state() == MemberState.LEAVING
-            ||
-            (membershipManager.state() == MemberState.JOINING || 
membershipManager.state() == MemberState.ACKNOWLEDGING)
+        return shouldSendLeaveHeartbeat()
+            || (membershipManager.state() == MemberState.JOINING || 
membershipManager.state() == MemberState.ACKNOWLEDGING)
                 && !heartbeatRequestState.requestInFlight();
     }
 
+    /**
+     * Returns whether a leave group heartbeat should be sent. The leave 
heartbeat is skipped only
+     * when the member is dynamic (no group instance ID) and the operation is
+     * {@link 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation#REMAIN_IN_GROUP}.
+     * Static members always send the leave heartbeat (with epoch -2) so the 
broker holds the
+     * assignment until the session timeout.
+     *
+     * @return true if a leave heartbeat should be sent, false otherwise
+     */
+    private boolean shouldSendLeaveHeartbeat() {
+        if (shouldSkipLeaveHeartbeat()) {
+            logger.debug("Member {} skipping leave heartbeat (operation={}, 
static={}).",
+                membershipManager.memberId(),
+                membershipManager.leaveGroupOperation(),
+                membershipManager.groupInstanceId().isPresent());
+            return false;
+        }
+        return membershipManager.state() == MemberState.LEAVING;
+    }
+
+    /**
+     * Returns true if the leave heartbeat should be skipped: only when the 
member is dynamic
+     * (no group instance ID) and the operation is REMAIN_IN_GROUP. Static 
members always send
+     * a leave heartbeat (with epoch -2) so the broker can hold the assignment.
+     */
+    private boolean shouldSkipLeaveHeartbeat() {
+        return REMAIN_IN_GROUP == membershipManager.leaveGroupOperation()
+            && membershipManager.groupInstanceId().isEmpty();
+    }
+
     private void maybePropagateCoordinatorFatalErrorEvent() {
         coordinatorRequestManager.getAndClearFatalError()
             .ifPresent(fatalError -> backgroundEventHandler.add(new 
ErrorEvent(fatalError)));
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
index a378cb0570b..f2d402de171 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.CloseOptions;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackNeededEvent;
@@ -198,7 +199,7 @@ public class StreamsMembershipManager implements 
RequestManager {
     /**
      * Group instance ID to be used by a static member, provided when creating 
the current membership manager.
      */
-    private final Optional<String> groupInstanceId = Optional.empty();
+    private final Optional<String> groupInstanceId;
 
     /**
      * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
@@ -210,12 +211,20 @@ public class StreamsMembershipManager implements 
RequestManager {
 
     /**
      * If the member is currently leaving the group after a call to {@link 
#leaveGroup()} or
-     * {@link #leaveGroupOnClose()}, this will have a future that will 
complete when the ongoing leave operation
-     * completes (callbacks executed and heartbeat request to leave is sent 
out). This will be empty if the
-     * member is not leaving.
+     * {@link #leaveGroupOnClose(CloseOptions.GroupMembershipOperation)}, this 
will have a future that will 
+     * complete when the ongoing leave operation completes (callbacks executed 
and heartbeat request to leave 
+     * is sent out). This will be empty if the member is not leaving.
      */
     private Optional<CompletableFuture<Void>> leaveGroupInProgress = 
Optional.empty();
 
+    /**
+     * The operation the member will perform on leaving the group. Remains 
{@code DEFAULT} until the
+     * member is closing.
+     *
+     * @see CloseOptions.GroupMembershipOperation
+     */
+    private CloseOptions.GroupMembershipOperation leaveGroupOperation = 
CloseOptions.GroupMembershipOperation.DEFAULT;
+
     /**
      * Future that will complete when a stale member completes releasing its 
assignment after
      * leaving the group due to poll timer expired. Used to make sure that the 
member rejoins
@@ -293,6 +302,7 @@ public class StreamsMembershipManager implements 
RequestManager {
      * @param metrics                The metrics.
      */
     public StreamsMembershipManager(final String groupId,
+                                    final Optional<String> groupInstanceId,
                                     final StreamsRebalanceData 
streamsRebalanceData,
                                     final SubscriptionState subscriptionState,
                                     final BackgroundEventHandler 
backgroundEventHandler,
@@ -302,6 +312,7 @@ public class StreamsMembershipManager implements 
RequestManager {
         log = logContext.logger(StreamsMembershipManager.class);
         this.state = MemberState.UNSUBSCRIBED;
         this.groupId = groupId;
+        this.groupInstanceId = groupInstanceId;
         this.backgroundEventHandler = backgroundEventHandler;
         this.streamsRebalanceData = streamsRebalanceData;
         this.subscriptionState = subscriptionState;
@@ -347,12 +358,32 @@ public class StreamsMembershipManager implements 
RequestManager {
     }
 
     /**
-     * @return True if the member is preparing to leave the group (waiting for 
callbacks), or
-     * leaving (sending last heartbeat). This is used to skip proactively 
leaving the group when
-     * the poll timer expires.
+     * @return the operation the member will perform on leaving the group.
+     */
+    public CloseOptions.GroupMembershipOperation leaveGroupOperation() {
+        return leaveGroupOperation;
+    }
+
+    /**
+     * @return True if the member is preparing to leave the group or leaving 
and a leave heartbeat
+     *         should be sent. Returns false for dynamic members with 
REMAIN_IN_GROUP, which skip
+     *         the leave heartbeat entirely.
      */
     public boolean isLeavingGroup() {
-        return state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING;
+        if (CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP == 
leaveGroupOperation && groupInstanceId.isEmpty()) {
+            return false;
+        }
+        MemberState currentState = state();
+        boolean isLeavingState = currentState == MemberState.PREPARE_LEAVING 
|| currentState == MemberState.LEAVING;
+        boolean hasLeaveOperation =
+            // Default operation: both static and dynamic members will send a 
leave heartbeat
+            CloseOptions.GroupMembershipOperation.DEFAULT == 
leaveGroupOperation
+            // Leave group operation: both static and dynamic members will 
send a leave heartbeat
+            || CloseOptions.GroupMembershipOperation.LEAVE_GROUP == 
leaveGroupOperation
+            // Remain in group: static members will send a leave heartbeat 
with -2 epoch to signal
+            // that a member using this instance ID is temporarily gone and 
will rejoin within session timeout.
+            || groupInstanceId.isPresent();
+        return isLeavingState && hasLeaveOperation;
     }
 
     private boolean isNotInGroup() {
@@ -426,6 +457,7 @@ public class StreamsMembershipManager implements 
RequestManager {
         if (reconciliationInProgress) {
             rejoinedWhileReconciliationInProgress = true;
         }
+        leaveGroupOperation = CloseOptions.GroupMembershipOperation.DEFAULT;
         resetEpoch();
         transitionTo(MemberState.JOINING);
         clearCurrentTaskAssignment();
@@ -464,10 +496,26 @@ public class StreamsMembershipManager implements 
RequestManager {
     }
 
     private void finalizeLeaving() {
-        
updateMemberEpoch(StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH);
+        updateMemberEpoch(leaveGroupEpoch());
         clearCurrentTaskAssignment();
     }
 
+    /**
+     * Returns the epoch to use in the leave group heartbeat. Static members 
use (LEAVE_GROUP_STATIC_MEMBER_EPOCH) 
+     * so the broker holds the assignment until session timeout, unless the 
operation is LEAVE_GROUP which forces 
+     * permanent removal.
+     */
+    public int leaveGroupEpoch() {
+        boolean isStaticMember = groupInstanceId.isPresent();
+        
+        if (CloseOptions.GroupMembershipOperation.LEAVE_GROUP == 
leaveGroupOperation) {
+            return StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        }
+        return isStaticMember
+            ? StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH
+            : StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+    }
+
     /**
      * Transition to STALE to release assignments because the member has left 
the group due to
      * expired poll timer. This will trigger the onAllTasksLost callback. Once 
the callback
@@ -532,16 +580,17 @@ public class StreamsMembershipManager implements 
RequestManager {
     }
 
     /**
-     * Notify when the heartbeat request is skipped.
      * Transition out of the {@link MemberState#LEAVING} state even if the 
heartbeat was not sent.
-     * This will ensure that the member is not blocked on {@link 
MemberState#LEAVING} (best
+     * This will ensure that the member is not blocked on {@link 
MemberState#LEAVING}. (best
      * effort to send the request, without any response handling or retry 
logic)
      */
     public void onHeartbeatRequestSkipped() {
         if (state == MemberState.LEAVING) {
-            log.warn("Heartbeat to leave group cannot be sent (most probably 
due to coordinator " +
-                    "not known/available). Member {} with epoch {} will 
transition to {}.",
-                memberId, memberEpoch, MemberState.UNSUBSCRIBED);
+            if (isLeavingGroup()) {
+                log.warn("Heartbeat to leave group cannot be sent (most 
probably due to coordinator " +
+                        "not known/available). Member {} with epoch {} will 
transition to {}.",
+                    memberId, memberEpoch, MemberState.UNSUBSCRIBED);
+            }
             transitionTo(MemberState.UNSUBSCRIBED);
             maybeCompleteLeaveInProgress();
         }
@@ -899,23 +948,15 @@ public class StreamsMembershipManager implements 
RequestManager {
     }
 
     /**
-     * Leaves the group when the member closes.
-     *
-     * <p>
-     * This method does the following:
-     * <ol>
-     *     <li>Transitions member state to {@link 
MemberState#PREPARE_LEAVING}.</li>
-     *     <li>Skips the invocation of the revocation callback or lost 
callback.</li>
-     *     <li>Clears the current and target assignment, unsubscribes from all 
topics and
-     *     transitions the member state to {@link MemberState#LEAVING}.</li>
-     * </ol>
-     * States {@link MemberState#PREPARE_LEAVING} and {@link 
MemberState#LEAVING} cause the heartbeat request manager
-     * to send a leave group heartbeat.
-     * </p>
+     * Closes the member's participation in the group, honoring the requested 
{@link CloseOptions.GroupMembershipOperation}.
+     * Stores the operation and follows the normal leaving path; {@link 
StreamsGroupHeartbeatRequestManager}
+     * decides whether to send or skip the leave group heartbeat based on the 
operation.
      *
-     * @return future that will complete when the heartbeat to leave the group 
has been sent out.
+     * @param membershipOperation the requested close behavior
+     * @return future that will complete when the close operation is done
      */
-    public CompletableFuture<Void> leaveGroupOnClose() {
+    public CompletableFuture<Void> leaveGroupOnClose(final 
CloseOptions.GroupMembershipOperation membershipOperation) {
+        this.leaveGroupOperation = membershipOperation;
         return leaveGroup(true);
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 76636c58d76..be76ac88ba3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -499,7 +499,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
             future.whenComplete(complete(event.future()));
         } else if (requestManagers.streamsMembershipManager.isPresent()) {
             log.debug("Signal the StreamsMembershipManager to leave the 
streams group since the member is closing");
-            CompletableFuture<Void> future = 
requestManagers.streamsMembershipManager.get().leaveGroupOnClose();
+            CompletableFuture<Void> future = 
requestManagers.streamsMembershipManager.get().leaveGroupOnClose(event.membershipOperation());
             future.whenComplete(complete(event.future()));
         }
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index 6c61f13eb3d..3ede3565c11 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
@@ -68,6 +69,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+import static 
org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -332,6 +334,55 @@ class StreamsGroupHeartbeatRequestManagerTest {
         }
     }
 
+    @Test
+    public void testSkipLeaveHeartbeatForRemainInGroupWithDynamicMember() {
+        final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = 
createStreamsGroupHeartbeatRequestManager();
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+        when(membershipManager.state()).thenReturn(MemberState.LEAVING);
+        
when(membershipManager.leaveGroupOperation()).thenReturn(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+        when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());
+
+        final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+        assertEquals(0, result.unsentRequests.size());
+        verify(membershipManager).onHeartbeatRequestSkipped();
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = CloseOptions.GroupMembershipOperation.class, names = 
{"DEFAULT", "REMAIN_IN_GROUP"})
+    public void testSendLeaveHeartbeatForStaticMember(final 
CloseOptions.GroupMembershipOperation operation) {
+        // Static members always send a leave heartbeat (with epoch -2) so the 
broker can hold the
+        // assignment until session timeout, regardless of the close operation.
+        final long heartbeatIntervalMs = 1234;
+        try (
+            final MockedConstruction<HeartbeatRequestState> ignored = 
mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(false);
+                    
when(mock.heartbeatIntervalMs()).thenReturn(heartbeatIntervalMs);
+                    when(mock.requestInFlight()).thenReturn(false);
+                })
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+            when(membershipManager.state()).thenReturn(MemberState.LEAVING);
+            
when(membershipManager.leaveGroupOperation()).thenReturn(operation);
+            
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
+            
when(membershipManager.memberEpoch()).thenReturn(LEAVE_GROUP_STATIC_MEMBER_EPOCH);
+            when(membershipManager.groupId()).thenReturn(GROUP_ID);
+            when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(1, result.unsentRequests.size());
+            verify(membershipManager, never()).onHeartbeatRequestSkipped();
+            final StreamsGroupHeartbeatRequest req =
+                (StreamsGroupHeartbeatRequest) 
result.unsentRequests.get(0).requestBuilder().build();
+            assertEquals(LEAVE_GROUP_STATIC_MEMBER_EPOCH, 
req.data().memberEpoch());
+            assertEquals(INSTANCE_ID, req.data().instanceId());
+        }
+    }
+
     @ParameterizedTest
     @EnumSource(value = MemberState.class, names = {"JOINING", 
"ACKNOWLEDGING"})
     public void testSendingHeartbeatIfMemberIsJoiningOrAcknowledging(final 
MemberState memberState) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
index ae91b7fd0f0..5761eefd09e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.CloseOptions;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackNeededEvent;
@@ -125,6 +126,7 @@ public class StreamsMembershipManagerTest {
     public void setup() {
         membershipManager = new StreamsMembershipManager(
             GROUP_ID,
+            Optional.empty(),
             streamsRebalanceData, subscriptionState, backgroundEventHandler,
             new LogContext("test"),
             time,
@@ -1155,7 +1157,7 @@ public class StreamsMembershipManagerTest {
 
     @Test
     public void testLeaveGroupOnCloseWhenNotInGroup() {
-        testLeaveGroupWhenNotInGroup(membershipManager::leaveGroupOnClose);
+        testLeaveGroupWhenNotInGroup(() -> 
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT));
     }
 
     @Test
@@ -1231,7 +1233,7 @@ public class StreamsMembershipManagerTest {
 
     @Test
     public void testLeaveGroupOnCloseWhenNotInGroupAndFenced() {
-        
testLeaveGroupOnCloseWhenNotInGroupAndFenced(membershipManager::leaveGroupOnClose);
+        testLeaveGroupOnCloseWhenNotInGroupAndFenced(() -> 
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT));
     }
 
     private void testLeaveGroupOnCloseWhenNotInGroupAndFenced(final 
Supplier<CompletableFuture<Void>> leaveGroup) {
@@ -1274,7 +1276,8 @@ public class StreamsMembershipManagerTest {
         verifyInStatePrepareLeaving(membershipManager);
         final CompletableFuture<Void> onGroupLeftBeforeRevocationCallback = 
membershipManager.leaveGroup();
         assertEquals(onGroupLeft, onGroupLeftBeforeRevocationCallback);
-        final CompletableFuture<Void> 
onGroupLeftOnCloseBeforeRevocationCallback = 
membershipManager.leaveGroupOnClose();
+        final CompletableFuture<Void> 
onGroupLeftOnCloseBeforeRevocationCallback = 
+                
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT);
         assertEquals(onGroupLeft, onGroupLeftOnCloseBeforeRevocationCallback);
         onTasksRevokedCallbackExecuted.complete(null);
         verify(memberStateListener).onGroupAssignmentUpdated(Set.of());
@@ -1315,7 +1318,8 @@ public class StreamsMembershipManagerTest {
 
         acknowledging(onTasksAssignedCallbackExecutedSetup);
 
-        final CompletableFuture<Void> onGroupLeft = 
membershipManager.leaveGroupOnClose();
+        final CompletableFuture<Void> onGroupLeft = 
+                
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT);
 
         assertFalse(onGroupLeft.isDone());
         verifyInStateLeaving(membershipManager);
@@ -1324,7 +1328,8 @@ public class StreamsMembershipManagerTest {
         verify(backgroundEventHandler, 
never()).add(any(StreamsOnTasksRevokedCallbackNeededEvent.class));
         final CompletableFuture<Void> 
onGroupLeftBeforeHeartbeatRequestGenerated = membershipManager.leaveGroup();
         assertEquals(onGroupLeft, onGroupLeftBeforeHeartbeatRequestGenerated);
-        final CompletableFuture<Void> 
onGroupLeftOnCloseBeforeHeartbeatRequestGenerated = 
membershipManager.leaveGroupOnClose();
+        final CompletableFuture<Void> 
onGroupLeftOnCloseBeforeHeartbeatRequestGenerated = 
+                
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT);
         assertEquals(onGroupLeft, 
onGroupLeftOnCloseBeforeHeartbeatRequestGenerated);
         assertFalse(onGroupLeft.isDone());
         membershipManager.onHeartbeatRequestGenerated();
@@ -1379,6 +1384,45 @@ public class StreamsMembershipManagerTest {
         
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
 Set.of(), Set.of());
     }
 
+    @Test
+    public void testLeaveGroupOnCloseWithRemainInGroupSkipsLeaveHeartbeat() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
TOPIC_0);
+        final Set<StreamsRebalanceData.TaskId> activeTasks =
+            Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 
PARTITION_0));
+        joining();
+        reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
+            
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+                activeTasks, Set.of(), Set.of());
+        acknowledging(onTasksAssignedCallbackExecuted);
+
+        final CompletableFuture<Void> onGroupLeft =
+            
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+
+        assertFalse(onGroupLeft.isDone());
+        assertEquals(MemberState.LEAVING, membershipManager.state());
+        assertEquals(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP, 
membershipManager.leaveGroupOperation());
+        verify(backgroundEventHandler, 
never()).add(any(StreamsOnTasksRevokedCallbackNeededEvent.class));
+
+        membershipManager.onHeartbeatRequestSkipped();
+
+        assertTrue(onGroupLeft.isDone());
+        assertFalse(onGroupLeft.isCompletedExceptionally());
+        verifyInStateUnsubscribed(membershipManager);
+        verify(subscriptionState).unsubscribe();
+    }
+
+    @Test
+    public void testLeaveGroupOnCloseWithRemainInGroupWhenNotInGroup() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
TOPIC_0);
+
+        final CompletableFuture<Void> onGroupLeft =
+            
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+
+        assertTrue(onGroupLeft.isDone());
+        assertFalse(onGroupLeft.isCompletedExceptionally());
+    }
+
     @Test
     public void testOnHeartbeatRequestSkippedWhenInLeaving() {
         
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
"topic");
@@ -1404,6 +1448,70 @@ public class StreamsMembershipManagerTest {
         assertFalse(future.isCompletedExceptionally());
     }
 
+    @Test
+    public void testLeaveGroupEpochIsStaticMemberEpochForStaticMember() {
+        final StreamsMembershipManager staticMember = new 
StreamsMembershipManager(
+            GROUP_ID,
+            Optional.of("instance-1"),
+            streamsRebalanceData, subscriptionState, backgroundEventHandler,
+            new LogContext("test"), time, new Metrics(time)
+        );
+        
assertEquals(StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH, 
staticMember.leaveGroupEpoch());
+    }
+
+    @Test
+    public void 
testLeaveGroupEpochIsDynamicMemberEpochForStaticMemberWithLeaveGroupOperation() 
{
+        final StreamsMembershipManager staticMember = new 
StreamsMembershipManager(
+            GROUP_ID,
+            Optional.of("instance-1"),
+            streamsRebalanceData, subscriptionState, backgroundEventHandler,
+            new LogContext("test"), time, new Metrics(time)
+        );
+        staticMember.registerStateListener(memberStateListener);
+        
staticMember.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
+        assertEquals(StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, 
staticMember.leaveGroupEpoch());
+    }
+
+    @Test
+    public void 
testLeaveGroupEpochIsStaticMemberEpochForStaticMemberWithRemainInGroup() {
+        final StreamsMembershipManager staticMember = new 
StreamsMembershipManager(
+            GROUP_ID,
+            Optional.of("instance-1"),
+            streamsRebalanceData, subscriptionState, backgroundEventHandler,
+            new LogContext("test"), time, new Metrics(time)
+        );
+        staticMember.registerStateListener(memberStateListener);
+        
staticMember.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+        
assertEquals(StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH, 
staticMember.leaveGroupEpoch());
+    }
+
+    @Test
+    public void 
testIsLeavingGroupReturnsTrueForStaticMemberWithRemainInGroupOperation() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
"topic");
+        final StreamsMembershipManager staticMember = new 
StreamsMembershipManager(
+            GROUP_ID,
+            Optional.of("instance-1"),
+            streamsRebalanceData, subscriptionState, backgroundEventHandler,
+            new LogContext("test"), time, new Metrics(time)
+        );
+        staticMember.registerStateListener(memberStateListener);
+        staticMember.onSubscriptionUpdated();
+        staticMember.onConsumerPoll();
+        assertEquals(MemberState.JOINING, staticMember.state());
+        
staticMember.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+        assertEquals(MemberState.LEAVING, staticMember.state());
+        assertTrue(staticMember.isLeavingGroup());
+    }
+
+    @Test
+    public void 
testIsLeavingGroupReturnsFalseForDynamicMemberWithRemainInGroupOperation() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
"topic");
+        joining();
+        
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+        assertEquals(MemberState.LEAVING, membershipManager.state());
+        assertFalse(membershipManager.isLeavingGroup());
+    }
+
     @Test
     public void testOnHeartbeatSuccessWhenInLeaving() {
         
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
"topic");
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
index c3810422458..26aa359583c 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.streams.CloseOptions;
+import org.apache.kafka.streams.GroupProtocol;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -57,8 +58,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.isEmptyConsumerGroup;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.isEmptyStreamGroup;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyStreamGroup;
 import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 
 @Tag("integration")
 @Timeout(600)
@@ -163,6 +168,84 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
         waitForEmptyConsumerGroup(adminClient, 
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
     }
 
+    @Test
+    public void testCloseOptionsRemainInGroupClassicProtocol() throws 
Exception {
+        // Classic + REMAIN_IN_GROUP: member must stay in group (no leave 
heartbeat).
+        // The group should still have a member immediately after close because
+        // the session timeout is set to Integer.MAX_VALUE.
+        streamsConfig.remove(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
+        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+        
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
+
+        
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
+            .withTimeout(Duration.ofSeconds(30)));
+
+        assertFalse(isEmptyConsumerGroup(adminClient, 
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)),
+            "Group should still have a member after REMAIN_IN_GROUP close 
(session timeout is MAX)");
+    }
+
+    @Test
+    public void testCloseOptionsDefaultClassicProtocol() throws Exception {
+        // Classic + DEFAULT: must behave like REMAIN_IN_GROUP (member stays 
in group).
+        streamsConfig.remove(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
+        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+        
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
+
+        
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.DEFAULT)
+            .withTimeout(Duration.ofSeconds(30)));
+
+        assertFalse(isEmptyConsumerGroup(adminClient, 
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)),
+            "Group should still have a member after DEFAULT close under 
Classic protocol");
+    }
+
+    @Test
+    public void testCloseOptionsLeaveGroupStreamsProtocol() throws Exception {
+        // Streams + LEAVE_GROUP: member must leave the group immediately.
+        streamsConfig.remove(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
+        streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
+        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+        
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
+
+        
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP)
+            .withTimeout(Duration.ofSeconds(30)));
+
+        waitForEmptyStreamGroup(adminClient, 
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
+    }
+
+    @Test
+    public void testCloseOptionsDefaultStreamsProtocol() throws Exception {
+        // Streams + DEFAULT: dynamic member must leave the group (consistent 
with Streams protocol design).
+        streamsConfig.remove(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
+        streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
+        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+        
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
+
+        
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.DEFAULT)
+            .withTimeout(Duration.ofSeconds(30)));
+
+        waitForEmptyStreamGroup(adminClient, 
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
+    }
+
+    @Test
+    public void testCloseOptionsRemainInGroupStreamsProtocol() throws 
Exception {
+        // Streams + REMAIN_IN_GROUP: member must stay in group (no leave 
heartbeat sent).
+        streamsConfig.remove(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
+        streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
+        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+        
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
+
+        
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
+            .withTimeout(Duration.ofSeconds(30)));
+
+        assertFalse(isEmptyStreamGroup(adminClient, 
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)),
+            "Group should still have a member after REMAIN_IN_GROUP close 
under Streams protocol");
+    }
+
     protected Topology setupTopologyWithoutIntermediateUserTopic() {
         final StreamsBuilder builder = new StreamsBuilder();
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 832e8eb1a06..1fb35121a92 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration.utils;
 
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.StreamsGroupDescription;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -984,6 +985,22 @@ public class IntegrationTestUtils {
         }
     }
 
+    private static class StreamGroupInactiveCondition implements TestCondition 
{
+        private final Admin adminClient;
+        private final String applicationId;
+
+        private StreamGroupInactiveCondition(final Admin adminClient,
+                                               final String applicationId) {
+            this.adminClient = adminClient;
+            this.applicationId = applicationId;
+        }
+
+        @Override
+        public boolean conditionMet() {
+            return isEmptyStreamGroup(adminClient, applicationId);
+        }
+    }
+
     public static void waitForEmptyConsumerGroup(final Admin adminClient,
                                                  final String applicationId,
                                                  final long timeoutMs) throws 
Exception {
@@ -994,6 +1011,16 @@ public class IntegrationTestUtils {
         );
     }
 
+    public static void waitForEmptyStreamGroup(final Admin adminClient,
+                                                 final String applicationId,
+                                                 final long timeoutMs) throws 
Exception {
+        TestUtils.waitForCondition(
+                new 
IntegrationTestUtils.StreamGroupInactiveCondition(adminClient, applicationId),
+                timeoutMs,
+                "Test stream group " + applicationId + " still active even 
after waiting " + timeoutMs + " ms."
+        );
+    }
+
     public static boolean isEmptyConsumerGroup(final Admin adminClient,
                                                final String applicationId) {
         try {
@@ -1010,6 +1037,22 @@ public class IntegrationTestUtils {
         }
     }
 
+    public static boolean isEmptyStreamGroup(final Admin adminClient,
+                                             final String applicationId) {
+        try {
+            final StreamsGroupDescription groupDescription =
+                    
adminClient.describeStreamsGroups(singletonList(applicationId))
+                            .describedGroups()
+                            .get(applicationId)
+                            .get();
+            return groupDescription.members().isEmpty();
+        } catch (final ExecutionException e) {
+            return e.getCause() instanceof GroupIdNotFoundException;
+        } catch (final InterruptedException e) {
+            return false;
+        }
+    }
+
     @SuppressWarnings("deprecation")
     private static StateListener getStateListener(final KafkaStreams streams) {
         try {
diff --git a/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java 
b/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java
index 2d74d1f668b..9e0158be9d2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java
+++ b/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java
@@ -25,22 +25,38 @@ public class CloseOptions {
      * Enum to specify the group membership operation upon closing the Kafka 
Streams application.
      *
      * <ul>
-     *   <li><b>{@code LEAVE_GROUP}</b>: means the consumer leave the 
group.</li>
-     *   <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will not leave 
the group explicitly.
-     *       Note that this option is ignored when using the streams group 
protocol
-     *       ({@code group.protocol=streams}); in that case, the consumer will 
always leave the group.</li>
+     *   <li><b>{@code DEFAULT}</b>: Applies the default behavior based on the 
active protocol and membership type:
+     *     <ul>
+     *       <li>For the <b>classic protocol</b>: The consumer will remain in 
the group.</li>
+     *       <li>For the <b>streams protocol</b> ({@code 
group.protocol=streams}):
+     *         <ul>
+     *           <li><b>Dynamic members</b>: The consumer will leave the group 
(consistent with the
+     *               protocol's design for dynamic members).</li>
+     *           <li><b>Static members</b>: The consumer will remain in the 
group until session timeout
+     *               (consistent with static membership semantics).</li>
+     *         </ul>
+     *       </li>
+     *     </ul>
+     *   </li>
+     *   <li><b>{@code LEAVE_GROUP}</b>: The consumer will explicitly leave 
the group, regardless of the
+     *       active protocol.</li>
+     *   <li><b>{@code REMAIN_IN_GROUP}</b>: The consumer will remain in the 
group, regardless of the
+     *       active protocol. Under the streams protocol, this enables a 
faster rebalance upon restart
+     *       for static members.</li>
      * </ul>
      */
     public enum GroupMembershipOperation {
+        DEFAULT,
         LEAVE_GROUP,
         REMAIN_IN_GROUP
     }
 
     /**
      * Specifies the group membership operation upon shutdown.
-     * By default, {@code GroupMembershipOperation.REMAIN_IN_GROUP} will be 
applied, which follows the KafkaStreams default behavior.
+     * By default, {@code GroupMembershipOperation.DEFAULT} will be applied, 
which adapts the behavior
+     * based on the active protocol.
      */
-    protected GroupMembershipOperation operation = 
GroupMembershipOperation.REMAIN_IN_GROUP;
+    protected GroupMembershipOperation operation = 
GroupMembershipOperation.DEFAULT;
 
     /**
      * Specifies the maximum amount of time to wait for the close process to 
complete.
@@ -69,7 +85,8 @@ public class CloseOptions {
     /**
      * Static method to create a {@code CloseOptions} with a specified group 
membership operation.
      *
-     * @param operation the group membership operation to apply. Must be one 
of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
+     * @param operation the group membership operation to apply. Must be one 
of {@code DEFAULT},
+     *                  {@code LEAVE_GROUP}, or {@code REMAIN_IN_GROUP}.
      * @return a new {@code CloseOptions} instance with the specified group 
membership operation.
      */
     public static CloseOptions groupMembershipOperation(final 
GroupMembershipOperation operation) {
@@ -90,7 +107,8 @@ public class CloseOptions {
     /**
      * Fluent method to set the group membership operation upon shutdown.
      *
-     * @param operation the group membership operation to apply. Must be one 
of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
+     * @param operation the group membership operation to apply. Must be one 
of {@code DEFAULT},
+     *                  {@code LEAVE_GROUP}, or {@code REMAIN_IN_GROUP}.
      * @return this {@code CloseOptions} instance.
      */
     public CloseOptions withGroupMembershipOperation(final 
GroupMembershipOperation operation) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b790f10e398..851fced582c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -489,7 +489,10 @@ public class KafkaStreams implements AutoCloseable {
             closeToError();
         }
         final StreamThread deadThread = (StreamThread) Thread.currentThread();
-        
deadThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+        // Use DEFAULT so the consumer layer decides: classic protocol maps to 
REMAIN_IN_GROUP
+        // (avoiding an unnecessary rebalance before the replacement thread 
joins), while Streams
+        // protocol adapts to static vs dynamic membership.
+        
deadThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.DEFAULT);
         addStreamThread();
         if (throwable instanceof RuntimeException) {
             throw (RuntimeException) throwable;
@@ -1456,11 +1459,17 @@ public class KafkaStreams implements AutoCloseable {
      * Shutdown this {@code KafkaStreams} instance by signaling all the 
threads to stop, and then wait for them to join.
      * This will block until all threads have stopped.
      * <p>
-     * When using the classic protocol, the consumer will not leave the group 
explicitly. However, when using
-     * the streams group protocol ({@code group.protocol=streams}), the 
consumer will always leave the group.
+     * Uses {@link 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation#DEFAULT DEFAULT} 
behavior,
+     * which adapts based on the active protocol:
+     * <ul>
+     *   <li>Classic protocol: the consumer remains in the group (no explicit 
leave).</li>
+     *   <li>Streams protocol ({@code group.protocol=streams}): dynamic 
members leave the group;
+     *       static members (with {@code group.instance.id}) remain in the 
group and are removed
+     *       by the broker after the session timeout.</li>
+     * </ul>
      */
     public void close() {
-        close(Optional.empty(), 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+        close(Optional.empty(), 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.DEFAULT);
     }
 
     private Thread shutdownHelper(
@@ -1604,8 +1613,14 @@ public class KafkaStreams implements AutoCloseable {
      * A {@code timeout} of {@link Duration#ZERO} (or any other zero duration) 
makes the close operation asynchronous.
      * Negative-duration timeouts are rejected.
      * <p>
-     * When using the classic protocol, the consumer will not leave the group 
explicitly. However, when using
-     * the streams group protocol ({@code group.protocol=streams}), the 
consumer will always leave the group.
+     * Uses {@link 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation#DEFAULT DEFAULT} 
behavior,
+     * which adapts based on the active protocol:
+     * <ul>
+     *   <li>Classic protocol: the consumer remains in the group (no explicit 
leave).</li>
+     *   <li>Streams protocol ({@code group.protocol=streams}): dynamic 
members leave the group;
+     *       static members (with {@code group.instance.id}) remain in the 
group and are removed
+     *       by the broker after the session timeout.</li>
+     * </ul>
      *
      * @param timeout how long to wait for the threads to shut down
      * @return {@code true} if all threads were successfully 
stopped&mdash;{@code false} if the timeout was reached
@@ -1620,7 +1635,7 @@ public class KafkaStreams implements AutoCloseable {
             throw new IllegalArgumentException("Timeout can't be negative.");
         }
 
-        return close(Optional.of(timeoutMs), 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+        return close(Optional.of(timeoutMs), 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.DEFAULT);
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 882b427d1ca..6d1bf0c2937 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -100,6 +100,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
+import static 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.DEFAULT;
 import static 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.LEAVE_GROUP;
 import static 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP;
 import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
@@ -373,7 +374,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
     // These are used to signal from outside the stream thread, but the 
variables themselves are internal to the thread
     private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
     private final 
AtomicReference<org.apache.kafka.streams.CloseOptions.GroupMembershipOperation> 
leaveGroupRequested =
-        new 
AtomicReference<>(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+        new 
AtomicReference<>(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.DEFAULT);
     private final AtomicLong lastShutdownWarningTimestamp = new AtomicLong(0L);
     private final boolean eosEnabled;
     private final boolean processingThreadsEnabled;
@@ -1892,10 +1893,21 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             log.error("Failed to close changelog reader due to the following 
error:", e);
         }
         try {
-            final GroupMembershipOperation membershipOperation =
-                leaveGroupRequested.get() == 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP ? 
LEAVE_GROUP : REMAIN_IN_GROUP;
-            if (membershipOperation == REMAIN_IN_GROUP && 
streamsRebalanceData.isPresent()) {
-                log.info("The consumer will leave the group since the streams 
group protocol is used");
+            final 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation streamsOperation 
= leaveGroupRequested.get();
+            final GroupMembershipOperation membershipOperation;
+            if (streamsOperation == 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP) {
+                membershipOperation = LEAVE_GROUP;
+            } else if (streamsOperation == 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP) 
{
+                membershipOperation = REMAIN_IN_GROUP;
+            } else {
+                // DEFAULT: adapt to the active protocol
+                if (streamsRebalanceData.isPresent()) {
+                    // Streams protocol: delegate to the consumer, which 
adapts to static vs dynamic
+                    membershipOperation = DEFAULT;
+                } else {
+                    // Classic protocol: remain in the group (preserves 
existing behavior)
+                    membershipOperation = REMAIN_IN_GROUP;
+                }
             }
             
mainConsumer.close(CloseOptions.groupMembershipOperation(membershipOperation));
         } catch (final Throwable e) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 11607240eb2..4ee975805dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -78,6 +78,7 @@ import org.mockito.MockedStatic;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
+import org.mockito.stubbing.Answer;
 
 import java.net.InetSocketAddress;
 import java.time.Duration;
@@ -310,7 +311,7 @@ public class KafkaStreamsTest {
     }
 
     private void prepareConsumer(final StreamThread thread, final 
AtomicReference<StreamThread.State> state) {
-        doAnswer(invocation -> {
+        final Answer<Object> shutdownAnswer = invocation -> {
             supplier.consumer.close(
                 
org.apache.kafka.clients.consumer.CloseOptions.groupMembershipOperation(org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
             );
@@ -325,7 +326,9 @@ public class KafkaStreamsTest {
             threadStateListenerCapture.getValue().onChange(thread, 
StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
             threadStateListenerCapture.getValue().onChange(thread, 
StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
             return null;
-        
}).when(thread).shutdown(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+        };
+        
doAnswer(shutdownAnswer).when(thread).shutdown(CloseOptions.GroupMembershipOperation.DEFAULT);
+        
doAnswer(shutdownAnswer).when(thread).shutdown(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
     }
 
     private void prepareThreadLock(final StreamThread thread) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 4a393060d62..01aa4279c43 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -110,6 +110,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.Mockito;
@@ -1503,6 +1504,98 @@ public class StreamThreadTest {
         verify(taskManager).shutdown(true);
     }
 
+    @Test
+    public void shouldRouteDefaultToRemainInGroupForClassicProtocol() {
+        // Classic protocol: DEFAULT should map to consumer REMAIN_IN_GROUP
+        final TaskManager taskManager = mock(TaskManager.class);
+        final StreamsConfig config = new StreamsConfig(configProps(false, 
false));
+        final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
+        topologyMetadata.buildAndRewriteTopology();
+        // buildStreamThread uses Optional.empty() for streamsRebalanceData 
(Classic protocol)
+        thread = buildStreamThread(consumer, taskManager, config, 
topologyMetadata)
+            .updateThreadMetadata(adminClientId(CLIENT_ID));
+
+        
thread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.DEFAULT);
+
+        final ArgumentCaptor<org.apache.kafka.clients.consumer.CloseOptions> 
captor =
+                
ArgumentCaptor.forClass(org.apache.kafka.clients.consumer.CloseOptions.class);
+        // buildStreamThread passes consumer as mainConsumer, so close() is 
called on consumer
+        verify(consumer).close(captor.capture());
+        assertEquals(
+                
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP,
+                captor.getValue().groupMembershipOperation()
+        );
+    }
+
+    @Test
+    public void shouldRouteDefaultToConsumerDefaultForStreamsProtocol() {
+        // Streams protocol: DEFAULT should map to consumer DEFAULT (dynamic 
member leaves)
+        final TaskManager taskManager = mock(TaskManager.class);
+        final StreamsConfig config = new StreamsConfig(configProps(false, 
false));
+        final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
+        when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
+            UUID.randomUUID(), Optional.empty(), Optional.empty(), Map.of(), 
Map.of());
+        thread = new StreamThread(
+            mockTime, config, null,
+            mainConsumer, consumer,
+            changelogReader, null, taskManager, null,
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime),
+            new TopologyMetadata(internalTopologyBuilder, config),
+            PROCESS_ID, CLIENT_ID, new LogContext(""),
+            null, new AtomicLong(Long.MAX_VALUE), new LinkedList<>(),
+            null, HANDLER, null,
+            Optional.of(streamsRebalanceData), null, null
+        ).updateThreadMetadata(adminClientId(CLIENT_ID));
+
+        
thread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.DEFAULT);
+
+        final ArgumentCaptor<org.apache.kafka.clients.consumer.CloseOptions> 
captor =
+                
ArgumentCaptor.forClass(org.apache.kafka.clients.consumer.CloseOptions.class);
+        verify(mainConsumer).close(captor.capture());
+        assertEquals(
+                
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.DEFAULT,
+                captor.getValue().groupMembershipOperation()
+        );
+    }
+
+    @Test
+    public void shouldRouteRemainInGroupToRemainInGroupForStreamsProtocol() {
+        // Streams protocol: explicit REMAIN_IN_GROUP must always map to 
consumer REMAIN_IN_GROUP
+        final TaskManager taskManager = mock(TaskManager.class);
+        final StreamsConfig config = new StreamsConfig(configProps(false, 
false));
+        final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
+        when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
+            UUID.randomUUID(), Optional.empty(), Optional.empty(), Map.of(), 
Map.of());
+        thread = new StreamThread(
+            mockTime, config, null,
+            mainConsumer, consumer,
+            changelogReader, null, taskManager, null,
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime),
+            new TopologyMetadata(internalTopologyBuilder, config),
+            PROCESS_ID, CLIENT_ID, new LogContext(""),
+            null, new AtomicLong(Long.MAX_VALUE), new LinkedList<>(),
+            null, HANDLER, null,
+            Optional.of(streamsRebalanceData), null, null
+        ).updateThreadMetadata(adminClientId(CLIENT_ID));
+
+        
thread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+
+        final ArgumentCaptor<org.apache.kafka.clients.consumer.CloseOptions> 
captor =
+                
ArgumentCaptor.forClass(org.apache.kafka.clients.consumer.CloseOptions.class);
+        verify(mainConsumer).close(captor.capture());
+        assertEquals(
+                
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP,
+                captor.getValue().groupMembershipOperation()
+        );
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void 
shouldNotThrowWhenStandbyTasksAssignedAndNoStateStoresForTopology(final boolean 
processingThreadsEnabled) {

Reply via email to