lucasbru commented on code in PR #20600:
URL: https://github.com/apache/kafka/pull/20600#discussion_r2421732182
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16296,6 +16299,98 @@ barTopicName, computeTopicHash(barTopicName,
metadataImage)
assertRecordsEquals(expectedRecords, result.records());
}
+ @Test
+ public void testJoinEmptyStreamsGroupAndDescribe() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .buildCoordinatorMetadataImage();
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ assignor.prepareGroupAssignment(Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
+ )));
+
+ assertThrows(GroupIdNotFoundException.class, () ->
+ context.groupMetadataManager.streamsGroup(groupId));
+
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setProcessId("process-id")
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ assertResponseEquals(
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setActiveTasks(List.of(
+ new StreamsGroupHeartbeatResponseData.TaskIds()
+ .setSubtopologyId(subtopology1)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+ ))
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()),
+ result.response().data()
+ );
+
+ StreamsGroupMember expectedMember =
streamsGroupMemberBuilderWithDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(1500)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4,
5)))
+ .build();
+
+ // Commit the offset and test again
Review Comment:
True. That's a copy/paste error from another test. I changed it to
```
// Commit the offset, so that the latest state will be described
below
```
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4504,6 +4506,55 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
+ @Test
+ def testDescribeStreamsGroupsForStatelessTopology(): Unit = {
+ val streamsGroupId = "stream_group_id"
+ val testTopicName = "test_topic"
+ val testNumPartitions = 1
+
+ val config = createConfig
+ client = Admin.create(config)
+
+ prepareTopics(List(testTopicName), testNumPartitions)
+ prepareRecords(testTopicName)
+
+ val streams = createStreamsGroup(
+ inputTopics = Set(testTopicName),
+ streamsGroupId = streamsGroupId
+ )
+ streams.poll(JDuration.ofMillis(500L))
+
+ try {
+ TestUtils.waitUntilTrue(() => {
+ val firstGroup =
client.listGroups().all().get().stream().findFirst().orElse(null)
+ firstGroup.groupState().orElse(null) == GroupState.STABLE &&
firstGroup.groupId() == streamsGroupId
Review Comment:
The bug is not really a race condition. The reason is that, in the old code,
after replaying the records from the first heartbeat, the streams group will
not have a `ConfiguredTopology`, so it will be `NOT_READY`. And in that state,
we will create a snapshot of the group. Then, the group will continue to show
as `NOT_READY`, because we always describe the latest snapshot of the group,
not the current in-memory representation of the group. So even if the following
heartbeats will create a `ConfiguredTopology` and turn the state from
`NOT_READY` to `STABLE`, the latest snapshot will continue to show "NOT_READY",
until we change the persistent state of the group, which in this test will not
happen.
That's the confusing part about the bug. So this test will indeed fail
before the fix.
Also, I think it's a valid assertion to just expect the group to become
`STABLE` eventually. This is an integration test, and doesn't need to reach
into the internals to try to pick exactly the condition we are looking for.
I could try to somehow assert that the group is never `NOT_READY`, but in
fact it can be `NOT_READY` if the input topic isn't in the group coordinator
metadata yet. That would then just be a flaky test.
TLDR: all we want is that the group becomes stable eventually on its own,
which is what this test is testing.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16801,7 +16903,7 @@ public void
testStreamsGroupMemberRequestingShutdownApplicationUponLeaving() {
context.replay(record);
}
assignor.prepareGroupAssignment(
- Map.of(memberId1, TasksTuple.EMPTY)
+ Map.of(memberId2, TasksTuple.EMPTY)
Review Comment:
Actually, I replaced this by just `Map.of()`. If we don't mention a member
in the target assignment, it will be treated like `TasksTuple.EMPTY`, so
whether we use member1 or member2 here, it's all the same. I also removed the
replaying of records two lines above, because that's already done inside
`context.streamsGroupHeartbeat`. Replaying the records is idempotent, but it's
cleaner to not replay them twice.
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4504,6 +4506,55 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
+ @Test
+ def testDescribeStreamsGroupsForStatelessTopology(): Unit = {
+ val streamsGroupId = "stream_group_id"
+ val testTopicName = "test_topic"
+ val testNumPartitions = 1
+
+ val config = createConfig
+ client = Admin.create(config)
+
+ prepareTopics(List(testTopicName), testNumPartitions)
+ prepareRecords(testTopicName)
+
+ val streams = createStreamsGroup(
+ inputTopics = Set(testTopicName),
+ streamsGroupId = streamsGroupId
+ )
+ streams.poll(JDuration.ofMillis(500L))
+
+ try {
+ TestUtils.waitUntilTrue(() => {
+ val firstGroup =
client.listGroups().all().get().stream().findFirst().orElse(null)
+ firstGroup.groupState().orElse(null) == GroupState.STABLE &&
firstGroup.groupId() == streamsGroupId
+ }, "Streams group did not transition to STABLE before timeout")
+
+ // Verify the describe call works correctly
+ val describedGroups =
client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
+ val group = describedGroups.get(streamsGroupId)
Review Comment:
See above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]