This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d9c3b925884 KAFKA-20167 Introduce CloseOptions.DEFAULT for Kafka
Streams (#21579)
d9c3b925884 is described below
commit d9c3b9258849487b29e915d058f61e08074d51e5
Author: Ken Huang <[email protected]>
AuthorDate: Thu May 28 00:49:16 2026 +0800
KAFKA-20167 Introduce CloseOptions.DEFAULT for Kafka Streams (#21579)
This patch is addressed the
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1284+Introduce+CloseOptions.DEFAULT+for+Kafka+Streams
Reviewers: Lucas Brutschy <[email protected]>
---
.../consumer/internals/RequestManagers.java | 1 +
.../StreamsGroupHeartbeatRequestManager.java | 47 +++++++-
.../internals/StreamsMembershipManager.java | 99 ++++++++++++-----
.../events/ApplicationEventProcessor.java | 2 +-
.../StreamsGroupHeartbeatRequestManagerTest.java | 51 +++++++++
.../internals/StreamsMembershipManagerTest.java | 118 ++++++++++++++++++++-
.../KafkaStreamsCloseOptionsIntegrationTest.java | 83 +++++++++++++++
.../integration/utils/IntegrationTestUtils.java | 43 ++++++++
.../org/apache/kafka/streams/CloseOptions.java | 34 ++++--
.../org/apache/kafka/streams/KafkaStreams.java | 29 +++--
.../streams/processor/internals/StreamThread.java | 22 +++-
.../org/apache/kafka/streams/KafkaStreamsTest.java | 7 +-
.../processor/internals/StreamThreadTest.java | 93 ++++++++++++++++
13 files changed, 567 insertions(+), 62 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index dca5067bf4f..36ebbb3bbf3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -222,6 +222,7 @@ public class RequestManagers implements Closeable {
if (streamsRebalanceData.isPresent()) {
streamsMembershipManager = new
StreamsMembershipManager(
groupRebalanceConfig.groupId,
+ groupRebalanceConfig.groupInstanceId,
streamsRebalanceData.get(),
subscriptions,
backgroundEventHandler,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index f7b551e812d..4080439283d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -50,6 +50,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP;
import static
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY;
import static
org.apache.kafka.clients.consumer.internals.RequestState.RETRY_BACKOFF_JITTER;
@@ -384,6 +385,13 @@ public class StreamsGroupHeartbeatRequestManager
implements RequestManager {
heartbeatState.reset();
return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(),
Collections.singletonList(leaveHeartbeat));
}
+ if (membershipManager.state() == MemberState.LEAVING &&
shouldSkipLeaveHeartbeat()) {
+ logger.info("Dynamic member {} skipping leave heartbeat
(operation=REMAIN_IN_GROUP). " +
+ "The broker will remove the member from the group via session
timeout.",
+ membershipManager.memberId());
+ membershipManager.onHeartbeatRequestSkipped();
+ return EMPTY;
+ }
if (shouldHeartbeatBeforeIntervalExpires() ||
heartbeatRequestState.canSendRequest(currentTimeMs)) {
NetworkClientDelegate.UnsentRequest request =
makeHeartbeatRequestAndHandleResponse(currentTimeMs);
return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(),
Collections.singletonList(request));
@@ -411,7 +419,7 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
*/
@Override
public NetworkClientDelegate.PollResult pollOnClose(long currentTimeMs) {
- if (membershipManager.isLeavingGroup()) {
+ if (membershipManager.isLeavingGroup() && !shouldSkipLeaveHeartbeat())
{
NetworkClientDelegate.UnsentRequest request =
makeHeartbeatRequestAndLogResponse(currentTimeMs);
return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(),
List.of(request));
}
@@ -458,7 +466,7 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
/**
* A heartbeat should be sent without waiting for the heartbeat interval
to expire if:
- * - the member is leaving the group
+ * - the member should send a leave heartbeat (see {@link
#shouldSendLeaveHeartbeat()})
* or
* - the member is joining the group or acknowledging the assignment and
for both cases there is no heartbeat request
* in flight.
@@ -466,12 +474,41 @@ public class StreamsGroupHeartbeatRequestManager
implements RequestManager {
* @return true if a heartbeat should be sent before the interval expires,
false otherwise
*/
private boolean shouldHeartbeatBeforeIntervalExpires() {
- return membershipManager.state() == MemberState.LEAVING
- ||
- (membershipManager.state() == MemberState.JOINING ||
membershipManager.state() == MemberState.ACKNOWLEDGING)
+ return shouldSendLeaveHeartbeat()
+ || (membershipManager.state() == MemberState.JOINING ||
membershipManager.state() == MemberState.ACKNOWLEDGING)
&& !heartbeatRequestState.requestInFlight();
}
+ /**
+ * Returns whether a leave group heartbeat should be sent. The leave
heartbeat is skipped only
+ * when the member is dynamic (no group instance ID) and the operation is
+ * {@link
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation#REMAIN_IN_GROUP}.
+ * Static members always send the leave heartbeat (with epoch -2) so the
broker holds the
+ * assignment until the session timeout.
+ *
+ * @return true if a leave heartbeat should be sent, false otherwise
+ */
+ private boolean shouldSendLeaveHeartbeat() {
+ if (shouldSkipLeaveHeartbeat()) {
+ logger.debug("Member {} skipping leave heartbeat (operation={},
static={}).",
+ membershipManager.memberId(),
+ membershipManager.leaveGroupOperation(),
+ membershipManager.groupInstanceId().isPresent());
+ return false;
+ }
+ return membershipManager.state() == MemberState.LEAVING;
+ }
+
+ /**
+ * Returns true if the leave heartbeat should be skipped: only when the
member is dynamic
+ * (no group instance ID) and the operation is REMAIN_IN_GROUP. Static
members always send
+ * a leave heartbeat (with epoch -2) so the broker can hold the assignment.
+ */
+ private boolean shouldSkipLeaveHeartbeat() {
+ return REMAIN_IN_GROUP == membershipManager.leaveGroupOperation()
+ && membershipManager.groupInstanceId().isEmpty();
+ }
+
private void maybePropagateCoordinatorFatalErrorEvent() {
coordinatorRequestManager.getAndClearFatalError()
.ifPresent(fatalError -> backgroundEventHandler.add(new
ErrorEvent(fatalError)));
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
index a378cb0570b..f2d402de171 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.consumer.CloseOptions;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent;
import
org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackNeededEvent;
@@ -198,7 +199,7 @@ public class StreamsMembershipManager implements
RequestManager {
/**
* Group instance ID to be used by a static member, provided when creating
the current membership manager.
*/
- private final Optional<String> groupInstanceId = Optional.empty();
+ private final Optional<String> groupInstanceId;
/**
* Current epoch of the member. It will be set to 0 by the member, and
provided to the server
@@ -210,12 +211,20 @@ public class StreamsMembershipManager implements
RequestManager {
/**
* If the member is currently leaving the group after a call to {@link
#leaveGroup()} or
- * {@link #leaveGroupOnClose()}, this will have a future that will
complete when the ongoing leave operation
- * completes (callbacks executed and heartbeat request to leave is sent
out). This will be empty if the
- * member is not leaving.
+ * {@link #leaveGroupOnClose(CloseOptions.GroupMembershipOperation)}, this
will have a future that will
+ * complete when the ongoing leave operation completes (callbacks executed
and heartbeat request to leave
+ * is sent out). This will be empty if the member is not leaving.
*/
private Optional<CompletableFuture<Void>> leaveGroupInProgress =
Optional.empty();
+ /**
+ * The operation the member will perform on leaving the group. Remains
{@code DEFAULT} until the
+ * member is closing.
+ *
+ * @see CloseOptions.GroupMembershipOperation
+ */
+ private CloseOptions.GroupMembershipOperation leaveGroupOperation =
CloseOptions.GroupMembershipOperation.DEFAULT;
+
/**
* Future that will complete when a stale member completes releasing its
assignment after
* leaving the group due to poll timer expired. Used to make sure that the
member rejoins
@@ -293,6 +302,7 @@ public class StreamsMembershipManager implements
RequestManager {
* @param metrics The metrics.
*/
public StreamsMembershipManager(final String groupId,
+ final Optional<String> groupInstanceId,
final StreamsRebalanceData
streamsRebalanceData,
final SubscriptionState subscriptionState,
final BackgroundEventHandler
backgroundEventHandler,
@@ -302,6 +312,7 @@ public class StreamsMembershipManager implements
RequestManager {
log = logContext.logger(StreamsMembershipManager.class);
this.state = MemberState.UNSUBSCRIBED;
this.groupId = groupId;
+ this.groupInstanceId = groupInstanceId;
this.backgroundEventHandler = backgroundEventHandler;
this.streamsRebalanceData = streamsRebalanceData;
this.subscriptionState = subscriptionState;
@@ -347,12 +358,32 @@ public class StreamsMembershipManager implements
RequestManager {
}
/**
- * @return True if the member is preparing to leave the group (waiting for
callbacks), or
- * leaving (sending last heartbeat). This is used to skip proactively
leaving the group when
- * the poll timer expires.
+ * @return the operation the member will perform on leaving the group.
+ */
+ public CloseOptions.GroupMembershipOperation leaveGroupOperation() {
+ return leaveGroupOperation;
+ }
+
+ /**
+ * @return True if the member is preparing to leave the group or leaving
and a leave heartbeat
+ * should be sent. Returns false for dynamic members with
REMAIN_IN_GROUP, which skip
+ * the leave heartbeat entirely.
*/
public boolean isLeavingGroup() {
- return state == MemberState.PREPARE_LEAVING || state ==
MemberState.LEAVING;
+ if (CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP ==
leaveGroupOperation && groupInstanceId.isEmpty()) {
+ return false;
+ }
+ MemberState currentState = state();
+ boolean isLeavingState = currentState == MemberState.PREPARE_LEAVING
|| currentState == MemberState.LEAVING;
+ boolean hasLeaveOperation =
+ // Default operation: both static and dynamic members will send a
leave heartbeat
+ CloseOptions.GroupMembershipOperation.DEFAULT ==
leaveGroupOperation
+ // Leave group operation: both static and dynamic members will
send a leave heartbeat
+ || CloseOptions.GroupMembershipOperation.LEAVE_GROUP ==
leaveGroupOperation
+ // Remain in group: static members will send a leave heartbeat
with -2 epoch to signal
+ // that a member using this instance ID is temporarily gone and
will rejoin within session timeout.
+ || groupInstanceId.isPresent();
+ return isLeavingState && hasLeaveOperation;
}
private boolean isNotInGroup() {
@@ -426,6 +457,7 @@ public class StreamsMembershipManager implements
RequestManager {
if (reconciliationInProgress) {
rejoinedWhileReconciliationInProgress = true;
}
+ leaveGroupOperation = CloseOptions.GroupMembershipOperation.DEFAULT;
resetEpoch();
transitionTo(MemberState.JOINING);
clearCurrentTaskAssignment();
@@ -464,10 +496,26 @@ public class StreamsMembershipManager implements
RequestManager {
}
private void finalizeLeaving() {
-
updateMemberEpoch(StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH);
+ updateMemberEpoch(leaveGroupEpoch());
clearCurrentTaskAssignment();
}
+ /**
+ * Returns the epoch to use in the leave group heartbeat. Static members
use (LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+ * so the broker holds the assignment until session timeout, unless the
operation is LEAVE_GROUP which forces
+ * permanent removal.
+ */
+ public int leaveGroupEpoch() {
+ boolean isStaticMember = groupInstanceId.isPresent();
+
+ if (CloseOptions.GroupMembershipOperation.LEAVE_GROUP ==
leaveGroupOperation) {
+ return StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+ }
+ return isStaticMember
+ ? StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH
+ : StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+ }
+
/**
* Transition to STALE to release assignments because the member has left
the group due to
* expired poll timer. This will trigger the onAllTasksLost callback. Once
the callback
@@ -532,16 +580,17 @@ public class StreamsMembershipManager implements
RequestManager {
}
/**
- * Notify when the heartbeat request is skipped.
* Transition out of the {@link MemberState#LEAVING} state even if the
heartbeat was not sent.
- * This will ensure that the member is not blocked on {@link
MemberState#LEAVING} (best
+ * This will ensure that the member is not blocked on {@link
MemberState#LEAVING}. (best
* effort to send the request, without any response handling or retry
logic)
*/
public void onHeartbeatRequestSkipped() {
if (state == MemberState.LEAVING) {
- log.warn("Heartbeat to leave group cannot be sent (most probably
due to coordinator " +
- "not known/available). Member {} with epoch {} will
transition to {}.",
- memberId, memberEpoch, MemberState.UNSUBSCRIBED);
+ if (isLeavingGroup()) {
+ log.warn("Heartbeat to leave group cannot be sent (most
probably due to coordinator " +
+ "not known/available). Member {} with epoch {} will
transition to {}.",
+ memberId, memberEpoch, MemberState.UNSUBSCRIBED);
+ }
transitionTo(MemberState.UNSUBSCRIBED);
maybeCompleteLeaveInProgress();
}
@@ -899,23 +948,15 @@ public class StreamsMembershipManager implements
RequestManager {
}
/**
- * Leaves the group when the member closes.
- *
- * <p>
- * This method does the following:
- * <ol>
- * <li>Transitions member state to {@link
MemberState#PREPARE_LEAVING}.</li>
- * <li>Skips the invocation of the revocation callback or lost
callback.</li>
- * <li>Clears the current and target assignment, unsubscribes from all
topics and
- * transitions the member state to {@link MemberState#LEAVING}.</li>
- * </ol>
- * States {@link MemberState#PREPARE_LEAVING} and {@link
MemberState#LEAVING} cause the heartbeat request manager
- * to send a leave group heartbeat.
- * </p>
+ * Closes the member's participation in the group, honoring the requested
{@link CloseOptions.GroupMembershipOperation}.
+ * Stores the operation and follows the normal leaving path; {@link
StreamsGroupHeartbeatRequestManager}
+ * decides whether to send or skip the leave group heartbeat based on the
operation.
*
- * @return future that will complete when the heartbeat to leave the group
has been sent out.
+ * @param membershipOperation the requested close behavior
+ * @return future that will complete when the close operation is done
*/
- public CompletableFuture<Void> leaveGroupOnClose() {
+ public CompletableFuture<Void> leaveGroupOnClose(final
CloseOptions.GroupMembershipOperation membershipOperation) {
+ this.leaveGroupOperation = membershipOperation;
return leaveGroup(true);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 76636c58d76..be76ac88ba3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -499,7 +499,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
future.whenComplete(complete(event.future()));
} else if (requestManagers.streamsMembershipManager.isPresent()) {
log.debug("Signal the StreamsMembershipManager to leave the
streams group since the member is closing");
- CompletableFuture<Void> future =
requestManagers.streamsMembershipManager.get().leaveGroupOnClose();
+ CompletableFuture<Void> future =
requestManagers.streamsMembershipManager.get().leaveGroupOnClose(event.membershipOperation());
future.whenComplete(complete(event.future()));
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index 6c61f13eb3d..3ede3565c11 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
@@ -68,6 +69,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+import static
org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -332,6 +334,55 @@ class StreamsGroupHeartbeatRequestManagerTest {
}
}
+ @Test
+ public void testSkipLeaveHeartbeatForRemainInGroupWithDynamicMember() {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager =
createStreamsGroupHeartbeatRequestManager();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+ when(membershipManager.state()).thenReturn(MemberState.LEAVING);
+
when(membershipManager.leaveGroupOperation()).thenReturn(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+ when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());
+
+ final NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(0, result.unsentRequests.size());
+ verify(membershipManager).onHeartbeatRequestSkipped();
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = CloseOptions.GroupMembershipOperation.class, names =
{"DEFAULT", "REMAIN_IN_GROUP"})
+ public void testSendLeaveHeartbeatForStaticMember(final
CloseOptions.GroupMembershipOperation operation) {
+ // Static members always send a leave heartbeat (with epoch -2) so the
broker can hold the
+ // assignment until session timeout, regardless of the close operation.
+ final long heartbeatIntervalMs = 1234;
+ try (
+ final MockedConstruction<HeartbeatRequestState> ignored =
mockConstruction(
+ HeartbeatRequestState.class,
+ (mock, context) -> {
+
when(mock.canSendRequest(time.milliseconds())).thenReturn(false);
+
when(mock.heartbeatIntervalMs()).thenReturn(heartbeatIntervalMs);
+ when(mock.requestInFlight()).thenReturn(false);
+ })
+ ) {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+ when(membershipManager.state()).thenReturn(MemberState.LEAVING);
+
when(membershipManager.leaveGroupOperation()).thenReturn(operation);
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
+
when(membershipManager.memberEpoch()).thenReturn(LEAVE_GROUP_STATIC_MEMBER_EPOCH);
+ when(membershipManager.groupId()).thenReturn(GROUP_ID);
+ when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+
+ final NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(1, result.unsentRequests.size());
+ verify(membershipManager, never()).onHeartbeatRequestSkipped();
+ final StreamsGroupHeartbeatRequest req =
+ (StreamsGroupHeartbeatRequest)
result.unsentRequests.get(0).requestBuilder().build();
+ assertEquals(LEAVE_GROUP_STATIC_MEMBER_EPOCH,
req.data().memberEpoch());
+ assertEquals(INSTANCE_ID, req.data().instanceId());
+ }
+ }
+
@ParameterizedTest
@EnumSource(value = MemberState.class, names = {"JOINING",
"ACKNOWLEDGING"})
public void testSendingHeartbeatIfMemberIsJoiningOrAcknowledging(final
MemberState memberState) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
index ae91b7fd0f0..5761eefd09e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.consumer.CloseOptions;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent;
import
org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackNeededEvent;
@@ -125,6 +126,7 @@ public class StreamsMembershipManagerTest {
public void setup() {
membershipManager = new StreamsMembershipManager(
GROUP_ID,
+ Optional.empty(),
streamsRebalanceData, subscriptionState, backgroundEventHandler,
new LogContext("test"),
time,
@@ -1155,7 +1157,7 @@ public class StreamsMembershipManagerTest {
@Test
public void testLeaveGroupOnCloseWhenNotInGroup() {
- testLeaveGroupWhenNotInGroup(membershipManager::leaveGroupOnClose);
+ testLeaveGroupWhenNotInGroup(() ->
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT));
}
@Test
@@ -1231,7 +1233,7 @@ public class StreamsMembershipManagerTest {
@Test
public void testLeaveGroupOnCloseWhenNotInGroupAndFenced() {
-
testLeaveGroupOnCloseWhenNotInGroupAndFenced(membershipManager::leaveGroupOnClose);
+ testLeaveGroupOnCloseWhenNotInGroupAndFenced(() ->
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT));
}
private void testLeaveGroupOnCloseWhenNotInGroupAndFenced(final
Supplier<CompletableFuture<Void>> leaveGroup) {
@@ -1274,7 +1276,8 @@ public class StreamsMembershipManagerTest {
verifyInStatePrepareLeaving(membershipManager);
final CompletableFuture<Void> onGroupLeftBeforeRevocationCallback =
membershipManager.leaveGroup();
assertEquals(onGroupLeft, onGroupLeftBeforeRevocationCallback);
- final CompletableFuture<Void>
onGroupLeftOnCloseBeforeRevocationCallback =
membershipManager.leaveGroupOnClose();
+ final CompletableFuture<Void>
onGroupLeftOnCloseBeforeRevocationCallback =
+
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT);
assertEquals(onGroupLeft, onGroupLeftOnCloseBeforeRevocationCallback);
onTasksRevokedCallbackExecuted.complete(null);
verify(memberStateListener).onGroupAssignmentUpdated(Set.of());
@@ -1315,7 +1318,8 @@ public class StreamsMembershipManagerTest {
acknowledging(onTasksAssignedCallbackExecutedSetup);
- final CompletableFuture<Void> onGroupLeft =
membershipManager.leaveGroupOnClose();
+ final CompletableFuture<Void> onGroupLeft =
+
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT);
assertFalse(onGroupLeft.isDone());
verifyInStateLeaving(membershipManager);
@@ -1324,7 +1328,8 @@ public class StreamsMembershipManagerTest {
verify(backgroundEventHandler,
never()).add(any(StreamsOnTasksRevokedCallbackNeededEvent.class));
final CompletableFuture<Void>
onGroupLeftBeforeHeartbeatRequestGenerated = membershipManager.leaveGroup();
assertEquals(onGroupLeft, onGroupLeftBeforeHeartbeatRequestGenerated);
- final CompletableFuture<Void>
onGroupLeftOnCloseBeforeHeartbeatRequestGenerated =
membershipManager.leaveGroupOnClose();
+ final CompletableFuture<Void>
onGroupLeftOnCloseBeforeHeartbeatRequestGenerated =
+
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT);
assertEquals(onGroupLeft,
onGroupLeftOnCloseBeforeHeartbeatRequestGenerated);
assertFalse(onGroupLeft.isDone());
membershipManager.onHeartbeatRequestGenerated();
@@ -1379,6 +1384,45 @@ public class StreamsMembershipManagerTest {
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
Set.of(), Set.of());
}
+ @Test
+ public void testLeaveGroupOnCloseWithRemainInGroupSkipsLeaveHeartbeat() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
TOPIC_0);
+ final Set<StreamsRebalanceData.TaskId> activeTasks =
+ Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0,
PARTITION_0));
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0,
List.of(PARTITION_0)));
+ final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
+
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+ activeTasks, Set.of(), Set.of());
+ acknowledging(onTasksAssignedCallbackExecuted);
+
+ final CompletableFuture<Void> onGroupLeft =
+
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+
+ assertFalse(onGroupLeft.isDone());
+ assertEquals(MemberState.LEAVING, membershipManager.state());
+ assertEquals(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP,
membershipManager.leaveGroupOperation());
+ verify(backgroundEventHandler,
never()).add(any(StreamsOnTasksRevokedCallbackNeededEvent.class));
+
+ membershipManager.onHeartbeatRequestSkipped();
+
+ assertTrue(onGroupLeft.isDone());
+ assertFalse(onGroupLeft.isCompletedExceptionally());
+ verifyInStateUnsubscribed(membershipManager);
+ verify(subscriptionState).unsubscribe();
+ }
+
+ @Test
+ public void testLeaveGroupOnCloseWithRemainInGroupWhenNotInGroup() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
TOPIC_0);
+
+ final CompletableFuture<Void> onGroupLeft =
+
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+
+ assertTrue(onGroupLeft.isDone());
+ assertFalse(onGroupLeft.isCompletedExceptionally());
+ }
+
@Test
public void testOnHeartbeatRequestSkippedWhenInLeaving() {
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
"topic");
@@ -1404,6 +1448,70 @@ public class StreamsMembershipManagerTest {
assertFalse(future.isCompletedExceptionally());
}
+ @Test
+ public void testLeaveGroupEpochIsStaticMemberEpochForStaticMember() {
+ final StreamsMembershipManager staticMember = new
StreamsMembershipManager(
+ GROUP_ID,
+ Optional.of("instance-1"),
+ streamsRebalanceData, subscriptionState, backgroundEventHandler,
+ new LogContext("test"), time, new Metrics(time)
+ );
+
assertEquals(StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH,
staticMember.leaveGroupEpoch());
+ }
+
+ @Test
+ public void
testLeaveGroupEpochIsDynamicMemberEpochForStaticMemberWithLeaveGroupOperation()
{
+ final StreamsMembershipManager staticMember = new
StreamsMembershipManager(
+ GROUP_ID,
+ Optional.of("instance-1"),
+ streamsRebalanceData, subscriptionState, backgroundEventHandler,
+ new LogContext("test"), time, new Metrics(time)
+ );
+ staticMember.registerStateListener(memberStateListener);
+
staticMember.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
+ assertEquals(StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH,
staticMember.leaveGroupEpoch());
+ }
+
+ @Test
+ public void
testLeaveGroupEpochIsStaticMemberEpochForStaticMemberWithRemainInGroup() {
+ final StreamsMembershipManager staticMember = new
StreamsMembershipManager(
+ GROUP_ID,
+ Optional.of("instance-1"),
+ streamsRebalanceData, subscriptionState, backgroundEventHandler,
+ new LogContext("test"), time, new Metrics(time)
+ );
+ staticMember.registerStateListener(memberStateListener);
+
staticMember.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+
assertEquals(StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH,
staticMember.leaveGroupEpoch());
+ }
+
+ @Test
+ public void
testIsLeavingGroupReturnsTrueForStaticMemberWithRemainInGroupOperation() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
"topic");
+ final StreamsMembershipManager staticMember = new
StreamsMembershipManager(
+ GROUP_ID,
+ Optional.of("instance-1"),
+ streamsRebalanceData, subscriptionState, backgroundEventHandler,
+ new LogContext("test"), time, new Metrics(time)
+ );
+ staticMember.registerStateListener(memberStateListener);
+ staticMember.onSubscriptionUpdated();
+ staticMember.onConsumerPoll();
+ assertEquals(MemberState.JOINING, staticMember.state());
+
staticMember.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+ assertEquals(MemberState.LEAVING, staticMember.state());
+ assertTrue(staticMember.isLeavingGroup());
+ }
+
+ @Test
+ public void
testIsLeavingGroupReturnsFalseForDynamicMemberWithRemainInGroupOperation() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
"topic");
+ joining();
+
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+ assertEquals(MemberState.LEAVING, membershipManager.state());
+ assertFalse(membershipManager.isLeavingGroup());
+ }
+
@Test
public void testOnHeartbeatSuccessWhenInLeaving() {
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
"topic");
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
index c3810422458..26aa359583c 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.streams.CloseOptions;
+import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@@ -57,8 +58,12 @@ import java.util.Collections;
import java.util.List;
import java.util.Properties;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.isEmptyConsumerGroup;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.isEmptyStreamGroup;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyStreamGroup;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.junit.jupiter.api.Assertions.assertFalse;
@Tag("integration")
@Timeout(600)
@@ -163,6 +168,84 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
waitForEmptyConsumerGroup(adminClient,
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
}
+ @Test
+ public void testCloseOptionsRemainInGroupClassicProtocol() throws
Exception {
+ // Classic + REMAIN_IN_GROUP: member must stay in group (no leave
heartbeat).
+ // The group should still have a member immediately after close because
+ // the session timeout is set to Integer.MAX_VALUE.
+ streamsConfig.remove(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
+ streams = new
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
+
+
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
+ .withTimeout(Duration.ofSeconds(30)));
+
+ assertFalse(isEmptyConsumerGroup(adminClient,
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)),
+ "Group should still have a member after REMAIN_IN_GROUP close
(session timeout is MAX)");
+ }
+
+ @Test
+ public void testCloseOptionsDefaultClassicProtocol() throws Exception {
+ // Classic + DEFAULT: must behave like REMAIN_IN_GROUP (member stays
in group).
+ streamsConfig.remove(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
+ streams = new
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
+
+
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.DEFAULT)
+ .withTimeout(Duration.ofSeconds(30)));
+
+ assertFalse(isEmptyConsumerGroup(adminClient,
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)),
+ "Group should still have a member after DEFAULT close under
Classic protocol");
+ }
+
+ @Test
+ public void testCloseOptionsLeaveGroupStreamsProtocol() throws Exception {
+ // Streams + LEAVE_GROUP: member must leave the group immediately.
+ streamsConfig.remove(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
+ streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
+ streams = new
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
+
+
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP)
+ .withTimeout(Duration.ofSeconds(30)));
+
+ waitForEmptyStreamGroup(adminClient,
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
+ }
+
+ @Test
+ public void testCloseOptionsDefaultStreamsProtocol() throws Exception {
+ // Streams + DEFAULT: dynamic member must leave the group (consistent
with Streams protocol design).
+ streamsConfig.remove(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
+ streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
+ streams = new
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
+
+
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.DEFAULT)
+ .withTimeout(Duration.ofSeconds(30)));
+
+ waitForEmptyStreamGroup(adminClient,
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
+ }
+
+ @Test
+ public void testCloseOptionsRemainInGroupStreamsProtocol() throws
Exception {
+ // Streams + REMAIN_IN_GROUP: member must stay in group (no leave
heartbeat sent).
+ streamsConfig.remove(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
+ streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
+ streams = new
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
+
+
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
+ .withTimeout(Duration.ofSeconds(30)));
+
+ assertFalse(isEmptyStreamGroup(adminClient,
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)),
+ "Group should still have a member after REMAIN_IN_GROUP close
under Streams protocol");
+ }
+
protected Topology setupTopologyWithoutIntermediateUserTopic() {
final StreamsBuilder builder = new StreamsBuilder();
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 832e8eb1a06..1fb35121a92 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration.utils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.StreamsGroupDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -984,6 +985,22 @@ public class IntegrationTestUtils {
}
}
+ private static class StreamGroupInactiveCondition implements TestCondition
{
+ private final Admin adminClient;
+ private final String applicationId;
+
+ private StreamGroupInactiveCondition(final Admin adminClient,
+ final String applicationId) {
+ this.adminClient = adminClient;
+ this.applicationId = applicationId;
+ }
+
+ @Override
+ public boolean conditionMet() {
+ return isEmptyStreamGroup(adminClient, applicationId);
+ }
+ }
+
public static void waitForEmptyConsumerGroup(final Admin adminClient,
final String applicationId,
final long timeoutMs) throws
Exception {
@@ -994,6 +1011,16 @@ public class IntegrationTestUtils {
);
}
+ public static void waitForEmptyStreamGroup(final Admin adminClient,
+ final String applicationId,
+ final long timeoutMs) throws
Exception {
+ TestUtils.waitForCondition(
+ new
IntegrationTestUtils.StreamGroupInactiveCondition(adminClient, applicationId),
+ timeoutMs,
+ "Test stream group " + applicationId + " still active even
after waiting " + timeoutMs + " ms."
+ );
+ }
+
public static boolean isEmptyConsumerGroup(final Admin adminClient,
final String applicationId) {
try {
@@ -1010,6 +1037,22 @@ public class IntegrationTestUtils {
}
}
+ public static boolean isEmptyStreamGroup(final Admin adminClient,
+ final String applicationId) {
+ try {
+ final StreamsGroupDescription groupDescription =
+
adminClient.describeStreamsGroups(singletonList(applicationId))
+ .describedGroups()
+ .get(applicationId)
+ .get();
+ return groupDescription.members().isEmpty();
+ } catch (final ExecutionException e) {
+ return e.getCause() instanceof GroupIdNotFoundException;
+ } catch (final InterruptedException e) {
+ return false;
+ }
+ }
+
@SuppressWarnings("deprecation")
private static StateListener getStateListener(final KafkaStreams streams) {
try {
diff --git a/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java
b/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java
index 2d74d1f668b..9e0158be9d2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java
+++ b/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java
@@ -25,22 +25,38 @@ public class CloseOptions {
* Enum to specify the group membership operation upon closing the Kafka
Streams application.
*
* <ul>
- * <li><b>{@code LEAVE_GROUP}</b>: means the consumer leave the
group.</li>
- * <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will not leave
the group explicitly.
- * Note that this option is ignored when using the streams group
protocol
- * ({@code group.protocol=streams}); in that case, the consumer will
always leave the group.</li>
+ * <li><b>{@code DEFAULT}</b>: Applies the default behavior based on the
active protocol and membership type:
+ * <ul>
+ * <li>For the <b>classic protocol</b>: The consumer will remain in
the group.</li>
+ * <li>For the <b>streams protocol</b> ({@code
group.protocol=streams}):
+ * <ul>
+ * <li><b>Dynamic members</b>: The consumer will leave the group
(consistent with the
+ * protocol's design for dynamic members).</li>
+ * <li><b>Static members</b>: The consumer will remain in the
group until session timeout
+ * (consistent with static membership semantics).</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * </li>
+ * <li><b>{@code LEAVE_GROUP}</b>: The consumer will explicitly leave
the group, regardless of the
+ * active protocol.</li>
+ * <li><b>{@code REMAIN_IN_GROUP}</b>: The consumer will remain in the
group, regardless of the
+ * active protocol. Under the streams protocol, this enables a
faster rebalance upon restart
+ * for static members.</li>
* </ul>
*/
public enum GroupMembershipOperation {
+ DEFAULT,
LEAVE_GROUP,
REMAIN_IN_GROUP
}
/**
* Specifies the group membership operation upon shutdown.
- * By default, {@code GroupMembershipOperation.REMAIN_IN_GROUP} will be
applied, which follows the KafkaStreams default behavior.
+ * By default, {@code GroupMembershipOperation.DEFAULT} will be applied,
which adapts the behavior
+ * based on the active protocol.
*/
- protected GroupMembershipOperation operation =
GroupMembershipOperation.REMAIN_IN_GROUP;
+ protected GroupMembershipOperation operation =
GroupMembershipOperation.DEFAULT;
/**
* Specifies the maximum amount of time to wait for the close process to
complete.
@@ -69,7 +85,8 @@ public class CloseOptions {
/**
* Static method to create a {@code CloseOptions} with a specified group
membership operation.
*
- * @param operation the group membership operation to apply. Must be one
of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
+ * @param operation the group membership operation to apply. Must be one
of {@code DEFAULT},
+ * {@code LEAVE_GROUP}, or {@code REMAIN_IN_GROUP}.
* @return a new {@code CloseOptions} instance with the specified group
membership operation.
*/
public static CloseOptions groupMembershipOperation(final
GroupMembershipOperation operation) {
@@ -90,7 +107,8 @@ public class CloseOptions {
/**
* Fluent method to set the group membership operation upon shutdown.
*
- * @param operation the group membership operation to apply. Must be one
of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
+ * @param operation the group membership operation to apply. Must be one
of {@code DEFAULT},
+ * {@code LEAVE_GROUP}, or {@code REMAIN_IN_GROUP}.
* @return this {@code CloseOptions} instance.
*/
public CloseOptions withGroupMembershipOperation(final
GroupMembershipOperation operation) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b790f10e398..851fced582c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -489,7 +489,10 @@ public class KafkaStreams implements AutoCloseable {
closeToError();
}
final StreamThread deadThread = (StreamThread) Thread.currentThread();
-
deadThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+ // Use DEFAULT so the consumer layer decides: classic protocol maps to
REMAIN_IN_GROUP
+ // (avoiding an unnecessary rebalance before the replacement thread
joins), while Streams
+ // protocol adapts to static vs dynamic membership.
+
deadThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.DEFAULT);
addStreamThread();
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
@@ -1456,11 +1459,17 @@ public class KafkaStreams implements AutoCloseable {
* Shutdown this {@code KafkaStreams} instance by signaling all the
threads to stop, and then wait for them to join.
* This will block until all threads have stopped.
* <p>
- * When using the classic protocol, the consumer will not leave the group
explicitly. However, when using
- * the streams group protocol ({@code group.protocol=streams}), the
consumer will always leave the group.
+ * Uses {@link
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation#DEFAULT DEFAULT}
behavior,
+ * which adapts based on the active protocol:
+ * <ul>
+ * <li>Classic protocol: the consumer remains in the group (no explicit
leave).</li>
+ * <li>Streams protocol ({@code group.protocol=streams}): dynamic
members leave the group;
+ * static members (with {@code group.instance.id}) remain in the
group and are removed
+ * by the broker after the session timeout.</li>
+ * </ul>
*/
public void close() {
- close(Optional.empty(),
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+ close(Optional.empty(),
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.DEFAULT);
}
private Thread shutdownHelper(
@@ -1604,8 +1613,14 @@ public class KafkaStreams implements AutoCloseable {
* A {@code timeout} of {@link Duration#ZERO} (or any other zero duration)
makes the close operation asynchronous.
* Negative-duration timeouts are rejected.
* <p>
- * When using the classic protocol, the consumer will not leave the group
explicitly. However, when using
- * the streams group protocol ({@code group.protocol=streams}), the
consumer will always leave the group.
+ * Uses {@link
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation#DEFAULT DEFAULT}
behavior,
+ * which adapts based on the active protocol:
+ * <ul>
+ * <li>Classic protocol: the consumer remains in the group (no explicit
leave).</li>
+ * <li>Streams protocol ({@code group.protocol=streams}): dynamic
members leave the group;
+ * static members (with {@code group.instance.id}) remain in the
group and are removed
+ * by the broker after the session timeout.</li>
+ * </ul>
*
* @param timeout how long to wait for the threads to shut down
* @return {@code true} if all threads were successfully
stopped—{@code false} if the timeout was reached
@@ -1620,7 +1635,7 @@ public class KafkaStreams implements AutoCloseable {
throw new IllegalArgumentException("Timeout can't be negative.");
}
- return close(Optional.of(timeoutMs),
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+ return close(Optional.of(timeoutMs),
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.DEFAULT);
}
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 882b427d1ca..6d1bf0c2937 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -100,6 +100,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
+import static
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.DEFAULT;
import static
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.LEAVE_GROUP;
import static
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
@@ -373,7 +374,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
// These are used to signal from outside the stream thread, but the
variables themselves are internal to the thread
private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
private final
AtomicReference<org.apache.kafka.streams.CloseOptions.GroupMembershipOperation>
leaveGroupRequested =
- new
AtomicReference<>(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+ new
AtomicReference<>(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.DEFAULT);
private final AtomicLong lastShutdownWarningTimestamp = new AtomicLong(0L);
private final boolean eosEnabled;
private final boolean processingThreadsEnabled;
@@ -1892,10 +1893,21 @@ public class StreamThread extends Thread implements
ProcessingThread {
log.error("Failed to close changelog reader due to the following
error:", e);
}
try {
- final GroupMembershipOperation membershipOperation =
- leaveGroupRequested.get() ==
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP ?
LEAVE_GROUP : REMAIN_IN_GROUP;
- if (membershipOperation == REMAIN_IN_GROUP &&
streamsRebalanceData.isPresent()) {
- log.info("The consumer will leave the group since the streams
group protocol is used");
+ final
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation streamsOperation
= leaveGroupRequested.get();
+ final GroupMembershipOperation membershipOperation;
+ if (streamsOperation ==
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP) {
+ membershipOperation = LEAVE_GROUP;
+ } else if (streamsOperation ==
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
{
+ membershipOperation = REMAIN_IN_GROUP;
+ } else {
+ // DEFAULT: adapt to the active protocol
+ if (streamsRebalanceData.isPresent()) {
+ // Streams protocol: delegate to the consumer, which
adapts to static vs dynamic
+ membershipOperation = DEFAULT;
+ } else {
+ // Classic protocol: remain in the group (preserves
existing behavior)
+ membershipOperation = REMAIN_IN_GROUP;
+ }
}
mainConsumer.close(CloseOptions.groupMembershipOperation(membershipOperation));
} catch (final Throwable e) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 11607240eb2..4ee975805dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -78,6 +78,7 @@ import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
+import org.mockito.stubbing.Answer;
import java.net.InetSocketAddress;
import java.time.Duration;
@@ -310,7 +311,7 @@ public class KafkaStreamsTest {
}
private void prepareConsumer(final StreamThread thread, final
AtomicReference<StreamThread.State> state) {
- doAnswer(invocation -> {
+ final Answer<Object> shutdownAnswer = invocation -> {
supplier.consumer.close(
org.apache.kafka.clients.consumer.CloseOptions.groupMembershipOperation(org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
);
@@ -325,7 +326,9 @@ public class KafkaStreamsTest {
threadStateListenerCapture.getValue().onChange(thread,
StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
threadStateListenerCapture.getValue().onChange(thread,
StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
return null;
-
}).when(thread).shutdown(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+ };
+
doAnswer(shutdownAnswer).when(thread).shutdown(CloseOptions.GroupMembershipOperation.DEFAULT);
+
doAnswer(shutdownAnswer).when(thread).shutdown(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
}
private void prepareThreadLock(final StreamThread thread) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 4a393060d62..01aa4279c43 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -110,6 +110,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
@@ -1503,6 +1504,98 @@ public class StreamThreadTest {
verify(taskManager).shutdown(true);
}
+ @Test
+ public void shouldRouteDefaultToRemainInGroupForClassicProtocol() {
+ // Classic protocol: DEFAULT should map to consumer REMAIN_IN_GROUP
+ final TaskManager taskManager = mock(TaskManager.class);
+ final StreamsConfig config = new StreamsConfig(configProps(false,
false));
+ final ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+ final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
+ topologyMetadata.buildAndRewriteTopology();
+ // buildStreamThread uses Optional.empty() for streamsRebalanceData
(Classic protocol)
+ thread = buildStreamThread(consumer, taskManager, config,
topologyMetadata)
+ .updateThreadMetadata(adminClientId(CLIENT_ID));
+
+
thread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.DEFAULT);
+
+ final ArgumentCaptor<org.apache.kafka.clients.consumer.CloseOptions>
captor =
+
ArgumentCaptor.forClass(org.apache.kafka.clients.consumer.CloseOptions.class);
+ // buildStreamThread passes consumer as mainConsumer, so close() is
called on consumer
+ verify(consumer).close(captor.capture());
+ assertEquals(
+
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP,
+ captor.getValue().groupMembershipOperation()
+ );
+ }
+
+ @Test
+ public void shouldRouteDefaultToConsumerDefaultForStreamsProtocol() {
+ // Streams protocol: DEFAULT should map to consumer DEFAULT (dynamic
member leaves)
+ final TaskManager taskManager = mock(TaskManager.class);
+ final StreamsConfig config = new StreamsConfig(configProps(false,
false));
+ final ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+ final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
+ UUID.randomUUID(), Optional.empty(), Optional.empty(), Map.of(),
Map.of());
+ thread = new StreamThread(
+ mockTime, config, null,
+ mainConsumer, consumer,
+ changelogReader, null, taskManager, null,
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime),
+ new TopologyMetadata(internalTopologyBuilder, config),
+ PROCESS_ID, CLIENT_ID, new LogContext(""),
+ null, new AtomicLong(Long.MAX_VALUE), new LinkedList<>(),
+ null, HANDLER, null,
+ Optional.of(streamsRebalanceData), null, null
+ ).updateThreadMetadata(adminClientId(CLIENT_ID));
+
+
thread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.DEFAULT);
+
+ final ArgumentCaptor<org.apache.kafka.clients.consumer.CloseOptions>
captor =
+
ArgumentCaptor.forClass(org.apache.kafka.clients.consumer.CloseOptions.class);
+ verify(mainConsumer).close(captor.capture());
+ assertEquals(
+
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.DEFAULT,
+ captor.getValue().groupMembershipOperation()
+ );
+ }
+
+ @Test
+ public void shouldRouteRemainInGroupToRemainInGroupForStreamsProtocol() {
+ // Streams protocol: explicit REMAIN_IN_GROUP must always map to
consumer REMAIN_IN_GROUP
+ final TaskManager taskManager = mock(TaskManager.class);
+ final StreamsConfig config = new StreamsConfig(configProps(false,
false));
+ final ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+ final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
+ UUID.randomUUID(), Optional.empty(), Optional.empty(), Map.of(),
Map.of());
+ thread = new StreamThread(
+ mockTime, config, null,
+ mainConsumer, consumer,
+ changelogReader, null, taskManager, null,
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime),
+ new TopologyMetadata(internalTopologyBuilder, config),
+ PROCESS_ID, CLIENT_ID, new LogContext(""),
+ null, new AtomicLong(Long.MAX_VALUE), new LinkedList<>(),
+ null, HANDLER, null,
+ Optional.of(streamsRebalanceData), null, null
+ ).updateThreadMetadata(adminClientId(CLIENT_ID));
+
+
thread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+
+ final ArgumentCaptor<org.apache.kafka.clients.consumer.CloseOptions>
captor =
+
ArgumentCaptor.forClass(org.apache.kafka.clients.consumer.CloseOptions.class);
+ verify(mainConsumer).close(captor.capture());
+ assertEquals(
+
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP,
+ captor.getValue().groupMembershipOperation()
+ );
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void
shouldNotThrowWhenStandbyTasksAssignedAndNoStateStoresForTopology(final boolean
processingThreadsEnabled) {