jeffkbkim commented on code in PR #19359:
URL: https://github.com/apache/kafka/pull/19359#discussion_r2027417821
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##########
@@ -1106,4 +1106,32 @@ public void testIsSubscribedToTopic() {
assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
}
+
+ @Test
+ public void testShutdownRequestedMethods() {
+ String memberId1 = "test-member-id1";
+ String memberId2 = "test-member-id2";
+ LogContext logContext = new LogContext();
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+ GroupCoordinatorMetricsShard metricsShard =
mock(GroupCoordinatorMetricsShard.class);
+ StreamsGroup streamsGroup = new StreamsGroup(logContext,
snapshotRegistry, "test-group", metricsShard);
+
+
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId1));
+
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId2));
+
+ // Initially, shutdown should not be requested
+ assertFalse(streamsGroup.isShutdownRequested());
+
+ // Set shutdown requested
+ streamsGroup.maybeSetShutdownRequested(memberId1, true);
+ assertTrue(streamsGroup.isShutdownRequested());
+
+ // As long as group not empty, remain in shutdown requested state
+ streamsGroup.removeMember(memberId1);
+ assertTrue(streamsGroup.isShutdownRequested());
+
+ // As soon as the group is empty, clear the shutdown requested state
+ streamsGroup.removeMember(memberId2);
+ assertFalse(streamsGroup.isShutdownRequested());
+ }
Review Comment:
nit: newline
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2070,8 +2067,6 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
String processId,
Endpoint userEndpoint,
List<KeyValue> clientTags,
- List<TaskOffset> taskOffsets,
- List<TaskOffset> taskEndOffsets,
Review Comment:
these are unrelated to this PR correct?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -1049,4 +1057,21 @@ public StreamsGroupDescribeResponseData.DescribedGroup
asDescribedGroup(
return describedGroup;
}
+ public void maybeSetShutdownRequested(final String memberId, final boolean
shutdownApplication) {
Review Comment:
should we just have a setShutdownRequested method and do the if check on the
calller's side?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -197,6 +197,13 @@ public static class DeadlineAndEpoch {
*/
private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+ /**
+ * A flag to indicate whether a shutdown has been requested for this group.
+ * This has no direct effect inside the group coordinator, but is
propagated to all members of the group.
+ * This is not persisted in the log.
Review Comment:
to understand this flag:
- we only clear when a group goes empty, this is when we assume all members
acked
- since this is only in soft state, we have no guarantee that all members
will eventually shutdown. it is a best effort nudge.
is this correct?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]