lucasbru commented on code in PR #20600:
URL: https://github.com/apache/kafka/pull/20600#discussion_r2421732182


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16296,6 +16299,98 @@ barTopicName, computeTopicHash(barTopicName, 
metadataImage)
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testJoinEmptyStreamsGroupAndDescribe() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .buildCoordinatorMetadataImage();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        assignor.prepareGroupAssignment(Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+            TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
+        )));
+
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.groupMetadataManager.streamsGroup(groupId));
+
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setProcessId("process-id")
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(topology)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setActiveTasks(List.of(
+                    new StreamsGroupHeartbeatResponseData.TaskIds()
+                        .setSubtopologyId(subtopology1)
+                        .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+                ))
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()),
+            result.response().data()
+        );
+
+        StreamsGroupMember expectedMember = 
streamsGroupMemberBuilderWithDefaults(memberId)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(1500)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 
5)))
+            .build();
+
+        // Commit the offset and test again

Review Comment:
   True. That's a copy/paste error from another test. I changed it to
   
   ```
           // Commit the offset, so that the latest state will be described 
below
   ```



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4504,6 +4506,55 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @Test
+  def testDescribeStreamsGroupsForStatelessTopology(): Unit = {
+    val streamsGroupId = "stream_group_id"
+    val testTopicName = "test_topic"
+    val testNumPartitions = 1
+
+    val config = createConfig
+    client = Admin.create(config)
+
+    prepareTopics(List(testTopicName), testNumPartitions)
+    prepareRecords(testTopicName)
+
+    val streams = createStreamsGroup(
+      inputTopics = Set(testTopicName),
+      streamsGroupId = streamsGroupId
+    )
+    streams.poll(JDuration.ofMillis(500L))
+
+    try {
+      TestUtils.waitUntilTrue(() => {
+        val firstGroup = 
client.listGroups().all().get().stream().findFirst().orElse(null)
+        firstGroup.groupState().orElse(null) == GroupState.STABLE && 
firstGroup.groupId() == streamsGroupId

Review Comment:
   The bug is not really a race condition. The reason is that, in the old code, 
after replaying the records from the first heartbeat, the streams group will 
not have a `ConfiguredTopology`, so it will be `NOT_READY`. And in that state, 
we will create a snapshot of the group. Then, the group will continue to show 
as `NOT_READY`, because we always describe the latest snapshot of the group, 
not the current in-memory representation of the group. So even if the following 
heartbeats will create a `ConfiguredTopology` and turn the state from 
`NOT_READY` to `STABLE`, the latest snapshot will continue to show "NOT_READY", 
until we change the persistent state of the group, which in this test will not 
happen.
   
   That's the confusing part about the bug. So this test will indeed fail 
before the fix. 
   
   Also, I think it's a valid assertion to just expect the group to become 
`STABLE` eventually. This is an integration test, and doesn't need to reach 
into the internals to try to pick exactly the condition we are looking for.
   
   I could try to somehow assert that the group is never `NOT_READY`, but in 
fact it can be `NOT_READY` if the input topic isn't in the group coordinator 
metadata yet. That would then just be a flaky test.
   
   TLDR: all we want is that the group becomes stable eventually on its own, 
which is what this test is testing.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16801,7 +16903,7 @@ public void 
testStreamsGroupMemberRequestingShutdownApplicationUponLeaving() {
             context.replay(record);
         }
         assignor.prepareGroupAssignment(
-            Map.of(memberId1, TasksTuple.EMPTY)
+            Map.of(memberId2, TasksTuple.EMPTY)

Review Comment:
   Actually, I replaced this by just `Map.of()`. If we don't mention a member 
in the target assignment, it will be treated like `TasksTuple.EMPTY`, so 
whether we use member1 or member2 here, it's all the same. I also removed the 
replaying of records two lines above, because that's already done inside 
`context.streamsGroupHeartbeat`. Replaying the records is idempotent, but it's 
cleaner to not replay them twice.



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4504,6 +4506,55 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @Test
+  def testDescribeStreamsGroupsForStatelessTopology(): Unit = {
+    val streamsGroupId = "stream_group_id"
+    val testTopicName = "test_topic"
+    val testNumPartitions = 1
+
+    val config = createConfig
+    client = Admin.create(config)
+
+    prepareTopics(List(testTopicName), testNumPartitions)
+    prepareRecords(testTopicName)
+
+    val streams = createStreamsGroup(
+      inputTopics = Set(testTopicName),
+      streamsGroupId = streamsGroupId
+    )
+    streams.poll(JDuration.ofMillis(500L))
+
+    try {
+      TestUtils.waitUntilTrue(() => {
+        val firstGroup = 
client.listGroups().all().get().stream().findFirst().orElse(null)
+        firstGroup.groupState().orElse(null) == GroupState.STABLE && 
firstGroup.groupId() == streamsGroupId
+      }, "Streams group did not transition to STABLE before timeout")
+
+      // Verify the describe call works correctly
+      val describedGroups = 
client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
+      val group = describedGroups.get(streamsGroupId)

Review Comment:
   See above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to